乐趣区

关于注册中心:得物技术常用注册中心原理及比较

目前比拟罕用的注册核心有 Eureka、Zookeeper、Consul 和 Nacos。最近对这四种注册核心的整体框架和实现进行了学习,并次要针对 Nacos 从源码角度学习了服务注册和订阅的具体实现。最初比拟了这四种注册核心的区别。

一.Eureka

左上角的 Eureka Client 是服务提供者:向 Eureka Server 注册和更新本人的信息,同时能从 Eureka Server 注册表中获取到其余服务的信息。具体有以下四种操作:

  • Register 注册:Client 端向 Server 端注册本身的元数据以供服务发现;
  • Renew 续约:通过发送心跳到 Server 以维持和更新注册表中服务实例元数据的有效性。当在肯定时长内,Server 没有收到 Client 的心跳信息,将默认服务下线,会把服务实例的信息从注册表中删除;
  • Cancel 下线:Client 在敞开时被动向 Server 登记服务实例元数据,这时 Client 的服务实例数据将从 Server 的注册表中删除;
  • Get Registry 获取注册表:Client 向 Server 申请注册表信息,用于服务发现,从而发动服务间近程调用。

Eureka Server 服务注册核心:提供服务注册和发现的性能。每个 Eureka Client 向 Eureka Server 注册本人的信息,也能够通过 Eureka Server 获取到其余服务的信息达到发现和调用其余服务的目标。

Eureka Client 服务消费者:通过 Eureka Server 获取注册到其上其余服务的信息,从而依据信息找到所需的服务发动近程调用。

Replicate 同步复制:Eureka Server 之间注册表信息的同步复制,使 Eureka Server 集群中不同注册表中服务实例信息保持一致。因为集群间的同步复制是通过 HTTP 的形式进行,基于网络的不可靠性,集群中的 Eureka Server 间的注册表信息不免存在不同步的工夫节点,不满足 CAP 中的 C(数据一致性)。

Make Remote Call 近程调用:服务客户端之间的近程调用。

二.Zookeeper

2.1 Zookeeper 整体框架

  • Leader:zookeeper 集群工作的外围,事务申请(写操作)的惟一调度和解决者,保障集群事务处理的程序性;集群外部各个服务的调度者。对于 create,set data,delete 等有写操作的申请,则须要对立转发给 leader 解决,leader 须要决定编号、执行操作,这个过程称为一个事务。
  • Follower:解决客户端非事务(读操作)申请 转发事务申请给 Leader 参加集群 leader。
  • Observer:观察者角色是针对访问量较大的 zookeeper 集群新增的角色。察看 zookeeper 集群的最新状态变动并将这些状态同步过去,其对于非事务申请能够进行独立解决,对于事务申请,则会转发给 Leader 服务器解决。不会参加任何模式的投票只提供服务,通常用于在不影响集群事务处理能力的前提下晋升集群的非事务处理能力,用于减少并发的申请。

2.2 Zookeeper 存储构造

下图形容了用于内存示意的 ZooKeeper 文件系统的树结构。ZooKeeper 节点称为 znode。每个 znode 由一个名称标识,并用门路 (/) 序列分隔。在图中,首先有一个由“/”分隔的 znode。在根目录下有两个逻辑命名空间 config 和 workers。config 命名空间用于集中式配置管理,workers 命名空间用于命名。

在 config 命名空间下,每个 znode 最多可存储 1MB 的数据。这与 UNIX 文件系统相相似,除了父 znode 也能够存储数据。这种构造的次要目标是存储同步数据并形容 znode 的元数据。此构造称为 ZooKeeper 数据模型。ZooKeeper 命名空间中的每个节点都由门路标识。

znode 兼具文件和目录两种特点。既像文件一样保护着数据长度、元信息、ACL、工夫戳等数据结构,又像目录一样能够作为门路标识的一部分:

  • 版本号 – 每个 znode 都有版本号,这意味着每当与 znode 相关联的数据发生变化时,其对应的版本号也会减少。当多个 zookeeper 客户端尝试在同一 znode 上执行操作时,版本号的应用就很重要。
  • 操作控制列表(ACL) – ACL 基本上是拜访 znode 的认证机制。它治理所有 znode 读取和写入操作。
  • 工夫戳 – 工夫戳示意创立和批改 znode 所通过的工夫。它通常以毫秒为单位。ZooKeeper 从“事务 ID”(zxid)标识 znode 的每个更改。Zxid 是惟一的,并且为每个事务保留工夫,以便你能够轻松地确定从一个申请到另一个申请所通过的工夫。
  • 数据长度 – 存储在 znode 中的数据总量是数据长度。最多能够存储 1MB 的数据。

