Zookeeper 通知更新可靠吗? 解读源码找答案!

64次阅读

共计 12344 个字符,预计需要花费 31 分钟才能阅读完成。

欢迎大家前往腾讯云 + 社区,获取更多腾讯海量技术实践干货哦~
本文由特鲁门发表于云 + 社区专栏

导读:
遇到 Keepper 通知更新无法收到的问题,思考节点变更通知的可靠性,通过阅读源码解析了解到 zk Watch 的注册以及触发的机制,本地调试运行模拟 zk 更新的不可靠的场景以及得出相应的解决方案。
过程很曲折,但问题的根本原因也水落石出了,本文最后陈述了更新无法收到的根本原因,希望对其他人有所帮助。—————————————–
通常 Zookeeper 是作为配置存储、分布式锁等功能被使用,配置读取如果每一次都是去 Zookeeper server 读取效率是非常低的,幸好 Zookeeper 提供节点更新的通知机制,只需要对节点设置 Watch 监听,节点的任何更新都会以通知的方式发送到 Client 端。

如上图所示:应用 Client 通常会连接上某个 ZkServer,forPath 不仅仅会读取 Zk 节点 zkNode 的数据(通常存储读取到的数据会存储在应用内存中,例如图中 Value),而且会设置一个 Watch,当 zkNode 节点有任何更新时,ZkServer 会发送 notify,Client 运行 Watch 来才走出相应的事件相应。这里假设操作为更新 Client 本地的数据。这样的模型使得配置异步更新到 Client 中,而无需 Client 每次都远程读取,大大提高了读的性能,(图中的 re-regist 重新注册是因为对节点的监听是一次性的,每一次通知完后,需要重新注册)。但这个 Notify 是可靠的吗?如果通知失败,那岂不是 Client 永远都读取的本地的未更新的值?
由于现网环境定位此类问题比较困难,因此本地下载源码并模拟运行 ZkServer & ZkClient 来看通知的发送情况。

1、git 下载源码 https://github.com/apache/zookeeper
2、cd 到路径下,运行 ant eclipse 加载工程的依赖。
3、导入 Idea 中。
https://stackoverflow.com/que…
查看相关问题和步骤。
首先运行 ZkServer。QuorumPeerMain 是 Server 的启动类。这个可以根据 bin 下 ZkServer.sh 找到入口。注意启动参数配置参数文件,指定例如启动端口等相关参数。

