Spring-Cloud-Alibaba-Nacos心跳与选举

27次阅读

共计 10106 个字符,预计需要花费 26 分钟才能阅读完成。

通过阅读 NACOS 的源码,了解其心跳与选举机制。开始阅读此篇文章之前,建议先阅读如下两篇文章:

Spring Cloud Alibaba Nacos(功能篇)

Spring Cloud Alibaba Nacos(源码篇)

一、心跳机制

只有 NACOS 服务与所注册的 Instance 之间才会有直接的心跳维持机制,换言之,这是一种典型的集中式管理机制。

在 client 这一侧是心跳的发起源,进入 NacosNamingService,可以发现,只有注册服务实例的时候才会构造心跳包:

    @Override
    public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {if (instance.isEphemeral()) {BeatInfo beatInfo = new BeatInfo();
            beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
            beatInfo.setIp(instance.getIp());
            beatInfo.setPort(instance.getPort());
            beatInfo.setCluster(instance.getClusterName());
            beatInfo.setWeight(instance.getWeight());
            beatInfo.setMetadata(instance.getMetadata());
            beatInfo.setScheduled(false);
            beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
        }
        serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
    }

没有特殊情况,目前 ephemeral 都是 true。BeatReactor 维护了一个 Map 对象,记录了需要发送心跳的 BeatInfo,构造了一个心跳包后,BeatReactor.addBeatInfo 方法将 BeatInfo 放入 Map 中。然后,内部有一个定时器,每隔 5 秒发送一次心跳。

    class BeatProcessor implements Runnable {

        @Override
        public void run() {
            try {for (Map.Entry<String, BeatInfo> entry : dom2Beat.entrySet()) {BeatInfo beatInfo = entry.getValue();
                    if (beatInfo.isScheduled()) {continue;}
                    beatInfo.setScheduled(true);
                    executorService.schedule(new BeatTask(beatInfo), 0, TimeUnit.MILLISECONDS);
                }
            } catch (Exception e) {NAMING_LOGGER.error("[CLIENT-BEAT] Exception while scheduling beat.", e);
            } finally {executorService.schedule(this, clientBeatInterval, TimeUnit.MILLISECONDS);
            }
        }
    }

通过设置 scheduled 的值来控制是否已经下发了心跳任务,具体的心跳任务逻辑放在了 BeatTask。

    class BeatTask implements Runnable {

        BeatInfo beatInfo;

        public BeatTask(BeatInfo beatInfo) {this.beatInfo = beatInfo;}

        @Override
        public void run() {long result = serverProxy.sendBeat(beatInfo);
            beatInfo.setScheduled(false);
            if (result > 0) {clientBeatInterval = result;}
        }
    }

sendBeat 就是请求了 /instance/beat 接口,只返回了一个心跳间隔时长,将这个返回值用于 client 设置定时任务间隔,同时将 scheduled 置为 false,表示完成了此次心跳发送任务,可以进行下次心跳。

NACOS 接到心跳后,会有一段 instance 判空的逻辑,如果找不到对应的 instance,就会直接创建出来,也就是默认相信心跳的请求源是合理的。

        Instance instance = serviceManager.getInstance(namespaceId, serviceName, clientBeat.getCluster(), clientBeat.getIp(), clientBeat.getPort());
        if (instance == null) {instance = new Instance();
            instance.setPort(clientBeat.getPort());
            instance.setIp(clientBeat.getIp());
            instance.setWeight(clientBeat.getWeight());
            instance.setMetadata(clientBeat.getMetadata());
            instance.setClusterName(clusterName);
            instance.setServiceName(serviceName);
            instance.setInstanceId(instance.generateInstanceId());
            instance.setEphemeral(clientBeat.isEphemeral());
            serviceManager.registerInstance(namespaceId, serviceName, instance);
        }
        Service service = serviceManager.getService(namespaceId, serviceName);
        if (service == null) {throw new NacosException(NacosException.SERVER_ERROR, "service not found:" + serviceName + "@" + namespaceId);
        }
        service.processClientBeat(clientBeat);

