关于hadoop:Hadoop-入门笔记-二十一-MapReduce-DB操作

4次阅读

共计 12286 个字符,预计需要花费 31 分钟才能阅读完成。

一. 背景常识

通常组织会应用关系型数据来存储业务相干的数据,但随着数据的规模越来越大,尤其是像 MySQL 这种,在单表超过 5 千万条记录时,只管对表应用了特定的存储引擎和索引优化,但仍然不可避免的存在性能降落问题。
此时,咱们** 能够通过应用 MapReduce 从 MySQL 中定期迁徙应用频率较低的历史数据到 HDFS 中,一方面能够升高对 MySQL 的存储和计算负载,另一方面,通过分布式计算引擎能够更加高效的解决过来的历史数据。

对于 MapReduce 框架来说,应用 inputform 进行数据读取操作,读取的数据首先由 mapper 解决,而后交给 reducer 解决,最终应用 outputformat 进行数据的输入操作。默认状况下,输入输出的组件实现都是针对文本数据处理的,别离是 TextInputFormat、TextOutputFormat。
为了不便 MapReduce 间接拜访关系型数据库(Mysql,Oracle),Hadoop 提供了 DBInputFormat 和 DBOutputFormat 两个类。其中DBInputFormat 负责从数据库中读取数据,而 DBOutputFormat 负责把数据最终写入数据库中。

二. 读取数据库操作

1. 需要

在 mysql 中 itcast_shop 数据库下创立表 itheima_goods 并加载数据到表中。要求应用 MapReduce 程序将表中的数据导出寄存在指定的文件下。
数据库:
链接:https://pan.baidu.com/s/1ImrI…
提取码:pz9b

因为波及到 java 操作 mysql,因而须要在 pom 依赖中额定增加 mysql-jdbc 驱动。

    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.32</version>
    </dependency>

2. DBInputFormat 类

InputFormat 类用于从 SQL 表读取数据。DBInputFormat 底层一行一行读取表中的数据,返回 <k,v> 键值对。其中 k 是 LongWritable 类型,表中数据的记录行号,从 0 开始,v 是 DBWritable 类型,示意该行数据对应的对象类型。
此外还须要应用 setInput 办法设置 SQL 查问的语句相干信息。

3. 代码实现

1. 编写 GoodsBean 类

定义 GoodsBean 的实体类,用于封装查问返回的后果(如果要查问表的所有字段,那么属性就跟表的字段一一对应即可)。并且须要实现序列化机制 Writable。
此外,从数据库读取 / 写入数据库的对象应实现 DBWritable。DBWritable 与 Writable 类似,区别在于 write(PreparedStatement)办法采纳 PreparedStatement,而 readFields(ResultSet)采纳 ResultSet。

package com.uuicon.sentiment_upload.db;


import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.lib.db.DBWritable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

public class GoodsBean implements Writable, DBWritable {
    private Long goodsId;
    private String goodsSn;
    private String goodsName;
    private Double marketPrice;
    private Double shopPrice;
    private Long saleNum;

    @Override
    public String toString() {return goodsId + "\t" + goodsSn + '\t' + goodsName + "\t" + marketPrice + "\t" + shopPrice + "\t" + saleNum;}

    public Long getGoodsId() {return goodsId;}

    public void setGoodsId(Long goodsId) {this.goodsId = goodsId;}

    public String getGoodsSn() {return goodsSn;}

    public void setGoodsSn(String goodsSn) {this.goodsSn = goodsSn;}

    public String getGoodsName() {return goodsName;}

    public void setGoodsName(String goodsName) {this.goodsName = goodsName;}

    public Double getMarketPrice() {return marketPrice;}

    public void setMarketPrice(Double marketPrice) {this.marketPrice = marketPrice;}

    public Double getShopPrice() {return shopPrice;}

    public void setShopPrice(Double shopPrice) {this.shopPrice = shopPrice;}

    public Long getSaleNum() {return saleNum;}

    public void setSaleNum(Long saleNum) {this.saleNum = saleNum;}

    public GoodsBean() {}

