作者 | 白松
【注:本文为原创,引用转载需与数澜联系。】
Giraph 介绍:
Apache Giraph is an iterative graph processing system built for high scalability. For example, it is currently used at Facebook to analyze the social graph formed by users and their connections. Giraph originated as the open-source counterpart to Pregel, the graph processing architecture developed at Google and described in a 2010 paper. Both systems are inspired by the Bulk Synchronous Parallelmodel of distributed computation introduced by Leslie Valiant. Giraph adds several features beyond the basic Pregel model, including master computation, sharded aggregators, edge-oriented input, out-of-core computation, and more. With a steady development cycle and a growing community of users worldwide, Giraph is a natural choice for unleashing the potential of structured datasets at a massive scale.
原理:
Giraph 基于 Hadoop 而建,将 MapReduce 中 Mapper 进行封装,未使用 reducer。在 Mapper 中进行多次迭代,每次迭代等价于 BSP 模型中的 SuperStep。一个 Hadoop Job 等价于一次 BSP 作业。基础结构如下图所示。
每部分的功能如下:
1. ZooKeeper: responsible for computation state
–partition/worker mapping
–global state: #superstep
–checkpoint paths, aggregator values, statistics
2. Master: responsible for coordination
–assigns partitions to workers
–coordinates synchronization
–requests checkpoints
–aggregates aggregator values
–collects health statuses
3. Worker: responsible for vertices
–invokes active vertices compute() function
–sends, receives and assigns messages
–computes local aggregation values
说明
(1)实验环境
三台服务器:test165、test62、test63。test165 同时是 JobTracker 和 TaskTracker.
测试例子:官网自带的 SSSP 程序,数据是自己模拟生成。
运行命令:Hadoop jar giraph-examples-1.0.0-for-hadoop-0.20.203.0-jar-with-dependencies.jar org.apache.giraph.GiraphRunner org.apache.giraph.examples.SimpleShortestPathsVertex -vif org.apache.giraph.io.formats.JsonLongDoubleFloatDoubleVertexInputFormat -vip /user/giraph/SSSP -of org.apache.giraph.io.formats.IdWithValueTextOutputFormat -op /user/giraph/output-sssp-debug-7 -w 5
(2)为节约空间,下文中所有代码均为核心代码片段。
(3)core-site.xml 中 hadoop.tmp.dir 的路径设为:/home/hadoop/hadooptmp
(4)写本文是多次调试完成的,故文中的 JobID 不一样,读者可理解为同一 JobID.
(5)后续文章也遵循上述规则。
org.apache.giraph.graph.GraphMapper 类
Giraph 中自定义 org.apache.giraph.graph.GraphMapper 类来继承 Hadoop 中的 org.apache.hadoop.mapreduce.Mapper<Object,Object,Object,Object> 类,覆写了 setup()、map()、cleanup() 和 run() 方法。GraphMapper 类的说明如下:
“This mapper that will execute the BSP graph tasks alloted to this worker. All tasks will be performed by calling the GraphTaskManager object managed by this GraphMapper wrapper classs. Since this mapper will not be passing data by key-value pairs through the MR framework, the Mapper parameter types are irrelevant, and set to Object type.”
BSP 的运算逻辑被封装在 GraphMapper 类中,其拥有一 GraphTaskManager 对象,用来管理 Job 的 tasks。每个 GraphMapper 对象都相当于 BSP 中的一个计算节点(compute node)。
在 GraphMapper 类中的 setup() 方法中,创建 GraphTaskManager 对象并调用其 setup() 方法进行一些初始化工作。如下:
map() 方法为空,因为所有操作都被封装在了 GraphTaskManager 类中。在 run() 方法中调用 GraphTaskManager 对象的 execute() 方法进行 BSP 迭代计算。
org.apache.giraph.graph.GraphMapper 类
功能:The Giraph-specific business logic for a single BSP compute node in whatever underlying type of cluster our Giraph job will run on. Owning object will provide the glue into the underlying cluster framework and will call this object to perform Giraph work.
下面讲述 setup() 方法,代码如下:
依次介绍每个方法的功能:
1、locateZookeeperClasspath(zkPathList)
找到 ZK jar 的本地副本,其路径为:/home/hadoop/hadooptmp/mapred/local/taskTracker/root/jobcache/job_201403270456_0001/jars/job.jar , 用于启动 ZooKeeper 服务。
2、startZooKeeperManager(),初始化和配置 ZooKeeperManager。
定义如下:
3、org.apache.giraph.zk.ZooKeeperManager 类
功能:Manages the election of ZooKeeper servers, starting/stopping the services, etc.
ZooKeeperManager 类的 setup() 定义如下:
createCandidateStamp() 方法在 HDFS 上 的_bsp/_defaultZkManagerDir/job_201403301409_0006/_task 目录下为每个 task 创建一个文件,文件内容为空。文件名为本机的 Hostname+taskPartition,如下截图:
运行时指定了 5 个 workers(-w 5),再加上一个 master,所有上面有 6 个 task。
getZooKeeperServerList() 方法中,taskPartition 为 0 的 task 会调用 createZooKeeperServerList() 方法创建 ZooKeeper server List,也是创建一个空文件,通过文件名来描述 Zookeeper servers。
首先获取 taskDirectory(_bsp/_defaultZkManagerDir/job_201403301409_0006/_task)目录下文件,如果当前目录下有文件,则把文件名(Hostname+taskPartition)中的 Hostname 和 taskPartition 存入到 hostNameTaskMap 中。扫描 taskDirectory 目录后,若 hostNameTaskMap 的 size 大于 serverCount(等于 GiraphConstants.java 中的 ZOOKEEPER_SERVER_COUNT 变量,定义为 1),就停止外层的循环。外层循环的目的是:因为 taskDirectory 下的文件每个 task 文件时多个 task 在分布式条件下创建的,有可能 task 0 在此创建 server List 时,别的 task 还没有生成后 task 文件。Giraph 默认为每个 Job 启动一个 ZooKeeper 服务,也就是说只有一个 task 会启动 ZooKeeper 服务。
经过多次测试,task 0 总是被选为 ZooKeeper Server,因为在同一进程中,扫描 taskDirectory 时,只有它对应的 task 文件(其他 task 的文件还没有生成好),然后退出 for 循环,发现 hostNameTaskMap 的 size 等于 1,直接退出 while 循环。那么此处就选了 test162 0。
最后,创建了文件:_bsp/_defaultZkManagerDir/job_201403301409_0006/zkServerList_test162 0
onlineZooKeeperServers(),根据 zkServerList_test162 0 文件,Task 0 先生成 zoo.cfg 配置文件,使用 ProcessBuilder 来创建 ZooKeeper 服务进程,然后 Task 0 再通过 socket 连接到 ZooKeeper 服务进程上,最后创建文件 _bsp/_defaultZkManagerDir/job_201403301409_0006/_zkServer/test162 0 来标记 master 任务已完成。worker 一直在进行循环检测 master 是否生成好 _bsp/_defaultZkManagerDir/job_201403301409_0006/_zkServer/test162 0, 即 worker 等待直到 master 上的 ZooKeeper 服务已经启动完成。
启动 ZooKeeper 服务的命令如下:
4、determineGraphFunctions()。
GraphTaskManager 类中有 CentralizedServiceMaster 对象和 CentralizedServiceWorker 对象,分别对应于 master 和 worker。每个 BSP compute node 扮演的角色判定逻辑如下:
a) If not split master, everyone does the everything and/or running ZooKeeper.
b) If split master/worker, masters also run ZooKeeper
c) If split master/worker == true and giraph.zkList is set, the master will not instantiate a ZK instance, but will assume a quorum is already active on the cluster for Giraph to use.
该判定在 GraphTaskManager 类中的静态方法 determineGraphFunctions() 中定义,片段代码如下:
默认的,Giraph 会区分 master 和 worker。会在 master 上面启动 zookeeper 服务,不会在 worker 上启动 ZooKeeper 服务。那么 Task 0 就是 master+ZooKeeper,其他 Tasks 就是 workers