一. 计数器概述
执行MapReduce程序的时候,控制台输入信息中通常有上面所示片段内容:
输入信息中的外围词是counters,中文叫做计数器,Hadoop内置的计数器性能收集作业的次要统计信息,能够帮忙用户了解程序的运行状况,辅助用户诊断故障。
二. MapReduce内置计数器
Hadoop为每个MapReduce作业保护一些内置的计数器,这些计数器报告各种指标,例如和MapReduce程序执行中每个阶段输入输出的数据量相干的计数器,能够帮忙用户进行判断程序逻辑是否失效、正确。
Hadoop内置计数器依据性能进行分组。每个组包含若干个不同的计数器,别离是:MapReduce工作计数器(Map-Reduce Framework)、文件系统计数器(File System Counters)、作业计数器(Job Counters)、输出文件工作计数器(File Input Format Counters)、输入文件计数器(File Output Format Counters)。
须要留神的是,内置的计数器都是MapReduce程序中全局的计数器,跟MapReduce分布式运算没有关系,不是所谓的每个部分的统计信息。
1. Map-Reduce Framework Counters
计数器名称 | 阐明 |
---|---|
MAP_INPUT_RECORDS | 所有mapper已解决的输出记录数 |
MAP_OUTPUT_RECORDS | 所有mapper产生的输入记录数 |
MAP_OUTPUT_BYTES | 所有mapper产生的未经压缩的输入数据的字节数 |
MAP_OUTPUT_MATERIALIZED_BYTES | mapper输入后的确写到磁盘上字节数 |
COMBINE_INPUT_RECORDS | 所有combiner(如果有)已解决的输出记录数 |
COMBINE_OUTPUT_RECORDS | 所有combiner(如果有)已产生的输入记录数 |
REDUCE_INPUT_GROUPS | 所有reducer已解决分组的个数 |
REDUCE_INPUT_RECORDS | 所有reducer曾经解决的输出记录的个数。每当某个reducer的迭代器读一个值时,该计数器的值减少 |
REDUCE_OUTPUT_RECORDS | 所有reducer输入记录数 |
REDUCE_SHUFFLE_BYTES | Shuffle时复制到reducer的字节数 |
SPILLED_RECORDS | 所有map和reduce工作溢出到磁盘的记录数 |
CPU_MILLISECONDS | 一个工作的总CPU工夫,以毫秒为单位,可由/proc/cpuinfo获取 |
PHYSICAL_MEMORY_BYTES | 一个工作所用的物理内存,以字节数为单位,可由/proc/meminfo获取 |
VIRTUAL_MEMORY_BYTES | 一个工作所用虚拟内存的字节数,由/proc/meminfo获取 |
2. File System Counters Counters
文件系统的计数器会针对不同的文件系统应用状况进行统计,比方HDFS、本地文件系统:
计数器名称 | 阐明 |
---|---|
BYTES_READ | 程序从文件系统中读取的字节数 |
BYTES_WRITTEN | 程序往文件系统中写入的字节数 |
READ_OPS | 文件系统中进行的读操作的数量(例如,open操作,filestatus操作) |
LARGE_READ_OPS | 文件系统中进行的大规模读操作的数量 |
WRITE_OPS | 文件系统中进行的写操作的数量(例如,create操作,append操作) |
3. Job Counters
计数器名称 | 阐明 |
---|---|
Launched map tasks | 启动的map工作数,包含以“揣测执行”形式启动的工作 |
Launched reduce tasks | 启动的reduce工作数,包含以“揣测执行”形式启动的工作 |
Data-local map tasks | 与输人数据在同一节点上的map工作数 |
Total time spent by all maps in occupied slots (ms) | 所有map工作在占用的插槽中破费的总工夫(毫秒) |
Total time spent by all reduces in occupied slots (ms) | 所有reduce工作在占用的插槽中破费的总工夫(毫秒) |
Total time spent by all map tasks (ms) | 所有map task破费的工夫 |
Total time spent by all reduce tasks (ms) | 所有reduce task破费的工夫 |
4. File Input|Output Format Counters
计数器名称 | 阐明 |
---|---|
读取的字节数(BYTES_READ) | 由map工作通过FilelnputFormat读取的字节数 |
写的字节数(BYTES_WRITTEN) | 由map工作(针对仅含map的作业)或者reduce工作通过FileOutputFormat写的字节数 |
3. MapReduce 自定义计数器
尽管Hadoop内置的计数器比拟全面,给作业运行过程的监控带了不便,然而对于一些业务中的特定要求(统计过程中对某种状况产生进行计数统计)MapReduce还是提供了用户编写自定义计数器的办法。最重要的是,计数器是全局的统计,防止了用户本人保护全局变量的不利性。
自定义计数器的应用分为两步:
- 通过context.getCounter办法获取一个全局计数器,创立的时候须要指定计数器所属的组名和计数器的名字:
- 在程序中须要应用计数器的中央,调用counter提供的办法即可,比方+1操作:
4. 案例:MapReduce自定义计数器应用
1. 需要
针对一批文件进行词频统计,不知何种起因,在任意文件的任意中央都有可能插入单词”apple”,现要求应用计数器统计出数据中apple呈现的次数,便于用户执行程序时判断。
2. 代码实现
1. Mapper类
public class WordCountMapper extends Mapper<LongWritable, Text,Text,LongWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //从程序上下文对象获取一个全局计数器:用于统计apple呈现的个数 //须要指定计数器组 和计数器的名字 Counter counter = context.getCounter("itcast_counters", "apple Counter"); String[] words = value.toString().split("\\s+"); for (String word : words) { //判断读取内容是否为apple 如果是 计数器加1 if("apple".equals(word)){ counter.increment(1); } context.write(new Text(word),new LongWritable(1)); } }}
2. Reducer类
public class WordCountReducer extends Reducer<Text, LongWritable,Text,LongWritable> { @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long count = 0; for (LongWritable value : values) { count +=value.get(); } context.write(key,new LongWritable(count)); }}
3. 运行主类
public class WordCountDriver extends Configured implements Tool { @Override public int run(String[] args) throws Exception { // 创立作业实例 Job job = Job.getInstance(getConf(), WordCountDriver.class.getSimpleName()); // 设置作业驱动类 job.setJarByClass(this.getClass()); // 设置作业mapper reducer类 job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); // 设置作业mapper阶段输入key value数据类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); //设置作业reducer阶段输入key value数据类型 也就是程序最终输入数据类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); // 配置作业的输出数据门路 FileInputFormat.addInputPath(job, new Path(args[0])); // 配置作业的输入数据门路 FileOutputFormat.setOutputPath(job, new Path(args[1])); // 提交作业并期待执行实现 return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { //配置文件对象 Configuration conf = new Configuration(); //应用工具类ToolRunner提交程序 int status = ToolRunner.run(conf, new WordCountDriver(), args); //退出客户端程序 客户端退出状态码和MapReduce程序执行后果绑定 System.exit(status); }}