乐趣区

Dubbo注册中心

1. 注册中心的作用

利用注册中心,服务提供者可以动态添加删除服务,服务消费者在收到更新通知后,可以拉取最新的服务从而实现同步。可以在注册中心实现统一配置,参数的动态调整可以自动通知到所有服务节点。

2.Dubbo 四种注册中心实现

Dubbo 注册中心的实现在 dubbo-registry 模块。

2.1 ZooKeeper

基于 Zookeeper。ZooKeeper 学习

2.1.1 Zookeeper 注册中心数据结构

2.1.2 Zookeeper 注册中心实现原理

Zookeeper 注册中心采用 ” 事件通知 ” +“客户端拉取”的实现方式:

  • 客户端在第一次连接注册中心的时候,会获取对应目录下的全量数据。
  • 客户端会在订阅的节点上注册一个 watch, 客户端与注册中心之间保持 TCP 长连接。
  • 当节点发生事务操作,节点的版本号发生改变,就会触发 watch 事件,推送数据给订阅方,订阅方收到通知之后,就会拉取对应目录下的数据。
  • 服务治理中心会订阅所有 service 层的数据,service 被设置成 ”*”,表示订阅全部。
2.1.3 Zookeeper 注册和取消注册
// 注册
protected void doRegister(URL url) {
    try {
        // 创建目录
        zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
    } catch (Throwable e) {throw new RpcException("Failed to register" + url + "to zookeeper" + getUrl() + ", cause:" + e.getMessage(), e);
    }
}

// 取消注册
protected void doUnregister(URL url) {
    try {
        // 删除目录
        zkClient.delete(toUrlPath(url));
    } catch (Throwable e) {throw new RpcException("Failed to unregister" + url + "to zookeeper" + getUrl() + ", cause:" + e.getMessage(), e);
    }
}

private String toUrlPath(URL url) {return toCategoryPath(url) + Constants.PATH_SEPARATOR + URL.encode(url.toFullString());
}

// 返回 "/dubbo/ 接口名称 /providers"
private String toCategoryPath(URL url) {return toServicePath(url) + Constants.PATH_SEPARATOR + url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
}

// 返回 "/dubbo/ 接口名称"
private String toServicePath(URL url) {String name = url.getServiceInterface();
    if (Constants.ANY_VALUE.equals(name)) {return toRootPath();
    }
    return toRootDir() + URL.encode(name);
}

// 返回 "/" 或者 "/dubbo/"
private String toRootDir() {if (root.equals(Constants.PATH_SEPARATOR)) {return root;}
    return root + Constants.PATH_SEPARATOR;
}

private String toRootPath() {return root;}

private final static String DEFAULT_ROOT = "dubbo";
private final ZookeeperClient zkClient;

public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
    // 省略 n 行代码
    String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
    // 添加前缀 "/"
    if (!group.startsWith(Constants.PATH_SEPARATOR)) {group = Constants.PATH_SEPARATOR + group;}
    this.root = group;
    zkClient = zookeeperTransporter.connect(url);
    // 省略 n 行代码
}

ZookeeperClient 和 ZookeeperTransporter 都是接口,在 Dubbo 中有两种实现,一种是基于 curator 客户端的 CuratorZookeeperClient 和 CuratorZookeeperTransporter;另一种是基于 zkclient 客户端的 ZkclientZookeeperClient 和 ZkclientZookeeperTransporter。默认实现是 curator。

@SPI("curator")
public interface ZookeeperTransporter {@Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
    ZookeeperClient connect(URL url);

}

在不能根据 Constants.CLIENT_KEY 和 Constants.TRANSPORTER_KEY 找到匹配的 URL 的时候,就会使用默认的 Curator 实现。

2.1.4 Zookeeper 订阅的实现

先看看 ConcurrentMap 的 put 和 putIfAbsent 的区别

public static final String ANY_VALUE = "*";
public static final String INTERFACE_KEY = "interface";
public static final String CHECK_KEY = "check";

