Giraph源码分析四-Master-如何检查Worker启动成功

38次阅读

共计 2322 个字符,预计需要花费 6 分钟才能阅读完成。

本文的目的

说明 Giraph 如何借助 ZooKeeper 来实现 Master 与 Workers 间的同步(不太确定)。

环境

在单机上(机器名:giraphx)启动了 2 个 workers。

Giraph 遵从单 Master 多 Workers 结构,BSPServiceMaster 使用 MasterThread 线程来进行全局的同步。每个 Worker 启动成功后,会向 Master 汇报自身的健康状况,那么 Master 是如何检测 Workers 是否都成功启动了?

Master 在 ZooKeeper 上创两个目录,_workerHealthyDir 和 _workerUnhealthyDir,分别用来记录 Healthy Workers 和 UnHealthy Workers。

主要在 BspServiceMaster 类中的 getAllWorkerInfos() 方法来完成,其调用关系如下,注意下 getAllWorkerInfos() 到 MasterThread.run() 方法调用关系,比较难找。

创建的两个目录如下:

/_hadoopBsp/job_201404102333_0002/_applicationAttemptsDir/0/_superstepDir/-1/_workerHealthyDir /_hadoopBsp/job_201404102333_0002/_applicationAttemptsDir/0/_superstepDir/-1/_workerUnhealthyDir

每个 Worker 在 setup() 中,调用 registerHealth() 方法来注册自身的状态。

若自身是 Healthy 的,则在_workerHealthyDir 目录下添加子节点 /wokerInfo.getHostNameId(),否则在_workerUnhealthyDir 目录下添加。wokerInfo.getHostNameId() 为:Hostname+“_”+TaskId。Task1 和 Task2(Task 0 是 master)创建的子节点如下:

/_hadoopBsp/job_201404102333_0002/_applicationAttemptsDir/0/_superstepDir/-1/_workerHealthyDir/giraphx_1
/_hadoopBsp/job_201404102333_0002/_applicationAttemptsDir/0/_superstepDir/-1/_workerHealthyDir/giraphx_2

Master 在 checkWorkers() 方法中,在 While 死循环中(实际有超时限制),通过调用 getAllWorkerInfos() 方法来获取_workerHealthyDir 目录下的子节点,然后比较子节点数目是否达到 maxWorkers(启动 job 时定义的,- w 参数)。

若小于 maxWorkers,则继续调用 getAllWorkerInfos() 方法进行下一轮检测;若等于 maxWorker,退出 While 循环,然后返回 healthyWorkersInfoList:[Worker(hostname=giraphx, MRtaskID=1, port=30001), Worker(hostname=giraphx, MRtaskID=2, port=30002)]。

问题: 由于在分布式环境中,每个 Worker 和 Maste 都是并行运行,彼此不知道对方的运行情况。上述第 3 步骤中,若还有子节点还没有创建,就一直在 while 死循环中调用来检测 getAllWorkerInfos() 方法检测,效率比较低下,当然也比较笨!

Giraph 借用 ZooKeeper 来高效的进行检测。设计理念如下:

  1. master 在获取子节点时,注册 Watcher(为注册器,用于触发相应事件)。

若某个 task 创建了子节点后,就会触发 Watcher 事件。

若子节点数目小于 maxWorkers,就调用 workerHealthRegistrationChanged 的 await() 方法释放当前线程的锁,陷入等待状态。不会进行无用的检测。

说明:workerHealthRegistrationChanged 为 PredicateLock 类型(implements BspEvent 接口),PredicateLock 里面使用可重入锁 ReentrantLock 和 Condition 进行线程的控制。

当某个 task 创建了子节点后,触发 Watcher 事件。

调用 BspService 中的 public final void Process(WatchedEvent event) 事件,该方法根据事件的路径来激活相应的 BspEvent 事件。此处对应的是:

实验运行如下:

s(926)) - process: Got a new event, path = /_hadoopBsp/job_201404102333_0002/_applicationAttemptsDir/0/_superstepDir/-1/_workerHealthyDir, type = NodeChildrenChanged, state = SyncConnected INFO bsp.BspService (BspService.java:process(960)) - process: workerHealthRegistrationChanged (worker health reported - healthy/unhealthy)

这样就会激活 master 线程,开始下一轮检测。

子节点数目等于 maxWorkers 时,就停止。

总结:每创建一个子节点时,才会进行一次检测,效率较高!

正文完
 0