ZooKeeper 还具备短暂节点的概念。只有创立 znode 的会话处于活动状态,这些 znode 就存在。会话完结时,将删除 znode。

2.3 Zookeeper 监督性能

ZooKeeper 反对 watch 的概念,客户端能够在 znode 上设置察看。znode 更改时,将触发并删除监督。触发监督后,客户端会收到一个数据包,阐明 znode 已更改。如果客户端和其中一个 ZooKeeper 服务器之间的连贯断开,则客户端将收到本地告诉。3.6.0 中的新增性能:客户端还能够在 znode 上设置永久性的递归监督,这些监督在触发时不会删除,并且会以递归形式触发注册 znode 以及所有子 znode 的更改。

2.4 Zookeeper 选举过程

ZooKeeper 至多须要三个节点能力工作,Zookeeper 节点状态个别认为有 4 个:

  • LOOKING:示意正在进行选举的节点,处于该状态须要进入选举流程
  • LEADING:领导者状态,处于该状态的节点阐明是角色曾经是 Leader
  • FOLLOWING:跟随者状态,示意 Leader 曾经选举进去,以后节点角色是 follower
  • OBSERVER:观察者状态,表明以后节点角色是 observer,observer 示意不会进入选举,仅仅只是承受选举后果,也就是说不会成为 Leader 节点,然而是 follower 节点一样提供服务。

推选 Leader 过程如下图所示:

在集群初始化阶段,当有一台服务器 ZK1 启动时,无奈独自进行和实现 Leader 选举,当第二台服务器 ZK2 启动时,此时两台机器能够互相通信,每台机器都试图找到 Leader,于是进入 Leader 选举过程。选举过程开始,过程如下:

(1) 每个 Server 收回一个投票。因为是初始状况,ZK1 和 ZK2 都会将本人作为 Leader 服务器来进行投票,每次投票会蕴含所推举的服务器的 ID 和 ZXID(事务 ID),应用 (ID, ZXID) 来示意,此时 ZK1 的投票为(1, 0),ZK2 的投票为(2, 0),而后各自将这个投票发给集群中其余机器。

(2) 承受来自各个服务器的投票。集群的每个服务器收到投票后,首先判断该投票的有效性,如查看是否是本轮投票、是否来自 LOOKING 状态的服务器。

(3) 解决投票。针对每一个投票,服务器都须要将他人的投票和本人的投票进行比拟,规定如下:

  • 优先查看 ZXID。ZXID 比拟大的服务器优先作为 Leader。
  • 如果 ZXID 雷同,那么就比拟服务器 ID。ID 较大的服务器作为 Leader 服务器。

对于 ZK1 而言,它的投票是(1, 0),接管 ZK2 的投票为(2, 0),首先会比拟两者的 ZXID,均为 0,再比拟 ID,此时 ZK2 的 ID 更大,于是 ZK2 胜。ZK1 更新本人的投票为(2, 0),并将投票从新发送给 ZK2。

(4) 统计投票。每次投票后,服务器都会统计投票信息,判断是否曾经有过半机器承受到雷同的投票信息,对于 ZK1、ZK2 而言,都统计出集群中曾经有两台机器承受了 (2, 0) 的投票信息,此时便认为曾经选出 ZK2 作为 Leader。

(5) 扭转服务器状态。一旦确定了 Leader,每个服务器就会更新本人的状态,如果是 Follower,那么就变更为 FOLLOWING,如果是 Leader,就变更为 LEADING。当新的 Zookeeper 节点 ZK3 启动时,发现曾经有 Leader 了,不再选举,间接将状态从 LOOKING 改为 FOLLOWING。

三.Consul

3.1 Consul 整体框架

Consul 反对多数据中心,在上图中有两个 Data Center,他们通过 WAN GOSSIP 在 Internet 互联,同时为了进步通信效率,只有 Server 节点才退出跨数据中心的通信。因而,consul 是能够反对多个数据中心之间基于 WAN 来做同步的。

