乐趣区

使用Python操作Hadoop,Python-MapReduce

环境
环境使用:hadoop3.1,Python3.6,ubuntu18.04
Hadoop 是使用 Java 开发的,推荐使用 Java 操作 HDFS。
有时候也需要我们使用 Python 操作 HDFS。
本次我们来讨论如何使用 Python 操作 HDFS,进行文件上传,下载,查看文件夹,以及如何使用 Python 进行 MapReduce 编程。
使用 Python 操作 HDFS
首先需要安装和导入 hdfs 库,使用 pip install hdfs。
1. 连接并查看指定路径下的数据
from hdfs import *
client = Client(‘http://ip:port’) #2.X 版本 port 使用 50070 3.x 版本 port 使用 9870
client.list(‘/’) #查看 hdfs / 下的目录
2. 创建目录
client.makedirs(‘/test’)
client.makedirs(‘/test’,permision = 777) # permision 可以设置参数
3. 重命名、删除
client.rename(‘/test’,’123′) #将 /test 目录改名为 123
client.delete(‘/test’,True) #第二个参数表示递归删除
4. 下载
将 /test/log.txt 文件下载至 /home 目录下。
client.download(‘/test/log.txt’,’/home’)
5. 读取
with client.read(“/test/[PPT]Google Protocol Buffers.pdf”) as reader:   
print reader.read()
其他参数:

read(args, *kwds)            
hdfs_path:hdfs 路径            
offset:设置开始的字节位置 
l- ength:读取的长度(字节为单位)
buffer_size:用于传输数据的字节的缓冲区的大小。默认值设置在 HDFS 配置。
encoding:指定编码            
chunk_size:如果设置为正数,上下文管理器将返回一个发生器产生的每一 chunk_size 字节而不是一个类似文件的对象            
delimiter:如果设置,上下文管理器将返回一个发生器产生每次遇到分隔符。此参数要求指定的编码。
progress:回调函数来跟踪进度,为每一 chunk_size 字节(不可用,如果块大小不是指定)。它将传递两个参数,文件上传的路径和传输的字节数。称为一次与 - 1 作为第二个参数。

6. 上传数据
将文件上传至 hdfs 的 /test 下。
client.upload(‘/test’,’/home/test/a.log’)
Python-MapReduce
编写 mapper 代码,map.py:
import sys

for line in sys.stdin:
fields = line.strip().split()
for item in fields:
print(item + ‘ ‘ + ‘1’)
编写 reducer 代码,reduce.py:
import sys

result = {}
for line in sys.stdin:
kvs = line.strip().split(‘ ‘)
k = kvs[0]
v = kvs[1]
if k in result:
result[k]+=1
else:
result[k] = 1
for k,v in result.items():
print(“%s\t%s” %(k,v))
添加测试文本,test1.txt:
tale as old as time
true as it can be
beauty and the beast
本地测试执行 map 代码:
`cat test1.txt | python map.py` 结果:
tale 1
as 1
old 1
as 1
time 1
true 1
as 1
it 1
can 1
be 1
beauty 1
and 1
the 1
beast 1
本地测试执行 reduce 代码:
cat test1.txt | python map.py | sort -k1,1 | python reduce.py
执行结果:
and 1
be 1
old 1
beauty 1
true 1
it 1
beast 1
as 3
can 1
time 1
the 1
tale 1
在 Hadoop 平台执行 map-reduce 程序
本地测试完毕,编写脚本在 HDFS 中执行程序
脚本:run.sh(请根据本机环境修改)
HADOOP_CMD=”/app/hadoop-3.1.2/bin/hadoop”

STREAM_JAR_PATH=”/app/hadoop-3.1.2/share/hadoop/tools/lib/hadoop-streaming-3.1.2.jar”

INPUT_FILE_PATH_1=”/py/input/”

OUTPUT_PATH=”/output”

$HADOOP_CMD fs -rmr-skipTrash $OUTPUT_PATH

# Step 1.

$HADOOP_CMD jar $STREAM_JAR_PATH \
-input $INPUT_FILE_PATH_1 \
-output $OUTPUT_PATH \
-mapper “python map.py” \
-reducer “python reduce.py” \
-file ./map.py \
-file ./reduce.py \
添加执行权限 chmod a+x run.sh;执行测试:bash run.sh,查看结果:

练习
1. 文件合并去重
输入文件 file1 的样例如下:20150101 x20150102 y20150103 x20150104 y20150105 z20150106 x
输入文件 file2 的样例如下:20150101 y20150102 y20150103 x20150104 z20150105 y
根据输入文件 file1 和 file2 合并得到的输出文件 file3 的样例如下:
20150101 x20150101 y20150102 y20150103 x20150104 y20150104 z20150105 y20150105 z20150106 x
对于两个输入文件,即文件 file1 和文件 file2,请编写 MapReduce 程序,对两个文件进行合并,并剔除其中重复的内容,得到一个新的输出文件 file3。为了完成文件合并去重的任务,你编写的程序要能将含有重复内容的不同文件合并到一个没有重复的整合文件,规则如下:

第一列按学号排列;
学号相同,按 x,y,z 排列。

2. 挖掘父子关系
输入文件内容如下:child parentSteven LucySteven JackJone LucyJone JackLucy MaryLucy FrankJack AliceJack JesseDavid AliceDavid JessePhilip DavidPhilip AlmaMark DavidMark Alma
输出文件内容如下:
grandchild grandparentSteven AliceSteven JesseJone AliceJone JesseSteven MarySteven FrankJone MaryJone FrankPhilip AlicePhilip JesseMark AliceMark Jesse
你编写的程序要能挖掘父子辈关系,给出祖孙辈关系的表格。规则如下:

孙子在前,祖父在后
孙子相同,祖父的名字按照 A - Z 排列

退出移动版