服务端用 InstanceController#beat 办法接管心跳申请。
InstanceController#beat
这里会判断是否曾经有实例,如果没有就创立实例,而后再开始查看心跳。
public ObjectNode beat(HttpServletRequest request) throws Exception {ObjectNode result = JacksonUtils.createEmptyJsonNode();
// 设置心跳工夫,会间接改客户端的心跳工夫
result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());
String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);
// 其余略
// 通过 namespaceId, serviceName, clusterName, ip, port 获取 Instance
Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);
// 如果没有,则注册
if (instance == null) {
// 这个是通过 beat 判断的,如果是第一次,则 beat 有信息,就会创立 clientBeat
// 如果不是第一次,失常 instance 不为空的,所以此时为空阐明可能被移除了
if (clientBeat == null) {result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);
return result;
}
// 其余略
// 注册
serviceManager.registerInstance(namespaceId, serviceName, instance);
}
// 从 serviceMap 缓存获取 Service
Service service = serviceManager.getService(namespaceId, serviceName);
if (service == null) {
throw new NacosException(NacosException.SERVER_ERROR,
"service not found:" + serviceName + "@" + namespaceId);
}
// 不是第一次,组装 clientBeat
if (clientBeat == null) {clientBeat = new RsInfo();
clientBeat.setIp(ip);
clientBeat.setPort(port);
clientBeat.setCluster(clusterName);
}
// 解决心跳
service.processClientBeat(clientBeat);
result.put(CommonParams.CODE, NamingResponseCode.OK);
if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) {result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval());
}
result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());
return result;
}
ServiceManager#getInstance
通过 ip 和端口获取实例
public Instance getInstance(String namespaceId, String serviceName, String cluster, String ip, int port) {
// 从 serviceMap 缓存获取 Service
Service service = getService(namespaceId, serviceName);
if (service == null) {return null;}
List<String> clusters = new ArrayList<>();
clusters.add(cluster);
// 从 clusters 集群获取 Instance 汇合
List<Instance> ips = service.allIPs(clusters);
if (ips == null || ips.isEmpty()) {return null;}
// 通过 ip 和端口获取实例
for (Instance instance : ips) {if (instance.getIp().equals(ip) && instance.getPort() == port) {return instance;}
}
return null;
}
Service#processClientBeat
封装 Runnable 对象,放入线程池。
public void processClientBeat(final RsInfo rsInfo) {
// 创立 ClientBeatProcessor 对象,这个是 Runnable,所以线程池会调用他的 run 办法
ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();
clientBeatProcessor.setService(this);
clientBeatProcessor.setRsInfo(rsInfo);
HealthCheckReactor.scheduleNow(clientBeatProcessor);
}
ClientBeatProcessor#run
找到对应的 Instance,设置最初心跳工夫,并设置为衰弱的,最初播送音讯。
public void run() {
Service service = this.service;
if (Loggers.EVT_LOG.isDebugEnabled()) {Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());
}
String ip = rsInfo.getIp();
String clusterName = rsInfo.getCluster();
int port = rsInfo.getPort();
Cluster cluster = service.getClusterMap().get(clusterName);
// 获取所有 Instance
List<Instance> instances = cluster.allIPs(true);
for (Instance instance : instances) {
// 通过 ip 和端口获取 Instance
if (instance.getIp().equals(ip) && instance.getPort() == port) {if (Loggers.EVT_LOG.isDebugEnabled()) {Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());
}
// 设置最初心跳工夫
instance.setLastBeat(System.currentTimeMillis());
// 没有被标记且不不衰弱的,设置为衰弱
if (!instance.isMarked()) {if (!instance.isHealthy()) {instance.setHealthy(true);
Loggers.EVT_LOG
.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",
cluster.getService().getName(), ip, port, cluster.getName(),
UtilsAndCommons.LOCALHOST_SITE);
// 播送音讯
getPushService().serviceChanged(service);
}
}
}
}
}
PushService#onApplicationEvent
播送音讯后,监听 ServiceChangeEvent 类型的类会调用 onApplicationEvent 办法。这里次要是封装 UDP 数据并发送。
public void onApplicationEvent(ServiceChangeEvent event) {Service service = event.getService();
String serviceName = service.getName();
String namespaceId = service.getNamespaceId();
Future future = GlobalExecutor.scheduleUdpSender(() -> {
try {Loggers.PUSH.info(serviceName + "is changed, add it to push queue.");
ConcurrentMap<String, PushClient> clients = clientMap
.get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
if (MapUtils.isEmpty(clients)) {return;}
Map<String, Object> cache = new HashMap<>(16);
long lastRefTime = System.nanoTime();
// 遍历 PushClient 汇合
for (PushClient client : clients.values()) {
// 过期了就算了
if (client.zombie()) {Loggers.PUSH.debug("client is zombie:" + client.toString());
clients.remove(client.toString());
Loggers.PUSH.debug("client is zombie:" + client.toString());
continue;
}
Receiver.AckEntry ackEntry;
Loggers.PUSH.debug("push serviceName: {} to client: {}", serviceName, client.toString());
String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());
byte[] compressData = null;
Map<String, Object> data = null;
if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);
compressData = (byte[]) (pair.getValue0());
data = (Map<String, Object>) pair.getValue1();
Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", serviceName, client.getAddrStr());
}
// 封装 UDP 数据,如果数据大于 1kb 则压缩,compressIfNecessary 这个办法判断
if (compressData != null) {ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);
} else {ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);
if (ackEntry != null) {cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data));
}
}
Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}",
client.getServiceName(), client.getAddrStr(), client.getAgent(),
(ackEntry == null ? null : ackEntry.key));
// 发送 udp 数据
udpPush(ackEntry);
}
} catch (Exception e) {Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);
} finally {futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
}
}, 1000, TimeUnit.MILLISECONDS);
futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);
}
PushService#udpPush
发送 UDP 数据,会重试 10 次。每 10 秒查看一次。
private static Receiver.AckEntry udpPush(Receiver.AckEntry ackEntry) {if (ackEntry == null) {Loggers.PUSH.error("[NACOS-PUSH] ackEntry is null.");
return null;
}
// 重试最大次数还没胜利,就删除 ackMap 和 udpSendTimeMap 的内容
if (ackEntry.getRetryTimes() > MAX_RETRY_TIMES) {Loggers.PUSH.warn("max re-push times reached, retry times {}, key: {}", ackEntry.retryTimes, ackEntry.key);
ackMap.remove(ackEntry.key);
udpSendTimeMap.remove(ackEntry.key);
failedPush += 1;
return ackEntry;
}
try {if (!ackMap.containsKey(ackEntry.key)) {totalPush++;}
ackMap.put(ackEntry.key, ackEntry);
udpSendTimeMap.put(ackEntry.key, System.currentTimeMillis());
Loggers.PUSH.info("send udp packet:" + ackEntry.key);
// udp 发送
udpSocket.send(ackEntry.origin);
ackEntry.increaseRetryTime();
// 10 秒查看一次
GlobalExecutor.scheduleRetransmitter(new Retransmitter(ackEntry),
TimeUnit.NANOSECONDS.toMillis(ACK_TIMEOUT_NANOS), TimeUnit.MILLISECONDS);
return ackEntry;
} catch (Exception e) {Loggers.PUSH.error("[NACOS-PUSH] failed to push data: {} to client: {}, error: {}", ackEntry.data,
ackEntry.origin.getAddress().getHostAddress(), e);
ackMap.remove(ackEntry.key);
udpSendTimeMap.remove(ackEntry.key);
failedPush += 1;
return null;
}
}
Retransmitter#run
每 10 秒查看是否发送胜利,如果没发送胜利,就持续发送,最多 10 次。
public void run() {if (ackMap.containsKey(ackEntry.key)) {Loggers.PUSH.info("retry to push data, key:" + ackEntry.key);
udpPush(ackEntry);
}
}
Receiver#run
PushService 创立的时候,会开启 Receiver 的线程。
static {
// 其余略
Receiver receiver = new Receiver();
Thread inThread = new Thread(receiver);
inThread.setDaemon(true);
inThread.setName("com.alibaba.nacos.naming.push.receiver");
inThread.start();
// 其余略
}
他这里会有个 while(true),收到申请后移除 ackMap 对应的 key。
public void run() {while (true) {
// 其余略
String ackKey = getAckKey(ip, port, ackPacket.lastRefTime);
AckEntry ackEntry = ackMap.remove(ackKey);
// 其余略
}
}
播送总结
播送的时候,会往 ackMap 存入值,播送过程失败就从 ackMap 移除对应的值。有时候 UDP 申请不胜利,那这个值始终会在 ackMap,这个时候,Retransmitter 每隔 10 秒就会去 ackMap 看看有没有胜利,如果没有胜利,他就会去重试,直至达到重试最大次数。另外还有一个线程,会去监听 UDP 响应,如果收到了响应,就会从 ackMap 移除对应的值。这个 UDP 是发送给客户端的,Nacos – HostReactor 的创立提到了收到申请后的解决,让客户端本人去更新信息。
心跳总结
次要是收到心跳申请后,更新心跳的工夫、衰弱状态以及播送