在单个数据中心中,Consul 分为 Client 和 Server 两种节点(所有的节点也被称为 Agent)。

  • Server 节点:参加共识仲裁、存储群集状态 (日志存储)、解决查问、保护与周边(LAN/WAN) 各节点关系
  • Agent 节点:负责通过该节点注册到 consul 的微服务的健康检查、将客户端注册申请以及查问转化为对 server 的 RPC 申请、保护与周边 (LAN/WAN) 各节点关系

它们之间通过 GRPC 通信。除此之外,Server 和 Client 之间,还有一条 LAN GOSSIP 通信,这是用于当 LAN 外部产生了拓扑变动时,存活的节点们可能及时感知,比方 Server 节点 down 掉后,Client 就会触发将对应 Server 节点从可用列表中剥离进来。所有的 Server 节点独特组成了一个集群,他们之间运行 raft 协定,通过共识仲裁选举出 leader。所有的业务数据都通过 leader 写入到集群中做长久化,当有半数以上的节点存储了该数据后,server 集群才会返回 ACK,从而保障了数据的强一致性。当然,Server 数量大了之后,也会影响写数据的效率。所有的 follower 会追随 leader 的脚步,保障其有最新的数据正本。集群内的 Consul 节点通过 gossip 协定保护成员关系,如集群内当初还有哪些节点,这些节点是 Client 还是 Server。

单个数据中心的谰言协定同时应用 TCP 和 UDP 通信,并且都应用 8301 端口。跨数据中心的谰言协定也同时应用 TCP 和 UDP 通信,端口应用 8302。集群内数据的读写申请既能够间接发到 Server,也能够通过 Client 应用 RPC 转发到 Server,申请最终会达到 Leader 节点。

四.Nacos

4.1 Nacos 整体框架

服务注册时在服务端本地会通过轮询注册核心集群节点地址进行服务得注册,在注册核心上,即 Nacos Server 上采纳了 Map 保留实例信息,配置了长久化的服务会被保留到数据库中,在服务的调用方,为了保障本地服务实例列表的动静感知,Nacos 与其余注册核心不同的是,采纳了 Pull/Push 同时运作的形式。

4.2 Nacos 选举

Nacos 的集群相似于 zookeeper,它分为 leader 角色和 follower 角色,那么从这个角色的名字能够看进去,这个集群存在选举的机制。因为如果本人不具备选举性能,角色的命名可能就是 master/slave 了。

选举算法:

Nacos 集群采纳 raft 算法来实现,它是绝对 zookeeper 的选举算法较为简单的一种。选举算法的外围在 RaftCore 中,包含数据的解决和数据同步。

在 Raft 中,节点有三种角色:

  • Leader:负责接管客户端的申请
  • Candidate:用于选举 Leader 的一种角色(竞选状态)
  • Follower:负责响应来自 Leader 或者 Candidate 的申请

所有节点启动的时候,都是 follower 状态。如果在一段时间内如果没有收到 leader 的心跳(可能是没有 leader,也可能是 leader 挂了),那么 follower 会变成 Candidate。而后发动选举,选举之前,会减少 term,这个 term 和 zookeeper 中的 epoch 的情理是一样的。

follower 会投本人一票,并且给其余节点发送票据信息,等到其余节点回复在这个过程中,可能呈现几种状况:

  • 收到过半的票数通过,则成为 leader
  • 被告知其余节点曾经成为 leader,则本人切换为 follower
  • 一段时间内没有收到过半的投票,则从新发动选举。约束条件在任一 term 中,单个节点最多只能投一票

第一种状况,博得选举之后,leader 会给所有节点发送音讯,防止其余节点触发新的选举。

第二种状况,比方有三个节点 A B C。A B 同时发动选举,而 A 的选举音讯先达到 C,C 给 A 投了一票,当 B 的音讯达到 C 时,曾经不能满足下面提到的约束条件,即 C 不会给 B 投票,而 A 和 B 显然都不会给对方投票。A 胜出之后,会给 B,C 发心跳音讯,节点 B 发现节点 A 的 term 不低于本人的 term,晓得有曾经有 Leader 了,于是转换成 follower。

第三种状况,没有任何节点取得大多数投票,可能是平票的状况。退出总共有四个节点(A/B/C/D),Node C、Node D 同时成为了 candidate,但 Node A 投了 Node D 一票,Node B 投了 Node C 一票,这就呈现了平票 的状况。这个时候大家都在期待,直到超时后从新发动选举。如果呈现平票的状况,那么就缩短了零碎不可用的工夫, 因而 raft 引入了 randomizedelection timeouts 来尽量避免平票状况。