在此之前,需要设置相关的断点。
首先我们要看 client 设置监听后,server 是如何处理的
ZkClient 是使用 Nio 的方式与 ZkServer 进行通信的,Zookeeper 的线程模型中使用两个线程:
SendThread 专门成立的请求的发送,请求会被封装为 Packet(包含节点名称、Watch 描述等信息)类发送给 Sever。
EventThread 则专门处理 SendThread 接收后解析出的 Event。
ZkClient 的主要有两个 Processor,一个是 SycProcessor 负责 Cluster 之间的数据同步(包括集群 leader 选取)。另一个是叫 FinalRuestProcessor,专门处理对接受到的请求(Packet)进行处理。
//ZookeeperServer 的 processPacket 方法专门对收到的请求进行处理。
public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
// We have the request, now process and setup for next
InputStream bais = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
RequestHeader h = new RequestHeader();
h.deserialize(bia, “header”);
// Through the magic of byte buffers, txn will not be
// pointing
// to the start of the txn
incomingBuffer = incomingBuffer.slice();
// 鉴权请求处理
if (h.getType() == OpCode.auth) {
LOG.info(“got auth packet ” + cnxn.getRemoteSocketAddress());
AuthPacket authPacket = new AuthPacket();
ByteBufferInputStream.byteBuffer2Record(incomingBuffer, authPacket);
String scheme = authPacket.getScheme();
ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(scheme);
Code authReturn = KeeperException.Code.AUTHFAILED;
if(ap != null) {
try {
authReturn = ap.handleAuthentication(new ServerAuthenticationProvider.ServerObjs(this, cnxn), authPacket.getAuth());
} catch(RuntimeException e) {
LOG.warn(“Caught runtime exception from AuthenticationProvider: ” + scheme + ” due to ” + e);
authReturn = KeeperException.Code.AUTHFAILED;
}
}
if (authReturn == KeeperException.Code.OK) {
if (LOG.isDebugEnabled()) {
LOG.debug(“Authentication succeeded for scheme: ” + scheme);
}
LOG.info(“auth success ” + cnxn.getRemoteSocketAddress());
ReplyHeader rh = new ReplyHeader(h.getXid(), 0,
KeeperException.Code.OK.intValue());
cnxn.sendResponse(rh, null, null);
} else {
if (ap == null) {
LOG.warn(“No authentication provider for scheme: ”
+ scheme + ” has ”
+ ProviderRegistry.listProviders());
} else {
LOG.warn(“Authentication failed for scheme: ” + scheme);
}
// send a response…
ReplyHeader rh = new ReplyHeader(h.getXid(), 0,
KeeperException.Code.AUTHFAILED.intValue());
cnxn.sendResponse(rh, null, null);
// … and close connection
cnxn.sendBuffer(ServerCnxnFactory.closeConn);
cnxn.disableRecv();
}
return;
} else {

if (h.getType() == OpCode.sasl) {
Record rsp = processSasl(incomingBuffer,cnxn);
ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue());
cnxn.sendResponse(rh,rsp, “response”); // not sure about 3rd arg..what is it?
return;
}
else {
Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(),
h.getType(), incomingBuffer, cnxn.getAuthInfo());
si.setOwner(ServerCnxn.me);
// Always treat packet from the client as a possible
// local request.
setLocalSessionFlag(si);
// 交给 finalRequestProcessor 处理
submitRequest(si);
}
}
cnxn.incrOutstandingRequests(h);
}
FinalRequestProcessor 对请求进行解析,Client 连接成功后,发送的 exist 命令会落在这部分处理逻辑。

zkDataBase 由 zkServer 从 disk 持久化的数据建立而来,上图可以看到这里就是添加监听 Watch 的地方。
然后我们需要了解到,当 Server 收到节点更新事件后,是如何触发 Watch 的。
首先了解两个概念,FinalRequestProcessor 处理的请求分为两种,一种是事务型的,一种非事务型,exist 的 event-type 是一个非事物型的操作,上面代码中是对其处理逻辑,对于事物的操作,例如 SetData 的操作。则在下面代码中处理。
private ProcessTxnResult processTxn(Request request, TxnHeader hdr,
Record txn) {
ProcessTxnResult rc;
int opCode = request != null ? request.type : hdr.getType();
long sessionId = request != null ? request.sessionId : hdr.getClientId();
if (hdr != null) {
//hdr 为事物头描述,例如 SetData 的操作就会被 ZkDataBase 接管操作,
// 因为是对 Zk 的数据存储机型修改
rc = getZKDatabase().processTxn(hdr, txn);
} else {
rc = new ProcessTxnResult();
}
if (opCode == OpCode.createSession) {
if (hdr != null && txn instanceof CreateSessionTxn) {
CreateSessionTxn cst = (CreateSessionTxn) txn;
sessionTracker.addGlobalSession(sessionId, cst.getTimeOut());
} else if (request != null && request.isLocalSession()) {
request.request.rewind();
int timeout = request.request.getInt();
request.request.rewind();
sessionTracker.addSession(request.sessionId, timeout);
} else {
LOG.warn(“*****>>>>> Got ”
+ txn.getClass() + ” ”
+ txn.toString());
}
} else if (opCode == OpCode.closeSession) {
sessionTracker.removeSession(sessionId);
}
return rc;
}

这里设置了断点,就可以拦截对节点的更新操作。
这两个设置了断点,就可以了解到 Watch 的设置过程。
接下来看如何启动 Zookeeper 的 Client。ZookeeperMain 为 Client 的入口,同样在 bin/zkCli.sh 中可以找到。注意设置参数,设置 Server 的连接地址。

