关于大数据:图解大数据-实操案例MapReduce大数据统计

12次阅读

共计 5142 个字符,预计需要花费 13 分钟才能阅读完成。

作者:韩信子 @ShowMeAI
教程地址:http://www.showmeai.tech/tutorials/84
本文地址:http://www.showmeai.tech/article-detail/170
申明:版权所有,转载请分割平台与作者并注明出处

1. 引言

本教程 ShowMeAI 具体给大家解说 Hadoop 应用 Map-Reduce 进行数据统计的办法,对于 Hadoop 与 map-reduce 的基础知识,大家能够回顾 ShowMeAI 的基础知识解说篇分布式平台 Hadoop 与 Map-reduce 详解。

只管大部分人应用 Hadoop 都是用 java 实现,然而 Hadoop 程序能够用 python、C++、ruby 等实现。本示例教大家用 python 实现 MapReduce 实例统计输出文件的单词的词频。

  • 输出:文本文件
  • 输入:单词和词频信息,用 \t 隔开

2.Python 实现 MapReduce 代码

应用 python 实现 MapReduce 须要利用 Hadoop 流的 API,通过 STDIN(规范输出)、STDOUT(规范输入)在 Map 函数和 Reduce 函数之间传递数据。

咱们会利用 Python 的 sys.stdin 读取输出数据,并把咱们的输入传送给 sys.stdout。Hadoop 流将会实现其余的工作。

一个形象的 Hadoop 大数据处理流程如下图所示

对于本文提到的工作,咱们做一个更具体的拆解,整个 Hadoop Map-Reduce 过程如下图所示

从上图,咱们能够看到,咱们在当前任务中,须要外围通过代码实现的步骤是:

  • Map:产生词与次数标记键值对
  • Reduce:聚合同一个词 (key) 的值,实现统计

上面咱们来看看,通过 python 如何实现这里的 Map 和 Reduce 阶段。

2.1 Map 阶段:mapper.py

在这里,咱们假如 map 阶段应用到的 python 脚本寄存地址为 ShowMeAI/hadoop/code/mapper.py

#!/usr/bin/env python
import sys
for line in sys.stdin:
    line = line.strip()
    words = line.split()
    for word in words:
        print "%s\t%s" % (word, 1)

解释一下上述代码:

  • 文件从 STDIN 读取文件。
  • 把单词切开,并把单词和词频输入 STDOUT。
  • Map 脚本不会计算单词的总数,而是间接输入 1(Reduce 阶段会实现统计工作)。

为了使脚本可执行,减少 mapper.py 的可执行权限:

chmod +x ShowMeAI/hadoop/code/mapper.py

2.2 Reduce 阶段:reducer.py

在这里,咱们假如 reduce 阶段应用到的 python 脚本寄存地址为 ShowMeAI/hadoop/code/reducer.py

#!/usr/bin/env python
from operator import itemgetter
import sys

current_word = None
current_count = 0
word = None

for line in sys.stdin:
    line = line.strip()
    word, count = line.split('\t', 1)
    try:
        count = int(count)
    except ValueError:  #count 如果不是数字的话,间接疏忽掉
        continue
    if current_word == word:
        current_count += count
    else:
        if current_word:
            print "%s\t%s" % (current_word, current_count)
        current_count = count
        current_word = word

if word == current_word:  #不要遗记最初的输入
    print "%s\t%s" % (current_word, current_count)

文件会读取 mapper.py 的后果作为 reducer.py 的输出,并统计每个单词呈现的总的次数,把最终的后果输入到 STDOUT。

为了是脚本可执行,减少 reducer.py 的可执行权限

chmod +x ShowMeAI/hadoop/code/reducer.py

3. 本地测试 MapReduce 流程

通常咱们在把数据处理流程提交到集群进行运行之前,会本地做一个简略测试,咱们会借助 linux 的管道命令 (cat data | map | sort | reduce) 对数据流进行串接,验证咱们写的 mapper.pyreducer.py脚本性能是否失常。这种测试形式,能保障输入的最终后果是咱们冀望的。

测试的命令如下:

cd ShowMeAI/hadoop/code/
echo "foo foo quux labs foo bar quux" | python mapper.py
echo ``"foo foo quux labs foo bar quux"` `| python mapper.py | sort -k1, 1  | python reducer.py

其中的 sort 过程次要是实现以 key 为基准的排序,不便 reduce 阶段进行聚合统计。

4.Hadoop 集群运行 python 代码

4.1 数据筹备

咱们对以下三个文件进行词频统计,先依据下述门路下载:

  • Plain Text UTF-8 http://www.gutenberg.org/ebooks/4300.txt.utf-8
  • Plain Text UTF-8 http://www.gutenberg.org/ebooks/5000.txt.utf-8
  • Plain Text UTF-8 http://www.gutenberg.org/ebooks/20417.txt.utf-8

将文件搁置到 ShowMeAI/hadoop/datas/ 目录下。

4.2 执行程序

把本地的数据文件拷贝到分布式文件系统 HDFS 中。

