乐趣区

Druid源码解析-overlord的TaskMaster

TaskMaster 是 Druid overlord 服务中最重要的一个类,它封装了索引服务的 leadership 生命周期。

TaskMaster 被 LifeCycle 类管理启动和停止,LifeCycle 启动时会调用 TaskMaster 的 start()方法。TaskMaster 在 CliOverlord 类的 getModules 方法中被绑定到 LifeCycle:

binder.bind(TaskMaster.class).in(ManageLifecycle.class);

在 TaskMaster 的 start()方法中,主要有如下代码:

overlordLeaderSelector.registerListener(leadershipListener);

这里先看一下 overlordLeaderSelector 这个 TaskMaster 的成员。overlordLeaderSelector 的类型是DruidLeaderSelector,通过 TaskMaster 的构造函数注入:

@Inject TaskMaster(
  ...
  @IndexingService final DruidLeaderSelector overlordLeaderSelector
)

DiscoveryModule 的 configure 方法中有:

PolyBind.optionBinder(binder, Key.get(DruidLeaderSelector.class, IndexingService.class))
        .addBinding(CURATOR_KEY)
        .toProvider(
            new DruidLeaderSelectorProvider((zkPathsConfig) -> ZKPaths.makePath(zkPathsConfig.getOverlordPath(), "_OVERLORD")
            )
        )
        .in(LazySingleton.class);

DruidLeaderSelectorProvider 的 get()方法定义如下:

public DruidLeaderSelector get() 
{
  return new CuratorDruidLeaderSelector(
      curatorFramework,
      druidNode,
      latchPathFn.apply(zkPathsConfig)
  )
}

又有 CuratorDruidLeaderSelector 的构造函数:

public CuratorDruidLeaderSelector(CuratorFramework curator, @Self DruidNode self, String latchPath)
{
  this.curator = curator;
  this.self = self;
  this.latchPath = latchPath;

  ...
}

可见 overlordLeaderSelector 的实际类型是CuratorDruidLeaderSelector,并且它的 latchPath 成员的值为/druid/overlord/_OVERLORD

CuratorDruidLeaderSelector 的 registerListener 方法

listener 成员变量赋值

this.listener = listener;

这里传入的 listener 类型为DruidLeaderSelector.Listener,在 TaskMaster 中定义,这个下面会详细说到。

listenerExecutor 成员变量赋值

this.listenerExecutor = Execs.singleThreaded(StringUtils.format("LeaderSelector[%s]", latchPath))

创建 LeaderLatch

createNewLeaderLatchWithListener 方法中,首先调用 createNewLeaderLatch()方法创建一个 LeaderLatch 实例:

final LeaderLatch newLeaderLatch = createNewLeaderLatch(); 
private LeaderLatch createNewLeaderLatch() 
{return new LeaderLatch(curator, latchPath, self.getServiceScheme() + "://" + self.getHostAndPortToUse());
}

稍后调用 LeaderLatch 的 start()方法,会在 latchPath(这里是 /druid/overlord/_OVERLORD)下创建一个 EPHEMERAL_SEQUENTIAL 类型的 znode,这个 znode 的 data 就是 LeaderLatch 构造函数的第三个参数,这里可能是http://localhost:8090

然后对这个 LeaderLatch 实例调用 addListener 设置 listener,LeaderLatch 的 listener 必须是 LeaderLatchListener 接口的实现:

public interface LeaderLatchListener {void isLeader();
  void notLeader();}

需要实现 isLeader()和 notLeader()两个接口,分别在当前服务成为 leader 以及当前服务失去 leader 角色时调用。

这里设置的 LeaderLatchListener 的实现,对于 isLeader()方法,主要的逻辑就是调用在前面赋值的 DruidLeaderSelector.Listener 类型的 listener 的 becomeLeader()方法。后面我会详细分析这个方法。

调用 LeaderLatch 实例的 start()方法

下面就是调用刚刚在 createNewLeaderLatchWithListener 方法中创建的 LeaderLatch(这个 LeaderLatch 被设置在了成员变量 leaderLatch 这个 AtomicReference 中)的 start()方法 进行 leader 的选举,并在选举为 leader 时在 LeaderLatchListener 的 isLeader()方法中调用 DruidLeaderSelector.Listener 的 becomeLeader 方法

如何利用 LeaderLatch 进行 leader 选举?