修改 ZookeeperMain 方法,设置对节点的 Watch 监听。
public ZooKeeperMain(String args[]) throws IOException, InterruptedException, KeeperException {
cl.parseOptions(args);
System.out.println(“Connecting to ” + cl.getOption(“server”));
connectToZK(cl.getOption(“server”));
while (true) {
// 模拟注册对 /zookeeper 节点的 watch 监听
zk.exists(“/zookeeper”, true);
System.out.println(“wait”);
}
}
启动 Client。
由于我们要观察节点变更的过程,上面这个 Client 设置了对节点的监听,那么我们需要另外一个 cleint 对节点进行更改,这个我们只需要在命令上进行就可以了。

此时命令行的 zkClient 更新了 /zookeeper 节点,Server 此时会停在 setData 事件的处理代码段。
public Stat setData(String path, byte data[], int version, long zxid,
long time) throws KeeperException.NoNodeException {
Stat s = new Stat();
DataNode n = nodes.get(path);
if (n == null) {
throw new KeeperException.NoNodeException();
}
byte lastdata[] = null;
synchronized (n) {
lastdata = n.data;
n.data = data;
n.stat.setMtime(time);
n.stat.setMzxid(zxid);
n.stat.setVersion(version);
n.copyStat(s);
}
// now update if the path is in a quota subtree.
String lastPrefix = getMaxPrefixWithQuota(path);
if(lastPrefix != null) {
this.updateBytes(lastPrefix, (data == null ? 0 : data.length)
– (lastdata == null ? 0 : lastdata.length));
}
// 触发 watch 监听
dataWatches.triggerWatch(path, EventType.NodeDataChanged);
return s;
}
此时,我们重点关注的类出现了。WatchManager
package org.apache.zookeeper.server;

import java.io.PrintWriter;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class manages watches. It allows watches to be associated with a string
* and removes watchers and their watches in addition to managing triggers.
*/
class WatchManager {
private static final Logger LOG = LoggerFactory.getLogger(WatchManager.class);
// 存储 path 对 watch 的关系
private final Map<String, Set<Watcher>> watchTable =
new HashMap<String, Set<Watcher>>();
// 存储 watch 监听了哪些 path 节点
private final Map<Watcher, Set<String>> watch2Paths =
new HashMap<Watcher, Set<String>>();

synchronized int size(){
int result = 0;
for(Set<Watcher> watches : watchTable.values()) {
result += watches.size();
}
return result;
}
// 添加监听
synchronized void addWatch(String path, Watcher watcher) {
Set<Watcher> list = watchTable.get(path);
if (list == null) {
// don’t waste memory if there are few watches on a node
// rehash when the 4th entry is added, doubling size thereafter
// seems like a good compromise
list = new HashSet<Watcher>(4);
watchTable.put(path, list);
}
list.add(watcher);

Set<String> paths = watch2Paths.get(watcher);
if (paths == null) {
// cnxns typically have many watches, so use default cap here
paths = new HashSet<String>();
watch2Paths.put(watcher, paths);
}
paths.add(path);
}
// 移除
synchronized void removeWatcher(Watcher watcher) {
Set<String> paths = watch2Paths.remove(watcher);
if (paths == null) {
return;
}
for (String p : paths) {
Set<Watcher> list = watchTable.get(p);
if (list != null) {
list.remove(watcher);
if (list.size() == 0) {
watchTable.remove(p);
}
}
}
}

Set<Watcher> triggerWatch(String path, EventType type) {
return triggerWatch(path, type, null);
}
// 触发 watch
Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
WatchedEvent e = new WatchedEvent(type,
KeeperState.SyncConnected, path);
Set<Watcher> watchers;
synchronized (this) {
watchers = watchTable.remove(path);
if (watchers == null || watchers.isEmpty()) {
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG,
ZooTrace.EVENT_DELIVERY_TRACE_MASK,
“No watchers for ” + path);
}
return null;
}
for (Watcher w : watchers) {
Set<String> paths = watch2Paths.get(w);
if (paths != null) {
paths.remove(path);
}
}
}
for (Watcher w : watchers) {
if (supress != null && supress.contains(w)) {
continue;
}
// 通知发送
w.process(e);
}
return watchers;
}
}
重点关注 triggerWatch 的方法,可以发现 watch 被移除后,即往 watch 中存储的 client 信息进行通知发送。
@Override
public void process(WatchedEvent event) {
ReplyHeader h = new ReplyHeader(-1, -1L, 0);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
“Deliver event ” + event + ” to 0x”
+ Long.toHexString(this.sessionId)
+ ” through ” + this);
}

