一. 背景常识

通常组织会应用关系型数据来存储业务相干的数据,但随着数据的规模越来越大,尤其是像MySQL这种,在单表超过5千万条记录时,只管对表应用了特定的存储引擎和索引优化,但仍然不可避免的存在性能降落问题。
此时,咱们**能够通过应用MapReduce从MySQL中定期迁徙应用频率较低的历史数据到HDFS中,一方面能够升高对MySQL的存储和计算负载,另一方面,通过分布式计算引擎能够更加高效的解决过来的历史数据。

对于MapReduce框架来说,应用inputform进行数据读取操作,读取的数据首先由mapper解决,而后交给reducer解决,最终应用outputformat进行数据的输入操作。默认状况下,输入输出的组件实现都是针对文本数据处理的,别离是TextInputFormat、TextOutputFormat。
为了不便 MapReduce 间接拜访关系型数据库(Mysql,Oracle),Hadoop提供了DBInputFormat和DBOutputFormat两个类。其中DBInputFormat负责从数据库中读取数据,而DBOutputFormat负责把数据最终写入数据库中。

二. 读取数据库操作

1. 需要

在mysql中itcast_shop数据库下创立表itheima_goods并加载数据到表中。要求应用MapReduce程序将表中的数据导出寄存在指定的文件下。
数据库:
链接:https://pan.baidu.com/s/1ImrI...
提取码:pz9b

因为波及到java操作mysql,因而须要在pom依赖中额定增加mysql-jdbc驱动。

    <dependency>        <groupId>mysql</groupId>        <artifactId>mysql-connector-java</artifactId>        <version>5.1.32</version>    </dependency>

2. DBInputFormat类

InputFormat类用于从SQL表读取数据。DBInputFormat底层一行一行读取表中的数据,返回<k,v>键值对。其中k是LongWritable类型,表中数据的记录行号,从0开始,v是DBWritable类型,示意该行数据对应的对象类型。
此外还须要应用setInput办法设置SQL查问的语句相干信息。

3. 代码实现

1. 编写GoodsBean类

定义GoodsBean的实体类,用于封装查问返回的后果(如果要查问表的所有字段,那么属性就跟表的字段一一对应即可)。并且须要实现序列化机制Writable。
此外,从数据库读取/写入数据库的对象应实现DBWritable。 DBWritable与Writable类似,区别在于write(PreparedStatement)办法采纳PreparedStatement,而readFields(ResultSet)采纳ResultSet。