利用 LeaderLatch 进行选举的过程如下:

  1. 在 /druid/overlord/_OVERLORD 下创建一个 PHEMERAL_SEQUENTIAL 节点,这种类型的 znode 会在节点名称后加入 sequence_number,比如这里创建的 zonde 名称可能是 _c_ba96d70d-b0f6-4df7-8b62-af078aa3b4c5-latch-0000000006_c_40d06620-8ae7-4b44-987a-f8222b17847e-latch-0000000007 等等。
  2. 对 LeaderLatch 实例调用 getChildren()方法,监听 /druid/overlord/_OVERLORD 节点,如果节点下创建了子节点,也就是上面说的 PHEMERAL_SEQUENTIAL 节点,则在回调中调用 checkLeadership 检查当前 LeaderLatch 所在进程是否为 leader。
  3. 调用 checkLeadership(event.getChildren())方法检查当前 LeaderLatch 所在进程或服务是否为 leader。这个检查的过程很简单,就是对 /druid/overlord/_OVERLORD 下的子节点按名称进行排序,排在第一位(id 最小)的节点如果是当前 LeaderLatch 创建的 znode,则当前 LeaderLatch 所在服务成为 leader。比如 /druid/overlord/_OVERLORD 下有两个子节点 latch-0000000007 和 latch-0000000006,那么创建 latch-0000000006 的 LeaderLatch 所在的服务或进程就成为 leader;如果当前 LeaderLatch 创建的 znode 不是排在第一位的,则监听比当前节点排名靠前的第一个节点,在监听的回调中,如果监听的节点被删除了,则重新运行 getChildren 方法,从而进行重新选举。例如在 /druid/overlord/_OVERLORD 下有三个孩子节点 latch-0000000006,latch-0000000007 和 latch-0000000008,当前 LeaderLatch 创建的节点为 latch-0000000007,则这个 LeaderLatch 所在的进程就没有成为 leader,这时监听 latch-0000000006 节点,当 latch-0000000006 节点被删除则重新运行 getChildren 方法,这时 latch-0000000007 节点排在第一位,这时这个 LeaderLatch 所在的进程就成为了新的 leader。

至此对 CuratorDruidLeaderSelectorregisterListener方法的调用就结束了。

DruidLeaderSelector.Listener 的 becomeLeader()方法

leadershipListener是 TaskMaster 的成员,类型为 DruidLeaderSelector.Listener,我们调用CuratorDruidLeaderSelectorregisterListener方法传入的就是 leadershipListener 这个 listener。leadershipListener 在 TaskMaster 的构造函数中被定义为实现 DruidLeaderSelector.Listener 接口的匿名类。

interface Listener {void becomeLeader();
  void stopBeingLeader();}

在当前 overlord 服务成为 leader 时,会调用 LeaderLatch 注册的 listener 中的 isLeader()方法,进而调用 DruidLeaderSelector.Listener 的 becomeLeader()方法。

在 becomeLeader()方法中,首先调用了:

taskLockbox.syncFromStorage();

接着定义了 taskRunner:

taskRunner = runnerFactory.build();

然后定义了 taskQueue:

taskQueue = new TaskQueue(
    taskQueueConfig,
    taskStorage,
    taskRunner,
    taskActionClientFactory,
    taskLockbox,
    emitter
);

接着声明了一个本地的 Lifecycle 实例吗,并加入一些被 LifeCyle 管理的类:

final Lifecycle leaderLifecycle = new Lifecycle("task-master");
...
leaderLifecycle.addManagedInstance(taskRunner);
leaderLifecycle.addManagedInstance(taskQueue);
leaderLifecycle.addManagedInstance(supervisorManager);
leaderLifecycle.addManagedInstance(overlordHelperManager);

leaderLifecycle.addHandler(new Lifecycle.Handler() 
  {
    @Override
    public void start()
    {
      initialized = true;
      serviceAnnouncer.announce(node);
    }
    @Override
    public void stop() { serviceAnnouncer.unannounce(node); }
  }
)

最后调用这个 Lifecycle 的 start()方法启动这个本地 Lifecycle:

leaderLifecycle.start();

调用 Lifecycle 的 start()方法,也会顺序调用它所管理的 Handler 的 start 方法,具体说来,会依次调用:

  • taskRunner 的 start()方法;
  • taskQueue 的 start()方法;
  • supervisorManager 的 start()方法;
  • overlordHelperManager 的 start()方法;
  • serviceAnnouncer 的 announce 方法;

下面我们详细来看一下这些方法。

TaskLockbox 的 syncFromStorage 方法

退出移动版