共计 5349 个字符,预计需要花费 14 分钟才能阅读完成。
作者:fredalxin\
地址:https://fredal.xin/zookeeper-…
咱们能够应用 zookeeper 作为注册核心来实现服务的注册与发现,curator 框架提供了 curator-x-discovery 扩大实现了开箱即用的服务注册发现,但更多时候咱们还是抉择本人去实现,那这个时候咱们须要额定关注 zookeeper 的 1 个个性,即 wathcer。
在微服务场景中,watcher 机制次要提供了服务告诉性能,比方 Instance1 这个实例在 Service1 服务节点下注册了一个 emphemeral 子节点后,它的某个服务消费者依据依赖配置在 Service1 节点上注册了一个子节点 watcher,就如图中的红钥匙。
子节点类型的 watcher 会观测 Service1 的子节点,即 InstanceX 节点,但不会观测孙子节点 config1。那么当 Instance1 节点挂掉之后,watcher 能够做到告诉给注册本身的那个服务消费者,告诉完一次后 wacther 也就被销毁了。
wacther 原理框架
zookeeper 的 watcher 次要由 client、server 以及 watchManager 之间配合实现,包含 watcher 注册以及触发 2 个阶段。
在 client 端注册表为 ZkWatchManager,其中包含了 dataWatches、existWatches 以及 childWatches。在 server 端的注册表在 DataTree 类中,封装了 2 类 WatchManager,即 dataWatches 和 existWatches。dataWatches 代表以后节点的数据监听,childWathes 代表子节点监听,与 client 比少的 existWatches 也很容易了解,因为节点是否存在须要客户端去判断。
注册阶段客户端的 getData 和 exists 申请能够注册 dataWatches,getChilden 能够注册 childWatches。而触发阶段,setData 申请会触发以后节点 dataWatches,create 申请会触发以后节点 dataWatches 以及父节点的 childWatches,delete 申请则会触发以后节点、父节点、子节点的 dataWatches,以及父节点的 childWatches。
watchManager 蕴含两个十分重要的数据结构:watchTable 和 watch2Paths。前者示意 path-set<watcher>,后者示意 watcher-set<path>。留神这里的 watcher 含意示意近程连贯,所以 watchTable 示意一个目录下可能有多个消费者的监听连贯,而 watch2Paths 示意一个消费者可能会对多个目录建设监听,显然多目录的监听会复用一个连贯。
申请阶段的传输数据 (包含 watcher 信息) 会封装在 request 和 response 中,比方 getData 申请会封装 getDataRequest/getDataResponse。而触发阶段的 watcher 告诉则通过事件 event 进行通信,server 端会发送一个 watcherEvent,而 client 端则会将其转换成 watchedEvent 再进行解决。
每个客户端都会保护 2 个线程,SendThread 负责解决客户端与服务端的申请通信,比方发送 getDataRequest,而 EventThread 则负责解决服务端的事件告诉,即 watcher 的事件。
watcher 注册源码
咱们来看看 watcher 注册的局部源码。首先是在客户端,以 Zookeeper 中 getData 办法为例,会入队一个 watch 为 true 的 packet。
public byte[] getData(final String path, Watcher watcher, Stat stat)
throws KeeperException, InterruptedException {
...
GetDataRequest request = new GetDataRequest();
request.setPath(serverPath);
request.setWatch(watcher != null);
GetDataResponse response = new GetDataResponse();
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
...
}
能够看到这边封装了 GetDataRequest 以及 GetDataResponse,而 request 中设置了 watch 参数为 true,最初将其进行 submitRequest,submitRequest 干的事儿其实就是将这些放入事件队列期待 sendThread 调度发送。
接着这个申请会被服务端所接管到,所有申请的服务端解决都在 FinalRequestProcessor#processRequest 办法中进行。
case OpCode.getData: {
lastOp = "GETD";
GetDataRequest getDataRequest = new GetDataRequest();
...
byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
getDataRequest.getWatch() ? cnxn : null);
...
}
这边会通过一些 case 来判断申请类型,还是以 getData 为例,最终会调用到 DataTree 的 getData 办法,咱们之前讲到 DataTree 里蕴含了 2 种 watcher,那这边除了获取数据外,天然是注册 dataWatchers 了。
public byte[] getData(String path, Stat stat, Watcher watcher) throws NoNodeException {DataNode n = (DataNode)this.nodes.get(path);
if (n == null) {throw new NoNodeException();
} else {synchronized(n) {n.copyStat(stat);
if (watcher != null) {this.dataWatches.addWatch(path, watcher);
}
return n.data;
}
}
}
addWatch 办法次要是将数据节点的门路以及 ServerCnxn(近程通信信息) 信息存储到 WatchManager 的 watchTable 和 watch2Paths 中。至此服务端曾经承受到了 watcher 并注册到了 watchManager 中了。
咱们将客户端本人也会保留一个 watchManager,这里其实是在接管到 getData 响应后进行的,在 ClientCnxn$SendThread 类的 readResponse->finishPacket 办法中。
private void finishPacket(ClientCnxn.Packet p) {if (p.watchRegistration != null) {p.watchRegistration.register(p.replyHeader.getErr());
}
if (p.cb == null) {synchronized(p) {
p.finished = true;
p.notifyAll();}
} else {
p.finished = true;
this.eventThread.queuePacket(p);
}
}
能够看到这边调用了 watchRegistration 的 register 办法,而它就是依据申请类型来装入对应的 watchManager 中了(dataWatches、existWatches、childWatches)。
整个大抵的时序图能够参考上面:
watcher 触发源码
wathcer 触发局部,咱们还以 服务端 DataTree 类解决 setData 申请 为例。
public Stat setData(String path, byte data[], int version, long zxid,
long time) throws KeeperException.NoNodeException {
...
dataWatches.triggerWatch(path, EventType.NodeDataChanged);
return s;
}
能够看到在解决完数据后调用了 triggerWatch,它干的事儿是从之前的 watchManager 中取得 watchers,而后一个个调用 process 办法。
public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
WatchedEvent e = new WatchedEvent(type,
KeeperState.SyncConnected, path);
HashSet<Watcher> watchers;
synchronized (this) {watchers = watchTable.remove(path);
if (watchers == null || watchers.isEmpty()) {if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG,
ZooTrace.EVENT_DELIVERY_TRACE_MASK,
"No watchers for" + path);
}
return null;
}
for (Watcher w : watchers) {HashSet<String> paths = watch2Paths.get(w);
if (paths != null) {paths.remove(path);
}
}
}
for (Watcher w : watchers) {if (supress != null && supress.contains(w)) {continue;}
w.process(e);
}
return watchers;
}
获取了须要本次触发的监听后,在 watchTable 和 watch2Paths 中还移除了本身,所以 watcher 是单次的。这里封装好了 watchedEvent 后塞入到了 Watcher 的 process 办法中,process 办法其实就是发送告诉,以 Watcher 的一个实现类 NioServerCnxn 为例就是调用了其 sendResponse 办法将告诉事件发送到客户端,发送前会将 watchedEvent 转换成 watcherEvent 进行发送。
那么客户端首先接管到申请的依然是 ClientCnxn$sendThread 的 readResponse 办法,这里讲 watcherEvent 转换为 watchedEvent 后入列 eventThread 的事件队列 期待后续进行解决。
...
WatchedEvent we = new WatchedEvent(event);
if (ClientCnxn.LOG.isDebugEnabled()) {ClientCnxn.LOG.debug("Got" + we + "for sessionid 0x" + Long.toHexString(ClientCnxn.this.sessionId));
}
ClientCnxn.this.eventThread.queueEvent(we);
...
咱们间接看下 EventThread 的 run 办法吧,办法很简略,就是一直从 waitingEvents 事件队列中取告诉事件。而后调用 processEvent 办法处理事件。
private void processEvent(Object event) {
try {if (event instanceof WatcherSetEventPair) {
// each watcher will process the event
WatcherSetEventPair pair = (WatcherSetEventPair) event;
for (Watcher watcher : pair.watchers) {
try {watcher.process(pair.event);
} catch (Throwable t) {LOG.error("Error while calling watcher", t);
}
}
} else {... 省略}
这里就是简略地取出本次事件须要告诉的 watcher 汇合,而后循环调用每个 watcher 的 process 办法了。那么在本人实现服务注册发现的场景里,显然 watcher 的 process 办法是咱们自定义的啦。
整个 watcher 触发的时序图能够参考上面:
至此,zookeeper 的整个 watcher 交互逻辑就曾经完结了。
近期热文举荐:
1.600+ 道 Java 面试题及答案整顿(2021 最新版)
2. 终于靠开源我的项目弄到 IntelliJ IDEA 激活码了,真香!
3. 阿里 Mock 工具正式开源,干掉市面上所有 Mock 工具!
4.Spring Cloud 2020.0.0 正式公布,全新颠覆性版本!
5.《Java 开发手册(嵩山版)》最新公布,速速下载!
感觉不错,别忘了顺手点赞 + 转发哦!