protected void doSubscribe(final URL url, final NotifyListener listener) {
        try {
            // 当接口名称是 * 的时候表示订阅全部 Service
            if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
                // 默认值是 "/dubbo"
                String root = toRootPath();
                ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                if (listeners == null) {
                    //listeners 为空,说明缓存中没有
                    zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                    listeners = zkListeners.get(url);
                }
                ChildListener zkListener = listeners.get(listener);
                if (zkListener == null) {
                    //zkListener 为空,说明是第一次调用
                    listeners.putIfAbsent(listener, new ChildListener() {
                        // 这个内部方法不会立即执行,会在触发变更通知时执行
                        @Override
                        public void childChanged(String parentPath, List<String> currentChilds) {
                            // 遍历所有子节点
                            for (String child : currentChilds) {child = URL.decode(child);
                                // 如果有子节点没有订阅,就进行订阅。if (!anyServices.contains(child)) {anyServices.add(child);
                                    subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
                                            Constants.CHECK_KEY, String.valueOf(false)), listener);
                                }
                            }
                        }
                    });
                    zkListener = listeners.get(listener);
                }
                // 创建持久节点
                zkClient.create(root, false);
                // 遍历订阅持久节点的直接子节点
                List<String> services = zkClient.addChildListener(root, zkListener);
                if (services != null && !services.isEmpty()) {for (String service : services) {service = URL.decode(service);
                        anyServices.add(service);
                        subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
                                Constants.CHECK_KEY, String.valueOf(false)), listener);
                    }
                }
            } else {List<URL> urls = new ArrayList<URL>();
                for (String path : toCategoriesPath(url)) {ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                    if (listeners == null) {zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                        listeners = zkListeners.get(url);
                    }
                    ChildListener zkListener = listeners.get(listener);
                    if (zkListener == null) {listeners.putIfAbsent(listener, new ChildListener() {
                            @Override
                            public void childChanged(String parentPath, List<String> currentChilds) {ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
                            }
                        });
                        zkListener = listeners.get(listener);
                    }
                    zkClient.create(path, false);
                    List<String> children = zkClient.addChildListener(path, zkListener);
                    if (children != null) {urls.addAll(toUrlsWithEmpty(url, path, children));
                    }
                }
                notify(url, listener, urls);
            }
        } catch (Throwable e) {throw new RpcException("Failed to subscribe" + url + "to zookeeper" + getUrl() + ", cause:" + e.getMessage(), e);
        }
    }
private String[] toCategoriesPath(URL url) {String[] categories;
    // 如果类别是 "*", 订阅四种类型的路径(providers,consumers,routers,configurations)
    if (Constants.ANY_VALUE.equals(url.getParameter(Constants.CATEGORY_KEY))) {categories = new String[]{Constants.PROVIDERS_CATEGORY, Constants.CONSUMERS_CATEGORY,
                Constants.ROUTERS_CATEGORY, Constants.CONFIGURATORS_CATEGORY};
    } else {
        // 默认是 providers
        categories = url.getParameter(Constants.CATEGORY_KEY, new String[]{Constants.DEFAULT_CATEGORY});
    }
    String[] paths = new String[categories.length];
    for (int i = 0; i < categories.length; i++) {paths[i] = toServicePath(url) + Constants.PATH_SEPARATOR + categories[i];
    }
    return paths;
}

2.2 Redis

基于 Redis。Redis 学习

2.2.1 Redis 注册中心数据结构

2.2.2 Redis 续期 Key 和清理 Key

org.apache.dubbo.registry.redis.RedisRegistry.deferExpired()定义了 Redis 刷新 key 的过期时间和清除过期的 key。服务提供者发布一个服务,会先在 Redis 中创建一个 Key,然后发布 register 事件,在 RedisRegistry 的构造方法中启动 expireExecutor 线程池周期性调用 deferExpired 方法刷新服务的过期时间,

private volatile boolean admin = false;

// 获取本地缓存所有已经注册的 key
for (URL url : new HashSet<URL>(getRegistered())) {if (url.getParameter(Constants.DYNAMIC_KEY, true)) {String key = toCategoryPath(url);
        // 刷新过期时间
        if (jedis.hset(key, url.toFullString(), String.valueOf(System.currentTimeMillis() + expirePeriod)) == 1) {
            // 广播,重新发布
            jedis.publish(key, Constants.REGISTER);
        }
    }
}
// 如果是服务治理中心,需要清除过期的 Key
if (admin) {clean(jedis);
}

