共计 2322 个字符,预计需要花费 6 分钟才能阅读完成。
本文的目的
说明 Giraph 如何借助 ZooKeeper 来实现 Master 与 Workers 间的同步(不太确定)。
环境
在单机上(机器名:giraphx)启动了 2 个 workers。
Giraph 遵从单 Master 多 Workers 结构,BSPServiceMaster 使用 MasterThread 线程来进行全局的同步。每个 Worker 启动成功后,会向 Master 汇报自身的健康状况,那么 Master 是如何检测 Workers 是否都成功启动了?
Master 在 ZooKeeper 上创两个目录,_workerHealthyDir 和 _workerUnhealthyDir,分别用来记录 Healthy Workers 和 UnHealthy Workers。
主要在 BspServiceMaster 类中的 getAllWorkerInfos() 方法来完成,其调用关系如下,注意下 getAllWorkerInfos() 到 MasterThread.run() 方法调用关系,比较难找。
创建的两个目录如下:
/_hadoopBsp/job_201404102333_0002/_applicationAttemptsDir/0/_superstepDir/-1/_workerHealthyDir /_hadoopBsp/job_201404102333_0002/_applicationAttemptsDir/0/_superstepDir/-1/_workerUnhealthyDir
每个 Worker 在 setup() 中,调用 registerHealth() 方法来注册自身的状态。
若自身是 Healthy 的,则在_workerHealthyDir 目录下添加子节点 /wokerInfo.getHostNameId(),否则在_workerUnhealthyDir 目录下添加。wokerInfo.getHostNameId() 为:Hostname+“_”+TaskId。Task1 和 Task2(Task 0 是 master)创建的子节点如下:
/_hadoopBsp/job_201404102333_0002/_applicationAttemptsDir/0/_superstepDir/-1/_workerHealthyDir/giraphx_1
/_hadoopBsp/job_201404102333_0002/_applicationAttemptsDir/0/_superstepDir/-1/_workerHealthyDir/giraphx_2
Master 在 checkWorkers() 方法中,在 While 死循环中(实际有超时限制),通过调用 getAllWorkerInfos() 方法来获取_workerHealthyDir 目录下的子节点,然后比较子节点数目是否达到 maxWorkers(启动 job 时定义的,- w 参数)。
若小于 maxWorkers,则继续调用 getAllWorkerInfos() 方法进行下一轮检测;若等于 maxWorker,退出 While 循环,然后返回 healthyWorkersInfoList:[Worker(hostname=giraphx, MRtaskID=1, port=30001), Worker(hostname=giraphx, MRtaskID=2, port=30002)]。
问题: 由于在分布式环境中,每个 Worker 和 Maste 都是并行运行,彼此不知道对方的运行情况。上述第 3 步骤中,若还有子节点还没有创建,就一直在 while 死循环中调用来检测 getAllWorkerInfos() 方法检测,效率比较低下,当然也比较笨!
Giraph 借用 ZooKeeper 来高效的进行检测。设计理念如下:
- master 在获取子节点时,注册 Watcher(为注册器,用于触发相应事件)。
若某个 task 创建了子节点后,就会触发 Watcher 事件。
若子节点数目小于 maxWorkers,就调用 workerHealthRegistrationChanged 的 await() 方法释放当前线程的锁,陷入等待状态。不会进行无用的检测。
说明:workerHealthRegistrationChanged 为 PredicateLock 类型(implements BspEvent 接口),PredicateLock 里面使用可重入锁 ReentrantLock 和 Condition 进行线程的控制。
当某个 task 创建了子节点后,触发 Watcher 事件。
调用 BspService 中的 public final void Process(WatchedEvent event) 事件,该方法根据事件的路径来激活相应的 BspEvent 事件。此处对应的是:
实验运行如下:
s(926)) - process: Got a new event, path = /_hadoopBsp/job_201404102333_0002/_applicationAttemptsDir/0/_superstepDir/-1/_workerHealthyDir, type = NodeChildrenChanged, state = SyncConnected INFO bsp.BspService (BspService.java:process(960)) - process: workerHealthRegistrationChanged (worker health reported - healthy/unhealthy)
这样就会激活 master 线程,开始下一轮检测。
子节点数目等于 maxWorkers 时,就停止。
总结:每创建一个子节点时,才会进行一次检测,效率较高!