package com.uuicon.sentiment_upload.db;import org.apache.hadoop.io.Writable;import org.apache.hadoop.mapred.lib.db.DBWritable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.SQLException;public class GoodsBean implements Writable, DBWritable {    private Long goodsId;    private String goodsSn;    private String goodsName;    private Double marketPrice;    private Double shopPrice;    private Long saleNum;    @Override    public String toString() {        return goodsId + "\t" + goodsSn + '\t' + goodsName + "\t" + marketPrice + "\t" + shopPrice + "\t" + saleNum;    }    public Long getGoodsId() {        return goodsId;    }    public void setGoodsId(Long goodsId) {        this.goodsId = goodsId;    }    public String getGoodsSn() {        return goodsSn;    }    public void setGoodsSn(String goodsSn) {        this.goodsSn = goodsSn;    }    public String getGoodsName() {        return goodsName;    }    public void setGoodsName(String goodsName) {        this.goodsName = goodsName;    }    public Double getMarketPrice() {        return marketPrice;    }    public void setMarketPrice(Double marketPrice) {        this.marketPrice = marketPrice;    }    public Double getShopPrice() {        return shopPrice;    }    public void setShopPrice(Double shopPrice) {        this.shopPrice = shopPrice;    }    public Long getSaleNum() {        return saleNum;    }    public void setSaleNum(Long saleNum) {        this.saleNum = saleNum;    }    public GoodsBean() {    }    public GoodsBean(Long goodsId, String goodsSn, String goodsName, Double marketPrice, Double shopPrice, Long saleNum) {        this.goodsId = goodsId;        this.goodsSn = goodsSn;        this.goodsName = goodsName;        this.marketPrice = marketPrice;        this.shopPrice = shopPrice;        this.saleNum = saleNum;    }    public void set(Long goodsId, String goodsSn, String goodsName, Double marketPrice, Double shopPrice, Long saleNum) {        this.goodsId = goodsId;        this.goodsSn = goodsSn;        this.goodsName = goodsName;        this.marketPrice = marketPrice;        this.shopPrice = shopPrice;        this.saleNum = saleNum;    }    @Override    public void write(DataOutput out) throws IOException {        out.writeLong(goodsId);        out.writeUTF(goodsSn);        out.writeUTF(goodsName);        out.writeDouble(marketPrice);        out.writeDouble(shopPrice);        out.writeLong(saleNum);    }    @Override    public void readFields(DataInput in) throws IOException {        this.goodsId = in.readLong();        this.goodsSn = in.readUTF();        this.goodsName = in.readUTF();        this.marketPrice = in.readDouble();        this.shopPrice = in.readDouble();        this.saleNum = in.readLong();    }    @Override    public void write(PreparedStatement ps) throws SQLException {        ps.setLong(1, goodsId);        ps.setString(2, goodsSn);        ps.setString(3, goodsName);        ps.setDouble(4, marketPrice);        ps.setDouble(5, shopPrice);        ps.setLong(6, saleNum);    }    @Override    public void readFields(ResultSet resultSet) throws SQLException {        this.goodsId = resultSet.getLong(1);        this.goodsSn = resultSet.getString(2);        this.goodsName = resultSet.getString(3);        this.marketPrice = resultSet.getDouble(4);        this.shopPrice = resultSet.getDouble(5);        this.saleNum = resultSet.getLong(6);    }}
2. 编写Mapper类
package com.uuicon.sentiment_upload.db;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class ReadDBMapper extends Mapper<LongWritable, GoodsBean, LongWritable, Text> {    Text outValue = new Text();    @Override    protected void map(LongWritable key, GoodsBean value, Context context) throws IOException, InterruptedException {        outValue.set(value.toString());        context.write(key, outValue);    }}
3. 创立程序驱动类
package com.uuicon.sentiment_upload.db;import com.uuicon.sentiment_upload.covidtopn.CovidTopNDriver;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapred.lib.db.DBConfiguration;import org.apache.hadoop.mapred.lib.db.DBInputFormat;import org.apache.hadoop.mapreduce.Job;import java.io.File;import java.io.IOException;public class ReadDBDriver {    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {        Configuration conf = new Configuration();        //数据库信息        DBConfiguration.configureDB(conf,                "com.mysql.jdbc.Driver",                "jdbc:mysql://localhost:3306/itcast_goods",                "root",                "root"        );        // 创立作业类        Job job = Job.getInstance(conf, ReadDBDriver.class.getSimpleName());        //设置mr 驱动类        job.setJarByClass(ReadDBDriver.class);        //设置mapper 类        job.setMapperClass(ReadDBMapper.class);        job.setOutputKeyClass(LongWritable.class);        job.setOutputValueClass(Text.class);        // todo 设置输出组件        FileOutputFormat.setOutputPath(job,                new Path("E:\\ml\\hadoop\\mysqlout"));        //设置Reducer 类 ,todo 本例不须要reduce ,操作形式是将tasknumber 设置为 0        job.setNumReduceTasks(0);        // todo 设置输出组件        job.setInputFormatClass(DBInputFormat.class);        //增加读取数据库相干参数        DBInputFormat.setInput(                job,                GoodsBean.class,                "select goodsId ,goodsSn,goodsName,marketPrice ,shopPrice , saleNum from itheima_goods",                "select count(goodsId) from itheima_goods"        );        boolean b = job.waitForCompletion(true);        System.exit(b ? 0 : 1);    }}
4. 运行程序

间接在驱动类中右键运行main办法,应用MapReduce的本地模式执行。也能够将程序应用maven插件打包成jar包,提交到yarn上进行分布式运行。

3. 输入到数据库操作

1. 需要

有一份结构化的数据文件,数据内容对应着mysql中一张表的内容,要求应用MapReduce程序将文件的内容读取写入到mysql中。
就以上例的输入文件作为结构化文件,上面在mysql中创立对应的表构造。
表构造:

CREATE TABLE `itheima_goods_mr_write` (  `goodsId` bigint(11) NOT NULL AUTO_INCREMENT COMMENT '商品id',  `goodsSn` varchar(20) NOT NULL COMMENT '商品编号',  `goodsName` varchar(200) NOT NULL COMMENT '商品名称',  `marketPrice` decimal(11,2) NOT NULL DEFAULT '0.00' COMMENT '市场价',  `shopPrice` decimal(11,2) NOT NULL DEFAULT '0.00' COMMENT '门店价',  `saleNum` int(11) NOT NULL DEFAULT '0' COMMENT '总销售量',  PRIMARY KEY (`goodsId`)) ENGINE=InnoDB AUTO_INCREMENT=115909 DEFAULT CHARSET=utf8;

2. DBOutputFormat类

OutputFormat,它将reduce输入发送到SQL表。DBOutputFormat承受<key,value>键值对,其中key必须具备扩大DBWritable的类型。
此外还须要应用setOutput办法设置SQL插入语句相干信息,比方表、字段等。

3. 代码实现

1. 编写GoodsBean类

定义GoodsBean的实体类,用于封装插入表中的数据(对象属性跟表的字段一一对应即可)。并且须要实现序列化机制Writable。
此外,从数据库读取/写入数据库的对象应实现DBWritable。 DBWritable与Writable类似,区别在于write(PreparedStatement)办法采纳PreparedStatement,而readFields(ResultSet)采纳ResultSet。

package com.uuicon.sentiment_upload.db;import org.apache.hadoop.io.Writable;import org.apache.hadoop.mapred.lib.db.DBWritable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.SQLException;public class GoodsBean implements Writable, DBWritable {    private Long goodsId;    private String goodsSn;    private String goodsName;    private Double marketPrice;    private Double shopPrice;    private Long saleNum;    @Override    public String toString() {        return goodsId + "\t" + goodsSn + '\t' + goodsName + "\t" + marketPrice + "\t" + shopPrice + "\t" + saleNum;    }    public Long getGoodsId() {        return goodsId;    }    public void setGoodsId(Long goodsId) {        this.goodsId = goodsId;    }    public String getGoodsSn() {        return goodsSn;    }    public void setGoodsSn(String goodsSn) {        this.goodsSn = goodsSn;    }    public String getGoodsName() {        return goodsName;    }    public void setGoodsName(String goodsName) {        this.goodsName = goodsName;    }    public Double getMarketPrice() {        return marketPrice;    }    public void setMarketPrice(Double marketPrice) {        this.marketPrice = marketPrice;    }    public Double getShopPrice() {        return shopPrice;    }    public void setShopPrice(Double shopPrice) {        this.shopPrice = shopPrice;    }    public Long getSaleNum() {        return saleNum;    }    public void setSaleNum(Long saleNum) {        this.saleNum = saleNum;    }    public GoodsBean() {    }    public GoodsBean(Long goodsId, String goodsSn, String goodsName, Double marketPrice, Double shopPrice, Long saleNum) {        this.goodsId = goodsId;        this.goodsSn = goodsSn;        this.goodsName = goodsName;        this.marketPrice = marketPrice;        this.shopPrice = shopPrice;        this.saleNum = saleNum;    }    public void set(Long goodsId, String goodsSn, String goodsName, Double marketPrice, Double shopPrice, Long saleNum) {        this.goodsId = goodsId;        this.goodsSn = goodsSn;        this.goodsName = goodsName;        this.marketPrice = marketPrice;        this.shopPrice = shopPrice;        this.saleNum = saleNum;    }    @Override    public void write(DataOutput out) throws IOException {        out.writeLong(goodsId);        out.writeUTF(goodsSn);        out.writeUTF(goodsName);        out.writeDouble(marketPrice);        out.writeDouble(shopPrice);        out.writeLong(saleNum);    }    @Override    public void readFields(DataInput in) throws IOException {        this.goodsId = in.readLong();        this.goodsSn = in.readUTF();        this.goodsName = in.readUTF();        this.marketPrice = in.readDouble();        this.shopPrice = in.readDouble();        this.saleNum = in.readLong();    }    @Override    public void write(PreparedStatement ps) throws SQLException {        ps.setLong(1, goodsId);        ps.setString(2, goodsSn);        ps.setString(3, goodsName);        ps.setDouble(4, marketPrice);        ps.setDouble(5, shopPrice);        ps.setLong(6, saleNum);    }    @Override    public void readFields(ResultSet resultSet) throws SQLException {        this.goodsId = resultSet.getLong(1);        this.goodsSn = resultSet.getString(2);        this.goodsName = resultSet.getString(3);        this.marketPrice = resultSet.getDouble(4);        this.shopPrice = resultSet.getDouble(5);        this.saleNum = resultSet.getLong(6);    }}
2. Mapper 类
package com.uuicon.sentiment_upload.db;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Counter;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class WriteDBMapper extends Mapper<LongWritable, Text, NullWritable, GoodsBean> {    NullWritable outKey = NullWritable.get();    GoodsBean outValue = new GoodsBean();    @Override    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {        Counter sc = context.getCounter("mr_to_sql", "SUCCESS");        Counter fc = context.getCounter("mr_to_sql", "FAILED");        String[] fields = value.toString().split("\\s+");        if (fields.length > 6) {            // 失常数据            outValue.set(                    Long.parseLong(fields[1]),                    fields[2],                    fields[3],                    Double.parseDouble(fields[4]),                    Double.parseDouble(fields[5]),                    Long.parseLong(fields[6])            );            context.write(outKey, outValue);            sc.increment(1);        } else {            // 异样数据            fc.increment(1);        }    }}
3. reudce 类
package com.uuicon.sentiment_upload.db;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/** * todo 在应用DBOutputFormat 的时候,要求输入的key 必须实现DBWritable 因为只会把key写入数据库 */public class WriteDBReducer extends Reducer<NullWritable, GoodsBean, GoodsBean, NullWritable> {    @Override    protected void reduce(NullWritable key, Iterable<GoodsBean> values, Context context) throws IOException, InterruptedException {        for (GoodsBean value : values) {            context.write(value, key);        }    }}
4. 驱动类
package com.uuicon.sentiment_upload.db;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapred.lib.db.DBConfiguration;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;public class WriteDBDriver {    public static void main(String[] args) throws Exception {        Configuration conf = new Configuration();        //数据库信息        DBConfiguration.configureDB(conf,                "com.mysql.jdbc.Driver",                "jdbc:mysql://localhost:3306/itcast_goods?useUnicode=true&characterEncoding=utf8",                "root",                "root"        );        // 创立作业类        Job job = Job.getInstance(conf, WriteDBDriver.class.getSimpleName());        //设置mr 驱动类        job.setJarByClass(WriteDBDriver.class);        //设置mapper 类        job.setMapperClass(WriteDBMapper.class);        job.setMapOutputKeyClass(NullWritable.class);        job.setMapOutputValueClass(GoodsBean.class);        //设置 Reduce 相干        job.setReducerClass(WriteDBReducer.class);        job.setOutputKeyClass(GoodsBean.class);        job.setOutputValueClass(NullWritable.class);        // 设置以后作业的文件门路        FileInputFormat.setInputPaths(job, new Path("E:\\ml\\hadoop\\mysqlout"));        // todo 设置程序输入类        job.setOutputFormatClass(DBOutputFormat.class);        // 配置以后作业,写入数据库表 itheima_goods_mr_write        DBOutputFormat.setOutput(                job,                "itheima_goods_mr_write",                "goodsId","goodsSn","goodsName","marketPrice","shopPrice","saleNum"                );        boolean b = job.waitForCompletion(true);        System.exit(b ? 0 : 1);    }}
5. 运行后果