一. 计数器概述
执行 MapReduce 程序的时候, 控制台输入信息中通常有上面所示片段内容:
输入信息中的外围词是 counters,中文叫做计数器,Hadoop 内置的计数器性能收集作业的次要统计信息,能够帮忙用户了解程序的运行状况,辅助用户诊断故障。
二. MapReduce 内置计数器
Hadoop 为每个 MapReduce 作业保护一些内置的计数器,这些计数器报告各种指标,例如和 MapReduce 程序执行中每个阶段输入输出的数据量相干的计数器,能够帮忙用户进行判断程序逻辑是否失效、正确。
Hadoop 内置计数器 依据性能进行分组 。每个组包含若干个不同的计数器,别离是:MapReduce 工作计数器 (Map-Reduce Framework)、 文件系统计数器 (File System Counters)、 作业计数器 (Job Counters)、 输出文件工作计数器 (File Input Format Counters)、 输入文件计数器(File Output Format Counters)。
须要留神的是,内置的计数器都是 MapReduce 程序中全局的计数器,跟 MapReduce 分布式运算没有关系,不是所谓的每个部分的统计信息。
1. Map-Reduce Framework Counters
计数器名称 | 阐明 |
---|---|
MAP_INPUT_RECORDS | 所有 mapper 已解决的输出记录数 |
MAP_OUTPUT_RECORDS | 所有 mapper 产生的输入记录数 |
MAP_OUTPUT_BYTES | 所有 mapper 产生的未经压缩的输入数据的字节数 |
MAP_OUTPUT_MATERIALIZED_BYTES | mapper 输入后的确写到磁盘上字节数 |
COMBINE_INPUT_RECORDS | 所有 combiner(如果有)已解决的输出记录数 |
COMBINE_OUTPUT_RECORDS | 所有 combiner(如果有)已产生的输入记录数 |
REDUCE_INPUT_GROUPS | 所有 reducer 已解决分组的个数 |
REDUCE_INPUT_RECORDS | 所有 reducer 曾经解决的输出记录的个数。每当某个 reducer 的迭代器读一个值时,该计数器的值减少 |
REDUCE_OUTPUT_RECORDS | 所有 reducer 输入记录数 |
REDUCE_SHUFFLE_BYTES | Shuffle 时复制到 reducer 的字节数 |
SPILLED_RECORDS | 所有 map 和 reduce 工作溢出到磁盘的记录数 |
CPU_MILLISECONDS | 一个工作的总 CPU 工夫,以毫秒为单位,可由 /proc/cpuinfo 获取 |
PHYSICAL_MEMORY_BYTES | 一个工作所用的物理内存,以字节数为单位,可由 /proc/meminfo 获取 |
VIRTUAL_MEMORY_BYTES | 一个工作所用虚拟内存的字节数,由 /proc/meminfo 获取 |
2. File System Counters Counters
文件系统的计数器会针对不同的文件系统应用状况进行统计,比方 HDFS、本地文件系统:
计数器名称 | 阐明 |
---|---|
BYTES_READ | 程序从文件系统中读取的字节数 |
BYTES_WRITTEN | 程序往文件系统中写入的字节数 |
READ_OPS | 文件系统中进行的读操作的数量(例如,open 操作,filestatus 操作) |
LARGE_READ_OPS | 文件系统中进行的大规模读操作的数量 |
WRITE_OPS | 文件系统中进行的写操作的数量(例如,create 操作,append 操作) |
3. Job Counters
计数器名称 | 阐明 |
---|---|
Launched map tasks | 启动的 map 工作数,包含以“揣测执行”形式启动的工作 |
Launched reduce tasks | 启动的 reduce 工作数,包含以“揣测执行”形式启动的工作 |
Data-local map tasks | 与输人数据在同一节点上的 map 工作数 |
Total time spent by all maps in occupied slots (ms) | 所有 map 工作在占用的插槽中破费的总工夫(毫秒) |
Total time spent by all reduces in occupied slots (ms) | 所有 reduce 工作在占用的插槽中破费的总工夫(毫秒) |
Total time spent by all map tasks (ms) | 所有 map task 破费的工夫 |
Total time spent by all reduce tasks (ms) | 所有 reduce task 破费的工夫 |
4. File Input|Output Format Counters
计数器名称 | 阐明 |
---|---|
读取的字节数(BYTES_READ) | 由 map 工作通过 FilelnputFormat 读取的字节数 |
写的字节数(BYTES_WRITTEN) | 由 map 工作(针对仅含 map 的作业)或者 reduce 工作通过 FileOutputFormat 写的字节数 |
3. MapReduce 自定义计数器
尽管 Hadoop 内置的计数器比拟全面,给作业运行过程的监控带了不便,然而对于一些业务中的特定要求 (统计过程中对某种状况产生进行计数统计)MapReduce 还是提供了用户编写自定义计数器的办法。最重要的是, 计数器是全局的统计,防止了用户本人保护全局变量的不利性。
自定义计数器的应用分为两步:
- 通过 context.getCounter 办法获取一个全局计数器,创立的时候须要指定计数器所属的组名和计数器的名字:
- 在程序中须要应用计数器的中央,调用 counter 提供的办法即可,比方 + 1 操作:
4. 案例:MapReduce 自定义计数器应用
1. 需要
针对一批文件进行词频统计,不知何种起因,在任意文件的任意中央都有可能插入单词”apple”, 现 要求应用计数器统计出数据中 apple 呈现的次数,便于用户执行程序时判断。
2. 代码实现
1. Mapper 类
public class WordCountMapper extends Mapper<LongWritable, Text,Text,LongWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 从程序上下文对象获取一个全局计数器:用于统计 apple 呈现的个数
// 须要指定计数器组 和计数器的名字
Counter counter = context.getCounter("itcast_counters", "apple Counter");
String[] words = value.toString().split("\\s+");
for (String word : words) {
// 判断读取内容是否为 apple 如果是 计数器加 1
if("apple".equals(word)){counter.increment(1);
}
context.write(new Text(word),new LongWritable(1));
}
}
}
2. Reducer 类
public class WordCountReducer extends Reducer<Text, LongWritable,Text,LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long count = 0;
for (LongWritable value : values) {count +=value.get();
}
context.write(key,new LongWritable(count));
}
}
3. 运行主类
public class WordCountDriver extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
// 创立作业实例
Job job = Job.getInstance(getConf(), WordCountDriver.class.getSimpleName());
// 设置作业驱动类
job.setJarByClass(this.getClass());
// 设置作业 mapper reducer 类
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 设置作业 mapper 阶段输入 key value 数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
// 设置作业 reducer 阶段输入 key value 数据类型 也就是程序最终输入数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// 配置作业的输出数据门路
FileInputFormat.addInputPath(job, new Path(args[0]));
// 配置作业的输入数据门路
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 提交作业并期待执行实现
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
// 配置文件对象
Configuration conf = new Configuration();
// 应用工具类 ToolRunner 提交程序
int status = ToolRunner.run(conf, new WordCountDriver(), args);
// 退出客户端程序 客户端退出状态码和 MapReduce 程序执行后果绑定
System.exit(status);
}
}
4. 执行后果