乐趣区

关于hadoop:Hadoop-入门笔记-二十-MapReduce-Counter计数器

一. 计数器概述

执行 MapReduce 程序的时候, 控制台输入信息中通常有上面所示片段内容:

输入信息中的外围词是 counters,中文叫做计数器,Hadoop 内置的计数器性能收集作业的次要统计信息,能够帮忙用户了解程序的运行状况,辅助用户诊断故障。

二. MapReduce 内置计数器

Hadoop 为每个 MapReduce 作业保护一些内置的计数器,这些计数器报告各种指标,例如和 MapReduce 程序执行中每个阶段输入输出的数据量相干的计数器,能够帮忙用户进行判断程序逻辑是否失效、正确。

Hadoop 内置计数器 依据性能进行分组 。每个组包含若干个不同的计数器,别离是:MapReduce 工作计数器 (Map-Reduce Framework)、 文件系统计数器 (File System Counters)、 作业计数器 (Job Counters)、 输出文件工作计数器 (File Input Format Counters)、 输入文件计数器(File Output Format Counters)。

须要留神的是,内置的计数器都是 MapReduce 程序中全局的计数器,跟 MapReduce 分布式运算没有关系,不是所谓的每个部分的统计信息。

1. Map-Reduce Framework Counters

计数器名称 阐明
MAP_INPUT_RECORDS 所有 mapper 已解决的输出记录数
MAP_OUTPUT_RECORDS 所有 mapper 产生的输入记录数
MAP_OUTPUT_BYTES 所有 mapper 产生的未经压缩的输入数据的字节数
MAP_OUTPUT_MATERIALIZED_BYTES mapper 输入后的确写到磁盘上字节数
COMBINE_INPUT_RECORDS 所有 combiner(如果有)已解决的输出记录数
COMBINE_OUTPUT_RECORDS 所有 combiner(如果有)已产生的输入记录数
REDUCE_INPUT_GROUPS 所有 reducer 已解决分组的个数
REDUCE_INPUT_RECORDS 所有 reducer 曾经解决的输出记录的个数。每当某个 reducer 的迭代器读一个值时,该计数器的值减少
REDUCE_OUTPUT_RECORDS 所有 reducer 输入记录数
REDUCE_SHUFFLE_BYTES Shuffle 时复制到 reducer 的字节数
SPILLED_RECORDS 所有 map 和 reduce 工作溢出到磁盘的记录数
CPU_MILLISECONDS 一个工作的总 CPU 工夫,以毫秒为单位,可由 /proc/cpuinfo 获取
PHYSICAL_MEMORY_BYTES 一个工作所用的物理内存,以字节数为单位,可由 /proc/meminfo 获取
VIRTUAL_MEMORY_BYTES 一个工作所用虚拟内存的字节数,由 /proc/meminfo 获取

2. File System Counters Counters

文件系统的计数器会针对不同的文件系统应用状况进行统计,比方 HDFS、本地文件系统:

计数器名称 阐明
BYTES_READ 程序从文件系统中读取的字节数
BYTES_WRITTEN 程序往文件系统中写入的字节数
READ_OPS 文件系统中进行的读操作的数量(例如,open 操作,filestatus 操作)
LARGE_READ_OPS 文件系统中进行的大规模读操作的数量
WRITE_OPS 文件系统中进行的写操作的数量(例如,create 操作,append 操作)

3. Job Counters

计数器名称 阐明
Launched map tasks 启动的 map 工作数,包含以“揣测执行”形式启动的工作
Launched reduce tasks 启动的 reduce 工作数,包含以“揣测执行”形式启动的工作
Data-local map tasks 与输人数据在同一节点上的 map 工作数
Total time spent by all maps in occupied slots (ms) 所有 map 工作在占用的插槽中破费的总工夫(毫秒)
Total time spent by all reduces in occupied slots (ms) 所有 reduce 工作在占用的插槽中破费的总工夫(毫秒)
Total time spent by all map tasks (ms) 所有 map task 破费的工夫
Total time spent by all reduce tasks (ms) 所有 reduce task 破费的工夫

4. File Input|Output Format Counters

计数器名称 阐明
读取的字节数(BYTES_READ) 由 map 工作通过 FilelnputFormat 读取的字节数
写的字节数(BYTES_WRITTEN) 由 map 工作(针对仅含 map 的作业)或者 reduce 工作通过 FileOutputFormat 写的字节数

3. MapReduce 自定义计数器

尽管 Hadoop 内置的计数器比拟全面,给作业运行过程的监控带了不便,然而对于一些业务中的特定要求 (统计过程中对某种状况产生进行计数统计)MapReduce 还是提供了用户编写自定义计数器的办法。最重要的是, 计数器是全局的统计,防止了用户本人保护全局变量的不利性。
自定义计数器的应用分为两步:

  1. 通过 context.getCounter 办法获取一个全局计数器,创立的时候须要指定计数器所属的组名和计数器的名字:
  2. 在程序中须要应用计数器的中央,调用 counter 提供的办法即可,比方 + 1 操作:

4. 案例:MapReduce 自定义计数器应用

1. 需要

针对一批文件进行词频统计,不知何种起因,在任意文件的任意中央都有可能插入单词”apple”, 现 要求应用计数器统计出数据中 apple 呈现的次数,便于用户执行程序时判断。

2. 代码实现

1. Mapper 类

public class WordCountMapper extends Mapper<LongWritable, Text,Text,LongWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 从程序上下文对象获取一个全局计数器:用于统计 apple 呈现的个数
        // 须要指定计数器组 和计数器的名字
        Counter counter = context.getCounter("itcast_counters", "apple Counter");

        String[] words = value.toString().split("\\s+");
        for (String word : words) {
            // 判断读取内容是否为 apple  如果是 计数器加 1
            if("apple".equals(word)){counter.increment(1);
            }
            context.write(new Text(word),new LongWritable(1));
        }
    }
}

2. Reducer 类

public class WordCountReducer extends Reducer<Text, LongWritable,Text,LongWritable> {
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        long count = 0;
        for (LongWritable value : values) {count +=value.get();
        }
        context.write(key,new LongWritable(count));
    }
}

3. 运行主类

public class WordCountDriver extends Configured implements Tool {

    @Override
    public int run(String[] args) throws Exception {
        // 创立作业实例
        Job job = Job.getInstance(getConf(), WordCountDriver.class.getSimpleName());
        // 设置作业驱动类
        job.setJarByClass(this.getClass());

        // 设置作业 mapper reducer 类
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);

        // 设置作业 mapper 阶段输入 key value 数据类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        // 设置作业 reducer 阶段输入 key value 数据类型 也就是程序最终输入数据类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        // 配置作业的输出数据门路
        FileInputFormat.addInputPath(job, new Path(args[0]));
        // 配置作业的输入数据门路
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 提交作业并期待执行实现
        return job.waitForCompletion(true) ? 0 : 1;

    }


    public static void main(String[] args) throws Exception {
        // 配置文件对象
        Configuration conf = new Configuration();
        // 应用工具类 ToolRunner 提交程序
        int status = ToolRunner.run(conf, new WordCountDriver(), args);
        // 退出客户端程序 客户端退出状态码和 MapReduce 程序执行后果绑定
        System.exit(status);
    }
}

4. 执行后果

退出移动版