对于 client 的心跳处理,放在了对应的 Service 里面,处理心跳的代码逻辑放在了 ClientBeatProcessor:

    @Override
    public void run() {
        Service service = this.service;
        String ip = rsInfo.getIp();
        String clusterName = rsInfo.getCluster();
        int port = rsInfo.getPort();
        Cluster cluster = service.getClusterMap().get(clusterName);
        List<Instance> instances = cluster.allIPs(true);
        for (Instance instance : instances) {if (instance.getIp().equals(ip) && instance.getPort() == port) {instance.setLastBeat(System.currentTimeMillis());
                if (!instance.isMarked()) {if (!instance.isHealthy()) {instance.setHealthy(true);
                        getPushService().serviceChanged(service.getNamespaceId(), this.service.getName());
                    }
                }
            }
        }
    }

逻辑很简单,将集群下所有 ephemeral=true 的实例找出来,然后根据 ip 和 port 匹配到对应的 instance,然后记录此次心态时间。

marked 属性暂时没有发现有什么用处,唯一调用过 setMarked 的地方是通过解析一段字符串来构建 Instance,下划线分割,可以指定 marked。

// 7 possible formats of config:
// ip:port
// ip:port_weight
// ip:port_weight_cluster
// ip:port_weight_valid
// ip:port_weight_valid_cluster
// ip:port_weight_valid_marked
// ip:port_weight_valid_marked_cluster

按照处理心跳的逻辑,如果 marked=true 的话,这个实例就不会处理 ServiceChanged 事件,状态也就得不到改变了。

PushService 在处理 ServiceChanged 事件的时候,主要做了两件事情:

其一,根据上次记录的心跳时间,判断现有的实例在缓存的时效内(默认 10s)是否有心跳发送过来,主要的调用方法:

        public boolean zombie() {return System.currentTimeMillis() - lastRefTime > switchDomain.getPushCacheMillis(serviceName);
        }

其二、是发送 udp 广播通知所有的 client,有 instance 发生了变更。

    if (compressData != null) {ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);
    } else {ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);
        if (ackEntry != null) {cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data));
        }
    }
    udpPush(ackEntry);

最后再关注一个问题:NACOS 怎么处理长时间没有发送心跳的服务实例?

相关代码逻辑放在了 PushService:

    static {
        try {udpSocket = new DatagramSocket();
            Receiver receiver = new Receiver();
            Thread inThread = new Thread(receiver);
            inThread.setDaemon(true);
            inThread.setName("com.alibaba.nacos.naming.push.receiver");
            inThread.start();
            executorService.scheduleWithFixedDelay(new Runnable() {
                @Override
                public void run() {
                    try {removeClientIfZombie();
                    } catch (Throwable e) {Loggers.PUSH.warn("[NACOS-PUSH] failed to remove client zombie");
                    }
                }
            }, 0, 20, TimeUnit.SECONDS);
        } catch (SocketException e) {Loggers.SRV_LOG.error("[NACOS-PUSH] failed to init push service");
        }
    }

可以看到,又是熟悉的定时器,移除 client 的判断依据仍然是 zombie 方法。

二、选举机制

NACOS 选举机制的底层原理是 RAFT 共识算法,NACOS 没有依赖诸如 zookeeper 之类的第三方库,而是自实现了一套 RAFT 算法。

相较于大名鼎鼎的 Paxos 算法,RAFT 算法最突出的优势就是易于理解,学习起来很轻松。

在 RAFT 算法领域中,有三种基本的状态(角色):followercandidateleader

处于 follower 状态的 server 不会发起任何的 request,只是被动的响应 leader 和 candidate。

处于 leader 状态的 server 会主动的发送心跳包给各个 follower,并且接收 client 所有的 request。

而 candidate 是一种过渡状态,只有整个 cluster 在进行新的选举的时候,才会出现此种状态的 server。

还有一个重要的概念是term,可以理解为一个任意(随机)的时间片段,在这个时间段内实施选举。

