一.MapReduce mapReduce Patition Combiner
1. Partition 分区
1. 默认状况
在默认状况下,不论 map 阶段有多少个并发执行 task, 到 reduce 阶段,所有的后果都将有一个 reduce 来解决 ,并且最终后果输入到一个文件中。
默认状况下 MapReduce 执行流程:
2. 批改 ReduceTask 个数
在 MapReduce 程序的驱动类中,通过 job 提供的办法,能够批改 reducetask 的个数。
应用 api 批改 reducetask 个数之后,输入后果文件的个数和 reducetask 个数对应。也就是说有几个 ReduceTask, 最终输入文件就是几个, 比方设置为 6 个,此时的输入后果如下所示:
此时,MapReduce 的执行流程如下所示:
3. Partition 数据分区概念
- 默认状况下,MapReduce 是只有一个 reducetask 来进行数据的解决。这就使得不论输出的数据量多大,最终的后果都是输入到一个文件中。
- 当扭转 reducetask 个数的时候,作为 maptask 就会波及到 数据分区的问题:MapTask 输入的后果如何调配给各个 ReduceTask 来解决。
4. partition 默认规定
MapReduce 默认分区规定是 HashPartitioner。跟 map 输入的数据 key 无关。
5. partition 注意事项
- reducetask 个数的扭转导致了数据分区的产生,而不是有数据分区导致了 reducetask 个数扭转。
- 数据分区的外围是分区规定。即如何调配数据给各个 reducetask。
- 默认的规定能够保障只有map 阶段输入的 key 一样,数据就肯定能够分区到同一个 reducetask,然而不能保证数据均匀分区。
- reducetask 个数的扭转还会导致输入后果文件不再是一个整体,而是输入到多个文件中。
2. Combiner 规约
1. 数据规约的含意
数据归约是指 在尽可能保持数据原貌的前提下,最大限度地精简数据量。
2. MapReduce 弊病
- MapReduce 是一种具备两个执行阶段的分布式计算程序,Map 阶段和 Reduce 阶段之间会波及到跨网络数据传递。
- 每一个 MapTask 都可能会产生大量的本地输入,这就导致跨网络传输数据质变大,网络 IO 性能低。比方 WordCount 单词统计案例,如果文件中有 1000 个单词,其中 999 个为 hello, 这将产生 999 个 <hello,1> 的键值对在网络中传递,性能及其低下。
3. Combiner 组件概念
- Combiner 中文叫做数据规约,是 MapReduce 的一种优化伎俩。
- Combiner 的作用就是 对 map 端的输入先做一次部分合并,以缩小在 map 和 reduce 节点之间的数据传输量。
4. Combiner 组件应用
- Combiner 是 MapReduce 程序中除了 Mapper 和 Reducer 之外的一种组件,默认状况下不启用。
- Combiner 实质就是 Reducer,combiner 和 reducer 的区别在于运行的地位:
combiner 是在每一个 maptask 所在的节点本地运行,是部分聚合;reducer 是对所有 maptask 的输入后果计算,是全局聚合。 - 具体实现步骤:
自定义一个CustomCombiner 类,继承 Reducer,重写 reduce 办法
job.setCombinerClass(CustomCombiner.class)
5. Combiner 应用注意事项
- Combiner 可能利用的前提是不能影响最终的业务逻辑,而且,Combiner 的输入 kv 应该跟 reducer 的输出 kv 类型要对应起来。
- 下述场景禁止应用 Combiner,因为这样不仅优化了网络传输数据量,还扭转了最终的执行后果
- 业务和数据个数相干的。
- 业务和整体排序相干的。
- Combiner 组件不是禁用,而是慎用。用的好进步程序性能,用不好,改变程序后果且不易发现。
二. MapReduce 编程指南
1. MapReduce 编程技巧概述
- MapReduce 执行流程了然于心,可能晓得数据在 MapReduce 中流转过程。
- 业务需要解读精确,即须要明确做什么。
- 牢牢把握住 key 的抉择,因为 MapReduce 很多行为跟 key 相干,比方:排序、分区、分组。
- 学会自定义组件批改默认行为,当默认的行为不满足业务需要,能够尝试自定义规定。
- 通过画图梳理业务执行流程,确定每个阶段的数据类型。
2. MapReduce 执行流程梳理
1. Map 阶段执行过程
- 第一阶段是 把输出目录下文件依照肯定的规范一一进行逻辑切片,造成切片布局。默认状况下,Split size = Block size。每一个切片由一个 MapTask 解决。(getSplits)
- 第二阶段是 对切片中的数据依照肯定的规定解析成 <key,value> 对。默认规定是把每一行文本内容解析成键值对。key 是每一行的起始地位(单位是字节),value 是本行的文本内容。(TextInputFormat)
- 第三阶段是 调用 Mapper 类中的 map 办法。上阶段中每解析进去的一个 <k,v>,调用一次 map 办法。每次调用 map 办法会输入零个或多个键值对。
- 第四阶段是依照肯定的规定对第三阶段 输入的键值对进行分区。默认是只有一个区。分区的数量就是 Reducer 工作运行的数量。默认只有一个 Reducer 工作。
- 第五阶段是 对每个分区中的键值对进行排序。首先,依照键进行排序,对于键雷同的键值对,依照值进行排序。比方三个键值对 <2,2>、<1,3>、<2,1>,键和值别离是整数。那么排序后的后果是 <1,3>、<2,1>、<2,2>。如果有第六阶段,那么进入第六阶段;如果没有,间接输入到文件中。
- 第六阶段是 对数据进行部分聚合解决,也就是 combiner 解决。键相等的键值对会调用一次 reduce 办法。通过这一阶段,数据量会缩小。本阶段默认是没有的。
2. Map 阶段执行过程
- 第一阶段是 R educer 工作会被动从 Mapper 工作复制其输入的键值对。Mapper 工作可能会有很多,因而 Reducer 会复制多个 Mapper 的输入。
- 第二阶段是 把复制到 Reducer 本地数据,全副进行合并,即把扩散的数据合并成一个大的数据。再对合并后的数据排序。
- 第三阶段是 对排序后的键值对调用 reduce 办法。键相等的键值对调用一次 reduce 办法,每次调用会产生零个或者多个键值对。最初把这些输入的键值对写入到 HDFS 文件中。
1. key 的重要性体现
-
在 MapReduce 编程中,外围是 牢牢把握住每个阶段的输入输出 key 是什么。因为 mr 中很多默认行为都跟 key 相干。
- 排序:key 的字典序 a -z 正序
- 分区:key.hashcode % reducetask 个数
- 分组:key 雷同的分为一组
- 最重要的是,如果感觉默认的行为不满足业务需要,MapReduce 还反对自定义排序、分区、分组的规定,这将使得编程更加灵便和不便。
三. 美国新冠疫情 Covid-19 病例统计
现有美国 2021-1-28 号,各个县 county 的新冠疫情累计案例信息,包含确诊病例和死亡病例,数据格式如下所示:
2021-01-28,Juneau City and Borough,Alaska,02110,1108,3
2021-01-28,Kenai Peninsula Borough,Alaska,02122,3866,18
2021-01-28,Ketchikan Gateway Borough,Alaska,02130,272,1
2021-01-28,Kodiak Island Borough,Alaska,02150,1021,5
2021-01-28,Kusilvak Census Area,Alaska,02158,1099,3
2021-01-28,Lake and Peninsula Borough,Alaska,02164,5,0
2021-01-28,Matanuska-Susitna Borough,Alaska,02170,7406,27
2021-01-28,Nome Census Area,Alaska,02180,307,0
2021-01-28,North Slope Borough,Alaska,02185,973,3
2021-01-28,Northwest Arctic Borough,Alaska,02188,567,1
2021-01-28,Petersburg Borough,Alaska,02195,43,0
字段含意如下:date(日期),county(县),state(州),fips(县编码 code),cases(累计确诊病例),deaths(累计死亡病例)。
1. 案例一: 各州累计病例数量统计
统计美国 2021-01-28,每个州 state 累计确诊案例数、累计死亡案例数。
1. 需要剖析
- 自定义对象 CovidCountBean,用于封装每个县的确诊病例数和死亡病例数。
- 留神自定义对象须要实现 Hadoop 的序列化机制。
- 以州作为 map 阶段输入的 key, 以 CovidCountBean 作为 value,这样属于同一个州的数据就会变成一组进行 reduce 解决,进行累加即可得出每个州累计确诊病例。
2. 自定义 CovidCountBean
package com.uuicon.sentiment_upload.covid;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class CovidCountBean implements WritableComparable<CovidCountBean> {
private Long cases;
private Long deaths;
public CovidCountBean() {}
public CovidCountBean(Long cases, Long deaths) {
this.cases = cases;
this.deaths = deaths;
}
public void set(Long cases, Long deaths) {
this.cases = cases;
this.deaths = deaths;
}
@Override
public String toString() {return cases + "\t" + deaths;}
public Long getCases() {return cases;}
public void setCases(Long cases) {this.cases = cases;}
public Long getDeaths() {return deaths;}
public void setDeaths(Long deaths) {this.deaths = deaths;}
/**
* 序列化
*
* @param out
* @throws IOException
*/
@Override
public void write(DataOutput out) throws IOException {out.writeLong(cases);
out.writeLong(deaths);
}
/**
* 反序列化,todo 反序列化的程序须要和序列化统一
*
* @param in
* @throws IOException
*/
@Override
public void readFields(DataInput in) throws IOException {this.cases = in.readLong();
this.deaths = in.readLong();}
@Override
public int compareTo(CovidCountBean o) {return this.cases - o.getCases() > 0 ? -1 : (this.cases - o.getCases() < 0 ? 1 : 0);
}
}
3. 实现 mapper
package com.uuicon.sentiment_upload.covid;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class CovidMapper extends Mapper<LongWritable, Text, Text, CovidCountBean> {Text outkey = new Text();
CovidCountBean outValue = new CovidCountBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] fields = value.toString().split(",");
outkey.set(fields[2]);
String cases = "";
String death = "";
// 有一个州的数据死亡率为 0 , 文件中显示为空
// 如果间接应用 fields.length - 2 和 fields.length - 1
// 死亡率为 0 的数据 fields.length - 2 是邮编
// 所以做一下数据处理
if (fields.length == 5) {cases = fields[fields.length - 1];
death = "0";
} else {cases = fields[fields.length - 2];
death = fields[fields.length - 1];
}
outValue.set(Long.parseLong(cases), Long.parseLong(death));
context.write(outkey, outValue);
}
}
4. 实现 Reducer
package com.uuicon.sentiment_upload.covid;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class CovidReduce extends Reducer<Text, CovidCountBean, Text, CovidCountBean> {CovidCountBean outValue = new CovidCountBean();
@Override
protected void reduce(Text key, Iterable<CovidCountBean> values, Context context) throws IOException, InterruptedException {
long totalCases = 0;
long totalDeaths = 0;
for (CovidCountBean value : values) {totalCases += value.getCases();
totalDeaths += value.getDeaths();}
outValue.set(totalCases, totalDeaths);
context.write(key, outValue);
}
}
4. 驱动程序
package com.uuicon.sentiment_upload.covid;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class CovidDriver {public static void main(String[] args) throws Exception {
// 配置文件对象
Configuration conf = new Configuration();
// 创立作业实例
Job job = Job.getInstance(conf, CovidDriver.class.getSimpleName());
// 设置作业驱动类
job.setJarByClass(CovidDriver.class);
// 设置作业 Mapper reduce 类
job.setMapperClass(CovidMapper.class);
job.setReducerClass(CovidReduce.class);
// 设置作业 mapper 阶段输入 key value 数据类型,
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(CovidCountBean.class);
// 设置作业 reducer 阶段输入 key value 数据类型, 也就是程序最终输入的数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(CovidCountBean.class);
// 配置作业的输出数据门路
FileInputFormat.addInputPath(job, new Path(args[0]));
// 配置作业的输入数据门路
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 判断输入门路是否存在, 如果存在, 删除
FileSystem fs = FileSystem.get(conf);
if (fs.exists(new Path(args[1]))) {fs.delete(new Path(args[1]), true);
}
boolean resultFlag = job.waitForCompletion(true);
System.exit(resultFlag ? 0 : 1);
}
}
5. 运行程序
-
MapReduce 驱动程序 鼠标右键运行程序, 会报错:
Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 0 at com.uuicon.sentiment_upload.covid.CovidDriver.main(CovidDriver.java:34)
没有传递运行程序的参数, 导致 args[0] 找不到 - 增加参数
-
运行后果
Alabama 452734 7340 Alaska 53524 253 Arizona 745976 12861 Arkansas 290856 4784 California 3272207 39521 Colorado 394668 5670 Connecticut 248765 7020 Delaware 76495 1075 District of Columbia 36132 902 Florida 1687586 26034 Georgia 869165 13404 Guam 8541 130 Hawaii 25460 403 Idaho 161863 1724
1. 案例一: 各州累计病例数量统计
1. 需要
将美国 2021-01-28,每个州 state 的确诊案例数进行倒序排序。
2. 剖析
如果你的需要中须要依据某个属性进行排序,无妨把这个属性作为 key。因为 MapReduce 中 key 有默认排序行为的。然而须要进行如下思考:
- 如果你的需要是正序,并且数据类型是 Hadoop 封装好的根本类型。这种状况下不须要任何批改,间接应用根本类型作为 key 即可。因为 Hadoop 封装好的类型曾经实现了排序规定。
比方:LongWritable 类型:
如果你的需要是倒序,或者数据类型是自定义对象。须要重写排序规定。须要对象实现 Comparable 接口,重写 ComparTo 办法。
compareTo 办法用于将以后对象与办法的参数进行比拟。
如果指定的数与参数相等返回 0。
如果指定的数小于参数返回 -1。
如果指定的数 2);
返回负数的话大于参数返回 1。
例如:o1.compareTo(o,以后对象(调用 compareTo 办法的对象 o1)要排在比拟对象(compareTo 传参对象 o2)前面,返回正数的话,放在后面。
3. 代码实现
- 自定义对象排序
package com.uuicon.sentiment_upload.covid;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class CovidCountBean implements WritableComparable<CovidCountBean> {
private Long cases;
private Long deaths;
public CovidCountBean() {}
public CovidCountBean(Long cases, Long deaths) {
this.cases = cases;
this.deaths = deaths;
}
public void set(Long cases, Long deaths) {
this.cases = cases;
this.deaths = deaths;
}
@Override
public String toString() {return cases + "\t" + deaths;}
public Long getCases() {return cases;}
public void setCases(Long cases) {this.cases = cases;}
public Long getDeaths() {return deaths;}
public void setDeaths(Long deaths) {this.deaths = deaths;}
/**
* 序列化
*
* @param out
* @throws IOException
*/
@Override
public void write(DataOutput out) throws IOException {out.writeLong(cases);
out.writeLong(deaths);
}
/**
* 反序列化,todo 反序列化的程序须要和序列化统一
*
* @param in
* @throws IOException
*/
@Override
public void readFields(DataInput in) throws IOException {this.cases = in.readLong();
this.deaths = in.readLong();}
@Override
public int compareTo(CovidCountBean o) {return this.cases - o.getCases() > 0 ? -1 : (this.cases - o.getCases() < 0 ? 1 : 0);
}
}
- Mapper 类
package com.uuicon.sentiment_upload.covidsumsort;
import com.uuicon.sentiment_upload.covid.CovidCountBean;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* 应用上一个程序运行的后果作为输出
* KEYIN, 偏移量
* VALUEIN, 行内容
* KEYOUT, 须要对州确诊人数作为排序, 所以须要用 CovidCountBean 作为 key, 并且须要实现 WritableComparable 接口
* VALUEOUT,
*/
public class CovidSumSortMapper extends Mapper<LongWritable, Text, CovidCountBean, Text> {CovidCountBean outKey = new CovidCountBean();
Text outValue = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] fields = value.toString().split("\t");
System.out.println("provice="+fields[0]+"-f1--"+fields[1]+"--f2--"+fields[2]);
outKey.set(Long.parseLong(fields[1]), Long.parseLong(fields[2]));
outValue.set(fields[0]);
context.write(outKey, outValue);
}
}
- Reduce 类
package com.uuicon.sentiment_upload.covidsumsort;
import com.uuicon.sentiment_upload.covid.CovidCountBean;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* KEYIN, mapper 类型同输入 key CovidCountBean
* VALUEIN, mapper 类型同输入 value Text
* KEYOUT, Text
* VALUEOUT CovidCountBean
*/
public class CovidSumSortReducer extends Reducer<CovidCountBean, Text, Text, CovidCountBean> {Text outKey = new Text();
@Override
protected void reduce(CovidCountBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {for (Text value : values) {outKey.set(value);
context.write(outKey,key);
}
}
}
- 驱动类
package com.uuicon.sentiment_upload.covidsumsort;
import com.uuicon.sentiment_upload.covid.CovidCountBean;
import com.uuicon.sentiment_upload.covid.CovidMapper;
import com.uuicon.sentiment_upload.covid.CovidReduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class CovidSumSortDriver {public static void main(String[] args) throws Exception {
// 配置文件对象
Configuration conf = new Configuration();
// 创立作业实例
Job job = Job.getInstance(conf, CovidSumSortDriver.class.getSimpleName());
// 设置作业驱动类
job.setJarByClass(CovidSumSortDriver.class);
// 设置作业 Mapper reduce 类
job.setMapperClass(CovidSumSortMapper.class);
job.setReducerClass(CovidSumSortReducer.class);
// 设置作业 mapper 阶段输入 key value 数据类型,
job.setMapOutputKeyClass(CovidCountBean.class);
job.setMapOutputValueClass(Text.class);
// 设置作业 reducer 阶段输入 key value 数据类型, 也就是程序最终输入的数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(CovidCountBean.class);
// 配置作业的输出数据门路
FileInputFormat.addInputPath(job, new Path(args[0]));
// 配置作业的输入数据门路
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 判断输入门路是否存在, 如果存在, 删除
FileSystem fs = FileSystem.get(conf);
if (fs.exists(new Path(args[1]))) {fs.delete(new Path(args[1]), true);
}
boolean resultFlag = job.waitForCompletion(true);
System.exit(resultFlag ? 0 : 1);
}
}
三. 各州累计病例分区统计
1. 需要
将美国每个州的疫情数据输入到各自不同的文件中,即 一个州的数据在一个后果文件中。
输入到不同文件中 –>reducetask 有多个(>2)–> 默认只有 1 个,如何有多个?—> 能够设置,job.setNumReduceTasks(N)—> 当有多个 reducetask 意味着数据分区 —-> 默认分区规定是什么?hashPartitioner—> 默认分区规定合乎你的业务需要么?—-> 合乎,间接应用 —> 不合乎自定义分区。
2. 代码实现
- 自定义分区器
package com.uuicon.sentiment_upload.covidpart;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
import java.util.HashMap;
/**
* KEY, 输出的数据 key 类型 , 本例中是美国州 所以是 Text
* VALUE 输出的 value 数据类型 , 本例中是行内容 也是 Text
*/
public class CovidPartition extends Partitioner<Text, Text> {
// 模仿美国各州的数据字典; 理论中如果是分区少, 能够用数据汇合保留, 如果分区量大能够应用 radis 保留
public static HashMap<String, Integer> stateMap = new HashMap<>();
static {stateMap.put("Alabama", 0);
stateMap.put("Arkansas", 1);
stateMap.put("California", 2);
stateMap.put("Colorado", 3);
stateMap.put("Florida", 4);
stateMap.put("Georgia", 5);
stateMap.put("Idaho", 6);
}
/**
* 自定义分区规定: 只有 getpartition 返回的 int 一样, 数据就会被分到同一个分区
* 所谓的同一个分区就是数据被放入同一个文件
*
* @param key 州
* @param value 一行文本
* @param numPartitions
* @return
*/
@Override
public int getPartition(Text key, Text value, int numPartitions) {Integer code = stateMap.get(key.toString());
if (code != null) {return code;}
return 7;
}
}
- Mapper 类
package com.uuicon.sentiment_upload.covidpart;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class CovidPartitionMapper extends Mapper<LongWritable, Text, Text, Text> {Text keyOut = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] fields = value.toString().split(",");
keyOut.set(fields[2]);
context.write(keyOut, value);
}
}
-
Reduce 类
package com.uuicon.sentiment_upload.covidpart; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class CovidPartitionReducer extends Reducer<Text, Text, Text, NullWritable> { @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {for (Text value : values) {context.write(value, NullWritable.get()); } } }
- 驱动类
package com.uuicon.sentiment_upload.covidpart;
import com.uuicon.sentiment_upload.covid.CovidCountBean;
import com.uuicon.sentiment_upload.covidsumsort.CovidSumSortMapper;
import com.uuicon.sentiment_upload.covidsumsort.CovidSumSortReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class CovidPartDriver {public static void main(String[] args) throws Exception {
// 配置文件对象
Configuration conf = new Configuration();
// 创立作业实例
Job job = Job.getInstance(conf, CovidPartDriver.class.getSimpleName());
// 设置作业驱动类
job.setJarByClass(CovidPartDriver.class);
// 设置作业 Mapper reduce 类
job.setMapperClass(CovidPartitionMapper.class);
job.setReducerClass(CovidPartitionReducer.class);
// 设置作业 mapper 阶段输入 key value 数据类型,
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// 设置作业 reducer 阶段输入 key value 数据类型, 也就是程序最终输入的数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 设置分区
job.setNumReduceTasks(8);
job.setPartitionerClass(CovidPartition.class);
// 配置作业的输出数据门路
FileInputFormat.addInputPath(job, new Path(args[0]));
// 配置作业的输入数据门路
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 判断输入门路是否存在, 如果存在, 删除
FileSystem fs = FileSystem.get(conf);
if (fs.exists(new Path(args[1]))) {fs.delete(new Path(args[1]), true);
}
boolean resultFlag = job.waitForCompletion(true);
System.exit(resultFlag ? 0 : 1);
}
}
四. 各州累计病例数最多的县
统计美国 2021-01-28,每个州 state 的确诊案例数最多的县是哪一个。
1. 需要
- 在 map 阶段将“州 state、县 county、县确诊病例 cases”通过自定义对象封装,作为 key 输入;
- 重写对象的排序规定,首先依据州的正序排序,如果州相等,依照确诊病例数 cases 倒序排序,发送到 reduce。
- 在 reduce 端利用自定义分组规定,将州 state 雷同的分为一组,而后取第一个即是最大值。
2. 分组概念 默认分组规定
- 分组在产生在 reduce 阶段,决定了同一个 reduce 中哪些数据将组成一组去调用 reduce 办法解决。
- 默认分组规定是:key 雷同的就会分为一组(前后两个 key 间接比拟是否相等)。
- 须要留神的是,在 reduce 阶段进行分组之前,因为进行了数据排序,因而排序 + 分组将会使得 key 一样的数据肯定被分到同一组,一组去调用 reduce 办法解决。
-
自定义分组规定
- 写类继承 WritableComparator,重写 Compare 办法。
- 只有 Compare 办法返回为 0,MapReduce 框架在分组的时候就会认为前后两个相等,分为一组。
- 在 job 对象中进行设置,让本人的重写分组类失效。job.setGroupingComparatorClass(xxxx.class);
3. 自定义 CovidBean
package com.uuicon.sentiment_upload.covidtop;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class CovidBean implements WritableComparable<CovidBean> {
private String state;
private String county;
private Long cases;
public CovidBean() {}
public CovidBean(String state, String county, Long cases) {
this.state = state;
this.county = county;
this.cases = cases;
}
public void set(String state, String county, Long cases) {
this.state = state;
this.county = county;
this.cases = cases;
}
@Override
public String toString() {return state + ":" + county + "--" + cases;}
public String getState() {return state;}
public void setState(String state) {this.state = state;}
public Long getCases() {return cases;}
public void setCases(Long cases) {this.cases = cases;}
public String getCounty() {return county;}
public void setCounty(String county) {this.county = county;}
@Override
public int compareTo(CovidBean o) {int i = state.compareTo(o.getState());
int result;
if (i > 0) {result = 1;} else if (i < 0) {result = -1;} else {result = cases > o.getCases() ? -1 : 1;
}
return result;
}
@Override
public void write(DataOutput out) throws IOException {out.writeUTF(state);
out.writeUTF(county);
out.writeLong(cases);
}
@Override
public void readFields(DataInput in) throws IOException {this.state = in.readUTF();
this.county = in.readUTF();
this.cases = in.readLong();}
}
-
Mapper 类
package com.uuicon.sentiment_upload.covidtop; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * */ public class CovidTopOneMapper extends Mapper<LongWritable, Text, CovidBean, NullWritable> {CovidBean outKey = new CovidBean(); NullWritable outValue = NullWritable.get(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] fields = value.toString().split(","); outKey.set(fields[2], fields[1], Long.parseLong(fields[4])); context.write(outKey, outValue); } }
-
Reduce 类
package com.uuicon.sentiment_upload.covidtop; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class CovidTopOneReduce extends Reducer<CovidBean, NullWritable, CovidBean, NullWritable> { @Override protected void reduce(CovidBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {context.write(key, NullWritable.get()); } }
-
自定义分组
package com.uuicon.sentiment_upload.covidtop; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class CovidGroupingComparator extends WritableComparator {protected CovidGroupingComparator() {super(CovidBean.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) {CovidBean aBean = (CovidBean) a; CovidBean bBean = (CovidBean) b; // 只有 state 一样, 就应该分到同一组 // 只有 compare 返回 0 mapreduce 就认为两个对象一样 return aBean.getState().compareTo(bBean.getState()); } }
-
驱动程序
package com.uuicon.sentiment_upload.covidtop; import com.uuicon.sentiment_upload.covidpart.CovidPartition; import com.uuicon.sentiment_upload.covidpart.CovidPartitionMapper; import com.uuicon.sentiment_upload.covidpart.CovidPartitionReducer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class CovidTopDriver {public static void main(String[] args) throws Exception { // 配置文件对象 Configuration conf = new Configuration(); // 创立作业实例 Job job = Job.getInstance(conf, CovidTopDriver.class.getSimpleName()); // 设置作业驱动类 job.setJarByClass(CovidTopDriver.class); // 设置作业 Mapper reduce 类 job.setMapperClass(CovidTopOneMapper.class); job.setReducerClass(CovidTopOneReduce.class); // 设置作业 mapper 阶段输入 key value 数据类型, job.setMapOutputKeyClass(CovidBean.class); job.setMapOutputValueClass(NullWritable.class); // 设置作业 reducer 阶段输入 key value 数据类型, 也就是程序最终输入的数据类型 job.setOutputKeyClass(CovidBean.class); job.setOutputValueClass(NullWritable.class); // 设置分区 // job.setNumReduceTasks(8); // job.setPartitionerClass(CovidPartition.class); job.setGroupingComparatorClass(CovidGroupingComparator.class); // 配置作业的输出数据门路 FileInputFormat.addInputPath(job, new Path(args[0])); // 配置作业的输入数据门路 FileOutputFormat.setOutputPath(job, new Path(args[1])); // 判断输入门路是否存在, 如果存在, 删除 FileSystem fs = FileSystem.get(conf); if (fs.exists(new Path(args[1]))) {fs.delete(new Path(args[1]), true); } boolean resultFlag = job.waitForCompletion(true); System.exit(resultFlag ? 0 : 1); } }
四. 各州累计病例数最多 TopN 的县
找出美国 2021-01-28,每个州 state 的确诊案例数最多前 3 个县。所谓 Top3 问题
1. 需要剖析
- 在 map 阶段将“州 state、县 county、县确诊病例 cases”通过自定义对象封装,作为 key 输入;
- 重写对象的排序规定,首先依据州的正序排序,如果州相等,依照确诊病例数 cases 倒序排序,发送到 reduce。
- 在 reduce 端利用自定义分组规定,将州 state 雷同的分为一组,而后取前 N 个即是 TopN。为了验证验证后果不便,能够在输入的时候以 cases 作为 value,实际上为空即可,value 并不实际意义。
2. 代码实现
-
Mapper 类
package com.uuicon.sentiment_upload.covidtopn; import com.uuicon.sentiment_upload.covidtop.CovidBean; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class CovidTopNMapper extends Mapper<LongWritable, Text, CovidBean, LongWritable> {CovidBean outKey = new CovidBean(); LongWritable outValue = new LongWritable(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] fields = value.toString().split(","); outKey.set(fields[2], fields[1], Long.parseLong(fields[4])); outValue.set(Long.parseLong(fields[4])); context.write(outKey, outValue); } }
-
Reduce 类
package com.uuicon.sentiment_upload.covidtopn; import com.uuicon.sentiment_upload.covidtop.CovidBean; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class CovidTopNReducer extends Reducer<CovidBean, LongWritable, CovidBean, LongWritable> { @Override protected void reduce(CovidBean key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { int num = 0; for (LongWritable value : values) {if (num < 3) {context.write(key, value); num++; }else{return;} } } }
-
驱动类
package com.uuicon.sentiment_upload.covidtopn; import com.uuicon.sentiment_upload.covidtop.CovidBean; import com.uuicon.sentiment_upload.covidtop.CovidGroupingComparator; import com.uuicon.sentiment_upload.covidtop.CovidTopOneMapper; import com.uuicon.sentiment_upload.covidtop.CovidTopOneReduce; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class CovidTopNDriver {public static void main(String[] args) throws Exception { // 配置文件对象 Configuration conf = new Configuration(); // 创立作业实例 Job job = Job.getInstance(conf, CovidTopNDriver.class.getSimpleName()); // 设置作业驱动类 job.setJarByClass(CovidTopNDriver.class); // 设置作业 Mapper reduce 类 job.setMapperClass(CovidTopNMapper.class); job.setReducerClass(CovidTopNReducer.class); // 设置作业 mapper 阶段输入 key value 数据类型, job.setMapOutputKeyClass(CovidBean.class); job.setMapOutputValueClass(LongWritable.class); // 设置作业 reducer 阶段输入 key value 数据类型, 也就是程序最终输入的数据类型 job.setOutputKeyClass(CovidBean.class); job.setOutputValueClass(LongWritable.class); // 设置分区 // job.setNumReduceTasks(8); // job.setPartitionerClass(CovidPartition.class); job.setGroupingComparatorClass(CovidGroupingComparator.class); // 配置作业的输出数据门路 FileInputFormat.addInputPath(job, new Path(args[0])); // 配置作业的输入数据门路 FileOutputFormat.setOutputPath(job, new Path(args[1])); // 判断输入门路是否存在, 如果存在, 删除 FileSystem fs = FileSystem.get(conf); if (fs.exists(new Path(args[1]))) {fs.delete(new Path(args[1]), true); } boolean resultFlag = job.waitForCompletion(true); System.exit(resultFlag ? 0 : 1); } }