bin/hadoop dfs -copyFromLocal ShowMeAI/hadoop/datas  hdfs_in

查看:

bin/hadoop dfs -ls

查看具体的文件:

bin/hadoop dfs -ls /user/showmeai/hdfs_in

执行 MapReduce job:

bin/hadoop jar contrib/streaming/hadoop-*streaming*.jar \
-file ShowMeAI/hadoop/code/mapper.py     -mapper ShowMeAI/hadoop/code/mapper.py \
-file ShowMeAI/hadoop/code/reducer.py    -reducer ShowMeAI/hadoop/code/reducer.py \
-input /user/showmeai/hdfs_in/*    -output /user/showmeai/hdfs_out

实例输入:

查看输入后果是否在目标目录 /user/showmeai/hdfs_out

bin/hadoop dfs -ls /user/showmeai/hdfs_out

查看后果:

bin/hadoop dfs -cat /user/showmeai/hdfs_out2/part-00000

输入:

5.Mapper 和 Reducer 代码优化

5.1 python 中的迭代器和生成器

咱们这里对 Map-Reduce 的代码优化次要基于迭代器和生成器,对这个局部不相熟的同学能够参考 ShowMeAI 的 python 局部内容 →《图解 python | 迭代器与生成器》。

5.2 优化 Mapper 和 Reducer 代码

mapper.py
#!/usr/bin/env python
import sys
def read_input(file):
    for line in file:
        yield line.split()

def main(separator='\t'):
    data = read_input(sys.stdin)
    for words in data:
        for word in words:
            print "%s%s%d" % (word, separator, 1)

if __name__ == "__main__":
    main()

reducer.py
#!/usr/bin/env python
from operator import itemgetter
from itertools import groupby
import sys

def read_mapper_output(file, separator = '\t'):
    for line in file:
        yield line.rstrip().split(separator, 1)

def main(separator = '\t'):
    data = read_mapper_output(sys.stdin, separator = separator)
    for current_word, group in groupby(data, itemgetter(0)):
        try:
            total_count = sum(int(count) for current_word, count in group)
            print "%s%s%d" % (current_word, separator, total_count)
        except valueError:
            pass

if __name__ == "__main__":
    main()

咱们对代码中的 groupby 做一个简略代码性能演示解说,如下:

from itertools import groupby
from operator import itemgetter

things = [('2009-09-02', 11),
          ('2009-09-02', 3),
          ('2009-09-03', 10),
          ('2009-09-03', 4),
          ('2009-09-03', 22),
          ('2009-09-06', 33)]

sss = groupby(things, itemgetter(0))
for key, items in sss:
    print key
    for subitem in items:
        print subitem
    print '-' * 20

后果:

2009-09-02
('2009-09-02', 11)
('2009-09-02', 3)
--------------------
2009-09-03
('2009-09-03', 10)
('2009-09-03', 4)
('2009-09-03', 22)
--------------------
2009-09-06
('2009-09-06', 33)
--------------------

代码中:

  • groupby(things, itemgetter(0)) 以第 0 列为排序指标
  • groupby(things, itemgetter(1)) 以第 1 列为排序指标
  • groupby(things) 以整行为排序指标

6. 参考资料

  • python 中的 split 函数中的参数问题 http://segmentfault.com/q/1010000000311861
  • Writing an Hadoop MapReduce Program in Python http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/
  • shell 的 sort 命令的 - k 参数 http://blog.chinaunix.net/uid-25513153-id-200481.html

ShowMeAI 相干文章举荐

  • 图解大数据 | 导论:大数据生态与利用
  • 图解大数据 | 分布式平台:Hadoop 与 Map-reduce 详解
  • 图解大数据 | 实操案例:Hadoop 零碎搭建与环境配置
  • 图解大数据 | 实操案例:利用 map-reduce 进行大数据统计
  • 图解大数据 | 海量数据库与查问:Hive 与 HBase 详解
  • 图解大数据 | 大数据分析开掘框架:Spark 初步
  • 图解大数据 | Spark 操作:基于 RDD 的大数据处理剖析
  • 图解大数据 | Spark 操作:基于 Dataframe 与 SQL 的大数据处理剖析
  • 图解大数据 | 综合案例:应用 spark 剖析美国新冠肺炎疫情数据
  • 图解大数据 | 综合案例:应用 Spark 剖析开掘批发交易数据
  • 图解大数据 | 综合案例:应用 Spark 剖析开掘音乐专辑数据
  • 图解大数据 | 流式数据处理:Spark Streaming
  • 图解大数据 | Spark 机器学习(上)- 工作流与特色工程
  • 图解大数据 | Spark 机器学习(下)- 建模与超参调优
  • 图解大数据 | Spark GraphFrames:基于图的数据分析开掘

ShowMeAI 系列教程举荐

  • 图解 Python 编程:从入门到精通系列教程
  • 图解数据分析:从入门到精通系列教程
  • 图解 AI 数学根底:从入门到精通系列教程
  • 图解大数据技术:从入门到精通系列教程

正文完
 0