org.apache.dubbo.common.Constants 类定义常量。

public static final String DYNAMIC_KEY = "dynamic";

org.apache.dubbo.registry.support.AbstractRegistry.getRegistered()获取缓存的 key。

private final Set<URL> registered = new ConcurrentHashSet<URL>();

public Set<URL> getRegistered() {return registered;}

public void register(URL url) {if (url == null) {throw new IllegalArgumentException("register url == null");
    }
    if (logger.isInfoEnabled()) {logger.info("Register:" + url);
    }
    registered.add(url);
}

org.apache.dubbo.registry.redis.RedisRegistry.clean()服务治理中心删除过期的 key。

private void clean(Jedis jedis) {Set<String> keys = jedis.keys(root + Constants.ANY_VALUE);
    if (keys != null && !keys.isEmpty()) {for (String key : keys) {
            // 获取 jedis 连接所有的 key
            Map<String, String> values = jedis.hgetAll(key);
            if (values != null && values.size() > 0) {
                boolean delete = false;
                long now = System.currentTimeMillis();
                for (Map.Entry<String, String> entry : values.entrySet()) {URL url = URL.valueOf(entry.getKey());
                    if (url.getParameter(Constants.DYNAMIC_KEY, true)) {long expire = Long.parseLong(entry.getValue());
                        // 如果 key 的过期时间早于当前时间,说明过期了。if (expire < now) {jedis.hdel(key, entry.getKey());
                            delete = true;
                            if (logger.isWarnEnabled()) {logger.warn("Delete expired key:" + key + "-> value:" + entry.getKey() + ", expire:" + new Date(expire) + ", now:" + new Date(now));
                            }
                        }
                    }
                }
                if (delete) {  
                    // 广播通知取消注册哪个 Key。jedis.publish(key, Constants.UNREGISTER);
                }
            }
        }
    }
}
2.2.3 Redis 注册

(1)org.apache.dubbo.registry.redis.RedisRegistry.doRegister(URL url)

