作者:韩信子@ShowMeAI
教程地址:http://www.showmeai.tech/tutorials/84
本文地址:http://www.showmeai.tech/article-detail/170
申明:版权所有,转载请分割平台与作者并注明出处
1.引言
本教程ShowMeAI具体给大家解说Hadoop应用Map-Reduce进行数据统计的办法,对于Hadoop与map-reduce的基础知识,大家能够回顾ShowMeAI的基础知识解说篇分布式平台Hadoop与Map-reduce详解。
只管大部分人应用Hadoop都是用java实现,然而Hadoop程序能够用python、C++、ruby等实现。本示例教大家用python实现MapReduce实例统计输出文件的单词的词频。
- 输出:文本文件
- 输入:单词和词频信息,用
\t
隔开
2.Python实现 MapReduce 代码
应用python实现MapReduce须要利用Hadoop流的API,通过STDIN(规范输出)、STDOUT(规范输入)在Map函数和Reduce函数之间传递数据。
咱们会利用Python的sys.stdin读取输出数据,并把咱们的输入传送给 sys.stdout
。Hadoop流将会实现其余的工作。
一个形象的Hadoop大数据处理流程如下图所示:
对于本文提到的工作,咱们做一个更具体的拆解,整个Hadoop Map-Reduce过程如下图所示:
从上图,咱们能够看到,咱们在当前任务中,须要外围通过代码实现的步骤是:
- Map:产生词与次数标记键值对
- Reduce:聚合同一个词(key)的值,实现统计
上面咱们来看看,通过python如何实现这里的 Map 和 Reduce 阶段。
2.1 Map阶段:mapper.py
在这里,咱们假如map阶段应用到的python脚本寄存地址为 ShowMeAI/hadoop/code/mapper.py
#!/usr/bin/env pythonimport sysfor line in sys.stdin: line = line.strip() words = line.split() for word in words: print "%s\t%s" % (word, 1)
解释一下上述代码:
- 文件从STDIN读取文件。
- 把单词切开,并把单词和词频输入STDOUT。
- Map脚本不会计算单词的总数,而是间接输入 1(Reduce阶段会实现统计工作)。
为了使脚本可执行,减少 mapper.py
的可执行权限:
chmod +x ShowMeAI/hadoop/code/mapper.py
2.2 Reduce阶段:reducer.py
在这里,咱们假如reduce阶段应用到的python脚本寄存地址为 ShowMeAI/hadoop/code/reducer.py
:
#!/usr/bin/env pythonfrom operator import itemgetterimport syscurrent_word = Nonecurrent_count = 0word = Nonefor line in sys.stdin: line = line.strip() word, count = line.split('\t', 1) try: count = int(count) except ValueError: #count如果不是数字的话,间接疏忽掉 continue if current_word == word: current_count += count else: if current_word: print "%s\t%s" % (current_word, current_count) current_count = count current_word = wordif word == current_word: #不要遗记最初的输入 print "%s\t%s" % (current_word, current_count)
文件会读取 mapper.py
的后果作为 reducer.py
的输出,并统计每个单词呈现的总的次数,把最终的后果输入到STDOUT。
为了是脚本可执行,减少 reducer.py
的可执行权限
chmod +x ShowMeAI/hadoop/code/reducer.py
3.本地测试MapReduce流程
通常咱们在把数据处理流程提交到集群进行运行之前,会本地做一个简略测试,咱们会借助linux的管道命令 (cat data | map | sort | reduce)
对数据流进行串接,验证咱们写的 mapper.py
和 reducer.py
脚本性能是否失常。这种测试形式,能保障输入的最终后果是咱们冀望的。
测试的命令如下:
cd ShowMeAI/hadoop/code/echo "foo foo quux labs foo bar quux" | python mapper.pyecho ``"foo foo quux labs foo bar quux"` `| python mapper.py | sort -k1, 1 | python reducer.py
其中的sort过程次要是实现以key为基准的排序,不便reduce阶段进行聚合统计。
4.Hadoop集群运行python代码
4.1 数据筹备
咱们对以下三个文件进行词频统计,先依据下述门路下载:
- Plain Text UTF-8 http://www.gutenberg.org/ebooks/4300.txt.utf-8
- Plain Text UTF-8 http://www.gutenberg.org/ebooks/5000.txt.utf-8
- Plain Text UTF-8 http://www.gutenberg.org/ebooks/20417.txt.utf-8
将文件搁置到 ShowMeAI/hadoop/datas/
目录下。
4.2 执行程序
把本地的数据文件拷贝到分布式文件系统HDFS中。
bin/hadoop dfs -copyFromLocal ShowMeAI/hadoop/datas hdfs_in
查看:
bin/hadoop dfs -ls
查看具体的文件:
bin/hadoop dfs -ls /user/showmeai/hdfs_in
执行MapReduce job:
bin/hadoop jar contrib/streaming/hadoop-*streaming*.jar \-file ShowMeAI/hadoop/code/mapper.py -mapper ShowMeAI/hadoop/code/mapper.py \-file ShowMeAI/hadoop/code/reducer.py -reducer ShowMeAI/hadoop/code/reducer.py \-input /user/showmeai/hdfs_in/* -output /user/showmeai/hdfs_out
实例输入:
查看输入后果是否在目标目录 /user/showmeai/hdfs_out
:
bin/hadoop dfs -ls /user/showmeai/hdfs_out
查看后果:
bin/hadoop dfs -cat /user/showmeai/hdfs_out2/part-00000
输入:
5.Mapper 和 Reducer代码优化
5.1 python中的迭代器和生成器
咱们这里对Map-Reduce的代码优化次要基于迭代器和生成器,对这个局部不相熟的同学能够参考ShowMeAI的python局部内容 → 《图解python | 迭代器与生成器》 。
5.2 优化Mapper 和 Reducer代码
mapper.py#!/usr/bin/env pythonimport sysdef read_input(file): for line in file: yield line.split()def main(separator='\t'): data = read_input(sys.stdin) for words in data: for word in words: print "%s%s%d" % (word, separator, 1)if __name__ == "__main__": main()reducer.py#!/usr/bin/env pythonfrom operator import itemgetterfrom itertools import groupbyimport sysdef read_mapper_output(file, separator = '\t'): for line in file: yield line.rstrip().split(separator, 1)def main(separator = '\t'): data = read_mapper_output(sys.stdin, separator = separator) for current_word, group in groupby(data, itemgetter(0)): try: total_count = sum(int(count) for current_word, count in group) print "%s%s%d" % (current_word, separator, total_count) except valueError: passif __name__ == "__main__": main()
咱们对代码中的groupby做一个简略代码性能演示解说,如下:
from itertools import groupbyfrom operator import itemgetterthings = [('2009-09-02', 11), ('2009-09-02', 3), ('2009-09-03', 10), ('2009-09-03', 4), ('2009-09-03', 22), ('2009-09-06', 33)]sss = groupby(things, itemgetter(0))for key, items in sss: print key for subitem in items: print subitem print '-' * 20
后果:
2009-09-02('2009-09-02', 11)('2009-09-02', 3)--------------------2009-09-03('2009-09-03', 10)('2009-09-03', 4)('2009-09-03', 22)--------------------2009-09-06('2009-09-06', 33)--------------------
代码中:
groupby(things, itemgetter(0))
以第0列为排序指标groupby(things, itemgetter(1))
以第1列为排序指标groupby(things)
以整行为排序指标
6.参考资料
- python中的split函数中的参数问题 http://segmentfault.com/q/1010000000311861
- Writing an Hadoop MapReduce Program in Python http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/
- shell的sort命令的-k参数 http://blog.chinaunix.net/uid-25513153-id-200481.html
ShowMeAI相干文章举荐
- 图解大数据 | 导论:大数据生态与利用
- 图解大数据 | 分布式平台:Hadoop与Map-reduce详解
- 图解大数据 | 实操案例:Hadoop零碎搭建与环境配置
- 图解大数据 | 实操案例:利用map-reduce进行大数据统计
- 图解大数据 | 海量数据库与查问:Hive与HBase详解
- 图解大数据 | 大数据分析开掘框架:Spark初步
- 图解大数据 | Spark操作:基于RDD的大数据处理剖析
- 图解大数据 | Spark操作:基于Dataframe与SQL的大数据处理剖析
- 图解大数据 | 综合案例:应用spark剖析美国新冠肺炎疫情数据
- 图解大数据 | 综合案例:应用Spark剖析开掘批发交易数据
- 图解大数据 | 综合案例:应用Spark剖析开掘音乐专辑数据
- 图解大数据 | 流式数据处理:Spark Streaming
- 图解大数据 | Spark机器学习(上)-工作流与特色工程
- 图解大数据 | Spark机器学习(下)-建模与超参调优
- 图解大数据 | Spark GraphFrames:基于图的数据分析开掘
ShowMeAI系列教程举荐
- 图解Python编程:从入门到精通系列教程
- 图解数据分析:从入门到精通系列教程
- 图解AI数学根底:从入门到精通系列教程
- 图解大数据技术:从入门到精通系列教程