共计 5216 个字符,预计需要花费 14 分钟才能阅读完成。
TaskMaster 是 Druid overlord 服务中最重要的一个类,它封装了索引服务的 leadership 生命周期。
TaskMaster 被 LifeCycle 类管理启动和停止,LifeCycle 启动时会调用 TaskMaster 的 start()方法。TaskMaster 在 CliOverlord 类的 getModules 方法中被绑定到 LifeCycle:
binder.bind(TaskMaster.class).in(ManageLifecycle.class);
在 TaskMaster 的 start()方法中,主要有如下代码:
overlordLeaderSelector.registerListener(leadershipListener);
这里先看一下 overlordLeaderSelector 这个 TaskMaster 的成员。overlordLeaderSelector 的类型是DruidLeaderSelector
,通过 TaskMaster 的构造函数注入:
@Inject TaskMaster(
...
@IndexingService final DruidLeaderSelector overlordLeaderSelector
)
在
DiscoveryModule
的 configure 方法中有:PolyBind.optionBinder(binder, Key.get(DruidLeaderSelector.class, IndexingService.class)) .addBinding(CURATOR_KEY) .toProvider( new DruidLeaderSelectorProvider((zkPathsConfig) -> ZKPaths.makePath(zkPathsConfig.getOverlordPath(), "_OVERLORD") ) ) .in(LazySingleton.class);
DruidLeaderSelectorProvider 的 get()方法定义如下:
public DruidLeaderSelector get() { return new CuratorDruidLeaderSelector( curatorFramework, druidNode, latchPathFn.apply(zkPathsConfig) ) }
又有 CuratorDruidLeaderSelector 的构造函数:
public CuratorDruidLeaderSelector(CuratorFramework curator, @Self DruidNode self, String latchPath) { this.curator = curator; this.self = self; this.latchPath = latchPath; ... }
可见 overlordLeaderSelector 的实际类型是CuratorDruidLeaderSelector
,并且它的 latchPath 成员的值为/druid/overlord/_OVERLORD
。
CuratorDruidLeaderSelector 的 registerListener 方法
listener 成员变量赋值
this.listener = listener;
这里传入的 listener 类型为DruidLeaderSelector.Listener
,在 TaskMaster 中定义,这个下面会详细说到。
listenerExecutor 成员变量赋值
this.listenerExecutor = Execs.singleThreaded(StringUtils.format("LeaderSelector[%s]", latchPath))
创建 LeaderLatch
在 createNewLeaderLatchWithListener
方法中,首先调用 createNewLeaderLatch()方法创建一个 LeaderLatch 实例:
final LeaderLatch newLeaderLatch = createNewLeaderLatch();
private LeaderLatch createNewLeaderLatch()
{return new LeaderLatch(curator, latchPath, self.getServiceScheme() + "://" + self.getHostAndPortToUse());
}
稍后调用 LeaderLatch 的 start()方法,会在 latchPath(这里是 /druid/overlord/_OVERLORD)下创建一个 EPHEMERAL_SEQUENTIAL
类型的 znode,这个 znode 的 data 就是 LeaderLatch 构造函数的第三个参数,这里可能是http://localhost:8090
。
然后对这个 LeaderLatch 实例调用 addListener
设置 listener,LeaderLatch 的 listener 必须是 LeaderLatchListener
接口的实现:
public interface LeaderLatchListener {void isLeader();
void notLeader();}
需要实现 isLeader()和 notLeader()两个接口,分别在当前服务成为 leader 以及当前服务失去 leader 角色时调用。
这里设置的 LeaderLatchListener 的实现,对于 isLeader()方法,主要的逻辑就是调用在前面赋值的 DruidLeaderSelector.Listener
类型的 listener 的 becomeLeader()方法。后面我会详细分析这个方法。
调用 LeaderLatch 实例的 start()方法
下面就是调用刚刚在 createNewLeaderLatchWithListener
方法中创建的 LeaderLatch(这个 LeaderLatch 被设置在了成员变量 leaderLatch 这个 AtomicReference 中)的 start()方法 进行 leader 的选举,并在选举为 leader 时在 LeaderLatchListener 的 isLeader()方法中调用 DruidLeaderSelector.Listener 的 becomeLeader 方法。
如何利用 LeaderLatch 进行 leader 选举?
利用 LeaderLatch 进行选举的过程如下:
- 在 /druid/overlord/_OVERLORD 下创建一个 PHEMERAL_SEQUENTIAL 节点,这种类型的 znode 会在节点名称后加入 sequence_number,比如这里创建的 zonde 名称可能是
_c_ba96d70d-b0f6-4df7-8b62-af078aa3b4c5-latch-0000000006
,_c_40d06620-8ae7-4b44-987a-f8222b17847e-latch-0000000007
等等。- 对 LeaderLatch 实例调用 getChildren()方法,监听 /druid/overlord/_OVERLORD 节点,如果节点下创建了子节点,也就是上面说的 PHEMERAL_SEQUENTIAL 节点,则在回调中调用 checkLeadership 检查当前 LeaderLatch 所在进程是否为 leader。
- 调用 checkLeadership(event.getChildren())方法检查当前 LeaderLatch 所在进程或服务是否为 leader。这个检查的过程很简单,就是对 /druid/overlord/_OVERLORD 下的子节点按名称进行排序,排在第一位(id 最小)的节点如果是当前 LeaderLatch 创建的 znode,则当前 LeaderLatch 所在服务成为 leader。比如 /druid/overlord/_OVERLORD 下有两个子节点 latch-0000000007 和 latch-0000000006,那么创建 latch-0000000006 的 LeaderLatch 所在的服务或进程就成为 leader;如果当前 LeaderLatch 创建的 znode 不是排在第一位的,则监听比当前节点排名靠前的第一个节点,在监听的回调中,如果监听的节点被删除了,则重新运行 getChildren 方法,从而进行重新选举。例如在 /druid/overlord/_OVERLORD 下有三个孩子节点 latch-0000000006,latch-0000000007 和 latch-0000000008,当前 LeaderLatch 创建的节点为 latch-0000000007,则这个 LeaderLatch 所在的进程就没有成为 leader,这时监听 latch-0000000006 节点,当 latch-0000000006 节点被删除则重新运行 getChildren 方法,这时 latch-0000000007 节点排在第一位,这时这个 LeaderLatch 所在的进程就成为了新的 leader。
至此对 CuratorDruidLeaderSelector
的registerListener
方法的调用就结束了。
DruidLeaderSelector.Listener 的 becomeLeader()方法
leadershipListener
是 TaskMaster 的成员,类型为 DruidLeaderSelector.Listener
,我们调用CuratorDruidLeaderSelector
的registerListener
方法传入的就是 leadershipListener 这个 listener。leadershipListener 在 TaskMaster 的构造函数中被定义为实现 DruidLeaderSelector.Listener 接口的匿名类。
interface Listener {void becomeLeader();
void stopBeingLeader();}
在当前 overlord 服务成为 leader 时,会调用 LeaderLatch 注册的 listener 中的 isLeader()方法,进而调用 DruidLeaderSelector.Listener 的 becomeLeader()方法。
在 becomeLeader()方法中,首先调用了:
taskLockbox.syncFromStorage();
接着定义了 taskRunner:
taskRunner = runnerFactory.build();
然后定义了 taskQueue:
taskQueue = new TaskQueue(
taskQueueConfig,
taskStorage,
taskRunner,
taskActionClientFactory,
taskLockbox,
emitter
);
接着声明了一个本地的 Lifecycle 实例吗,并加入一些被 LifeCyle 管理的类:
final Lifecycle leaderLifecycle = new Lifecycle("task-master");
...
leaderLifecycle.addManagedInstance(taskRunner);
leaderLifecycle.addManagedInstance(taskQueue);
leaderLifecycle.addManagedInstance(supervisorManager);
leaderLifecycle.addManagedInstance(overlordHelperManager);
leaderLifecycle.addHandler(new Lifecycle.Handler()
{
@Override
public void start()
{
initialized = true;
serviceAnnouncer.announce(node);
}
@Override
public void stop() { serviceAnnouncer.unannounce(node); }
}
)
最后调用这个 Lifecycle 的 start()方法启动这个本地 Lifecycle:
leaderLifecycle.start();
调用 Lifecycle 的 start()方法,也会顺序调用它所管理的 Handler 的 start 方法,具体说来,会依次调用:
- taskRunner 的 start()方法;
- taskQueue 的 start()方法;
- supervisorManager 的 start()方法;
- overlordHelperManager 的 start()方法;
- serviceAnnouncer 的 announce 方法;
下面我们详细来看一下这些方法。
TaskLockbox 的 syncFromStorage 方法