dubbo之Zookeeper注册中心

6次阅读

共计 2711 个字符,预计需要花费 7 分钟才能阅读完成。

目前 dubbo 支持多种注册中心:Zookeeper、Redis、Simple、Multicast、Etcd3。
本编文章是分析使用 Zookeeper 作为注册中心,dubbo 如何整合 Zookeeper 进行服务注册和订阅服务。
首先 dubbo 将服务注册到 Zookeeper 后,目录结构如下所示:(注册接口名:com.bob.dubbo.service.CityDubboService)
在 consumer 和 provider 服务启动的时候,去把自身 URL 格式化成字符串,然后注册到 zookeeper 相应节点下,作为临时节点,断开连接后,节点删除;consumer 启动时,不仅会订阅服务,同时也会将自己的 URL 注册到 zookeeper 中;
ZookeeperRegistry
ZookeeperRegistry:dubbo 与 zookeeper 交互主要的类,已下结合源码进行分析,先来看 doSubcribe() 方法:
@Override
public void doSubscribe(final URL url, final NotifyListener listener) {
try {
// 处理所有 service 层发起的订阅,例如监控中心的订阅
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
String root = toRootPath();
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener, (parentPath, currentChilds) -> {
for (String child : currentChilds) {
child = URL.decode(child);
if (!anyServices.contains(child)) {
anyServices.add(child);
subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
});
zkListener = listeners.get(listener);
}
zkClient.create(root, false);
List<String> services = zkClient.addChildListener(root, zkListener);
if (services != null && !services.isEmpty()) {
for (String service : services) {
service = URL.decode(service);
anyServices.add(service);
subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
// 处理指定 service 层发起的订阅,例如服务消费者的订阅
} else {
List<URL> urls = new ArrayList<>();
// 循环分类数组 , router, configurator, provider
for (String path : toCategoriesPath(url)) {
// 获得 url 对应的监听器集合
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {// 不存在,进行创建
zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
listeners = zkListeners.get(url);
}
// 获得 ChildListener 对象
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {// 不存在子目录的监听器,进行创建 ChildListener 对象
// 订阅父级目录, 当有子节点发生变化时,触发此回调函数,回调 listener 中的 notify() 方法
listeners.putIfAbsent(listener, (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)));
zkListener = listeners.get(listener);
}
// 向 Zookeeper,PATH 节点,发起订阅, 返回此节点下的所有子元素 path : / 根节点 / 接口全名 /providers, 比如:/dubbo/com.bob.service.CityService/providers
zkClient.create(path, false);
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
// 首次全量数据获取完成时,调用 `#notify(…)` 方法,回调 NotifyListener, 在这一步从连接 Provider, 实例化 Invoker
notify(url, listener, urls);
}
} catch (Throwable e) {
throw new RpcException(“Failed to subscribe ” + url + ” to zookeeper ” + getUrl() + “, cause: ” + e.getMessage(), e);
}
}

正文完
 0