4.3 Nacos 服务注册流程源码

Nacos 源码是在 https://github.com/alibaba/nacos 下载的最新版本 2.0.0-bugfix (Mar 30th, 2021)。

当须要注册时,Spring-Cloud 会注入实例 NacosServiceRegistry。

@Override
    public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {NamingUtils.checkInstanceIsLegal(instance);
        String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
        // 增加心跳信息
        if (instance.isEphemeral()) {BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
            beatReactor.addBeatInfo(groupedServiceName, beatInfo);
        }
        // 调用服务代理类进行注册
        serverProxy.registerService(groupedServiceName, groupName, instance);
    }

而后调用 registerService 办法进行注册,构建申请参数,发动申请。

public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,
                instance);

        final Map<String, String> params = new HashMap<String, String>(16);
        params.put(CommonParams.NAMESPACE_ID, namespaceId);
        params.put(CommonParams.SERVICE_NAME, serviceName);
        params.put(CommonParams.GROUP_NAME, groupName);
        params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
        params.put("ip", instance.getIp());
        params.put("port", String.valueOf(instance.getPort()));
        params.put("weight", String.valueOf(instance.getWeight()));
        params.put("enable", String.valueOf(instance.isEnabled()));
        params.put("healthy", String.valueOf(instance.isHealthy()));
        params.put("ephemeral", String.valueOf(instance.isEphemeral()));
        params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));

        reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);

    }

进入 reqApi 办法,咱们能够看到服务在进行注册的时候会轮询配置好的注册核心的地址:

public String reqApi(String api, Map<String, String> params, Map<String, String> body, List<String> servers,
            String method) throws NacosException {params.put(CommonParams.NAMESPACE_ID, getNamespaceId());

        if (CollectionUtils.isEmpty(servers) && StringUtils.isBlank(nacosDomain)) {throw new NacosException(NacosException.INVALID_PARAM, "no server available");
        }

        NacosException exception = new NacosException();
        //service 只有一个的状况
        if (StringUtils.isNotBlank(nacosDomain)) {for (int i = 0; i < maxRetry; i++) {
                try {return callServer(api, params, body, nacosDomain, method);
                } catch (NacosException e) {
                    exception = e;
                    if (NAMING_LOGGER.isDebugEnabled()) {NAMING_LOGGER.debug("request {} failed.", nacosDomain, e);
                    }
                }
            }
        } else {Random random = new Random(System.currentTimeMillis());
            int index = random.nextInt(servers.size());

            for (int i = 0; i < servers.size(); i++) {String server = servers.get(index);
                try {return callServer(api, params, body, server, method);
                } catch (NacosException e) {
                    exception = e;
                    if (NAMING_LOGGER.isDebugEnabled()) {NAMING_LOGGER.debug("request {} failed.", server, e);
                    }
                }
                // 轮询
                index = (index + 1) % servers.size();}
        }

最初通过 callServer(api, params, server, method) 发动调用

public String callServer(String api, Map<String, String> params, Map<String, String> body, String curServer,
            String method) throws NacosException {long start = System.currentTimeMillis();
        long end = 0;
        injectSecurityInfo(params);
        Header header = builderHeader();

        String url;
        // 发送 http 申请
        if (curServer.startsWith(UtilAndComs.HTTPS) || curServer.startsWith(UtilAndComs.HTTP)) {url = curServer + api;} else {if (!IPUtil.containsPort(curServer)) {curServer = curServer + IPUtil.IP_PORT_SPLITER + serverPort;}
            url = NamingHttpClientManager.getInstance().getPrefix() + curServer + api;
        }
    }

Nacos 服务端的解决:

服务端提供了一个 InstanceController 类,在这个类中提供了服务注册相干的 API

@CanDistro
    @PostMapping
    @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
    public String register(HttpServletRequest request) throws Exception {

        final String namespaceId = WebUtils
                .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
        final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        NamingUtils.checkServiceNameFormat(serviceName);
        // 从申请中解析出 instance 实例
        final Instance instance = parseInstance(request);

        serviceManager.registerInstance(namespaceId, serviceName, instance);
        return "ok";
    }

而后调用 ServiceManager 进行服务的注册

public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
        // 创立一个空服务,在 Nacos 控制台服务列表展现的服务信息,实际上是初始化一个 serviceMap,它是一个 ConcurrentHashMap 汇合
        createEmptyService(namespaceId, serviceName, instance.isEphemeral());
        // 从 serviceMap 中,依据 namespaceId 和 serviceName 失去一个服务对象
        Service service = getService(namespaceId, serviceName);

        if (service == null) {
            throw new NacosException(NacosException.INVALID_PARAM,
                    "service not found, namespace:" + namespaceId + ", service:" + serviceName);
        }
        // 调用 addInstance 创立一个服务实例
        addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
    }

