TaskMaster是Druid overlord服务中最重要的一个类,它封装了索引服务的leadership生命周期。
TaskMaster被LifeCycle类管理启动和停止,LifeCycle启动时会调用TaskMaster的start()方法。TaskMaster在CliOverlord类的getModules方法中被绑定到LifeCycle:
binder.bind(TaskMaster.class).in(ManageLifecycle.class);
在TaskMaster的start()方法中,主要有如下代码:
overlordLeaderSelector.registerListener(leadershipListener);
这里先看一下overlordLeaderSelector这个TaskMaster的成员。overlordLeaderSelector的类型是DruidLeaderSelector
,通过TaskMaster的构造函数注入:
@Inject TaskMaster(
...
@IndexingService final DruidLeaderSelector overlordLeaderSelector
)
在
DiscoveryModule
的configure方法中有:PolyBind.optionBinder(binder, Key.get(DruidLeaderSelector.class, IndexingService.class)) .addBinding(CURATOR_KEY) .toProvider( new DruidLeaderSelectorProvider( (zkPathsConfig) -> ZKPaths.makePath(zkPathsConfig.getOverlordPath(), "_OVERLORD") ) ) .in(LazySingleton.class);
DruidLeaderSelectorProvider的get()方法定义如下:
public DruidLeaderSelector get() { return new CuratorDruidLeaderSelector( curatorFramework, druidNode, latchPathFn.apply(zkPathsConfig) ) }
又有CuratorDruidLeaderSelector的构造函数:
public CuratorDruidLeaderSelector(CuratorFramework curator, @Self DruidNode self, String latchPath) { this.curator = curator; this.self = self; this.latchPath = latchPath; ... }
可见overlordLeaderSelector的实际类型是CuratorDruidLeaderSelector
,并且它的latchPath成员的值为/druid/overlord/_OVERLORD
。
CuratorDruidLeaderSelector的registerListener方法
listener成员变量赋值
this.listener = listener;
这里传入的listener类型为DruidLeaderSelector.Listener
,在TaskMaster中定义,这个下面会详细说到。
listenerExecutor成员变量赋值
this.listenerExecutor = Execs.singleThreaded(StringUtils.format("LeaderSelector[%s]", latchPath))
创建LeaderLatch
在createNewLeaderLatchWithListener
方法中,首先调用createNewLeaderLatch()方法创建一个LeaderLatch实例:
final LeaderLatch newLeaderLatch = createNewLeaderLatch();
private LeaderLatch createNewLeaderLatch()
{
return new LeaderLatch(curator, latchPath, self.getServiceScheme() + "://" + self.getHostAndPortToUse());
}
稍后调用LeaderLatch的start()方法,会在latchPath(这里是/druid/overlord/_OVERLORD)下创建一个EPHEMERAL_SEQUENTIAL
类型的znode,这个znode的data就是LeaderLatch构造函数的第三个参数,这里可能是http://localhost:8090
。
然后对这个LeaderLatch实例调用addListener
设置listener,LeaderLatch的listener必须是LeaderLatchListener
接口的实现:
public interface LeaderLatchListener {
void isLeader();
void notLeader();
}
需要实现isLeader()和notLeader()两个接口,分别在当前服务成为leader以及当前服务失去leader角色时调用。
这里设置的LeaderLatchListener的实现,对于isLeader()方法,主要的逻辑就是调用在前面赋值的DruidLeaderSelector.Listener
类型的listener的becomeLeader()方法。后面我会详细分析这个方法。
调用LeaderLatch实例的start()方法
下面就是调用刚刚在createNewLeaderLatchWithListener
方法中创建的LeaderLatch(这个LeaderLatch被设置在了成员变量leaderLatch这个AtomicReference中)的start()方法进行leader的选举,并在选举为leader时在LeaderLatchListener的isLeader()方法中调用DruidLeaderSelector.Listener的becomeLeader方法。
如何利用LeaderLatch进行leader选举?
利用LeaderLatch进行选举的过程如下:
- 在/druid/overlord/_OVERLORD下创建一个PHEMERAL_SEQUENTIAL节点,这种类型的znode会在节点名称后加入sequence_number,比如这里创建的zonde名称可能是
_c_ba96d70d-b0f6-4df7-8b62-af078aa3b4c5-latch-0000000006
,_c_40d06620-8ae7-4b44-987a-f8222b17847e-latch-0000000007
等等。- 对LeaderLatch实例调用getChildren()方法,监听/druid/overlord/_OVERLORD节点,如果节点下创建了子节点,也就是上面说的PHEMERAL_SEQUENTIAL节点,则在回调中调用checkLeadership检查当前LeaderLatch所在进程是否为leader。
- 调用checkLeadership(event.getChildren())方法检查当前LeaderLatch所在进程或服务是否为leader。这个检查的过程很简单,就是对/druid/overlord/_OVERLORD下的子节点按名称进行排序,排在第一位(id最小)的节点如果是当前LeaderLatch创建的znode,则当前LeaderLatch所在服务成为leader。比如/druid/overlord/_OVERLORD下有两个子节点latch-0000000007和latch-0000000006,那么创建latch-0000000006的LeaderLatch所在的服务或进程就成为leader;如果当前LeaderLatch创建的znode不是排在第一位的,则监听比当前节点排名靠前的第一个节点,在监听的回调中,如果监听的节点被删除了,则重新运行getChildren方法,从而进行重新选举。例如在/druid/overlord/_OVERLORD下有三个孩子节点latch-0000000006,latch-0000000007和latch-0000000008,当前LeaderLatch创建的节点为latch-0000000007,则这个LeaderLatch所在的进程就没有成为leader,这时监听latch-0000000006节点,当latch-0000000006节点被删除则重新运行getChildren方法,这时latch-0000000007节点排在第一位,这时这个LeaderLatch所在的进程就成为了新的leader。
至此对CuratorDruidLeaderSelector
的registerListener
方法的调用就结束了。
DruidLeaderSelector.Listener的becomeLeader()方法
leadershipListener
是TaskMaster的成员,类型为DruidLeaderSelector.Listener
,我们调用CuratorDruidLeaderSelector
的registerListener
方法传入的就是leadershipListener这个listener。leadershipListener在TaskMaster的构造函数中被定义为实现DruidLeaderSelector.Listener接口的匿名类。
interface Listener {
void becomeLeader();
void stopBeingLeader();
}
在当前overlord服务成为leader时,会调用LeaderLatch注册的listener中的isLeader()方法,进而调用DruidLeaderSelector.Listener的becomeLeader()方法。
在becomeLeader()方法中,首先调用了:
taskLockbox.syncFromStorage();
接着定义了taskRunner:
taskRunner = runnerFactory.build();
然后定义了taskQueue:
taskQueue = new TaskQueue(
taskQueueConfig,
taskStorage,
taskRunner,
taskActionClientFactory,
taskLockbox,
emitter
);
接着声明了一个本地的Lifecycle实例吗,并加入一些被LifeCyle管理的类:
final Lifecycle leaderLifecycle = new Lifecycle("task-master");
...
leaderLifecycle.addManagedInstance(taskRunner);
leaderLifecycle.addManagedInstance(taskQueue);
leaderLifecycle.addManagedInstance(supervisorManager);
leaderLifecycle.addManagedInstance(overlordHelperManager);
leaderLifecycle.addHandler(
new Lifecycle.Handler()
{
@Override
public void start()
{
initialized = true;
serviceAnnouncer.announce(node);
}
@Override
public void stop() { serviceAnnouncer.unannounce(node); }
}
)
最后调用这个Lifecycle的start()方法启动这个本地Lifecycle:
leaderLifecycle.start();
调用Lifecycle的start()方法,也会顺序调用它所管理的Handler的start方法,具体说来,会依次调用:
- taskRunner的start()方法;
- taskQueue的start()方法;
- supervisorManager的start()方法;
- overlordHelperManager的start()方法;
- serviceAnnouncer的announce方法;
下面我们详细来看一下这些方法。
发表回复