一. 计数器概述

执行MapReduce程序的时候,控制台输入信息中通常有上面所示片段内容:

输入信息中的外围词是counters,中文叫做计数器,Hadoop内置的计数器性能收集作业的次要统计信息,能够帮忙用户了解程序的运行状况,辅助用户诊断故障。

二. MapReduce内置计数器

Hadoop为每个MapReduce作业保护一些内置的计数器,这些计数器报告各种指标,例如和MapReduce程序执行中每个阶段输入输出的数据量相干的计数器,能够帮忙用户进行判断程序逻辑是否失效、正确。

Hadoop内置计数器依据性能进行分组。每个组包含若干个不同的计数器,别离是:MapReduce工作计数器(Map-Reduce Framework)、文件系统计数器(File System Counters)、作业计数器(Job Counters)、输出文件工作计数器(File Input Format Counters)、输入文件计数器(File Output Format Counters)。

须要留神的是,内置的计数器都是MapReduce程序中全局的计数器,跟MapReduce分布式运算没有关系,不是所谓的每个部分的统计信息。

1. Map-Reduce Framework Counters

计数器名称阐明
MAP_INPUT_RECORDS所有mapper已解决的输出记录数
MAP_OUTPUT_RECORDS所有mapper产生的输入记录数
MAP_OUTPUT_BYTES所有mapper产生的未经压缩的输入数据的字节数
MAP_OUTPUT_MATERIALIZED_BYTESmapper输入后的确写到磁盘上字节数
COMBINE_INPUT_RECORDS所有combiner(如果有)已解决的输出记录数
COMBINE_OUTPUT_RECORDS所有combiner(如果有)已产生的输入记录数
REDUCE_INPUT_GROUPS所有reducer已解决分组的个数
REDUCE_INPUT_RECORDS所有reducer曾经解决的输出记录的个数。每当某个reducer的迭代器读一个值时,该计数器的值减少
REDUCE_OUTPUT_RECORDS所有reducer输入记录数
REDUCE_SHUFFLE_BYTESShuffle时复制到reducer的字节数
SPILLED_RECORDS所有map和reduce工作溢出到磁盘的记录数
CPU_MILLISECONDS一个工作的总CPU工夫,以毫秒为单位,可由/proc/cpuinfo获取
PHYSICAL_MEMORY_BYTES一个工作所用的物理内存,以字节数为单位,可由/proc/meminfo获取
VIRTUAL_MEMORY_BYTES一个工作所用虚拟内存的字节数,由/proc/meminfo获取

2. File System Counters Counters

文件系统的计数器会针对不同的文件系统应用状况进行统计,比方HDFS、本地文件系统:

计数器名称阐明
BYTES_READ程序从文件系统中读取的字节数
BYTES_WRITTEN程序往文件系统中写入的字节数
READ_OPS文件系统中进行的读操作的数量(例如,open操作,filestatus操作)
LARGE_READ_OPS文件系统中进行的大规模读操作的数量
WRITE_OPS文件系统中进行的写操作的数量(例如,create操作,append操作)

3. Job Counters

计数器名称阐明
Launched map tasks启动的map工作数,包含以“揣测执行”形式启动的工作
Launched reduce tasks启动的reduce工作数,包含以“揣测执行”形式启动的工作
Data-local map tasks与输人数据在同一节点上的map工作数
Total time spent by all maps in occupied slots (ms)所有map工作在占用的插槽中破费的总工夫(毫秒)
Total time spent by all reduces in occupied slots (ms)所有reduce工作在占用的插槽中破费的总工夫(毫秒)
Total time spent by all map tasks (ms)所有map task破费的工夫
Total time spent by all reduce tasks (ms)所有reduce task破费的工夫

4. File Input|Output Format Counters

计数器名称阐明
读取的字节数(BYTES_READ)由map工作通过FilelnputFormat读取的字节数
写的字节数(BYTES_WRITTEN)由map工作(针对仅含map的作业)或者reduce工作通过FileOutputFormat写的字节数

3. MapReduce 自定义计数器

尽管Hadoop内置的计数器比拟全面,给作业运行过程的监控带了不便,然而对于一些业务中的特定要求(统计过程中对某种状况产生进行计数统计)MapReduce还是提供了用户编写自定义计数器的办法。最重要的是,计数器是全局的统计,防止了用户本人保护全局变量的不利性。
自定义计数器的应用分为两步:

  1. 通过context.getCounter办法获取一个全局计数器,创立的时候须要指定计数器所属的组名和计数器的名字:
  2. 在程序中须要应用计数器的中央,调用counter提供的办法即可,比方+1操作:

4. 案例:MapReduce自定义计数器应用

1. 需要

针对一批文件进行词频统计,不知何种起因,在任意文件的任意中央都有可能插入单词”apple”,现要求应用计数器统计出数据中apple呈现的次数,便于用户执行程序时判断。

2. 代码实现

1. Mapper类

public class WordCountMapper extends Mapper<LongWritable, Text,Text,LongWritable> {    @Override    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {        //从程序上下文对象获取一个全局计数器:用于统计apple呈现的个数        //须要指定计数器组 和计数器的名字        Counter counter = context.getCounter("itcast_counters", "apple Counter");        String[] words = value.toString().split("\\s+");        for (String word : words) {            //判断读取内容是否为apple  如果是 计数器加1            if("apple".equals(word)){                counter.increment(1);            }            context.write(new Text(word),new LongWritable(1));        }    }}

2. Reducer类

public class WordCountReducer extends Reducer<Text, LongWritable,Text,LongWritable> {    @Override    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {        long count = 0;        for (LongWritable value : values) {            count +=value.get();        }        context.write(key,new LongWritable(count));    }}

3. 运行主类

public class WordCountDriver extends Configured implements Tool {    @Override    public int run(String[] args) throws Exception {        // 创立作业实例        Job job = Job.getInstance(getConf(), WordCountDriver.class.getSimpleName());        // 设置作业驱动类        job.setJarByClass(this.getClass());        // 设置作业mapper reducer类        job.setMapperClass(WordCountMapper.class);        job.setReducerClass(WordCountReducer.class);        // 设置作业mapper阶段输入key value数据类型        job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(LongWritable.class);        //设置作业reducer阶段输入key value数据类型 也就是程序最终输入数据类型        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(LongWritable.class);        // 配置作业的输出数据门路        FileInputFormat.addInputPath(job, new Path(args[0]));        // 配置作业的输入数据门路        FileOutputFormat.setOutputPath(job, new Path(args[1]));        // 提交作业并期待执行实现        return job.waitForCompletion(true) ? 0 : 1;    }    public static void main(String[] args) throws Exception {        //配置文件对象        Configuration conf = new Configuration();        //应用工具类ToolRunner提交程序        int status = ToolRunner.run(conf, new WordCountDriver(), args);        //退出客户端程序 客户端退出状态码和MapReduce程序执行后果绑定        System.exit(status);    }}

4. 执行后果