创立空服务实例时

public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
            throws NacosException {
        // 从 serviceMap 中获取服务对象
        Service service = getService(namespaceId, serviceName);
        // 如果为空。则初始化
        if (service == null) {Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
            service = new Service();
            service.setName(serviceName);
            service.setNamespaceId(namespaceId);
            service.setGroupName(NamingUtils.getGroupName(serviceName));
            // now validate the service. if failed, exception will be thrown
            service.setLastModifiedMillis(System.currentTimeMillis());
            service.recalculateChecksum();
            if (cluster != null) {cluster.setService(service);
                service.getClusterMap().put(cluster.getName(), cluster);
            }
            service.validate();

            putServiceAndInit(service);
            if (!local) {addOrReplaceService(service);
            }
        }
    }

getService 办法中用到了 Map 进行存储:

private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();

Nacos 是通过不同的 namespace 来保护服务的,而每个 namespace 下有不同的 group,不同的 group 下才有对应的 Service,再通过这个 serviceName 来确定服务实例。第一次进来则会进入初始化,初始化完会调用 putServiceAndInit。

private void putServiceAndInit(Service service) throws NacosException {
        // 把服务信息保留到 serviceMap 汇合
        putService(service);
        service = getService(service.getNamespaceId(), service.getName());
        // 建设心跳检测机制
        service.init();
        // 实现数据一致性监听,ephemeral(标识服务是否为长期服务,默认是长久化的,也就是 true)=true 示意采纳 raft 协定,false 示意采纳 Distro
        consistencyService
                .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
        consistencyService
                .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
        Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
    }

获取到服务当前把服务实例增加到汇合中,而后基于一致性协定进行数据的同步。而后调用 addInstance

public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
            throws NacosException {
        // 组装 key
        String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
        // 获取刚刚组装的服务
        Service service = getService(namespaceId, serviceName);

        synchronized (service) {List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);

            Instances instances = new Instances();
            instances.setInstanceList(instanceList);
            // 也就是上一步实现监听的类里增加注册服务
            consistencyService.put(key, instances);
        }
    }

4.4 Nacos 服务订阅源码

节点的订阅在不同的注册核心中都有不同的实现,个别分为拉取和推送两种。

推送是指当订阅的节点产生更新的时候会被动向订阅方进行推送,ZK 就是推送的实现形式,客户端和服务端会建设一个 TCP 长连贯,客户端会注册一个 watcher,而后当有数据更新的时候,服务端会通过长连贯进行推送。通过这种建设长连贯的模式,会重大耗费服务端的资源,所以当 watcher 比拟多,并且当更新频繁的时候,Zookeeper 的性能会非常低,甚至挂掉。

拉取是指订阅的节点被动定时获取服务端节点的信息,而后再本地去做一个比对,如果有扭转就会做一些更新。在 Consul 中也有一个 watcher 机制,但和 ZK 不一样的是,他是通过 Http 长轮询去实现的,Consul 服务端会对申请的 url 中是否蕴含 wait 参数进行立刻返回,还是先挂起期待指定 wait 工夫内如果服务有变动在返回。应用该办法的性能可能较高然而实时性可能不高。

在 Nacos 中,联合了这两个思维,既提供了拉取又提供了被动推送。

拉取的局部,从 hostReactor 获取 serviceInfo 的具体操作如下:

