一.MapReduce mapReduce Patition Combiner
1. Partition分区
1.默认状况
在默认状况下,不论map阶段有多少个并发执行task,到reduce阶段,所有的后果都将有一个reduce来解决,并且最终后果输入到一个文件中。
默认状况下MapReduce 执行流程:
2. 批改ReduceTask个数
在MapReduce程序的驱动类中,通过job提供的办法,能够批改reducetask的个数。
应用api批改reducetask个数之后,输入后果文件的个数和reducetask个数对应。也就是说有几个ReduceTask,最终输入文件就是几个,比方设置为6个,此时的输入后果如下所示:
此时,MapReduce的执行流程如下所示:
3. Partition数据分区概念
- 默认状况下,MapReduce是只有一个reducetask来进行数据的解决。这就使得不论输出的数据量多大,最终的后果都是输入到一个文件中。
- 当扭转reducetask个数的时候,作为maptask就会波及到数据分区的问题:MapTask输入的后果如何调配给各个ReduceTask来解决。
4. partition默认规定
MapReduce默认分区规定是HashPartitioner。跟map输入的数据key无关。
5. partition注意事项
- reducetask个数的扭转导致了数据分区的产生,而不是有数据分区导致了reducetask个数扭转。
- 数据分区的外围是分区规定。即如何调配数据给各个reducetask。
- 默认的规定能够保障只有map阶段输入的key一样,数据就肯定能够分区到同一个reducetask,然而不能保证数据均匀分区。
- reducetask个数的扭转还会导致输入后果文件不再是一个整体,而是输入到多个文件中。
2. Combiner 规约
1.数据规约的含意
数据归约是指在尽可能保持数据原貌的前提下,最大限度地精简数据量。
2. MapReduce 弊病
- MapReduce是一种具备两个执行阶段的分布式计算程序,Map阶段和Reduce阶段之间会波及到跨网络数据传递。
- 每一个MapTask都可能会产生大量的本地输入,这就导致跨网络传输数据质变大,网络IO性能低。比方WordCount单词统计案例,如果文件中有1000个单词,其中999个为hello,这将产生999个<hello,1>的键值对在网络中传递,性能及其低下。
3. Combiner 组件概念
- Combiner中文叫做数据规约,是MapReduce的一种优化伎俩。
- Combiner的作用就是对map端的输入先做一次部分合并,以缩小在map和reduce节点之间的数据传输量。
4. Combiner 组件应用
- Combiner是MapReduce程序中除了Mapper和Reducer之外的一种组件,默认状况下不启用。
- Combiner实质就是Reducer,combiner和reducer的区别在于运行的地位:
combiner是在每一个maptask所在的节点本地运行,是部分聚合;reducer是对所有maptask的输入后果计算,是全局聚合。 - 具体实现步骤:
自定义一个CustomCombiner类,继承Reducer,重写reduce办法
job.setCombinerClass(CustomCombiner.class)
5. Combiner 应用注意事项
- Combiner可能利用的前提是不能影响最终的业务逻辑,而且,Combiner的输入kv应该跟reducer的输出kv类型要对应起来。
- 下述场景禁止应用Combiner,因为这样不仅优化了网络传输数据量,还扭转了最终的执行后果
- 业务和数据个数相干的。
- 业务和整体排序相干的。
- Combiner组件不是禁用,而是慎用。用的好进步程序性能,用不好,改变程序后果且不易发现。
二. MapReduce 编程指南
1. MapReduce 编程技巧概述
- MapReduce执行流程了然于心,可能晓得数据在MapReduce中流转过程。
- 业务需要解读精确,即须要明确做什么。
- 牢牢把握住key的抉择,因为MapReduce很多行为跟key相干,比方:排序、分区、分组。
- 学会自定义组件批改默认行为,当默认的行为不满足业务需要,能够尝试自定义规定。
- 通过画图梳理业务执行流程,确定每个阶段的数据类型。
2. MapReduce执行流程梳理
1. Map阶段执行过程
- 第一阶段是把输出目录下文件依照肯定的规范一一进行逻辑切片,造成切片布局。默认状况下,Split size = Block size。每一个切片由一个MapTask解决。(getSplits)
- 第二阶段是对切片中的数据依照肯定的规定解析成<key,value>对。默认规定是把每一行文本内容解析成键值对。key是每一行的起始地位(单位是字节),value是本行的文本内容。(TextInputFormat)
- 第三阶段是调用Mapper类中的map办法。上阶段中每解析进去的一个<k,v>,调用一次map办法。每次调用map办法会输入零个或多个键值对。
- 第四阶段是依照肯定的规定对第三阶段输入的键值对进行分区。默认是只有一个区。分区的数量就是Reducer工作运行的数量。默认只有一个Reducer工作。
- 第五阶段是对每个分区中的键值对进行排序。首先,依照键进行排序,对于键雷同的键值对,依照值进行排序。比方三个键值对<2,2>、<1,3>、<2,1>,键和值别离是整数。那么排序后的后果是<1,3>、<2,1>、<2,2>。如果有第六阶段,那么进入第六阶段;如果没有,间接输入到文件中。
- 第六阶段是对数据进行部分聚合解决,也就是combiner解决。键相等的键值对会调用一次reduce办法。通过这一阶段,数据量会缩小。本阶段默认是没有的。
2. Map阶段执行过程
- 第一阶段是Reducer工作会被动从Mapper工作复制其输入的键值对。Mapper工作可能会有很多,因而Reducer会复制多个Mapper的输入。
- 第二阶段是把复制到Reducer本地数据,全副进行合并,即把扩散的数据合并成一个大的数据。再对合并后的数据排序。
- 第三阶段是对排序后的键值对调用reduce办法。键相等的键值对调用一次reduce办法,每次调用会产生零个或者多个键值对。最初把这些输入的键值对写入到HDFS文件中。
1. key 的重要性体现
在MapReduce编程中,外围是牢牢把握住每个阶段的输入输出key是什么。因为mr中很多默认行为都跟key相干。
- 排序:key的字典序a-z 正序
- 分区:key.hashcode % reducetask 个数
- 分组:key雷同的分为一组
- 最重要的是,如果感觉默认的行为不满足业务需要,MapReduce还反对自定义排序、分区、分组的规定,这将使得编程更加灵便和不便。
三.美国新冠疫情Covid-19 病例统计
现有美国2021-1-28号,各个县county的新冠疫情累计案例信息,包含确诊病例和死亡病例,数据格式如下所示:
2021-01-28,Juneau City and Borough,Alaska,02110,1108,32021-01-28,Kenai Peninsula Borough,Alaska,02122,3866,182021-01-28,Ketchikan Gateway Borough,Alaska,02130,272,12021-01-28,Kodiak Island Borough,Alaska,02150,1021,52021-01-28,Kusilvak Census Area,Alaska,02158,1099,32021-01-28,Lake and Peninsula Borough,Alaska,02164,5,02021-01-28,Matanuska-Susitna Borough,Alaska,02170,7406,272021-01-28,Nome Census Area,Alaska,02180,307,02021-01-28,North Slope Borough,Alaska,02185,973,32021-01-28,Northwest Arctic Borough,Alaska,02188,567,12021-01-28,Petersburg Borough,Alaska,02195,43,0
字段含意如下:date(日期),county(县),state(州),fips(县编码code),cases(累计确诊病例),deaths(累计死亡病例)。
1. 案例一: 各州累计病例数量统计
统计美国2021-01-28,每个州state累计确诊案例数、累计死亡案例数。
1.需要剖析
- 自定义对象CovidCountBean,用于封装每个县的确诊病例数和死亡病例数。
- 留神自定义对象须要实现Hadoop的序列化机制。
- 以州作为map阶段输入的key,以CovidCountBean作为value,这样属于同一个州的数据就会变成一组进行reduce解决,进行累加即可得出每个州累计确诊病例。
2. 自定义CovidCountBean
package com.uuicon.sentiment_upload.covid;import org.apache.hadoop.io.Writable;import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;public class CovidCountBean implements WritableComparable<CovidCountBean> { private Long cases; private Long deaths; public CovidCountBean() { } public CovidCountBean(Long cases, Long deaths) { this.cases = cases; this.deaths = deaths; } public void set(Long cases, Long deaths) { this.cases = cases; this.deaths = deaths; } @Override public String toString() { return cases + "\t" + deaths; } public Long getCases() { return cases; } public void setCases(Long cases) { this.cases = cases; } public Long getDeaths() { return deaths; } public void setDeaths(Long deaths) { this.deaths = deaths; } /** * 序列化 * * @param out * @throws IOException */ @Override public void write(DataOutput out) throws IOException { out.writeLong(cases); out.writeLong(deaths); } /** * 反序列化,todo 反序列化的程序须要和序列化统一 * * @param in * @throws IOException */ @Override public void readFields(DataInput in) throws IOException { this.cases = in.readLong(); this.deaths = in.readLong(); } @Override public int compareTo(CovidCountBean o) { return this.cases - o.getCases() > 0 ? -1 : (this.cases - o.getCases() < 0 ? 1 : 0); }}
3. 实现mapper
package com.uuicon.sentiment_upload.covid;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class CovidMapper extends Mapper<LongWritable, Text, Text, CovidCountBean> { Text outkey = new Text(); CovidCountBean outValue = new CovidCountBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] fields = value.toString().split(","); outkey.set(fields[2]); String cases = ""; String death = ""; // 有一个州的数据死亡率为0 ,文件中显示为空 // 如果间接应用 fields.length - 2 和fields.length - 1 // 死亡率为0 的数据 fields.length - 2 是邮编 //所以做一下数据处理 if (fields.length == 5) { cases = fields[fields.length - 1]; death = "0"; } else { cases = fields[fields.length - 2]; death = fields[fields.length - 1]; } outValue.set(Long.parseLong(cases), Long.parseLong(death)); context.write(outkey, outValue); }}
4. 实现Reducer
package com.uuicon.sentiment_upload.covid;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class CovidReduce extends Reducer<Text, CovidCountBean, Text, CovidCountBean> { CovidCountBean outValue = new CovidCountBean(); @Override protected void reduce(Text key, Iterable<CovidCountBean> values, Context context) throws IOException, InterruptedException { long totalCases = 0; long totalDeaths = 0; for (CovidCountBean value : values) { totalCases += value.getCases(); totalDeaths += value.getDeaths(); } outValue.set(totalCases, totalDeaths); context.write(key, outValue); }}
4. 驱动程序
package com.uuicon.sentiment_upload.covid;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class CovidDriver { public static void main(String[] args) throws Exception { // 配置文件对象 Configuration conf = new Configuration(); // 创立作业实例 Job job = Job.getInstance(conf, CovidDriver.class.getSimpleName()); // 设置作业驱动类 job.setJarByClass(CovidDriver.class); // 设置作业Mapper reduce 类 job.setMapperClass(CovidMapper.class); job.setReducerClass(CovidReduce.class); // 设置作业 mapper 阶段输入key value 数据类型, job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(CovidCountBean.class); // 设置作业reducer 阶段输入key value 数据类型,也就是程序最终输入的数据类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(CovidCountBean.class); // 配置作业的输出数据门路 FileInputFormat.addInputPath(job, new Path(args[0])); // 配置作业的输入数据门路 FileOutputFormat.setOutputPath(job, new Path(args[1])); // 判断输入门路是否存在,如果存在,删除 FileSystem fs = FileSystem.get(conf); if (fs.exists(new Path(args[1]))) { fs.delete(new Path(args[1]), true); } boolean resultFlag = job.waitForCompletion(true); System.exit(resultFlag ? 0 : 1); }}
5. 运行程序
MapReduce 驱动程序 鼠标右键运行程序,会报错:
Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 0 at com.uuicon.sentiment_upload.covid.CovidDriver.main(CovidDriver.java:34)
没有传递运行程序的参数,导致args[0] 找不到- 增加参数
运行后果
Alabama 452734 7340Alaska 53524 253Arizona 745976 12861Arkansas 290856 4784California 3272207 39521Colorado 394668 5670Connecticut 248765 7020Delaware 76495 1075District of Columbia 36132 902Florida 1687586 26034Georgia 869165 13404Guam 8541 130Hawaii 25460 403Idaho 161863 1724
1. 案例一: 各州累计病例数量统计
1. 需要
将美国2021-01-28,每个州state的确诊案例数进行倒序排序。
2. 剖析
如果你的需要中须要依据某个属性进行排序 ,无妨把这个属性作为key。因为MapReduce中key有默认排序行为的。然而须要进行如下思考:
- 如果你的需要是正序,并且数据类型是Hadoop封装好的根本类型。这种状况下不须要任何批改,间接应用根本类型作为key即可。因为Hadoop封装好的类型曾经实现了排序规定。
比方:LongWritable类型:
如果你的需要是倒序,或者数据类型是自定义对象。须要重写排序规定。须要对象实现Comparable接口,重写ComparTo办法。
compareTo办法用于将以后对象与办法的参数进行比拟。
如果指定的数与参数相等返回0。
如果指定的数小于参数返回 -1。
如果指定的数2);
返回负数的话大于参数返回 1。
例如:o1.compareTo(o,以后对象(调用compareTo办法的对象o1)要排在比拟对象(compareTo传参对象o2)前面,返回正数的话,放在后面。
3. 代码实现
- 自定义对象排序
package com.uuicon.sentiment_upload.covid;import org.apache.hadoop.io.Writable;import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;public class CovidCountBean implements WritableComparable<CovidCountBean> { private Long cases; private Long deaths; public CovidCountBean() { } public CovidCountBean(Long cases, Long deaths) { this.cases = cases; this.deaths = deaths; } public void set(Long cases, Long deaths) { this.cases = cases; this.deaths = deaths; } @Override public String toString() { return cases + "\t" + deaths; } public Long getCases() { return cases; } public void setCases(Long cases) { this.cases = cases; } public Long getDeaths() { return deaths; } public void setDeaths(Long deaths) { this.deaths = deaths; } /** * 序列化 * * @param out * @throws IOException */ @Override public void write(DataOutput out) throws IOException { out.writeLong(cases); out.writeLong(deaths); } /** * 反序列化,todo 反序列化的程序须要和序列化统一 * * @param in * @throws IOException */ @Override public void readFields(DataInput in) throws IOException { this.cases = in.readLong(); this.deaths = in.readLong(); } @Override public int compareTo(CovidCountBean o) { return this.cases - o.getCases() > 0 ? -1 : (this.cases - o.getCases() < 0 ? 1 : 0); }}
- Mapper类
package com.uuicon.sentiment_upload.covidsumsort;import com.uuicon.sentiment_upload.covid.CovidCountBean;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/** * 应用上一个程序运行的后果作为输出 * KEYIN, 偏移量 * VALUEIN, 行内容 * KEYOUT, 须要对州确诊人数作为排序,所以须要用 CovidCountBean 作为key, 并且须要实现WritableComparable接口 * VALUEOUT, */public class CovidSumSortMapper extends Mapper<LongWritable, Text, CovidCountBean, Text> { CovidCountBean outKey = new CovidCountBean(); Text outValue = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] fields = value.toString().split("\t"); System.out.println("provice="+fields[0]+"-f1--"+fields[1]+"--f2--"+fields[2]); outKey.set(Long.parseLong(fields[1]), Long.parseLong(fields[2])); outValue.set(fields[0]); context.write(outKey, outValue); }}
- Reduce 类
package com.uuicon.sentiment_upload.covidsumsort;import com.uuicon.sentiment_upload.covid.CovidCountBean;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/** * KEYIN, mapper 类型同输入key CovidCountBean * VALUEIN, mapper 类型同输入value Text * KEYOUT, Text * VALUEOUT CovidCountBean */public class CovidSumSortReducer extends Reducer<CovidCountBean, Text, Text, CovidCountBean> { Text outKey = new Text(); @Override protected void reduce(CovidCountBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text value : values) { outKey.set(value); context.write(outKey,key); } }}
- 驱动类
package com.uuicon.sentiment_upload.covidsumsort;import com.uuicon.sentiment_upload.covid.CovidCountBean;import com.uuicon.sentiment_upload.covid.CovidMapper;import com.uuicon.sentiment_upload.covid.CovidReduce;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class CovidSumSortDriver { public static void main(String[] args) throws Exception { // 配置文件对象 Configuration conf = new Configuration(); // 创立作业实例 Job job = Job.getInstance(conf, CovidSumSortDriver.class.getSimpleName()); // 设置作业驱动类 job.setJarByClass(CovidSumSortDriver.class); // 设置作业Mapper reduce 类 job.setMapperClass(CovidSumSortMapper.class); job.setReducerClass(CovidSumSortReducer.class); // 设置作业 mapper 阶段输入key value 数据类型, job.setMapOutputKeyClass(CovidCountBean.class); job.setMapOutputValueClass(Text.class); // 设置作业reducer 阶段输入key value 数据类型,也就是程序最终输入的数据类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(CovidCountBean.class); // 配置作业的输出数据门路 FileInputFormat.addInputPath(job, new Path(args[0])); // 配置作业的输入数据门路 FileOutputFormat.setOutputPath(job, new Path(args[1])); // 判断输入门路是否存在,如果存在,删除 FileSystem fs = FileSystem.get(conf); if (fs.exists(new Path(args[1]))) { fs.delete(new Path(args[1]), true); } boolean resultFlag = job.waitForCompletion(true); System.exit(resultFlag ? 0 : 1); }}
三. 各州累计病例分区统计
1. 需要
将美国每个州的疫情数据输入到各自不同的文件中,即一个州的数据在一个后果文件中。
输入到不同文件中-->reducetask有多个(>2)-->默认只有1个,如何有多个?--->能够设置,job.setNumReduceTasks(N)--->当有多个reducetask 意味着数据分区---->默认分区规定是什么? hashPartitioner--->默认分区规定合乎你的业务需要么?---->合乎,间接应用--->不合乎自定义分区。
2. 代码实现
- 自定义分区器
package com.uuicon.sentiment_upload.covidpart;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Partitioner;import java.util.HashMap;/** * KEY, 输出的数据key 类型 ,本例中是美国州 所以是Text * VALUE 输出的value 数据类型 ,本例中是行内容 也是Text */public class CovidPartition extends Partitioner<Text, Text> { // 模仿美国各州的数据字典;理论中如果是分区少,能够用数据汇合保留,如果分区量大能够应用radis 保留 public static HashMap<String, Integer> stateMap = new HashMap<>(); static { stateMap.put("Alabama", 0); stateMap.put("Arkansas", 1); stateMap.put("California", 2); stateMap.put("Colorado", 3); stateMap.put("Florida", 4); stateMap.put("Georgia", 5); stateMap.put("Idaho", 6); } /** * 自定义分区规定: 只有getpartition 返回的int一样,数据就会被分到同一个分区 * 所谓的同一个分区就是数据被放入同一个文件 * * @param key 州 * @param value 一行文本 * @param numPartitions * @return */ @Override public int getPartition(Text key, Text value, int numPartitions) { Integer code = stateMap.get(key.toString()); if (code != null) { return code; } return 7; }}
- Mapper类
package com.uuicon.sentiment_upload.covidpart;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class CovidPartitionMapper extends Mapper<LongWritable, Text, Text, Text> { Text keyOut = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] fields = value.toString().split(","); keyOut.set(fields[2]); context.write(keyOut, value); }}
Reduce 类
package com.uuicon.sentiment_upload.covidpart;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class CovidPartitionReducer extends Reducer<Text, Text, Text, NullWritable> { @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text value : values) { context.write(value, NullWritable.get()); } }}
- 驱动类
package com.uuicon.sentiment_upload.covidpart;import com.uuicon.sentiment_upload.covid.CovidCountBean;import com.uuicon.sentiment_upload.covidsumsort.CovidSumSortMapper;import com.uuicon.sentiment_upload.covidsumsort.CovidSumSortReducer;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class CovidPartDriver { public static void main(String[] args) throws Exception { // 配置文件对象 Configuration conf = new Configuration(); // 创立作业实例 Job job = Job.getInstance(conf, CovidPartDriver.class.getSimpleName()); // 设置作业驱动类 job.setJarByClass(CovidPartDriver.class); // 设置作业Mapper reduce 类 job.setMapperClass(CovidPartitionMapper.class); job.setReducerClass(CovidPartitionReducer.class); // 设置作业 mapper 阶段输入key value 数据类型, job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); // 设置作业reducer 阶段输入key value 数据类型,也就是程序最终输入的数据类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 设置分区 job.setNumReduceTasks(8); job.setPartitionerClass(CovidPartition.class); // 配置作业的输出数据门路 FileInputFormat.addInputPath(job, new Path(args[0])); // 配置作业的输入数据门路 FileOutputFormat.setOutputPath(job, new Path(args[1])); // 判断输入门路是否存在,如果存在,删除 FileSystem fs = FileSystem.get(conf); if (fs.exists(new Path(args[1]))) { fs.delete(new Path(args[1]), true); } boolean resultFlag = job.waitForCompletion(true); System.exit(resultFlag ? 0 : 1); }}
四. 各州累计病例数最多的县
统计美国2021-01-28,每个州state的确诊案例数最多的县是哪一个。
1. 需要
- 在map阶段将“州state、县county、县确诊病例cases”通过自定义对象封装,作为key输入;
- 重写对象的排序规定,首先依据州的正序排序,如果州相等,依照确诊病例数cases倒序排序,发送到reduce。
- 在reduce端利用自定义分组规定,将州state雷同的分为一组,而后取第一个即是最大值。
2. 分组概念 默认分组规定
- 分组在产生在reduce阶段,决定了同一个reduce中哪些数据将组成一组去调用reduce办法解决。
- 默认分组规定是:key雷同的就会分为一组(前后两个key间接比拟是否相等)。
- 须要留神的是,在reduce阶段进行分组之前,因为进行了数据排序,因而排序+分组将会使得key一样的数据肯定被分到同一组,一组去调用reduce办法解决。
自定义分组规定
- 写类继承 WritableComparator,重写Compare办法。
- 只有Compare办法返回为0,MapReduce框架在分组的时候就会认为前后两个相等,分为一组。
- 在job对象中进行设置,让本人的重写分组类失效。job.setGroupingComparatorClass(xxxx.class);
3. 自定义CovidBean
package com.uuicon.sentiment_upload.covidtop;import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;public class CovidBean implements WritableComparable<CovidBean> { private String state; private String county; private Long cases; public CovidBean() { } public CovidBean(String state, String county, Long cases) { this.state = state; this.county = county; this.cases = cases; } public void set(String state, String county, Long cases) { this.state = state; this.county = county; this.cases = cases; } @Override public String toString() { return state + ":" + county + "--" + cases; } public String getState() { return state; } public void setState(String state) { this.state = state; } public Long getCases() { return cases; } public void setCases(Long cases) { this.cases = cases; } public String getCounty() { return county; } public void setCounty(String county) { this.county = county; } @Override public int compareTo(CovidBean o) { int i = state.compareTo(o.getState()); int result; if (i > 0) { result = 1; } else if (i < 0) { result = -1; } else { result = cases > o.getCases() ? -1 : 1; } return result; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(state); out.writeUTF(county); out.writeLong(cases); } @Override public void readFields(DataInput in) throws IOException { this.state = in.readUTF(); this.county = in.readUTF(); this.cases = in.readLong(); }}
Mapper 类
package com.uuicon.sentiment_upload.covidtop;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/** * */public class CovidTopOneMapper extends Mapper<LongWritable, Text, CovidBean, NullWritable> { CovidBean outKey = new CovidBean(); NullWritable outValue = NullWritable.get(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] fields = value.toString().split(","); outKey.set(fields[2], fields[1], Long.parseLong(fields[4])); context.write(outKey, outValue); }}
Reduce 类
package com.uuicon.sentiment_upload.covidtop;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class CovidTopOneReduce extends Reducer<CovidBean, NullWritable, CovidBean, NullWritable> { @Override protected void reduce(CovidBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); }}
自定义分组
package com.uuicon.sentiment_upload.covidtop;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.WritableComparator;public class CovidGroupingComparator extends WritableComparator { protected CovidGroupingComparator() { super(CovidBean.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { CovidBean aBean = (CovidBean) a; CovidBean bBean = (CovidBean) b; // 只有state 一样,就应该分到同一组 // 只有compare 返回0 mapreduce 就认为两个对象一样 return aBean.getState().compareTo(bBean.getState()); }}
驱动程序
package com.uuicon.sentiment_upload.covidtop;import com.uuicon.sentiment_upload.covidpart.CovidPartition;import com.uuicon.sentiment_upload.covidpart.CovidPartitionMapper;import com.uuicon.sentiment_upload.covidpart.CovidPartitionReducer;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class CovidTopDriver { public static void main(String[] args) throws Exception { // 配置文件对象 Configuration conf = new Configuration(); // 创立作业实例 Job job = Job.getInstance(conf, CovidTopDriver.class.getSimpleName()); // 设置作业驱动类 job.setJarByClass(CovidTopDriver.class); // 设置作业Mapper reduce 类 job.setMapperClass(CovidTopOneMapper.class); job.setReducerClass(CovidTopOneReduce.class); // 设置作业 mapper 阶段输入key value 数据类型, job.setMapOutputKeyClass(CovidBean.class); job.setMapOutputValueClass(NullWritable.class); // 设置作业reducer 阶段输入key value 数据类型,也就是程序最终输入的数据类型 job.setOutputKeyClass(CovidBean.class); job.setOutputValueClass(NullWritable.class); // 设置分区// job.setNumReduceTasks(8);// job.setPartitionerClass(CovidPartition.class); job.setGroupingComparatorClass(CovidGroupingComparator.class); // 配置作业的输出数据门路 FileInputFormat.addInputPath(job, new Path(args[0])); // 配置作业的输入数据门路 FileOutputFormat.setOutputPath(job, new Path(args[1])); // 判断输入门路是否存在,如果存在,删除 FileSystem fs = FileSystem.get(conf); if (fs.exists(new Path(args[1]))) { fs.delete(new Path(args[1]), true); } boolean resultFlag = job.waitForCompletion(true); System.exit(resultFlag ? 0 : 1); }}
四. 各州累计病例数最多TopN的县
找出美国2021-01-28,每个州state的确诊案例数最多前3个县。所谓Top3问题
1. 需要剖析
- 在map阶段将“州state、县county、县确诊病例cases”通过自定义对象封装,作为key输入;
- 重写对象的排序规定,首先依据州的正序排序,如果州相等,依照确诊病例数cases倒序排序,发送到reduce。
- 在reduce端利用自定义分组规定,将州state雷同的分为一组,而后取前N个即是TopN。为了验证验证后果不便,能够在输入的时候以cases作为value,实际上为空即可,value并不实际意义。
2. 代码实现
Mapper类
package com.uuicon.sentiment_upload.covidtopn;import com.uuicon.sentiment_upload.covidtop.CovidBean;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class CovidTopNMapper extends Mapper<LongWritable, Text, CovidBean, LongWritable> { CovidBean outKey = new CovidBean(); LongWritable outValue = new LongWritable(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] fields = value.toString().split(","); outKey.set(fields[2], fields[1], Long.parseLong(fields[4])); outValue.set(Long.parseLong(fields[4])); context.write(outKey, outValue); }}
Reduce 类
package com.uuicon.sentiment_upload.covidtopn;import com.uuicon.sentiment_upload.covidtop.CovidBean;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class CovidTopNReducer extends Reducer<CovidBean, LongWritable, CovidBean, LongWritable> { @Override protected void reduce(CovidBean key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { int num = 0; for (LongWritable value : values) { if (num < 3) { context.write(key, value); num++; }else{ return; } } }}
驱动类
package com.uuicon.sentiment_upload.covidtopn;import com.uuicon.sentiment_upload.covidtop.CovidBean;import com.uuicon.sentiment_upload.covidtop.CovidGroupingComparator;import com.uuicon.sentiment_upload.covidtop.CovidTopOneMapper;import com.uuicon.sentiment_upload.covidtop.CovidTopOneReduce;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class CovidTopNDriver { public static void main(String[] args) throws Exception { // 配置文件对象 Configuration conf = new Configuration(); // 创立作业实例 Job job = Job.getInstance(conf, CovidTopNDriver.class.getSimpleName()); // 设置作业驱动类 job.setJarByClass(CovidTopNDriver.class); // 设置作业Mapper reduce 类 job.setMapperClass(CovidTopNMapper.class); job.setReducerClass(CovidTopNReducer.class); // 设置作业 mapper 阶段输入key value 数据类型, job.setMapOutputKeyClass(CovidBean.class); job.setMapOutputValueClass(LongWritable.class); // 设置作业reducer 阶段输入key value 数据类型,也就是程序最终输入的数据类型 job.setOutputKeyClass(CovidBean.class); job.setOutputValueClass(LongWritable.class); // 设置分区// job.setNumReduceTasks(8);// job.setPartitionerClass(CovidPartition.class); job.setGroupingComparatorClass(CovidGroupingComparator.class); // 配置作业的输出数据门路 FileInputFormat.addInputPath(job, new Path(args[0])); // 配置作业的输入数据门路 FileOutputFormat.setOutputPath(job, new Path(args[1])); // 判断输入门路是否存在,如果存在,删除 FileSystem fs = FileSystem.get(conf); if (fs.exists(new Path(args[1]))) { fs.delete(new Path(args[1]), true); } boolean resultFlag = job.waitForCompletion(true); System.exit(resultFlag ? 0 : 1); }}