思考

想到全局排序,是否第一想到的是,从map端收集数据,shuffle到reduce来,设置一个reduce,再对reduce中的数据排序,显然这样和单机器并没有什么区别,要晓得mapreduce框架默认是对key来排序的,当然也能够将value放到key下面来达到对value排序,最初在reduce时候对调回去,另外排序是针对雷同分区,即一个reduce来排序的,这样其实也不能充分运用到集群的并行,那么如何更优雅地实现全局排序呢?

摘要

hadoop中的排序分为局部排序,全局排序,辅助排序,二次排序等,本文次要介绍如何实现key全局排序,共有三种实现形式:

  1. 设置一个reduce
  2. 利用自定义partition 将数据按程序分批次分流到多个分区
  3. 利用框架自实现TotalOrderPartitioner 分区器来实现

实现

首先筹备一些输出数据:https://github.com/hulichao/b...,如下,

/data/job/file.txt232654321575665223

通过设置一 个reduce来实现全局排序

利用一个reduce来实现全局排序,能够说不须要做什么特地的操作,mapper,reduce,driver实现如下:

package com.hoult.mr.job;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class JobMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable> {    @Override    protected void map(LongWritable key, Text value,                       Context context) throws IOException, InterruptedException {        IntWritable intWritable = new IntWritable(Integer.parseInt(value.toString()));        context.write(intWritable, intWritable);    }}
package com.hoult.mr.job;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class JobReducer  extends        Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {    private int index = 0;//全局排序计数器    @Override    protected void reduce(IntWritable key, Iterable<IntWritable> values,                          Context context) throws IOException, InterruptedException {        for (IntWritable value : values)            context.write(new IntWritable(++index), value);    }}
package com.hoult.mr.job;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.*;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;public class JobDriver extends Configured implements Tool {    @Override    public int run(String[] args) throws Exception {        if (args.length != 2) {            System.err.println("input-path output-path");            System.exit(1);        }        Job job = Job.getInstance(getConf());        job.setJarByClass(JobDriver.class);        FileInputFormat.addInputPath(job, new Path(args[0]));        FileOutputFormat.setOutputPath(job, new Path(args[1]));        job.setMapperClass(JobMapper.class);        job.setReducerClass(JobReducer.class);        job.setMapOutputKeyClass(IntWritable.class);        job.setMapOutputValueClass(IntWritable.class);        job.setOutputKeyClass(IntWritable.class);        job.setOutputValueClass(NullWritable.class);        //应用一个reduce来排序        job.setNumReduceTasks(1);        job.setJobName("JobDriver");        return job.waitForCompletion(true) ? 0 : 1;    }    public static void main(String[] args)throws Exception{//        int exitCode = ToolRunner.run(new JobDriver(), args);        int exitCode = ToolRunner.run(new JobDriver(), new String[] {"data/job/", "data/job/output"});        System.exit(exitCode);    }}
//加了排序索引,最初输入一个文件,内容如下:1    22    63    154    225    266    327    328    549    9210    65011    65412    75613    595614    65223

PS; 以上通过hadoop自带的ToolRunner工具来启动工作,后续代码波及到反复的不再列出,只针对差异性的代码。

利用自定义partition 将数据按程序分批次分流到多个分区

通过自定义分区如何保证数据的全局有序呢?咱们晓得key值分区,会通过默认分区函数HashPartition将不同范畴的key发送到不同的reduce,所以利用这一点,这样来实现分区器,例如有数据分布在1-1亿,能够将1-1000万的数据让reduce1来跑,1000万+1-2000万的数据来让reduce2来跑。。。。最初能够对这十个文件,按程序组合即可失去所有数据按分区有序的全局排序数据,因为数据量较小,采纳分11个分区,别离是1-1000,10001-2000,。跟第一种形式实现不同的有上面两个点,

//partitionner实现package com.hoult.mr.job;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.mapreduce.Partitioner;/** * @author hulichao * @date 20-9-20 **/public class JobPartitioner extends Partitioner<IntWritable, IntWritable> {    @Override    public int getPartition(IntWritable key, IntWritable value, int numPartitions) {        int keyValue = Integer.parseInt(key.toString());        for (int i = 0; i < 10; i++) {            if (keyValue < 1000 * (i+1) && keyValue >= 1000 * (i-1)) {                System.out.println("key:" + keyValue + ", part:" + i);                return i;            }        }        return 10;    }}//driver处须要减少:        //设置自定义分区器        job.setPartitionerClass(JobPartitioner.class);        //driver处须要批改reduce数量        job.setNumReduceTasks(10);

执行程序,后果会产生10个文件,文件内有序。

part-r-00000part-r-00001part-r-00002part-r-00003part-r-00004part-r-00005part-r-00006part-r-00007part-r-00008part-r-00009

留神:须要留神一点,partition含有数据的分区要小于等于reduce数,否则会包Illegal partiion谬误。另外毛病分区的实现如果对数据晓得较少可能会导致数据歪斜和OOM问题。

利用框架自实现TotalOrderPartitioner 分区器来实现

既然想到了第二种自定义形式,其实能够解决少数歪斜问题,然而实际上,在数据分布不理解之前,对数据的散布评估,只能去试,看后果值有哪些,进而自定义分区器,这不就是取样吗,针对取样而后实现分区器这种形式,hadoop曾经帮咱们实现好了,并且解决了数据歪斜和OOM 问题,那就是TotalOrderPartitioner类,其类提供了数据采样器,对key值进行局部采样,而后依照采样后果寻找key值的最佳宰割点,从而将key均匀分布在不同分区中。

TotalOrderPartitioner提供了三个采样器如下:

  • SplitSampler 分片采样器,从数据分片中采样数据,该采样器不适宜曾经排好序的数据
  • RandomSampler随机采样器,依照设置好的采样率从一个数据集中采样
  • IntervalSampler距离采样机,以固定的距离从分片中采样数据,对于曾经排好序的数据成果十分好

采样器实现了K[] getSample(InputFormat<K,V> info, Job job) 办法,返回的是采样数组,其中InputFormat是map输出端后面的输出辅助类,依据返回的K[]的长度进而生成数组长度-1个partition,最初依照宰割点范畴将对应数据发送到相应分区中。

代码实现:

//mapper和driver的类型略有不同package com.hoult.mr.job.totalsort;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/** * @author hulichao * @date 20-9-20 **/public class TotalMapper extends Mapper<Text, Text, Text, IntWritable> {    @Override    protected void map(Text key, Text value,                       Context context) throws IOException, InterruptedException {        System.out.println("key:" + key.toString() + ", value:" + value.toString());        context.write(key, new IntWritable(Integer.parseInt(key.toString())));    }}
package com.hoult.mr.job.totalsort;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/** * @author hulichao * @date 20-9-20 **/public class TotalReducer extends Reducer<Text, IntWritable, IntWritable, NullWritable> {    @Override    protected void reduce(Text key, Iterable<IntWritable> values,                          Context context) throws IOException, InterruptedException {        for (IntWritable value : values)            context.write(value, NullWritable.get());    }}
//比拟器package com.hoult.mr.job.totalsort;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.WritableComparator;/** * 自定义比拟器来比拟key的程序 * @author hulichao * @date 20-9-20 **/public class KeyComparator extends WritableComparator {    protected KeyComparator() {        super(Text.class, true);    }    @Override    public int compare(WritableComparable w1, WritableComparable w2) {        int num1 = Integer.valueOf(w1.toString());        int num2 = Integer.valueOf(w2.toString());        return num1 - num2;    }}
package com.hoult.mr.job.totalsort;//driver 实现import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.partition.InputSampler;import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;/** * @author hulichao * @date 20-9-20 **/public class TotalDriver extends Configured implements Tool {    @Override    public int run(String[] args) throws Exception {        Configuration conf = new Configuration();        //设置非分区排序        conf.set("mapreduce.totalorderpartitioner.naturalorder", "false");        Job job = Job.getInstance(conf, "Total Driver");        job.setJarByClass(TotalDriver.class);        //设置读取文件的门路,都是从HDFS中读取。读取文件门路从脚本文件中传进来        FileInputFormat.addInputPath(job,new Path(args[0]));        //设置mapreduce程序的输入门路,MapReduce的后果都是输出到文件中        FileOutputFormat.setOutputPath(job,new Path(args[1]));        job.setInputFormatClass(KeyValueTextInputFormat.class);        //设置比拟器,用于比拟数据的大小,而后按程序排序,该例子次要用于比拟两个key的大小        job.setSortComparatorClass(KeyComparator.class);        job.setNumReduceTasks(10);//设置reduce数量        job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(IntWritable.class);        job.setOutputKeyClass(IntWritable.class);        job.setOutputValueClass(NullWritable.class);        //设置保留partitions文件的门路        TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), new Path(args[2]));        //key值采样,0.01是采样率,        InputSampler.Sampler<Text, Text> sampler = new InputSampler.RandomSampler<>(0.1, 3, 100);        //将采样数据写入到分区文件中        InputSampler.writePartitionFile(job, sampler);        job.setMapperClass(TotalMapper.class);        job.setReducerClass(TotalReducer.class);        //设置分区类。        job.setPartitionerClass(TotalOrderPartitioner.class);        return job.waitForCompletion(true) ? 0 : 1;    }    public static void main(String[] args)throws Exception{//        int exitCode = ToolRunner.run(new TotalDriver(), new String[] {"data/job/input", "data/job/output", "data/job/partition","data/job/partitio2"});        int exitCode = ToolRunner.run(new TotalDriver(), args);        System.exit(exitCode);    }}

后果和第二种实现相似,须要留神只在集群测试时候才无效,本地测试可能会报错

2020-09-20 16:36:10,664 WARN [org.apache.hadoop.util.NativeCodeLoader] - Unable to load native-hadoop library for your platform... using builtin-java classes where applicableException in thread "main" java.lang.ArrayIndexOutOfBoundsException: 0    at com.hoult.mr.job.totalsort.TotalDriver.run(TotalDriver.java:32)    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76)    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:90)    at com.hoult.mr.job.totalsort.TotalDriver.main(TotalDriver.java:60)

吴邪,小三爷,混迹于后盾,大数据,人工智能畛域的小菜鸟。
更多请关注
<center> <img src="https://raw.githubusercontent.com/hulichao/myblog_pic/master/blog/wechat.png" width="40%" /> </center>