public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {NAMING_LOGGER.debug("failover-mode:" + failoverReactor.isFailoverSwitch());
        // 拼接服务名称 + 集群名称(默认为空)String key = ServiceInfo.getKey(serviceName, clusters);
        if (failoverReactor.isFailoverSwitch()) {return failoverReactor.getService(key);
        }
        // 从 ServiceInfoMap 中依据 key 来查找服务提供者列表,ServiceInfoMap 是客户端的服务地址的本地缓存
        ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
        // 如果为空,示意本地缓存不存在
        if (null == serviceObj) {serviceObj = new ServiceInfo(serviceName, clusters);
            // 如果找不到则创立一个新的而后放入 serviceInfoMap,同时放入 updatingMap,执行 updateServiceNow,再从 updatingMap 移除;serviceInfoMap.put(serviceObj.getKey(), serviceObj);

            updatingMap.put(serviceName, new Object());
            // 立马从 Nacos server 中去加载服务地址信息
            updateServiceNow(serviceName, clusters);
            updatingMap.remove(serviceName);

        } else if (updatingMap.containsKey(serviceName)) {
            // 如果从 serviceInfoMap 找进去的 serviceObj 在 updatingMap 中则期待 UPDATE_HOLD_INTERVAL
            if (UPDATE_HOLD_INTERVAL > 0) {
                // hold a moment waiting for update finish
                synchronized (serviceObj) {
                    try {serviceObj.wait(UPDATE_HOLD_INTERVAL);
                    } catch (InterruptedException e) {
                        NAMING_LOGGER
                                .error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
                    }
                }
            }
        }
        // 开启定时调度,每 10s 去查问一次服务地址
        // 如果本地缓存中存在,则通过 scheduleUpdateIfAbsent 开启定时工作,再从 serviceInfoMap 取出 serviceInfo
        scheduleUpdateIfAbsent(serviceName, clusters);
        return serviceInfoMap.get(serviceObj.getKey());
    }

Nacos 推送性能,Nacos 会记录下面咱们的订阅者到咱们的 PushService

而 PushService 类实现了 ApplicationListener<ServiceChangeEvent> 所以自身又会取监听该事件,监听服务状态变更事件,而后遍历所有的客户端,通过 udp 协定进行音讯的播送告诉:

public void onApplicationEvent(ServiceChangeEvent event) {Service service = event.getService();// 获取到服务
        String serviceName = service.getName();// 服务名
        String namespaceId = service.getNamespaceId();// 命名空间
        // 执行工作
        Future future = GlobalExecutor.scheduleUdpSender(() -> {
            try {Loggers.PUSH.info(serviceName + "is changed, add it to push queue.");
                ConcurrentMap<String, PushClient> clients = clientMap
                        .get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
                if (MapUtils.isEmpty(clients)) {return;}
                Map<String, Object> cache = new HashMap<>(16);
                long lastRefTime = System.nanoTime();
                for (PushClient client : clients.values()) {if (client.zombie()) {Loggers.PUSH.debug("client is zombie:" + client.toString());
                        clients.remove(client.toString());
                        Loggers.PUSH.debug("client is zombie:" + client.toString());
                        continue;
                    }
                    Receiver.AckEntry ackEntry;
                    Loggers.PUSH.debug("push serviceName: {} to client: {}", serviceName, client.toString());
                    String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());
                    byte[] compressData = null;
                    Map<String, Object> data = null;
                    if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);
                        compressData = (byte[]) (pair.getValue0());
                        data = (Map<String, Object>) pair.getValue1();
                        Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", serviceName, client.getAddrStr());
                    }
                    if (compressData != null) {ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);
                    } else {ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);
                        if (ackEntry != null) {cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data));
                        }
                    }
                    Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}",
                            client.getServiceName(), client.getAddrStr(), client.getAgent(),
                            (ackEntry == null ? null : ackEntry.key));
                    // 执行 UDP  推送
                    udpPush(ackEntry);
                }
            } catch (Exception e) {Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);

            } finally {futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
            }

        }, 1000, TimeUnit.MILLISECONDS);

        futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);

    }

服务消费者此时需建设一个 udp 服务的监听,否则服务端无奈进行数据的推送。这个监听是在 HostReactor 的构造方法中初始化的。

Nacos 这种推送模式,对于 Zookeeper 那种通过 tcp 长连贯来说会节约很多资源,就算大量的节点更新也不会让 Nacos 呈现太多的性能瓶颈,在 Nacos 中客户端如果承受到了 udp 音讯会返回一个 ACK, 如果肯定工夫 Nacos-Server 没有收到 ACK,那么还会进行重发,当超过肯定重发工夫之后,就不在重发了,尽管通过 udp 并不能保障能真正的送到订阅者,然而 Nacos 还有定时轮训作为兜底,不须要放心数据不会更新的状况。

Nacos 通过这两种伎俩,既保证了实时性,又保障了数据更新不会漏掉。

五.四种注册核心比拟

四种注册核心有着各自的特点,通过以下列表能够比拟清晰地比照他们的不同点:

文 /hz

关注得物技术,携手走向技术的云端

退出移动版