乐趣区

Giraph-源码分析五-加载数据同步总结

作者 | 白松

关于 Giraph 共有九个章节,本文第五个章节。

环境:在单机上(机器名:giraphx)启动了 2 个 workers。

输入:SSSP 文件夹,里面有 1.txt 和 2.txt 两个文件。

1、在 Worker 向 Master 汇报健康状况后,就开始等待 Master 创建 InputSplit。

方法:每个 Worker 通过检某个 Znode 节点是否存在,同时在此 Znode 上设置 Watcher。若不存在,就通过 BSPEvent 的 waitForever()方法释放当前线程的锁,陷入等待状态。一直等到 master 创建该 znode。此步骤位于 BSPServiceWorker 类中的 startSuperStep 方法中,等待代码如下:


2、Master 调用 createInputSplits()方法创建 InputSplit。

在 generateInputSplits()方法中,根据用户设定的 VertexInputFormat 获得 InputSplits。代码如下:

其中 minSplitCountHint 为创建 split 的最小数目,其值如下:

minSplitCountHint = Workers 数目 * NUM_INPUT_THREADS

NUM_INPUT_THREADS 表示 每个 Input split loading 的线程数目,默认值为 1。经查证,在 TextVertexValueInputFormat 抽象类中的 getSplits()方法中的 minSplitCountHint 参数被忽略。用户输入的 VertexInputFormat 继承 TextVertexValueInputFormat 抽象类。

如果得到的 splits.size 小于 minSplitCountHint,那么有些 worker 就没被用上。

得到 split 信息后,要把这些信息写到 Zookeeper 上,以便其他 workers 访问。上面得到的 split 信息如下:

