关于hadoop:Hadoop之MapReduce四序列化和排序

5次阅读

共计 4564 个字符,预计需要花费 12 分钟才能阅读完成。

1.Hadoop 序列化和自定义实现序列化

序列化就是把内存中的对象,转换成字节序列,或这是其余传入协定,而后进行网络传输或者长久化到磁盘中。
反序列化就是一个相同的过程,将收到的字节序列或者磁盘中长久化的数据,转换成内存中的对象。

hadoop 有一套本人的序列化机制,像后面提到的 BooleanWritable、ByteWritable、IntWritable、FloatWritable、LongWritable、DoubleWritable、Text、MapWritable、ArrayWritable,这些都是 hadoop 实现好的序列化类型,咱们能够间接拿来应用。

但有些时候,这些根本的序列化类型不能满足咱们的需要,咱们就须要本人去实现一个序列化类型。

该怎么去实现?先看看 hadoop 中实现形式,就看 IntWritable

public class IntWritable implements WritableComparable<IntWritable> 

它实现 WritableComparable 这接口,那咱们自定义的类也能够实现这个接口,那么先看看这个接口有哪些要实现的办法

public interface WritableComparable<T> extends Writable, Comparable<T> {}

好难堪,这个接口啥都没有啊,那持续看它继承的接口,有两个。
第一个 Writable 接口

public interface Writable {
    // 序列化办法
    void write(DataOutput var1) throws IOException;
    // 反序列化办法
    void readFields(DataInput var1) throws IOException;
}

两个要实现的办法,一个序列化办法和一个反序列化办法

而后咱们第二个接口

public interface Comparable<T> {public int compareTo(T o);
}

为什么要实现这个接口?

在 MapReduce 的 shuffle 过程要求 key 必须可能排序,所以如果自定义序列化类须要放在 key 中传输,那么就要实现这个接口中的办法,当然如果不须要,就能够略过了。

示例,实现一个能够序列化的 StuInfoBean

public class StuInfoBean implements WritableComparable<StuInfoBean> {
    private Integer stuid;
    private String name;
    private Integer age;
    private Boolean sex;

    public StuInfoBean(){}

    public Integer getStuid() {return stuid;}

    public void setStuid(Integer stuid) {this.stuid = stuid;}

    public String getName() {return name;}

    public void setName(String name) {this.name = name;}

    public Integer getAge() {return age;}

    public void setAge(Integer age) {this.age = age;}

    public Boolean getSex() {return sex;}

    public void setSex(Boolean sex) {this.sex = sex;}

    @Override
    public String toString() {
        return "StuInfoBean{" +
                "stuid=" + stuid +
                ", name='" + name + '\'' +
                ", age=" + age +
                ", sex=" + sex +
                '}';
    }

    /**
     * 如果须要就实现,不须要就略过
     */
    @Override
    public int compareTo(StuInfoBean o) {return this.stuid.compareTo(o.getStuid());
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {dataOutput.writeInt(stuid);
        dataOutput.writeUTF(name);
        dataOutput.writeInt(age);
        dataOutput.writeBoolean(sex);
    }

    /**
     * 这里要留神,反序列的程序要和序列化保持一致
     */
    @Override
    public void readFields(DataInput dataInput) throws IOException {stuid = dataInput.readInt();
        name = dataInput.readUTF();
        age = dataInput.readInt();
        sex = dataInput.readBoolean();}
}

2. 排序

MapTask 收集咱们的 map() 办法输入的 kv 对,放到内存缓冲区中,这是一个环形数据结构,其实就是一个字节数组,叫 kvbuffer。
外面寄存着咱们的数据及索引数据 kvmeta。
kvbuffer 的大小尽管能够设置,但终归是无限的,当写满的时候,内存中的数据就会刷到磁盘上,这个过程就是溢写,溢写产生的文件可能会不止一个,多个溢出文件会被合并成大的溢出文件。
在这个过程中,都要调用 Partitioner 进行分区和针对 key 进行排序 sort。
分区之前曾经说了,这里就说下排序 sort。

MapTask 和 ReduceTask 均会对数据依照 key 进行排序,任何应用程序中的数据不论业务逻辑是否须要,在 hadoop 中均会被排序。

默认是依照字典排序,实现形式是快排。

自定义类型排序的实现,就是后面 compareTo 办法。

  @Override
    public int compareTo(StuInfoBean o) {return this.stuid.compareTo(o.getStuid());
    }

3. 规约(Combiner)

这个也是合并,每一个 map 都可能会产生大量的本地输入,Combiner 的作用就是对 map 端的输入先做一次合并,以缩小在 map 和 reduce 节点之间的数据传输量,以进步网络 IO 性能,是 MapReduce 的一种优化伎俩之一。
但有个前提就是不能扭转业务最终逻辑,Combiner 的输入 kv 应该跟 Reducer 的输出 kv 类型要对应起来。

Combiner 是 MR 程序中 Mapper 和 Reducer 之外的一种组件,它的父类就是 Reduce。

举个例子

class1_aaa 50
class2_bbb 100
class3_ccc 80
class1_ddd 10
class2_eee 100
class3_fff 70
class1_hhh 150
class2_lll 100
class3_www 80

需要求出每个属性值的和,那么咱们能够自定义 MyCombiner,先部分求和,最初汇总到 Reduce 求和。

mapper 代码

public class ClMap extends Mapper<LongWritable, Text,Text, IntWritable> {
    // 输入的 k 和 v
    Text outk = new Text();
    IntWritable outv = new IntWritable();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();
        String[] contents = line.split(" ");
        String outkey = contents[0].split("_")[0];

        outk.set(outkey);
        outv.set(Integer.parseInt(contents[1]));

        context.write(outk,outv);
    }
}

MyCombiner 代码

public class MyCombiner extends Reducer<Text, IntWritable,Text,IntWritable> {private IntWritable v= new IntWritable();
    
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable value : values) {sum+=value.get();
        }
        v.set(sum);
        context.write(key,v);
    }
}

Reduce 代码

public class ClReduce extends Reducer<Text, IntWritable,Text,IntWritable> {//    private Text outk = new Text();
    private IntWritable outv = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {for (IntWritable value : values) {outv.set(value.get());
        }
        context.write(key,outv);
    }
}

Driver 代码

public class ClDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();

        Job job = Job.getInstance(conf);

        job.setJarByClass(ClDriver.class);

        job.setMapperClass(ClMap.class);
        job.setReducerClass(ClReduce.class);

        job.setMapOutputValueClass(IntWritable.class);
        
        // 设置应用 MyCombiner
        job.setCombinerClass(MyCombiner.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        boolean b = job.waitForCompletion(true);

        System.exit(b ? 0:1);

    }
}
正文完
 0