更多的 RAFT 算法知识不在此展开讲述了,接下来进入源码阅读,核心部分在 RaftCore 类中。此类被注解为 @component,我们从 init()方法开始阅读:

    @PostConstruct
    public void init() throws Exception {executor.submit(notifier);
        long start = System.currentTimeMillis();
        datums = raftStore.loadDatums(notifier);
        setTerm(NumberUtils.toLong(raftStore.loadMeta().getProperty("term"), 0L));
        while (true) {if (notifier.tasks.size() <= 0) {break;}
            Thread.sleep(1000L);
        }
        initialized = true;
        GlobalExecutor.registerMasterElection(new MasterElection());
        GlobalExecutor.registerHeartbeat(new HeartBeat());
    }

咋一看,有一个比较扎眼的 while(true)死循环,当然也会有跳出循环的条件。再细看 loadDatums()方法,是在读取本地缓存目录,如果不为空,就会调用 notifier.addTask,这种情况下就会导致死循环跳出条件得不到满足,而 notifier 内部又是一个死循环,调用了 tasks.take()取出任务,如果没有没有任务可取了,就会阻塞于此,上述初始化方法中的死循环也就顺利跳出了。

其实这一步操作是在利用 Failover 机制来同步本地的 service 和 instance 信息,与选举机制无关。

最后两行代码才是关键,分别启动了 MasterElection 和 HeartBeat 两个任务。看这部分代码的时候,需要你有分布式的思维方式,可以在脑海中假定有三个独立部署的 NACOS 服务组成的 cluster。