[hdfs://giraphx:9000/user/root/SSSP/1.txt:0+66, hdfs://giraphx:9000/user/root/SSSP/2.txt:0+46]

遍历 splits List,为每个 split 创建一个 Znode,值为 split 的信息。如为 split- 0 创建 Znode,值为:hdfs://giraphx:9000/user/root/SSSP/1.txt:0+66

/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir/0

为 split- 1 创建 znode(如下),值为:hdfs://giraphx:9000/user/root/SSSP/2.txt:0+46

/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir/1

最后创建 znode:/_hadoopBsp/job_201404102333_0013/_vertexInputSplitsAllReady 表示所有 splits 都创建好了。

3、Master 根据 splits 创建 Partitions。首先确定 partition 的数目。

BSPServiceMaster 中的 MasterGraphPartitioner<I.V,E,M> 对象默认为 HashMasterPartitioner。它的 createInitialPartitionOwners()方法如下:

上面代码中是在工具类 PartitionUtils 计算 Partition 的数目,计算公式如下:

partitionCount=PARTITION_COUNT_MULTIPLIER availableWorkerInfos.size() availableWorkerInfos.size(),其中 PARTITION_COUNT_MULTIPLIER 表示 Multiplier for the current workers squared,默认值为 1。

可见,partitionCount 值为 4(122)。创建的 partitionOwnerList 信息如下:

[(id=0,cur=Worker(hostname=giraphx, MRtaskID=1, port=30001),prev=null,ckpt_file=null),

(id=1,cur=Worker(hostname=giraphx, MRtaskID=2, port=30002),prev=null,ckpt_file=null),

(id=2,cur=Worker(hostname=giraphx, MRtaskID=1, port=30001),prev=null,ckpt_file=null),

(id=3,cur=Worker(hostname=giraphx, MRtaskID=2, port=30002),prev=null,ckpt_file=null)]

4、Master 创建 Znode:/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_partitionExchangeDir,用于后面的 exchange partition。

5、Master 最后在 assignPartitionOwners()方法中

把 masterinfo,chosenWorkerInfoList,partitionOwners 等信息写入 Znode 中(作为 Znode 的 data),该 Znode 的路径为:/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_addressesAndPartitions。

Master 调用 barrierOnWorkerList()方法开始等待各个 Worker 完成数据加载。调用关系如下:

barrierOnWorkerList 中创建 znode,path=/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir。然后检查该 znode 的子节点数目是否等于 workers 的数目,若不等于,则线程陷入等待状态。后面某个 worker 完成数据加载后,会创建子 node(如 /_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir/giraphx_1)来激活该线程继续判断。

6、当 Master 创建第 5 步的 znode 后,会激活 worker。

每个 worker 从 znode 上读出 data,data 包含 masterInfo,WorkerInfoList 和 partitionOwnerList,然后各个 worker 开始加载数据。

把 partitionOwnerList 复制给 BSPServiceWorker 类中的 workerGraphPartitioner(默认为 HashWorkerPartitioner 类型)对象的 partitionOwnerList 变量,后续每个顶点把根据 vertexID 通过 workerGraphPartitioner 对象获取其对应的 partitionOwner。

每个 Worker 从 znode:/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir 获取子节点,得到 inputSplitPathList,内容如下:

[/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir/1,

/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir/0]

然后每个 Worker 创建 N 个 InputsCallable 线程读取数据。N=Min(NUM_INPUT_THREADS,maxInputSplitThread),其中 NUM_INPUT_THREADS 默认值为 1,maxInputSplitThread=(InputSplitSize-1/maxWorkers +1

那么,默认每个 worker 就是创建一个线程来加载数据。

在 InputSplitsHandler 类中的 reserveInputSplit()方法中,每个 worker 都是遍历 inputSplitPathList,通过创建 znode 来保留 (标识要处理) 的 split。代码及注释如下:

当用 reserveInputSplit()方法获取某个 znode 后,loadSplitsCallable 类的 loadInputSplit 方法就开始通过该 znode 获取其 HDFS 的路径信息,然后读入数据、重分布数据。

VertexInputSplitsCallable 类的 readInputSplit()方法如下:

7、每个 worker 加载完数据后,调用 waitForOtherWorkers()方法等待其他 workers 都处理完 split。

策略如下,每个 worker 在 /_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir 目录下创建子节点,后面追加自己的 worker 信息,如 worker1、worker2 创建的子节点分别如下:

/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir/giraphx_1

/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir/giraphx_2

创建完后,然后等待 master 创建 /_hadoopBsp/job_201404102333_0013/_vertexInputSplitsAllDone。

8、从第 5 步骤可知,若 master 发现 /_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir 下的子节点数目等于 workers 的总数目,就会在 coordinateInputSplits()方法中创建

_hadoopBsp/job_201404102333_0013/_vertexInputSplitsAllDone,告诉每个 worker,所有的 worker 都处理完了 split。

9、最后就是就行全局同步。

master 创建 znode,path=/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_workerFinishedDir,然后再调用 barrierOnWorkerList 方法检查该 znode 的子节点数目是否等于 workers 的数目,若不等于,则线程陷入等待状态。等待 worker 创建子节点来激活该线程继续判断。

每个 worker 获取自身的 Partition Stats,进入 finishSuperStep 方法中,等待所有的 Request 都被处理完;把自身的 Aggregator 信息发送给 master;创建子节点,如 /_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_workerFinishedDir/giraphx_1,data 为该 worker 的 partitionStatsList 和 workerSentMessages 统计量;

最后调用 waitForOtherWorkers()方法等待 master 创建 /_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_superstepFinished 节点。

master 发现 /_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_workerFinishedDir 的子节点数目等于 workers 数目后,根据 /_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_workerFinishedDir 子节点上的 data 收集每个 worker 发送的 aggregator 信息,汇总为 globalStats。

Master 若发现全局信息中(1)所有顶点都 voteHalt 且没有消息传递,或(2)达到最大迭代次数 时,设置 globalStats.setHaltComputation(true)。告诉 works 结束迭代。

master 创建 /_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_superstepFinished 节点,data 为 globalStats。告诉所有 workers 当前超级步结束。

每个 Worker 检测到 master 创建 /_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_superstepFinished 节点后,读出该 znode 的数据,即全局的统计信息。然后决定是否继续下一次迭代。

10、同步之后开始下一个超级步。

11、master 和 workers 同步过程总结。

(1)master 创建 znode A,然后检测 A 的子节点数目是否等于 workers 数目,不等于就陷入等待。某个 worker 创建一个子节点后,就会唤醒 master 进行检测一次。

(2)每个 worker 进行自己的工作,完成后,创建 A 的子节点 A1。然后等待 master 创建 znode B。

(3)若 master 检测到 A 的子节点数目等于 workers 的数目时,创建 Znode B

(4)master 创建 B 节点后,会激活各个 worker。同步结束,各个 worker 就可以开始下一个超步。

本质是通过 znode B 来进行全局同步的。

退出移动版