    public GoodsBean(Long goodsId, String goodsSn, String goodsName, Double marketPrice, Double shopPrice, Long saleNum) {
        this.goodsId = goodsId;
        this.goodsSn = goodsSn;
        this.goodsName = goodsName;
        this.marketPrice = marketPrice;
        this.shopPrice = shopPrice;
        this.saleNum = saleNum;
    }

    public void set(Long goodsId, String goodsSn, String goodsName, Double marketPrice, Double shopPrice, Long saleNum) {
        this.goodsId = goodsId;
        this.goodsSn = goodsSn;
        this.goodsName = goodsName;
        this.marketPrice = marketPrice;
        this.shopPrice = shopPrice;
        this.saleNum = saleNum;
    }

    @Override
    public void write(DataOutput out) throws IOException {out.writeLong(goodsId);
        out.writeUTF(goodsSn);
        out.writeUTF(goodsName);
        out.writeDouble(marketPrice);
        out.writeDouble(shopPrice);
        out.writeLong(saleNum);
    }

    @Override
    public void readFields(DataInput in) throws IOException {this.goodsId = in.readLong();
        this.goodsSn = in.readUTF();
        this.goodsName = in.readUTF();
        this.marketPrice = in.readDouble();
        this.shopPrice = in.readDouble();
        this.saleNum = in.readLong();}

    @Override
    public void write(PreparedStatement ps) throws SQLException {ps.setLong(1, goodsId);
        ps.setString(2, goodsSn);
        ps.setString(3, goodsName);
        ps.setDouble(4, marketPrice);
        ps.setDouble(5, shopPrice);
        ps.setLong(6, saleNum);
    }

    @Override
    public void readFields(ResultSet resultSet) throws SQLException {this.goodsId = resultSet.getLong(1);
        this.goodsSn = resultSet.getString(2);
        this.goodsName = resultSet.getString(3);
        this.marketPrice = resultSet.getDouble(4);
        this.shopPrice = resultSet.getDouble(5);
        this.saleNum = resultSet.getLong(6);
    }
}
2. 编写 Mapper 类
package com.uuicon.sentiment_upload.db;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class ReadDBMapper extends Mapper<LongWritable, GoodsBean, LongWritable, Text> {Text outValue = new Text();

    @Override
    protected void map(LongWritable key, GoodsBean value, Context context) throws IOException, InterruptedException {outValue.set(value.toString());
        context.write(key, outValue);
    }
}
3. 创立程序驱动类
package com.uuicon.sentiment_upload.db;


import com.uuicon.sentiment_upload.covidtopn.CovidTopNDriver;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapred.lib.db.DBConfiguration;
import org.apache.hadoop.mapred.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.Job;

import java.io.File;
import java.io.IOException;

public class ReadDBDriver {public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {Configuration conf = new Configuration();
        // 数据库信息
        DBConfiguration.configureDB(conf,
                "com.mysql.jdbc.Driver",
                "jdbc:mysql://localhost:3306/itcast_goods",
                "root",
                "root"
        );
        // 创立作业类
        Job job = Job.getInstance(conf, ReadDBDriver.class.getSimpleName());
        // 设置 mr 驱动类
        job.setJarByClass(ReadDBDriver.class);

        // 设置 mapper 类
        job.setMapperClass(ReadDBMapper.class);
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(Text.class);

        // todo 设置输出组件
        FileOutputFormat.setOutputPath(job,
                new Path("E:\\ml\\hadoop\\mysqlout"));

        // 设置 Reducer 类 ,todo 本例不须要 reduce , 操作形式是将 tasknumber 设置为 0
        job.setNumReduceTasks(0);

        // todo 设置输出组件
        job.setInputFormatClass(DBInputFormat.class);
        // 增加读取数据库相干参数

        DBInputFormat.setInput(
                job,
                GoodsBean.class,
                "select goodsId ,goodsSn,goodsName,marketPrice ,shopPrice , saleNum from itheima_goods",
                "select count(goodsId) from itheima_goods"
        );
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);

    }
}
4. 运行程序

间接在驱动类中右键运行 main 办法,应用 MapReduce 的本地模式执行。也能够将程序应用 maven 插件打包成 jar 包,提交到 yarn 上进行分布式运行。