先来看选举任务的定义:

    public class MasterElection implements Runnable {
        @Override
        public void run() {
            try {if (!peers.isReady()) {return;}
                RaftPeer local = peers.local();
                local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS;
                if (local.leaderDueMs > 0) {return;}
                // reset timeout
                local.resetLeaderDue();
                local.resetHeartbeatDue();
                sendVote();} catch (Exception e) {Loggers.RAFT.warn("[RAFT] error while master election {}", e);
            }
        }

比较关键的是对时效性的处理。local.leaderDueMs 的值一开始是随机生成的,范围是 [0, 15000),单位是毫秒,此后按照 500 毫秒一个梯度进行递减,减少到≤0 后,就会触发选举操作。当然,选举之前,把超时时间重置一下。resetLeaderDue() 方法是把 leaderDueMs 变量重新赋值,但是并不是像初始化随机赋值一样的逻辑,而是在 15000 毫秒的基础上加上了一个随机值,其随机值的范围是[0, 5000)毫秒。

接下来就是选举方法:

    public void sendVote() {RaftPeer local = peers.get(NetUtils.localServer());
        peers.reset();
        local.term.incrementAndGet();
        local.voteFor = local.ip;
        local.state = RaftPeer.State.CANDIDATE;
        Map<String, String> params = new HashMap<String, String>(1);
        params.put("vote", JSON.toJSONString(local));
        for (final String server : peers.allServersWithoutMySelf()) {final String url = buildURL(server, API_VOTE);
            try {HttpClient.asyncHttpPost(url, null, params, new AsyncCompletionHandler<Integer>() {
                    @Override
                    public Integer onCompleted(Response response) throws Exception {if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {return 1;}
                        RaftPeer peer = JSON.parseObject(response.getResponseBody(), RaftPeer.class);
                        peers.decideLeader(peer);
                        return 0;
                    }
                });
            } catch (Exception e) {Loggers.RAFT.warn("error while sending vote to server: {}", server);
            }
        }
    }

进入这个方法之后,会将自身的 term 自增 1,为自己投一票,状态变成了 candidate,然后将自己投票的结果发送给其他 NACOS 服务。发送投票是若干次 HTTP Request,由各自的 RaftController 来接收处理,最终调用的还是 RaftCore.receivedVote 方法:

    public RaftPeer receivedVote(RaftPeer remote) {if (!peers.contains(remote)) {throw new IllegalStateException("can not find peer:" + remote.ip);
        }
        RaftPeer local = peers.get(NetUtils.localServer());
        if (remote.term.get() <= local.term.get()) {if (StringUtils.isEmpty(local.voteFor)) {local.voteFor = local.ip;}
            return local;
        }
        local.resetLeaderDue();
        local.state = RaftPeer.State.FOLLOWER;
        local.voteFor = remote.ip;
        local.term.set(remote.term.get());
        return local;
    }

这里面主要是对 term 的数值大小比较,如果一旦发现 request 请求过来的 term 比自己本地的 term 要大,那就放弃竞选,自己转为 follower 的状态,将 term 设置为 request 中带过来的 term 参数值;反之,就不做任何处理,直接返回 local。

发送投票后,可以同时拿到对方的投票结果,然后根据各方投票结果来计算最终哪台 server 出于 leader 状态。

    public RaftPeer decideLeader(RaftPeer candidate) {peers.put(candidate.ip, candidate);
        SortedBag ips = new TreeBag();
        int maxApproveCount = 0;
        String maxApprovePeer = null;
        for (RaftPeer peer : peers.values()) {if (StringUtils.isEmpty(peer.voteFor)) {continue;}
            ips.add(peer.voteFor);
            if (ips.getCount(peer.voteFor) > maxApproveCount) {maxApproveCount = ips.getCount(peer.voteFor);
                maxApprovePeer = peer.voteFor;
            }
        }
        if (maxApproveCount >= majorityCount()) {RaftPeer peer = peers.get(maxApprovePeer);
            peer.state = RaftPeer.State.LEADER;
            if (!Objects.equals(leader, peer)) {leader = peer;}
        }
        return leader;
    }

因为发起投票的请求是异步进行的,进入 decideLeader 方法中并不意味着所有的 candidate 都完成了投票,所以会在 for 循环中忽略未投票的 peer。接下来要经过两个步骤,其一是计算出得票数最多的 peer,其二是最多的得票数还必须超过整个集群实例数的一半。这也就说明了服务实例为什么要是奇数个,并且是三个及以上。RAFT 算法推荐的是 5 个实例。

然后就是由 leader 发送心跳包给各个 follower。对于心跳时效性的处理逻辑和选举的时候如出一辙,这里就不再赘述了。run 方法里面没有调用 resetLeaderDue()方法,而是推迟到了 sendBeat()方法里面进行调用了,达到的效果是一样的,防止在不必要的时候发起了选举。

    public class HeartBeat implements Runnable {
        @Override
        public void run() {
            try {if (!peers.isReady()) {return;}
                RaftPeer local = peers.local();
                local.heartbeatDueMs -= GlobalExecutor.TICK_PERIOD_MS;
                if (local.heartbeatDueMs > 0) {return;}
                local.resetHeartbeatDue();
                sendBeat();} catch (Exception e) {Loggers.RAFT.warn("[RAFT] error while sending beat {}", e);
            }
        }

当然,follower 在接收到心跳包之后,也会调用 resetLeaderDue()方法,也就意味着,如果 follower 长时间收不到 leader 的心态,就认为 leader 已经不可用了,随机触发选举操作,选出新的 leader。

最后来讨论两种可能发生的情况,可能性微乎其微的奇葩场景就不再讨论了。

1、在一轮投票中,大家很凑巧的都投给了自己,那就等待下一轮投票开始吧!不可能每次都这么凑巧。同时,NACOS 选举加入了时间的随机性,如果发现不满足时间点的要求,就会放弃选举,维持上一轮的 term,最终肯定是处于 follower 的状态。

2、leader 选举成功后,因为网络抖动,follower 接收不到心跳包,将会重新发起选举。重新选举的过程中,如果旧的 leader 恢复了,那就皆大欢喜,一切照常;如果没有及时恢复过来,那就造成了双 leader 的问题。不过 NACOS 在处理心跳包的时候会修正,当发现自己不是 follower 状态,却收到了心跳包的时候,会强制把自己的状态变为 follower。如果凑巧两个 leader 都将自己变为了 follower,也没关系,心跳过期时间一到,马上就可以开始新的选举流程了。

三、总结

本文深入探讨了 NACOS 的心跳和选举机制,并且对可能遇到的情况进行了进一步分析。总的来说,心跳机制是比较好理解的,而选举机制则需要一些 RAFT 算法的基础知识,加之目前 NACOS 源码的注解甚少,如果对 NACOS 没有一定的了解,阅读起来还是有些困难的。总之,把握整体,不要在意太多细节。

扫描下方二维码,进入原创干货,搞“技”圣地。

正文完
 0