// Convert WatchedEvent to a type that can be sent over the wire
WatcherEvent e = event.getWrapper();

sendResponse(h, e, “notification”);
}
没有任何确认机制,不会由于发送失败,而回写 watch。
结论:
到这里,可以知道 watch 的通知机制是不可靠的,zkServer 不会保证通知的可靠抵达。虽然 zkclient 与 zkServer 端是会有心跳机制保持链接,但是如果通知过程中断开,即时重新建立连接后,watch 的状态是不会恢复。

现在已经知道了通知是不可靠的,会有丢失的情况,那 ZkClient 的使用需要进行修正。

本地的存储不再是一个静态的等待 watch 更新的状态,而是引入缓存机制,定期的去从 Zk 主动拉取并注册 Watch(ZkServer 会进行去重,对同一个 Node 节点的相同时间类型的 Watch 不会重复)。
另外一种方式是,Client 端收到断开连接的通知,重新注册所有关注节点的 Watch。但作者遇到的现网情况是 client 没有收到更新通知的同时,也没有查看到连接断开的错误信息。这块仍需进一步确认。水平有限,欢迎指正 :D

在 StackOverFlow 上的提问有了新进展:
https://stackoverflow.com/que…
原来官方文档已经解释了在连接断开的时候,client 对 watch 的一些恢复操做,ps:原来上面我提到的客户端的策略已经官方实现。。。

客户端会通过心跳保活,如果发现断开了连接,会重新建立连接,并发送之前对节点设置的 watch 以及节点 zxid,如果 zxid 与服务端的小则说明断开期间有更改,那么 server 会触发通知。
这么来看,Zookeeper 的通知机制至少在官方的文档说明上是可靠的,至少是有相应机制去保证。ps:除 Exist watch 外。但是本人遇到的问题仍未解开。。后悔当初没有保留现场,深入发掘。计划先把实现改回原来的,后续进一步验证。找到原因再更新这里。

最终结论更新!
通过深入阅读 apache 的 zk 论坛以及源码,有一个重要的信息。
上面提到的连接断开分为 recoverble 以及 unrecoverble 两种场景,这两种的区别主要是基于 Session 的有效期,所有的 client 操作包括 watch 都是和 Session 关联的,当 Session 在超时过期时间内,重新成功建立连接,则 watch 会在连接建立后重新设置。但是当 Session Timeout 后仍然没有成功重新建立连接,那么 Session 则处于 Expire 的状态。下面连接讲述了这个过程
How should I handle SESSION_EXPIRED?
这种情况下,ZookeeperClient 会重新连接,但是 Session 将会是全新的一个。同时之前的状态是不会保存的。
private void conLossPacket(Packet p) {
if (p.replyHeader == null) {
return;
}
switch (state) {
case AUTH_FAILED:
p.replyHeader.setErr(KeeperException.Code.AUTHFAILED.intValue());
break;
case CLOSED:
// session 关闭状态,直接返回。
p.replyHeader.setErr(KeeperException.Code.SESSIONEXPIRED.intValue());
break;
default:
p.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue());
}
// 如果 session 未过期,这里进行 session 的状态(watches)会重新注册。
finishPacket(p);
}
*1、什么是 zookeeper 的会话过期?*
一般来说,我们使用 zookeeper 是集群形式,如下图,client 和 zookeeper 集群 (3 个实例) 建立一个会话 session。

