1.注册中心的作用

利用注册中心,服务提供者可以动态添加删除服务,服务消费者在收到更新通知后,可以拉取最新的服务从而实现同步。可以在注册中心实现统一配置,参数的动态调整可以自动通知到所有服务节点。

2.Dubbo四种注册中心实现

Dubbo注册中心的实现在dubbo-registry模块。

2.1 ZooKeeper

基于Zookeeper。ZooKeeper学习

2.1.1 Zookeeper注册中心数据结构

2.1.2 Zookeeper注册中心实现原理

Zookeeper注册中心采用"事件通知" + “客户端拉取”的实现方式:

  • 客户端在第一次连接注册中心的时候,会获取对应目录下的全量数据。
  • 客户端会在订阅的节点上注册一个watch,客户端与注册中心之间保持TCP长连接。
  • 当节点发生事务操作,节点的版本号发生改变,就会触发watch事件,推送数据给订阅方,订阅方收到通知之后,就会拉取对应目录下的数据。
  • 服务治理中心会订阅所有service层的数据,service被设置成"*",表示订阅全部。
2.1.3 Zookeeper注册和取消注册
//注册protected void doRegister(URL url) {    try {        //创建目录        zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));    } catch (Throwable e) {        throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);    }}//取消注册protected void doUnregister(URL url) {    try {        //删除目录        zkClient.delete(toUrlPath(url));    } catch (Throwable e) {        throw new RpcException("Failed to unregister " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);    }}private String toUrlPath(URL url) {    return toCategoryPath(url) + Constants.PATH_SEPARATOR + URL.encode(url.toFullString());}//返回"/dubbo/接口名称/providers"private String toCategoryPath(URL url) {    return toServicePath(url) + Constants.PATH_SEPARATOR + url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);}//返回"/dubbo/接口名称"private String toServicePath(URL url) {    String name = url.getServiceInterface();    if (Constants.ANY_VALUE.equals(name)) {        return toRootPath();    }    return toRootDir() + URL.encode(name);}//返回"/"或者"/dubbo/"private String toRootDir() {    if (root.equals(Constants.PATH_SEPARATOR)) {        return root;    }    return root + Constants.PATH_SEPARATOR;}private String toRootPath() {    return root;}private final static String DEFAULT_ROOT = "dubbo";private final ZookeeperClient zkClient;public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {    //省略n行代码    String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);    //添加前缀"/"    if (!group.startsWith(Constants.PATH_SEPARATOR)) {        group = Constants.PATH_SEPARATOR + group;    }    this.root = group;    zkClient = zookeeperTransporter.connect(url);    //省略n行代码}

ZookeeperClient和ZookeeperTransporter都是接口,在Dubbo中有两种实现,一种是基于curator客户端的CuratorZookeeperClient和CuratorZookeeperTransporter;另一种是基于zkclient客户端的ZkclientZookeeperClient和ZkclientZookeeperTransporter。默认实现是curator。

@SPI("curator")public interface ZookeeperTransporter {    @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})    ZookeeperClient connect(URL url);}

在不能根据Constants.CLIENT_KEY和Constants.TRANSPORTER_KEY找到匹配的URL的时候,就会使用默认的Curator实现。

2.1.4 Zookeeper订阅的实现

先看看ConcurrentMap的put和putIfAbsent的区别

