序
本文主要研究一下 curator recipes 的 LeaderLatch
实例
@Test
public void testCuratorLeaderLatch() throws Exception {
CuratorFramework client = CuratorFrameworkFactory.newClient(“localhost:2181”, new ExponentialBackoffRetry(1000, 3));
client.start();
String leaderLockPath = “/leader-lock2″;
List<LeaderLatch> latchList = IntStream.rangeClosed(1,10)
.parallel()
.mapToObj(i -> new LeaderLatch(client,leaderLockPath,”client”+i))
.collect(Collectors.toList());
latchList.parallelStream()
.forEach(latch -> {
try {
latch.start();
} catch (Exception e) {
e.printStackTrace();
}
});
TimeUnit.SECONDS.sleep(5);
Iterator<LeaderLatch> iterator = latchList.iterator();
while (iterator.hasNext()){
LeaderLatch latch = iterator.next();
if(latch.hasLeadership()){
System.out.println(latch.getId() + ” hasLeadership”);
try {
latch.close();
} catch (IOException e) {
e.printStackTrace();
}
iterator.remove();
}
}
TimeUnit.SECONDS.sleep(5);
latchList.stream()
.filter(latch -> latch.hasLeadership())
.forEach(latch -> System.out.println(latch.getId() + ” hasLeadership”));
Participant participant = latchList.get(0).getLeader();
System.out.println(participant);
TimeUnit.MINUTES.sleep(15);
latchList.stream()
.forEach(latch -> {
try {
latch.close();
} catch (IOException e) {
e.printStackTrace();
}
});
client.close();
}
zkCli 查询
[zk: localhost:2181(CONNECTED) 17] ls /
[leader-lock1, leader-lock2, zookeeper, leader-lock]
[zk: localhost:2181(CONNECTED) 18] ls /leader-lock2
[_c_4e86edb9-075f-4e18-a00c-cbf4fbf11b23-latch-0000000048, _c_b53efe1b-39ba-48df-8edb-905ddcccf5c9-latch-0000000042, _c_5ea234cc-8350-47ef-beda-8795694b62f6-latch-0000000045, _c_5f3330d9-384c-4abf-8f3e-21623213a374-latch-0000000044, _c_3fdec032-b8a4-44b9-9a9f-20285553a23e-latch-0000000049, _c_97a53125-0ab1-48ea-85cc-cdba631ce20f-latch-0000000047, _c_2bb56be2-ba17-485e-bbd3-10aa1d6af57c-latch-0000000043, _c_93fb732d-541b-48c6-aca7-dd2cd9b6f93e-latch-0000000041, _c_e09f0307-344c-4041-ab71-d68e10a48d02-latch-0000000046, _c_754a4f90-b03c-4803-915b-0654ad35ec9f-latch-0000000040]
LeaderLatch.start
curator-recipes-4.0.1-sources.jar!/org/apache/curator/framework/recipes/leader/LeaderLatch.java
/**
* Add this instance to the leadership election and attempt to acquire leadership.
*
* @throws Exception errors
*/
public void start() throws Exception
{
Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), “Cannot be started more than once”);
startTask.set(AfterConnectionEstablished.execute(client, new Runnable()
{
@Override
public void run()
{
try
{
internalStart();
}
finally
{
startTask.set(null);
}
}
}));
}
private synchronized void internalStart()
{
if (state.get() == State.STARTED )
{
client.getConnectionStateListenable().addListener(listener);
try
{
reset();
}
catch (Exception e)
{
ThreadUtils.checkInterrupted(e);
log.error(“An error occurred checking resetting leadership.”, e);
}
}
}
@VisibleForTesting
void reset() throws Exception
{
setLeadership(false);
setNode(null);
BackgroundCallback callback = new BackgroundCallback()
{
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
{
if (debugResetWaitLatch != null)
{
debugResetWaitLatch.await();
debugResetWaitLatch = null;
}
if (event.getResultCode() == KeeperException.Code.OK.intValue())
{
setNode(event.getName());
if (state.get() == State.CLOSED )
{
setNode(null);
}
else
{
getChildren();
}
}
else
{
log.error(“getChildren() failed. rc = ” + event.getResultCode());
}
}
};
client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id));
}
这里 start 方法表示参与选举,reset 方法通过 forPath 创建子节点
这里 ZKPaths.makePath(latchPath, LOCK_NAME) 返回的是 /latchPath/latch-
这里有个 callback 主要做 getChildren 处理
CreateBuilderImpl.forPath
curator-framework-4.0.1-sources.jar!/org/apache/curator/framework/imps/CreateBuilderImpl.java
@VisibleForTesting
static final String PROTECTED_PREFIX = “_c_”;
@Override
public String forPath(final String givenPath, byte[] data) throws Exception
{
if (compress)
{
data = client.getCompressionProvider().compress(givenPath, data);
}
final String adjustedPath = adjustPath(client.fixForNamespace(givenPath, createMode.isSequential()));
List<ACL> aclList = acling.getAclList(adjustedPath);
client.getSchemaSet().getSchema(givenPath).validateCreate(createMode, givenPath, data, aclList);
String returnPath = null;
if (backgrounding.inBackground() )
{
pathInBackground(adjustedPath, data, givenPath);
}
else
{
String path = protectedPathInForeground(adjustedPath, data, aclList);
returnPath = client.unfixForNamespace(path);
}
return returnPath;
}
@VisibleForTesting
String adjustPath(String path) throws Exception
{
if (doProtected)
{
ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path);
String name = getProtectedPrefix(protectedId) + pathAndNode.getNode();
path = ZKPaths.makePath(pathAndNode.getPath(), name);
}
return path;
}
private static String getProtectedPrefix(String protectedId)
{
return PROTECTED_PREFIX + protectedId + “-“;
}
如果 CuratorFramework 创建的时候没有指定的 namespace 的话,这里 client.fixForNamespace 返回原值
adjustPath 对于需要 doProtected 的进行处理,添加上 PROTECTED_PREFIX 以及 protectedId(UUID) 还有 -,比如原来是 latch-,处理之后变为_c_a749fd26-b739-4510-9e1b-d2974f6dd1d1-latch-
之后由于创建的是 EPHEMERAL_SEQUENTIAL,因而最后会添加上编号,比如 /leader-lock2/_c_a749fd26-b739-4510-9e1b-d2974f6dd1d1-latch-0000000045,而节点的值为 LeaderLatch 指定的 id
LeaderLatch.getChildren
curator-recipes-4.0.1-sources.jar!/org/apache/curator/framework/recipes/leader/LeaderLatch.java
private void getChildren() throws Exception
{
BackgroundCallback callback = new BackgroundCallback()
{
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
{
if (event.getResultCode() == KeeperException.Code.OK.intValue())
{
checkLeadership(event.getChildren());
}
}
};
client.getChildren().inBackground(callback).forPath(ZKPaths.makePath(latchPath, null));
}
private void checkLeadership(List<String> children) throws Exception
{
final String localOurPath = ourPath.get();
List<String> sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children);
int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1;
if (ourIndex < 0)
{
log.error(“Can’t find our node. Resetting. Index: ” + ourIndex);
reset();
}
else if (ourIndex == 0)
{
setLeadership(true);
}
else
{
String watchPath = sortedChildren.get(ourIndex – 1);
Watcher watcher = new Watcher()
{
@Override
public void process(WatchedEvent event)
{
if ((state.get() == State.STARTED) && (event.getType() == Event.EventType.NodeDeleted) && (localOurPath != null) )
{
try
{
getChildren();
}
catch (Exception ex)
{
ThreadUtils.checkInterrupted(ex);
log.error(“An error occurred checking the leadership.”, ex);
}
}
}
};
BackgroundCallback callback = new BackgroundCallback()
{
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
{
if (event.getResultCode() == KeeperException.Code.NONODE.intValue())
{
// previous node is gone – reset
reset();
}
}
};
// use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
client.getData().usingWatcher(watcher).inBackground(callback).forPath(ZKPaths.makePath(latchPath, watchPath));
}
}
这里主要是调用了 checkLeadership 方法,该方法对于 index 为 0 的标记为 leader,对于 index 大于 0 的则添加 watch,watch 的路径为前一个节点,如果前一个节点被删除了,则重新触发 getChildren 方法
这里还注册一个 callback,如果前一个节点被删除,则重新触发 reset 操作
LeaderLatch.close
curator-recipes-4.0.1-sources.jar!/org/apache/curator/framework/recipes/leader/LeaderLatch.java
/**
* Remove this instance from the leadership election. If this instance is the leader, leadership
* is released. IMPORTANT: the only way to release leadership is by calling close(). All LeaderLatch
* instances must eventually be closed.
*
* @throws IOException errors
*/
@Override
public void close() throws IOException
{
close(closeMode);
}
/**
* Remove this instance from the leadership election. If this instance is the leader, leadership
* is released. IMPORTANT: the only way to release leadership is by calling close(). All LeaderLatch
* instances must eventually be closed.
*
* @param closeMode allows the default close mode to be overridden at the time the latch is closed.
* @throws IOException errors
*/
public synchronized void close(CloseMode closeMode) throws IOException
{
Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), “Already closed or has not been started”);
Preconditions.checkNotNull(closeMode, “closeMode cannot be null”);
cancelStartTask();
try
{
setNode(null);
client.removeWatchers();
}
catch (Exception e)
{
ThreadUtils.checkInterrupted(e);
throw new IOException(e);
}
finally
{
client.getConnectionStateListenable().removeListener(listener);
switch (closeMode)
{
case NOTIFY_LEADER:
{
setLeadership(false);
listeners.clear();
break;
}
default:
{
listeners.clear();
setLeadership(false);
break;
}
}
}
}
private synchronized void setLeadership(boolean newValue)
{
boolean oldValue = hasLeadership.getAndSet(newValue);
if (oldValue && !newValue)
{// Lost leadership, was true, now false
listeners.forEach(new Function<LeaderLatchListener, Void>()
{
@Override
public Void apply(LeaderLatchListener listener)
{
listener.notLeader();
return null;
}
});
}
else if (!oldValue && newValue)
{// Gained leadership, was false, now true
listeners.forEach(new Function<LeaderLatchListener, Void>()
{
@Override
public Void apply(LeaderLatchListener input)
{
input.isLeader();
return null;
}
});
}
notifyAll();
}
close 方法用于将该 LeaderLatch 退出选举,如果该 latch 是 leader,则需要释放 leadership
close 方法首先 cancel 掉 StartTask,设置节点值为 null,然后移除了 watcher 以及 ConnectionStateListener,最后设置 leadership 为 false,然后触发相关 listener
注意如果 closeMode 是 NOTIFY_LEADER,则先设置 leadership 为 false,触发相关 listener 之后再移除 listener;否则是先移除 listener,再设置为 false
setLeadership 根据新旧值调用 listener.notLeader() 或者 input.isLeader()
ConnectionStateListener
curator-recipes-4.0.1-sources.jar!/org/apache/curator/framework/recipes/leader/LeaderLatch.java
private final ConnectionStateListener listener = new ConnectionStateListener()
{
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
handleStateChange(newState);
}
};
private void handleStateChange(ConnectionState newState)
{
switch (newState)
{
default:
{
// NOP
break;
}
case RECONNECTED:
{
try
{
if (client.getConnectionStateErrorPolicy().isErrorState(ConnectionState.SUSPENDED) || !hasLeadership.get())
{
reset();
}
}
catch (Exception e)
{
ThreadUtils.checkInterrupted(e);
log.error(“Could not reset leader latch”, e);
setLeadership(false);
}
break;
}
case SUSPENDED:
{
if (client.getConnectionStateErrorPolicy().isErrorState(ConnectionState.SUSPENDED) )
{
setLeadership(false);
}
break;
}
case LOST:
{
setLeadership(false);
break;
}
}
}
LeaderLatch 注册了一个自定义的 ConnectionStateListener,分别在 RECONNECTED、SUSPENDED、LOST 的时候进行相应处理
setLeadership(false) 的时候,会根据新旧值通知相应的 listener 做处理,如果原来是 leader,则回调 listener.notLeader()
对于 RECONNECTED 状态,如果当前 latch 不是 leader,则调用 reset,重新走 start 过程注册节点
小结
curator recipes 的 LeaderLatch 给我们提供了 leader 选举的便利方法,并提供了 LeaderLatchListener 供自定义处理
LeaderLatch 使用了 zk 的 EPHEMERAL_SEQUENTIAL,节点名会自动带上编号,默认 LOCK_NAME 为 latch-,另外对于 protected 的,会自动添加上 PROTECTED_PREFIX(_c_) 以及 protectedId(UUID),因而最后的节点名的格式为 PROTECTED_PREFIX+UUID+LOCK_NAME+ 编号,类似_c_a749fd26-b739-4510-9e1b-d2974f6dd1d1-latch-0000000045
LeaderLatch 使用了 ConnectionStateListener 对自身节点变化进行相应处理,取 index 为 0 的节点位 leader,对于非 leader 的还对前一个节点添加 watcher 针对前一节点删除进行处理,触发 checkLeadership 操作,重新检查自身的 index 是否是在 children 排在第一位,如果是则更新为 leader,触发相应操作,如果不是则重新 watch 前面一个节点。如此一环扣一环的实现显得十分精妙。
doc
Leader Latch
Apache Curator Leader 选举 简单示例
基于 Apache Curator 框架的两种分布式 Leader 选举策略详解