3. 输入到数据库操作

1. 需要

有一份结构化的数据文件,数据内容对应着 mysql 中一张表的内容,要求应用 MapReduce 程序将文件的内容读取写入到 mysql 中。
就以上例的输入文件作为结构化文件,上面在 mysql 中创立对应的表构造。
表构造:

CREATE TABLE `itheima_goods_mr_write` (`goodsId` bigint(11) NOT NULL AUTO_INCREMENT COMMENT '商品 id',
  `goodsSn` varchar(20) NOT NULL COMMENT '商品编号',
  `goodsName` varchar(200) NOT NULL COMMENT '商品名称',
  `marketPrice` decimal(11,2) NOT NULL DEFAULT '0.00' COMMENT '市场价',
  `shopPrice` decimal(11,2) NOT NULL DEFAULT '0.00' COMMENT '门店价',
  `saleNum` int(11) NOT NULL DEFAULT '0' COMMENT '总销售量',
  PRIMARY KEY (`goodsId`)
) ENGINE=InnoDB AUTO_INCREMENT=115909 DEFAULT CHARSET=utf8;

2. DBOutputFormat 类

OutputFormat,它将 reduce 输入发送到 SQL 表。DBOutputFormat 承受 <key,value> 键值对,其中 key 必须具备扩大 DBWritable 的类型。
此外还须要应用 setOutput 办法设置 SQL 插入语句相干信息,比方表、字段等。

3. 代码实现

1. 编写 GoodsBean 类

定义 GoodsBean 的实体类,用于封装插入表中的数据(对象属性跟表的字段一一对应即可)。并且须要实现序列化机制 Writable。
此外,从数据库读取 / 写入数据库的对象应实现 DBWritable。DBWritable 与 Writable 类似,区别在于 write(PreparedStatement)办法采纳 PreparedStatement,而 readFields(ResultSet)采纳 ResultSet。

package com.uuicon.sentiment_upload.db;


import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.lib.db.DBWritable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

public class GoodsBean implements Writable, DBWritable {
    private Long goodsId;
    private String goodsSn;
    private String goodsName;
    private Double marketPrice;
    private Double shopPrice;
    private Long saleNum;

    @Override
    public String toString() {return goodsId + "\t" + goodsSn + '\t' + goodsName + "\t" + marketPrice + "\t" + shopPrice + "\t" + saleNum;}

    public Long getGoodsId() {return goodsId;}

    public void setGoodsId(Long goodsId) {this.goodsId = goodsId;}

    public String getGoodsSn() {return goodsSn;}

    public void setGoodsSn(String goodsSn) {this.goodsSn = goodsSn;}

    public String getGoodsName() {return goodsName;}

    public void setGoodsName(String goodsName) {this.goodsName = goodsName;}

    public Double getMarketPrice() {return marketPrice;}

    public void setMarketPrice(Double marketPrice) {this.marketPrice = marketPrice;}

    public Double getShopPrice() {return shopPrice;}

    public void setShopPrice(Double shopPrice) {this.shopPrice = shopPrice;}

    public Long getSaleNum() {return saleNum;}

    public void setSaleNum(Long saleNum) {this.saleNum = saleNum;}

    public GoodsBean() {}

    public GoodsBean(Long goodsId, String goodsSn, String goodsName, Double marketPrice, Double shopPrice, Long saleNum) {
        this.goodsId = goodsId;
        this.goodsSn = goodsSn;
        this.goodsName = goodsName;
        this.marketPrice = marketPrice;
        this.shopPrice = shopPrice;
        this.saleNum = saleNum;
    }

    public void set(Long goodsId, String goodsSn, String goodsName, Double marketPrice, Double shopPrice, Long saleNum) {
        this.goodsId = goodsId;
        this.goodsSn = goodsSn;
        this.goodsName = goodsName;
        this.marketPrice = marketPrice;
        this.shopPrice = shopPrice;
        this.saleNum = saleNum;
    }