public void doRegister(URL url) {// /dubbo/interface/providers(or consumers or configurations or routes)
    String key = toCategoryPath(url);
    //URL
    String value = url.toFullString();
    // 计算过期时间
    String expire = String.valueOf(System.currentTimeMillis() + expirePeriod);
    boolean success = false;
    RpcException exception = null;
    // 遍历连接池中的所有节点
    for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {JedisPool jedisPool = entry.getValue();
        try {Jedis jedis = jedisPool.getResource();
            try {
                // 保存键值
                jedis.hset(key, value, expire);
                // 发布
                jedis.publish(key, Constants.REGISTER);
                success = true;
                // 非 replicate 模式只需要写入一个节点,否则写入全部节点
                if (!replicate) {break; //  If the server side has synchronized data, just write a single machine}
            } finally {jedis.close();
            }
        } catch (Throwable t) {exception = new RpcException("Failed to register service to redis registry. registry:" + entry.getKey() + ", service:" + url + ", cause:" + t.getMessage(), t);
        }
    }
    if (exception != null) {if (success) {logger.warn(exception.getMessage(), exception);
        } else {throw exception;}
    }
}

(2)org.apache.dubbo.registry.redis.RedisRegistry.toCategoryPath(URL url)

private String toCategoryPath(URL url) {//toServicePath(url) 得到根节点 (默认得到 "/dubbo/") 和接口名称
    //Constants.PATH_SEPARATOR("/")
    //url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY)默认得到 "providers"
    return toServicePath(url) + Constants.PATH_SEPARATOR + url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
}

// 拼接根节点和接口名称
private String toServicePath(URL url) {return root + url.getServiceInterface();
}

RedisRegistry 的构造函数对 root 进行了赋值:

private final static String DEFAULT_ROOT = "dubbo";

public RedisRegistry(URL url) {
    .... 省略 N 行代码...
    String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
    if (!group.startsWith(Constants.PATH_SEPARATOR)) {group = Constants.PATH_SEPARATOR + group;}
    if (!group.endsWith(Constants.PATH_SEPARATOR)) {group = group + Constants.PATH_SEPARATOR;}
    this.root = group;
    .... 省略 N 行代码...
}

再来看看 org.apache.dubbo.common.URL.getServiceInterface()

public String getServiceInterface() {return getParameter(Constants.INTERFACE_KEY, path);
}

public String getParameter(String key, String defaultValue) {String value = getParameter(key);
    if (value == null || value.length() == 0) {return defaultValue;}
    return value;
}

public String getParameter(String key) {String value = parameters.get(key);
    if (value == null || value.length() == 0) {value = parameters.get(Constants.DEFAULT_KEY_PREFIX + key);
    }
    return value;
}

在 org.apache.dubbo.common.Constants 类中定义了常量:

public final static String PATH_SEPARATOR = "/";
public static final String CATEGORY_KEY = "category";
public static final String DEFAULT_CATEGORY = PROVIDERS_CATEGORY;
public static final String PROVIDERS_CATEGORY = "providers";
public static final String GROUP_KEY = "group";
public static final String DEFAULT_KEY_PREFIX = "default.";
public static final String INTERFACE_KEY = "interface";
2.2.4 Redis 订阅

首次订阅时,会创建一个内部线程类 Notifier,在启动 Notifier 时异步订阅通道,同时主线程一次性拉取注册中心的所有事务信息。Notifier 订阅的通道推送事件实现后续注册中心的信息变更。

RedisRegistry 的 Notifier 线程的 run()方法部分代码如下:

if (service.endsWith(Constants.ANY_VALUE)) {
    // 如果以 * 结尾
    if (!first) {
        // 如果订阅过,获取全部 key, 更新本地缓存
        first = false;
        Set<String> keys = jedis.keys(service);
        if (keys != null && !keys.isEmpty()) {for (String s : keys) {doNotify(jedis, s);
            }
        }
        resetSkip();// 重置失败计数器}
    // 订阅
    jedis.psubscribe(new NotifySub(jedisPool), service);
} else {if (!first) {
        first = false;
        doNotify(jedis, service);
        resetSkip();}
    jedis.psubscribe(new NotifySub(jedisPool), service + Constants.PATH_SEPARATOR + Constants.ANY_VALUE); // 订阅指定类别
}

2.3 Simple

基于内存的默认实现。标准 RPC 服务,不支持集群,可能出现单点故障。

2.4 Multicast

通过广播地址实现互相发现。

3. 缓存机制

缓存机制的目的是以空间换时间,如果每次远程调用都要从注册中心拉取一遍可用的远程服务列表,会给网络造成很大的压力。在 AbstractRegistry 实现了通用的缓存机制。Dubbo 在内存存储一份,保存在 properties 对象中,在磁盘文件保存一份,放在 file 对象中。

// 本地磁盘缓存
private final Properties properties = new Properties();
private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<URL, Map<String, List<URL>>>();
// 本地磁盘缓存文件
private File file;

private void loadProperties() {if (file != null && file.exists()) {
        InputStream in = null;
        try {
            // 读取磁盘文件
            in = new FileInputStream(file);
            // 加载到 properties 对象
            properties.load(in);
            if (logger.isInfoEnabled()) {logger.info("Load registry store file" + file + ", data:" + properties);
            }
        } catch (Throwable e) {logger.warn("Failed to load registry store file" + file, e);
        } finally {if (in != null) {
                try {in.close();
                } catch (IOException e) {logger.warn(e.getMessage(), e);
                }
            }
        }
    }
}

org.apache.dubbo.registry.support.AbstractRegistry.saveProperties(URL url)

private final boolean syncSaveFile;
if (syncSaveFile) {
    // 同步保存
    doSaveProperties(version);
} else {
    // 线程池异步保存
    registryCacheExecutor.execute(new SaveProperties(version));
}

4. 重试机制

org.apache.dubbo.registry.support.FailBackRegistry 添加了 retry()方法。

退出移动版