在这个会话 session 当中,client 其实是随机与其中一个 zk provider 建立的链接,并且互发心跳 heartbeat。zk 集群负责管理这个 session,并且在所有的 provider 上维护这个 session 的信息,包括这个 session 中定义的临时数据和监视点 watcher。
如果再网络不佳或者 zk 集群中某一台 provider 挂掉的情况下,有可能出现 connection loss 的情况,例如 client 和 zk provider1 连接断开,这时候 client 不需要任何的操作(zookeeper api 已经给我们做好了),只需要等待 client 与其他 provider 重新连接即可。这个过程可能导致两个结果:
1)在 session timeout 之内连接成功
这个时候 client 成功切换到连接另一个 provider 例如是 provider2,由于 zk 在所有的 provider 上同步了 session 相关的数据,此时可以认为无缝迁移了。
2)在 session timeout 之内没有重新连接
这就是 session expire 的情况,这时候 zookeeper 集群会任务会话已经结束,并清除和这个 session 有关的所有数据,包括临时节点和注册的监视点 Watcher。
在 session 超时之后,如果 client 重新连接上了 zookeeper 集群,很不幸,zookeeper 会发出 session expired 异常,且不会重建 session,也就是不会重建临时数据和 watcher。
我们实现的 ZookeeperProcessor 是基于 Apache Curator 的 Client 封装实现的。
Apache Curator 错误处理机制
它对于 Session Expire 的处理是提供了处理的监听注册 ConnectionStateListner,当遇到 Session Expire 时,执行使用者要做的逻辑。(例如:重新设置 Watch)遗憾的是,我们没有对这个事件进行处理,因此连接是一致断开的,但是!我们应用仍然会读到老的数据!
在这里,我们又犯了另外一个错误,本地缓存了 zookeeper 的节点数据。。其实 zookeeperClient 已经做了本地缓存的机制,但是我们有加了一层(注:这里也有一个原因,是因为 zk 节点的数据时二进制的数组,业务要使用通常要反序列化,我们这里的缓存是为了减少反序列化带来的开销!),正式由于我们本地缓存了,因此即使 zk 断开了,仍然读取了老的值!
至此,谜团已经全部解开,看来之前的实现有许多姿势是错误的,导致后续出现了各种奇怪的 BUG。现在处理的方案,是监听 Reconnect 的通知,当收到这个通知后,主动让本地缓存失效(这里仍然做了缓存,是因为减少反序列化的开销,zkClient 的缓存只是缓存了二进制,每次拿出来仍然需要反序列化)。代码:
curatorFramework.getConnectionStateListenable().addListener(new ConnectionStateListener() {
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {

switch (newState) {
case CONNECTED:
break;
case RECONNECTED:
LOG.error(“zookeeper connection reconnected”);
System.out.println(“zookeeper connection reconnected”);
// 本来使用 invalidateAll,但是这个会使得 cache 所有缓存值同时失效
// 如果关注节点比较多,导致同时请求 zk 读值,可能服务会瞬时阻塞在这一步
// 因此使用 guava cache refresh 方法,异步更新,更新过程中,
// 老值返回,知道更新完成
for (String key : classInfoMap.keySet()) {
zkDataCache.refresh(key);
}

break;
case LOST:
// session 超时,断开连接,这里不要做任何操作,缓存保持使用
LOG.error(“zookeeper connection lost”);
System.out.println(“zookeeper connection lost”);
break;
case SUSPENDED:
break;
default:
break;
}

}
});

问答如何阅读 Zookeeper 事务日志?相关阅读 Zookeeper 总览 ZooKeeper 入门 zookeeper 原理【每日课程推荐】机器学习实战!快速入门在线广告业务及 CTR 相应知识

正文完
 0