    @Override
    public void write(DataOutput out) throws IOException {out.writeLong(goodsId);
        out.writeUTF(goodsSn);
        out.writeUTF(goodsName);
        out.writeDouble(marketPrice);
        out.writeDouble(shopPrice);
        out.writeLong(saleNum);
    }

    @Override
    public void readFields(DataInput in) throws IOException {this.goodsId = in.readLong();
        this.goodsSn = in.readUTF();
        this.goodsName = in.readUTF();
        this.marketPrice = in.readDouble();
        this.shopPrice = in.readDouble();
        this.saleNum = in.readLong();}

    @Override
    public void write(PreparedStatement ps) throws SQLException {ps.setLong(1, goodsId);
        ps.setString(2, goodsSn);
        ps.setString(3, goodsName);
        ps.setDouble(4, marketPrice);
        ps.setDouble(5, shopPrice);
        ps.setLong(6, saleNum);
    }

    @Override
    public void readFields(ResultSet resultSet) throws SQLException {this.goodsId = resultSet.getLong(1);
        this.goodsSn = resultSet.getString(2);
        this.goodsName = resultSet.getString(3);
        this.marketPrice = resultSet.getDouble(4);
        this.shopPrice = resultSet.getDouble(5);
        this.saleNum = resultSet.getLong(6);
    }
}
2. Mapper 类
package com.uuicon.sentiment_upload.db;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class WriteDBMapper extends Mapper<LongWritable, Text, NullWritable, GoodsBean> {NullWritable outKey = NullWritable.get();
    GoodsBean outValue = new GoodsBean();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {Counter sc = context.getCounter("mr_to_sql", "SUCCESS");
        Counter fc = context.getCounter("mr_to_sql", "FAILED");
        String[] fields = value.toString().split("\\s+");
        if (fields.length > 6) {
            // 失常数据
            outValue.set(Long.parseLong(fields[1]),
                    fields[2],
                    fields[3],
                    Double.parseDouble(fields[4]),
                    Double.parseDouble(fields[5]),
                    Long.parseLong(fields[6])
            );
            context.write(outKey, outValue);
            sc.increment(1);
        } else {
            // 异样数据
            fc.increment(1);
        }
    }
}
3. reudce 类
package com.uuicon.sentiment_upload.db;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * todo 在应用 DBOutputFormat 的时候, 要求输入的 key 必须实现 DBWritable 因为只会把 key 写入数据库
 */
public class WriteDBReducer extends Reducer<NullWritable, GoodsBean, GoodsBean, NullWritable> {
    @Override
    protected void reduce(NullWritable key, Iterable<GoodsBean> values, Context context) throws IOException, InterruptedException {for (GoodsBean value : values) {context.write(value, key);
        }
    }
}
4. 驱动类
package com.uuicon.sentiment_upload.db;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class WriteDBDriver {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();
        // 数据库信息
        DBConfiguration.configureDB(conf,
                "com.mysql.jdbc.Driver",
                "jdbc:mysql://localhost:3306/itcast_goods?useUnicode=true&characterEncoding=utf8",
                "root",
                "root"
        );
        // 创立作业类
        Job job = Job.getInstance(conf, WriteDBDriver.class.getSimpleName());
        // 设置 mr 驱动类
        job.setJarByClass(WriteDBDriver.class);

        // 设置 mapper 类
        job.setMapperClass(WriteDBMapper.class);
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(GoodsBean.class);

        // 设置 Reduce 相干
        job.setReducerClass(WriteDBReducer.class);
        job.setOutputKeyClass(GoodsBean.class);
        job.setOutputValueClass(NullWritable.class);

        // 设置以后作业的文件门路
        FileInputFormat.setInputPaths(job, new Path("E:\\ml\\hadoop\\mysqlout"));
        // todo 设置程序输入类
        job.setOutputFormatClass(DBOutputFormat.class);
        // 配置以后作业, 写入数据库表 itheima_goods_mr_write
        DBOutputFormat.setOutput(
                job,
                "itheima_goods_mr_write",
                "goodsId","goodsSn","goodsName","marketPrice","shopPrice","saleNum"
                );
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);

    }
}
5. 运行后果

正文完
 0