public static final String ANY_VALUE = "*";public static final String INTERFACE_KEY = "interface";public static final String CHECK_KEY = "check";protected void doSubscribe(final URL url, final NotifyListener listener) {        try {            //当接口名称是*的时候表示订阅全部Service            if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {                //默认值是"/dubbo"                String root = toRootPath();                ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);                if (listeners == null) {                    //listeners为空,说明缓存中没有                    zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());                    listeners = zkListeners.get(url);                }                ChildListener zkListener = listeners.get(listener);                if (zkListener == null) {                    //zkListener为空,说明是第一次调用                    listeners.putIfAbsent(listener, new ChildListener() {                        //这个内部方法不会立即执行,会在触发变更通知时执行                        @Override                        public void childChanged(String parentPath, List<String> currentChilds) {                            //遍历所有子节点                            for (String child : currentChilds) {                                child = URL.decode(child);                                //如果有子节点没有订阅,就进行订阅。                                if (!anyServices.contains(child)) {                                    anyServices.add(child);                                    subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,                                            Constants.CHECK_KEY, String.valueOf(false)), listener);                                }                            }                        }                    });                    zkListener = listeners.get(listener);                }                //创建持久节点                zkClient.create(root, false);                //遍历订阅持久节点的直接子节点                List<String> services = zkClient.addChildListener(root, zkListener);                if (services != null && !services.isEmpty()) {                    for (String service : services) {                        service = URL.decode(service);                        anyServices.add(service);                        subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,                                Constants.CHECK_KEY, String.valueOf(false)), listener);                    }                }            } else {                List<URL> urls = new ArrayList<URL>();                for (String path : toCategoriesPath(url)) {                    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);                    if (listeners == null) {                        zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());                        listeners = zkListeners.get(url);                    }                    ChildListener zkListener = listeners.get(listener);                    if (zkListener == null) {                        listeners.putIfAbsent(listener, new ChildListener() {                            @Override                            public void childChanged(String parentPath, List<String> currentChilds) {                                ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));                            }                        });                        zkListener = listeners.get(listener);                    }                    zkClient.create(path, false);                    List<String> children = zkClient.addChildListener(path, zkListener);                    if (children != null) {                        urls.addAll(toUrlsWithEmpty(url, path, children));                    }                }                notify(url, listener, urls);            }        } catch (Throwable e) {            throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);        }    }
private String[] toCategoriesPath(URL url) {    String[] categories;    //如果类别是"*",订阅四种类型的路径(providers,consumers,routers,configurations)    if (Constants.ANY_VALUE.equals(url.getParameter(Constants.CATEGORY_KEY))) {        categories = new String[]{Constants.PROVIDERS_CATEGORY, Constants.CONSUMERS_CATEGORY,                Constants.ROUTERS_CATEGORY, Constants.CONFIGURATORS_CATEGORY};    } else {        //默认是providers        categories = url.getParameter(Constants.CATEGORY_KEY, new String[]{Constants.DEFAULT_CATEGORY});    }    String[] paths = new String[categories.length];    for (int i = 0; i < categories.length; i++) {        paths[i] = toServicePath(url) + Constants.PATH_SEPARATOR + categories[i];    }    return paths;}

2.2 Redis

基于Redis。Redis学习

2.2.1 Redis注册中心数据结构

2.2.2 Redis续期Key和清理Key

org.apache.dubbo.registry.redis.RedisRegistry.deferExpired()定义了Redis刷新key的过期时间和清除过期的key。服务提供者发布一个服务,会先在Redis中创建一个Key,然后发布register事件,在RedisRegistry的构造方法中启动expireExecutor线程池周期性调用deferExpired方法刷新服务的过期时间,

private volatile boolean admin = false;//获取本地缓存所有已经注册的keyfor (URL url : new HashSet<URL>(getRegistered())) {    if (url.getParameter(Constants.DYNAMIC_KEY, true)) {        String key = toCategoryPath(url);        //刷新过期时间        if (jedis.hset(key, url.toFullString(), String.valueOf(System.currentTimeMillis() + expirePeriod)) == 1) {            //广播,重新发布            jedis.publish(key, Constants.REGISTER);        }    }}//如果是服务治理中心,需要清除过期的Keyif (admin) {    clean(jedis);}

org.apache.dubbo.common.Constants类定义常量。

public static final String DYNAMIC_KEY = "dynamic";

org.apache.dubbo.registry.support.AbstractRegistry.getRegistered()获取缓存的key。

private final Set<URL> registered = new ConcurrentHashSet<URL>();public Set<URL> getRegistered() {    return registered;}public void register(URL url) {    if (url == null) {        throw new IllegalArgumentException("register url == null");    }    if (logger.isInfoEnabled()) {        logger.info("Register: " + url);    }    registered.add(url);}

org.apache.dubbo.registry.redis.RedisRegistry.clean()服务治理中心删除过期的key。

private void clean(Jedis jedis) {    Set<String> keys = jedis.keys(root + Constants.ANY_VALUE);    if (keys != null && !keys.isEmpty()) {        for (String key : keys) {            //获取jedis连接所有的key            Map<String, String> values = jedis.hgetAll(key);            if (values != null && values.size() > 0) {                boolean delete = false;                long now = System.currentTimeMillis();                for (Map.Entry<String, String> entry : values.entrySet()) {                    URL url = URL.valueOf(entry.getKey());                    if (url.getParameter(Constants.DYNAMIC_KEY, true)) {                        long expire = Long.parseLong(entry.getValue());                        //如果key的过期时间早于当前时间,说明过期了。                        if (expire < now) {                            jedis.hdel(key, entry.getKey());                            delete = true;                            if (logger.isWarnEnabled()) {                                logger.warn("Delete expired key: " + key + " -> value: " + entry.getKey() + ", expire: " + new Date(expire) + ", now: " + new Date(now));                            }                        }                    }                }                if (delete) {                      //广播通知取消注册哪个Key。                    jedis.publish(key, Constants.UNREGISTER);                }            }        }    }}
2.2.3 Redis注册

(1)org.apache.dubbo.registry.redis.RedisRegistry.doRegister(URL url)

public void doRegister(URL url) {    // /dubbo/interface/providers(or consumers or configurations or routes)    String key = toCategoryPath(url);    //URL    String value = url.toFullString();    //计算过期时间    String expire = String.valueOf(System.currentTimeMillis() + expirePeriod);    boolean success = false;    RpcException exception = null;    //遍历连接池中的所有节点    for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {        JedisPool jedisPool = entry.getValue();        try {            Jedis jedis = jedisPool.getResource();            try {                //保存键值                jedis.hset(key, value, expire);                //发布                jedis.publish(key, Constants.REGISTER);                success = true;                //非replicate模式只需要写入一个节点,否则写入全部节点                if (!replicate) {                    break; //  If the server side has synchronized data, just write a single machine                }            } finally {                jedis.close();            }        } catch (Throwable t) {            exception = new RpcException("Failed to register service to redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);        }    }    if (exception != null) {        if (success) {            logger.warn(exception.getMessage(), exception);        } else {            throw exception;        }    }}

(2)org.apache.dubbo.registry.redis.RedisRegistry.toCategoryPath(URL url)

private String toCategoryPath(URL url) {    //toServicePath(url) 得到根节点(默认得到"/dubbo/")和接口名称    //Constants.PATH_SEPARATOR("/")    //url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY)默认得到"providers"    return toServicePath(url) + Constants.PATH_SEPARATOR + url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);}//拼接根节点和接口名称private String toServicePath(URL url) {    return root + url.getServiceInterface();}

RedisRegistry的构造函数对root进行了赋值:

private final static String DEFAULT_ROOT = "dubbo";public RedisRegistry(URL url) {    ....省略N行代码...    String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);    if (!group.startsWith(Constants.PATH_SEPARATOR)) {        group = Constants.PATH_SEPARATOR + group;    }    if (!group.endsWith(Constants.PATH_SEPARATOR)) {        group = group + Constants.PATH_SEPARATOR;    }    this.root = group;    ....省略N行代码...}

再来看看org.apache.dubbo.common.URL.getServiceInterface()

public String getServiceInterface() {    return getParameter(Constants.INTERFACE_KEY, path);}public String getParameter(String key, String defaultValue) {    String value = getParameter(key);    if (value == null || value.length() == 0) {        return defaultValue;    }    return value;}public String getParameter(String key) {    String value = parameters.get(key);    if (value == null || value.length() == 0) {        value = parameters.get(Constants.DEFAULT_KEY_PREFIX + key);    }    return value;}

在org.apache.dubbo.common.Constants类中定义了常量:

public final static String PATH_SEPARATOR = "/";public static final String CATEGORY_KEY = "category";public static final String DEFAULT_CATEGORY = PROVIDERS_CATEGORY;public static final String PROVIDERS_CATEGORY = "providers";public static final String GROUP_KEY = "group";public static final String DEFAULT_KEY_PREFIX = "default.";public static final String INTERFACE_KEY = "interface";
2.2.4 Redis订阅

首次订阅时,会创建一个内部线程类Notifier,在启动Notifier时异步订阅通道,同时主线程一次性拉取注册中心的所有事务信息。Notifier订阅的通道推送事件实现后续注册中心的信息变更。

RedisRegistry的 Notifier线程的run()方法部分代码如下:

if (service.endsWith(Constants.ANY_VALUE)) {    //如果以*结尾    if (!first) {        //如果订阅过,获取全部key,更新本地缓存        first = false;        Set<String> keys = jedis.keys(service);        if (keys != null && !keys.isEmpty()) {            for (String s : keys) {                doNotify(jedis, s);            }        }        resetSkip();//重置失败计数器    }    //订阅    jedis.psubscribe(new NotifySub(jedisPool), service);} else {    if (!first) {        first = false;        doNotify(jedis, service);        resetSkip();    }    jedis.psubscribe(new NotifySub(jedisPool), service + Constants.PATH_SEPARATOR + Constants.ANY_VALUE); //订阅指定类别}

2.3 Simple

基于内存的默认实现。标准RPC服务,不支持集群,可能出现单点故障。

2.4 Multicast

通过广播地址实现互相发现。

3.缓存机制

缓存机制的目的是以空间换时间,如果每次远程调用都要从注册中心拉取一遍可用的远程服务列表,会给网络造成很大的压力。在AbstractRegistry实现了通用的缓存机制。Dubbo在内存存储一份,保存在properties对象中,在磁盘文件保存一份,放在file对象中。

//本地磁盘缓存private final Properties properties = new Properties();private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<URL, Map<String, List<URL>>>();//本地磁盘缓存文件private File file;private void loadProperties() {    if (file != null && file.exists()) {        InputStream in = null;        try {            //读取磁盘文件            in = new FileInputStream(file);            //加载到properties对象            properties.load(in);            if (logger.isInfoEnabled()) {                logger.info("Load registry store file " + file + ", data: " + properties);            }        } catch (Throwable e) {            logger.warn("Failed to load registry store file " + file, e);        } finally {            if (in != null) {                try {                    in.close();                } catch (IOException e) {                    logger.warn(e.getMessage(), e);                }            }        }    }}

org.apache.dubbo.registry.support.AbstractRegistry.saveProperties(URL url)

private final boolean syncSaveFile;if (syncSaveFile) {    //同步保存    doSaveProperties(version);} else {    //线程池异步保存    registryCacheExecutor.execute(new SaveProperties(version));}

4.重试机制

org.apache.dubbo.registry.support.FailBackRegistry添加了retry()方法。