共计 10106 个字符,预计需要花费 26 分钟才能阅读完成。
通过阅读 NACOS 的源码,了解其心跳与选举机制。开始阅读此篇文章之前,建议先阅读如下两篇文章:
Spring Cloud Alibaba Nacos(功能篇)
Spring Cloud Alibaba Nacos(源码篇)
一、心跳机制
只有 NACOS 服务与所注册的 Instance 之间才会有直接的心跳维持机制,换言之,这是一种典型的集中式管理机制。
在 client 这一侧是心跳的发起源,进入 NacosNamingService,可以发现,只有注册服务实例的时候才会构造心跳包:
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {if (instance.isEphemeral()) {BeatInfo beatInfo = new BeatInfo();
beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
beatInfo.setIp(instance.getIp());
beatInfo.setPort(instance.getPort());
beatInfo.setCluster(instance.getClusterName());
beatInfo.setWeight(instance.getWeight());
beatInfo.setMetadata(instance.getMetadata());
beatInfo.setScheduled(false);
beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
}
serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
}
没有特殊情况,目前 ephemeral 都是 true。BeatReactor 维护了一个 Map 对象,记录了需要发送心跳的 BeatInfo,构造了一个心跳包后,BeatReactor.addBeatInfo 方法将 BeatInfo 放入 Map 中。然后,内部有一个定时器,每隔 5 秒发送一次心跳。
class BeatProcessor implements Runnable {
@Override
public void run() {
try {for (Map.Entry<String, BeatInfo> entry : dom2Beat.entrySet()) {BeatInfo beatInfo = entry.getValue();
if (beatInfo.isScheduled()) {continue;}
beatInfo.setScheduled(true);
executorService.schedule(new BeatTask(beatInfo), 0, TimeUnit.MILLISECONDS);
}
} catch (Exception e) {NAMING_LOGGER.error("[CLIENT-BEAT] Exception while scheduling beat.", e);
} finally {executorService.schedule(this, clientBeatInterval, TimeUnit.MILLISECONDS);
}
}
}
通过设置 scheduled 的值来控制是否已经下发了心跳任务,具体的心跳任务逻辑放在了 BeatTask。
class BeatTask implements Runnable {
BeatInfo beatInfo;
public BeatTask(BeatInfo beatInfo) {this.beatInfo = beatInfo;}
@Override
public void run() {long result = serverProxy.sendBeat(beatInfo);
beatInfo.setScheduled(false);
if (result > 0) {clientBeatInterval = result;}
}
}
sendBeat 就是请求了 /instance/beat 接口,只返回了一个心跳间隔时长,将这个返回值用于 client 设置定时任务间隔,同时将 scheduled 置为 false,表示完成了此次心跳发送任务,可以进行下次心跳。
NACOS 接到心跳后,会有一段 instance 判空的逻辑,如果找不到对应的 instance,就会直接创建出来,也就是默认相信心跳的请求源是合理的。
Instance instance = serviceManager.getInstance(namespaceId, serviceName, clientBeat.getCluster(), clientBeat.getIp(), clientBeat.getPort());
if (instance == null) {instance = new Instance();
instance.setPort(clientBeat.getPort());
instance.setIp(clientBeat.getIp());
instance.setWeight(clientBeat.getWeight());
instance.setMetadata(clientBeat.getMetadata());
instance.setClusterName(clusterName);
instance.setServiceName(serviceName);
instance.setInstanceId(instance.generateInstanceId());
instance.setEphemeral(clientBeat.isEphemeral());
serviceManager.registerInstance(namespaceId, serviceName, instance);
}
Service service = serviceManager.getService(namespaceId, serviceName);
if (service == null) {throw new NacosException(NacosException.SERVER_ERROR, "service not found:" + serviceName + "@" + namespaceId);
}
service.processClientBeat(clientBeat);
对于 client 的心跳处理,放在了对应的 Service 里面,处理心跳的代码逻辑放在了 ClientBeatProcessor:
@Override
public void run() {
Service service = this.service;
String ip = rsInfo.getIp();
String clusterName = rsInfo.getCluster();
int port = rsInfo.getPort();
Cluster cluster = service.getClusterMap().get(clusterName);
List<Instance> instances = cluster.allIPs(true);
for (Instance instance : instances) {if (instance.getIp().equals(ip) && instance.getPort() == port) {instance.setLastBeat(System.currentTimeMillis());
if (!instance.isMarked()) {if (!instance.isHealthy()) {instance.setHealthy(true);
getPushService().serviceChanged(service.getNamespaceId(), this.service.getName());
}
}
}
}
}
逻辑很简单,将集群下所有 ephemeral=true 的实例找出来,然后根据 ip 和 port 匹配到对应的 instance,然后记录此次心态时间。
marked 属性暂时没有发现有什么用处,唯一调用过 setMarked 的地方是通过解析一段字符串来构建 Instance,下划线分割,可以指定 marked。
// 7 possible formats of config:
// ip:port
// ip:port_weight
// ip:port_weight_cluster
// ip:port_weight_valid
// ip:port_weight_valid_cluster
// ip:port_weight_valid_marked
// ip:port_weight_valid_marked_cluster
按照处理心跳的逻辑,如果 marked=true 的话,这个实例就不会处理 ServiceChanged 事件,状态也就得不到改变了。
PushService 在处理 ServiceChanged 事件的时候,主要做了两件事情:
其一,根据上次记录的心跳时间,判断现有的实例在缓存的时效内(默认 10s)是否有心跳发送过来,主要的调用方法:
public boolean zombie() {return System.currentTimeMillis() - lastRefTime > switchDomain.getPushCacheMillis(serviceName);
}
其二、是发送 udp 广播通知所有的 client,有 instance 发生了变更。
if (compressData != null) {ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);
} else {ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);
if (ackEntry != null) {cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data));
}
}
udpPush(ackEntry);
最后再关注一个问题:NACOS 怎么处理长时间没有发送心跳的服务实例?
相关代码逻辑放在了 PushService:
static {
try {udpSocket = new DatagramSocket();
Receiver receiver = new Receiver();
Thread inThread = new Thread(receiver);
inThread.setDaemon(true);
inThread.setName("com.alibaba.nacos.naming.push.receiver");
inThread.start();
executorService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {removeClientIfZombie();
} catch (Throwable e) {Loggers.PUSH.warn("[NACOS-PUSH] failed to remove client zombie");
}
}
}, 0, 20, TimeUnit.SECONDS);
} catch (SocketException e) {Loggers.SRV_LOG.error("[NACOS-PUSH] failed to init push service");
}
}
可以看到,又是熟悉的定时器,移除 client 的判断依据仍然是 zombie 方法。
二、选举机制
NACOS 选举机制的底层原理是 RAFT 共识算法,NACOS 没有依赖诸如 zookeeper 之类的第三方库,而是自实现了一套 RAFT 算法。
相较于大名鼎鼎的 Paxos 算法,RAFT 算法最突出的优势就是易于理解,学习起来很轻松。
在 RAFT 算法领域中,有三种基本的状态(角色):follower、candidate、leader。
处于 follower 状态的 server 不会发起任何的 request,只是被动的响应 leader 和 candidate。
处于 leader 状态的 server 会主动的发送心跳包给各个 follower,并且接收 client 所有的 request。
而 candidate 是一种过渡状态,只有整个 cluster 在进行新的选举的时候,才会出现此种状态的 server。
还有一个重要的概念是term,可以理解为一个任意(随机)的时间片段,在这个时间段内实施选举。
更多的 RAFT 算法知识不在此展开讲述了,接下来进入源码阅读,核心部分在 RaftCore 类中。此类被注解为 @component,我们从 init()方法开始阅读:
@PostConstruct
public void init() throws Exception {executor.submit(notifier);
long start = System.currentTimeMillis();
datums = raftStore.loadDatums(notifier);
setTerm(NumberUtils.toLong(raftStore.loadMeta().getProperty("term"), 0L));
while (true) {if (notifier.tasks.size() <= 0) {break;}
Thread.sleep(1000L);
}
initialized = true;
GlobalExecutor.registerMasterElection(new MasterElection());
GlobalExecutor.registerHeartbeat(new HeartBeat());
}
咋一看,有一个比较扎眼的 while(true)死循环,当然也会有跳出循环的条件。再细看 loadDatums()方法,是在读取本地缓存目录,如果不为空,就会调用 notifier.addTask,这种情况下就会导致死循环跳出条件得不到满足,而 notifier 内部又是一个死循环,调用了 tasks.take()取出任务,如果没有没有任务可取了,就会阻塞于此,上述初始化方法中的死循环也就顺利跳出了。
其实这一步操作是在利用 Failover 机制来同步本地的 service 和 instance 信息,与选举机制无关。
最后两行代码才是关键,分别启动了 MasterElection 和 HeartBeat 两个任务。看这部分代码的时候,需要你有分布式的思维方式,可以在脑海中假定有三个独立部署的 NACOS 服务组成的 cluster。
先来看选举任务的定义:
public class MasterElection implements Runnable {
@Override
public void run() {
try {if (!peers.isReady()) {return;}
RaftPeer local = peers.local();
local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS;
if (local.leaderDueMs > 0) {return;}
// reset timeout
local.resetLeaderDue();
local.resetHeartbeatDue();
sendVote();} catch (Exception e) {Loggers.RAFT.warn("[RAFT] error while master election {}", e);
}
}
比较关键的是对时效性的处理。local.leaderDueMs 的值一开始是随机生成的,范围是 [0, 15000),单位是毫秒,此后按照 500 毫秒一个梯度进行递减,减少到≤0 后,就会触发选举操作。当然,选举之前,把超时时间重置一下。resetLeaderDue() 方法是把 leaderDueMs 变量重新赋值,但是并不是像初始化随机赋值一样的逻辑,而是在 15000 毫秒的基础上加上了一个随机值,其随机值的范围是[0, 5000)毫秒。
接下来就是选举方法:
public void sendVote() {RaftPeer local = peers.get(NetUtils.localServer());
peers.reset();
local.term.incrementAndGet();
local.voteFor = local.ip;
local.state = RaftPeer.State.CANDIDATE;
Map<String, String> params = new HashMap<String, String>(1);
params.put("vote", JSON.toJSONString(local));
for (final String server : peers.allServersWithoutMySelf()) {final String url = buildURL(server, API_VOTE);
try {HttpClient.asyncHttpPost(url, null, params, new AsyncCompletionHandler<Integer>() {
@Override
public Integer onCompleted(Response response) throws Exception {if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {return 1;}
RaftPeer peer = JSON.parseObject(response.getResponseBody(), RaftPeer.class);
peers.decideLeader(peer);
return 0;
}
});
} catch (Exception e) {Loggers.RAFT.warn("error while sending vote to server: {}", server);
}
}
}
进入这个方法之后,会将自身的 term 自增 1,为自己投一票,状态变成了 candidate,然后将自己投票的结果发送给其他 NACOS 服务。发送投票是若干次 HTTP Request,由各自的 RaftController 来接收处理,最终调用的还是 RaftCore.receivedVote 方法:
public RaftPeer receivedVote(RaftPeer remote) {if (!peers.contains(remote)) {throw new IllegalStateException("can not find peer:" + remote.ip);
}
RaftPeer local = peers.get(NetUtils.localServer());
if (remote.term.get() <= local.term.get()) {if (StringUtils.isEmpty(local.voteFor)) {local.voteFor = local.ip;}
return local;
}
local.resetLeaderDue();
local.state = RaftPeer.State.FOLLOWER;
local.voteFor = remote.ip;
local.term.set(remote.term.get());
return local;
}
这里面主要是对 term 的数值大小比较,如果一旦发现 request 请求过来的 term 比自己本地的 term 要大,那就放弃竞选,自己转为 follower 的状态,将 term 设置为 request 中带过来的 term 参数值;反之,就不做任何处理,直接返回 local。
发送投票后,可以同时拿到对方的投票结果,然后根据各方投票结果来计算最终哪台 server 出于 leader 状态。
public RaftPeer decideLeader(RaftPeer candidate) {peers.put(candidate.ip, candidate);
SortedBag ips = new TreeBag();
int maxApproveCount = 0;
String maxApprovePeer = null;
for (RaftPeer peer : peers.values()) {if (StringUtils.isEmpty(peer.voteFor)) {continue;}
ips.add(peer.voteFor);
if (ips.getCount(peer.voteFor) > maxApproveCount) {maxApproveCount = ips.getCount(peer.voteFor);
maxApprovePeer = peer.voteFor;
}
}
if (maxApproveCount >= majorityCount()) {RaftPeer peer = peers.get(maxApprovePeer);
peer.state = RaftPeer.State.LEADER;
if (!Objects.equals(leader, peer)) {leader = peer;}
}
return leader;
}
因为发起投票的请求是异步进行的,进入 decideLeader 方法中并不意味着所有的 candidate 都完成了投票,所以会在 for 循环中忽略未投票的 peer。接下来要经过两个步骤,其一是计算出得票数最多的 peer,其二是最多的得票数还必须超过整个集群实例数的一半。这也就说明了服务实例为什么要是奇数个,并且是三个及以上。RAFT 算法推荐的是 5 个实例。
然后就是由 leader 发送心跳包给各个 follower。对于心跳时效性的处理逻辑和选举的时候如出一辙,这里就不再赘述了。run 方法里面没有调用 resetLeaderDue()方法,而是推迟到了 sendBeat()方法里面进行调用了,达到的效果是一样的,防止在不必要的时候发起了选举。
public class HeartBeat implements Runnable {
@Override
public void run() {
try {if (!peers.isReady()) {return;}
RaftPeer local = peers.local();
local.heartbeatDueMs -= GlobalExecutor.TICK_PERIOD_MS;
if (local.heartbeatDueMs > 0) {return;}
local.resetHeartbeatDue();
sendBeat();} catch (Exception e) {Loggers.RAFT.warn("[RAFT] error while sending beat {}", e);
}
}
当然,follower 在接收到心跳包之后,也会调用 resetLeaderDue()方法,也就意味着,如果 follower 长时间收不到 leader 的心态,就认为 leader 已经不可用了,随机触发选举操作,选出新的 leader。
最后来讨论两种可能发生的情况,可能性微乎其微的奇葩场景就不再讨论了。
1、在一轮投票中,大家很凑巧的都投给了自己,那就等待下一轮投票开始吧!不可能每次都这么凑巧。同时,NACOS 选举加入了时间的随机性,如果发现不满足时间点的要求,就会放弃选举,维持上一轮的 term,最终肯定是处于 follower 的状态。
2、leader 选举成功后,因为网络抖动,follower 接收不到心跳包,将会重新发起选举。重新选举的过程中,如果旧的 leader 恢复了,那就皆大欢喜,一切照常;如果没有及时恢复过来,那就造成了双 leader 的问题。不过 NACOS 在处理心跳包的时候会修正,当发现自己不是 follower 状态,却收到了心跳包的时候,会强制把自己的状态变为 follower。如果凑巧两个 leader 都将自己变为了 follower,也没关系,心跳过期时间一到,马上就可以开始新的选举流程了。
三、总结
本文深入探讨了 NACOS 的心跳和选举机制,并且对可能遇到的情况进行了进一步分析。总的来说,心跳机制是比较好理解的,而选举机制则需要一些 RAFT 算法的基础知识,加之目前 NACOS 源码的注解甚少,如果对 NACOS 没有一定的了解,阅读起来还是有些困难的。总之,把握整体,不要在意太多细节。
扫描下方二维码,进入原创干货,搞“技”圣地。