乐趣区

Hadoop篇09Hadoop序列化

知之为知之,不知为不知

Hadoop 序列化

序列化概念

​ 序列化就是 把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储(持久化)和网络传输反序列化就是将收到字节序列(或其他数据传输协议)或者是硬盘的持久化数据,转换成内存中的对象。

序列化作用

​“活的”对象只生存在内存里,关机断电就没有了。而且“活的”对象只能由本地的进程使用,不能被发送到网络上的另外一台计算机。然而序列化可以存储“活的”对象,可以将“活的”对象发送到远程计算机

为什么不用 Java 的序列化

​ Java 的序列化是一个 重量级序列化框架 Serializable,一个对象被序列化后,会附带很多额外的信息(各种校验信息,Header,继承体系等),不便于在网络中高效传输。所以,Hadoop 自己开发了一套序列化机制(Writable)。
Hadoop 序列化特点

(1)紧凑:高效使用存储空间。(2)快速:读写数据的额外开销小。(3)可扩展:随着通信协议的升级而可升级(4)互操作:支持多语言的交互

常用数据的序列化类型

Java 类型 Hadoop Writable 类型
boolean BooleanWritable
byte ByteWritable
int IntWritable
float FloatWritable
long LongWritable
double DoubleWritable
String Text
map MapWritable
array ArrayWritable

自定义 bean 对象实现序列化接口步骤

(1)必须实现 Writable 接口

(2)空参构造函数

public FlowBean() {    super();}

(3)重写序列化方法

@Overridepublic void write(DataOutput out) throws IOException {out.writeLong(upFlow);    out.writeLong(downFlow);    out.writeLong(sumFlow);}

(4)重写反序列化方法

@Overridepublic void readFields(DataInput in) throws IOException {upFlow = in.readLong();    downFlow = in.readLong();    sumFlow = in.readLong();}

(5)方法顺序一致

注意反序列化的顺序和序列化的顺序完全一致

(6)重写 toString

要想把结果显示在文件中,需要重写 toString(),可用”t”分开,方便后续用。

(7)实现 Comparable 接口

如果需要将自定义的 bean 放在 key 中传输,则还需要实现 Comparable 接口,因为 MapReduce 框中的 Shuffle 过程要求对 key 必须能排序。

@Overridepublic int compareTo(FlowBean o) {// 倒序排列,从大到小    return this.sumFlow > o.getSumFlow() ? -1 : 1;}

序列化案例实操

需求

统计每一个手机号耗费的总上行流量、下行流量、总流量

输出数据

13560436666         1116              954             2070
手机号码            上行流量        下行流量        总流量

需求分析

Bean 代码

// 1 实现 writable 接口
public class FlowBean implements Writable{

    private long upFlow ;
    private long downFlow;
    private long sumFlow;
    
    //2  反序列化时,需要反射调用空参构造函数,所以必须有
    public FlowBean() {super();
    }

    public FlowBean(long upFlow, long downFlow) {super();
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = upFlow + downFlow;
    }
    
    //3  写序列化方法
    @Override
    public void write(DataOutput out) throws IOException {out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }
    
    //4 反序列化方法
    //5 反序列化方法读顺序必须和写序列化方法的写顺序必须一致
    @Override
    public void readFields(DataInput in) throws IOException {this.upFlow  = in.readLong();
        this.downFlow = in.readLong();
        this.sumFlow = in.readLong();}

    // 6 编写 toString 方法,方便后续打印到文本
    @Override
    public String toString() {return upFlow + "\t" + downFlow + "\t" + sumFlow;}

    public long getUpFlow() {return upFlow;}

    public void setUpFlow(long upFlow) {this.upFlow = upFlow;}

    public long getDownFlow() {return downFlow;}

    public void setDownFlow(long downFlow) {this.downFlow = downFlow;}

    public long getSumFlow() {return sumFlow;}

    public void setSumFlow(long sumFlow) {this.sumFlow = sumFlow;}
}

Mapper 代码

public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean>{FlowBean v = new FlowBean();
    Text k = new Text();
    
    @Override
    protected void map(LongWritable key, Text value, Context context)    throws IOException, InterruptedException {
        
        // 1 获取一行
        String line = value.toString();
        
        // 2 切割字段
        String[] fields = line.split("\t");
        
        // 3 封装对象
        // 取出手机号码
        String phoneNum = fields[1];
        // 取出上行流量和下行流量
        long upFlow = Long.parseLong(fields[fields.length - 3]);
        long downFlow = Long.parseLong(fields[fields.length - 2]);

        k.set(phoneNum);
        v.set(downFlow, upFlow);
        
        // 4 写出
        context.write(k, v);
    }
}

Reducer 代码

public class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> {

    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Context context)throws IOException, InterruptedException {

        long sum_upFlow = 0;
        long sum_downFlow = 0;

        // 1 遍历所用 bean,将其中的上行流量,下行流量分别累加
        for (FlowBean flowBean : values) {sum_upFlow += flowBean.getUpFlow();
            sum_downFlow += flowBean.getDownFlow();}

        // 2 封装对象
        FlowBean resultBean = new FlowBean(sum_upFlow, sum_downFlow);
        
        // 3 写出
        context.write(key, resultBean);
    }
}

Driver 代码

public class FlowsumDriver {public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {
        
// 输入输出路径需要根据自己电脑上实际的输入输出路径设置
args = new String[] { "e:/input/inputflow", "e:/output1"};

        // 1 获取配置信息,或者 job 对象实例
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        // 6 指定本程序的 jar 包所在的本地路径
        job.setJarByClass(FlowsumDriver.class);

        // 2 指定本业务 job 要使用的 mapper/Reducer 业务类
        job.setMapperClass(FlowCountMapper.class);
        job.setReducerClass(FlowCountReducer.class);

        // 3 指定 mapper 输出数据的 kv 类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);

        // 4 指定最终输出的数据的 kv 类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        
        // 5 指定 job 的输入原始文件所在目录
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 7 将 job 中配置的相关参数,以及 job 所用的 java 类所在的 jar 包,提交给 yarn 去运行
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

相关资料

本文配套GitHub:https://github.com/zhutiansam…

退出移动版