服务端用InstanceController#beat办法接管心跳申请。
InstanceController#beat
这里会判断是否曾经有实例,如果没有就创立实例,而后再开始查看心跳。
public ObjectNode beat(HttpServletRequest request) throws Exception { ObjectNode result = JacksonUtils.createEmptyJsonNode(); // 设置心跳工夫,会间接改客户端的心跳工夫 result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval()); String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY); // 其余略 // 通过namespaceId, serviceName, clusterName, ip, port获取Instance Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port); // 如果没有,则注册 if (instance == null) { // 这个是通过beat判断的,如果是第一次,则beat有信息,就会创立clientBeat // 如果不是第一次,失常instance不为空的,所以此时为空阐明可能被移除了 if (clientBeat == null) { result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND); return result; } // 其余略 // 注册 serviceManager.registerInstance(namespaceId, serviceName, instance); } // 从serviceMap缓存获取Service Service service = serviceManager.getService(namespaceId, serviceName); if (service == null) { throw new NacosException(NacosException.SERVER_ERROR, "service not found: " + serviceName + "@" + namespaceId); } // 不是第一次,组装clientBeat if (clientBeat == null) { clientBeat = new RsInfo(); clientBeat.setIp(ip); clientBeat.setPort(port); clientBeat.setCluster(clusterName); } // 解决心跳 service.processClientBeat(clientBeat); result.put(CommonParams.CODE, NamingResponseCode.OK); if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) { result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval()); } result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled()); return result;}
ServiceManager#getInstance
通过ip和端口获取实例
public Instance getInstance(String namespaceId, String serviceName, String cluster, String ip, int port) { // 从serviceMap缓存获取Service Service service = getService(namespaceId, serviceName); if (service == null) { return null; } List<String> clusters = new ArrayList<>(); clusters.add(cluster); // 从clusters集群获取Instance汇合 List<Instance> ips = service.allIPs(clusters); if (ips == null || ips.isEmpty()) { return null; } // 通过ip和端口获取实例 for (Instance instance : ips) { if (instance.getIp().equals(ip) && instance.getPort() == port) { return instance; } } return null;}
Service#processClientBeat
封装Runnable对象,放入线程池。
public void processClientBeat(final RsInfo rsInfo) { // 创立ClientBeatProcessor对象,这个是Runnable,所以线程池会调用他的run办法 ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor(); clientBeatProcessor.setService(this); clientBeatProcessor.setRsInfo(rsInfo); HealthCheckReactor.scheduleNow(clientBeatProcessor);}
ClientBeatProcessor#run
找到对应的Instance,设置最初心跳工夫,并设置为衰弱的,最初播送音讯。
public void run() { Service service = this.service; if (Loggers.EVT_LOG.isDebugEnabled()) { Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString()); } String ip = rsInfo.getIp(); String clusterName = rsInfo.getCluster(); int port = rsInfo.getPort(); Cluster cluster = service.getClusterMap().get(clusterName); // 获取所有Instance List<Instance> instances = cluster.allIPs(true); for (Instance instance : instances) { // 通过ip和端口获取Instance if (instance.getIp().equals(ip) && instance.getPort() == port) { if (Loggers.EVT_LOG.isDebugEnabled()) { Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString()); } // 设置最初心跳工夫 instance.setLastBeat(System.currentTimeMillis()); // 没有被标记且不不衰弱的,设置为衰弱 if (!instance.isMarked()) { if (!instance.isHealthy()) { instance.setHealthy(true); Loggers.EVT_LOG .info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok", cluster.getService().getName(), ip, port, cluster.getName(), UtilsAndCommons.LOCALHOST_SITE); // 播送音讯 getPushService().serviceChanged(service); } } } }}
PushService#onApplicationEvent
播送音讯后,监听ServiceChangeEvent类型的类会调用onApplicationEvent办法。这里次要是封装UDP数据并发送。
public void onApplicationEvent(ServiceChangeEvent event) { Service service = event.getService(); String serviceName = service.getName(); String namespaceId = service.getNamespaceId(); Future future = GlobalExecutor.scheduleUdpSender(() -> { try { Loggers.PUSH.info(serviceName + " is changed, add it to push queue."); ConcurrentMap<String, PushClient> clients = clientMap .get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName)); if (MapUtils.isEmpty(clients)) { return; } Map<String, Object> cache = new HashMap<>(16); long lastRefTime = System.nanoTime(); // 遍历PushClient汇合 for (PushClient client : clients.values()) { // 过期了就算了 if (client.zombie()) { Loggers.PUSH.debug("client is zombie: " + client.toString()); clients.remove(client.toString()); Loggers.PUSH.debug("client is zombie: " + client.toString()); continue; } Receiver.AckEntry ackEntry; Loggers.PUSH.debug("push serviceName: {} to client: {}", serviceName, client.toString()); String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent()); byte[] compressData = null; Map<String, Object> data = null; if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) { org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key); compressData = (byte[]) (pair.getValue0()); data = (Map<String, Object>) pair.getValue1(); Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", serviceName, client.getAddrStr()); } // 封装UDP数据,如果数据大于1kb则压缩,compressIfNecessary这个办法判断 if (compressData != null) { ackEntry = prepareAckEntry(client, compressData, data, lastRefTime); } else { ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime); if (ackEntry != null) { cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data)); } } Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}", client.getServiceName(), client.getAddrStr(), client.getAgent(), (ackEntry == null ? null : ackEntry.key)); // 发送udp数据 udpPush(ackEntry); } } catch (Exception e) { Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e); } finally { futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName)); } }, 1000, TimeUnit.MILLISECONDS); futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);}
PushService#udpPush
发送UDP数据,会重试10次。每10秒查看一次。
private static Receiver.AckEntry udpPush(Receiver.AckEntry ackEntry) { if (ackEntry == null) { Loggers.PUSH.error("[NACOS-PUSH] ackEntry is null."); return null; } // 重试最大次数还没胜利,就删除ackMap和udpSendTimeMap的内容 if (ackEntry.getRetryTimes() > MAX_RETRY_TIMES) { Loggers.PUSH.warn("max re-push times reached, retry times {}, key: {}", ackEntry.retryTimes, ackEntry.key); ackMap.remove(ackEntry.key); udpSendTimeMap.remove(ackEntry.key); failedPush += 1; return ackEntry; } try { if (!ackMap.containsKey(ackEntry.key)) { totalPush++; } ackMap.put(ackEntry.key, ackEntry); udpSendTimeMap.put(ackEntry.key, System.currentTimeMillis()); Loggers.PUSH.info("send udp packet: " + ackEntry.key); // udp发送 udpSocket.send(ackEntry.origin); ackEntry.increaseRetryTime(); // 10秒查看一次 GlobalExecutor.scheduleRetransmitter(new Retransmitter(ackEntry), TimeUnit.NANOSECONDS.toMillis(ACK_TIMEOUT_NANOS), TimeUnit.MILLISECONDS); return ackEntry; } catch (Exception e) { Loggers.PUSH.error("[NACOS-PUSH] failed to push data: {} to client: {}, error: {}", ackEntry.data, ackEntry.origin.getAddress().getHostAddress(), e); ackMap.remove(ackEntry.key); udpSendTimeMap.remove(ackEntry.key); failedPush += 1; return null; }}
Retransmitter#run
每10秒查看是否发送胜利,如果没发送胜利,就持续发送,最多10次。
public void run() { if (ackMap.containsKey(ackEntry.key)) { Loggers.PUSH.info("retry to push data, key: " + ackEntry.key); udpPush(ackEntry); }}
Receiver#run
PushService创立的时候,会开启Receiver的线程。
static { // 其余略 Receiver receiver = new Receiver(); Thread inThread = new Thread(receiver); inThread.setDaemon(true); inThread.setName("com.alibaba.nacos.naming.push.receiver"); inThread.start(); // 其余略}
他这里会有个while(true),收到申请后移除ackMap对应的key。
public void run() { while (true) { // 其余略 String ackKey = getAckKey(ip, port, ackPacket.lastRefTime); AckEntry ackEntry = ackMap.remove(ackKey); // 其余略 }}
播送总结
播送的时候,会往ackMap存入值,播送过程失败就从ackMap移除对应的值。有时候UDP申请不胜利,那这个值始终会在ackMap,这个时候,Retransmitter每隔10秒就会去ackMap看看有没有胜利,如果没有胜利,他就会去重试,直至达到重试最大次数。另外还有一个线程,会去监听UDP响应,如果收到了响应,就会从ackMap移除对应的值。这个UDP是发送给客户端的,Nacos - HostReactor的创立提到了收到申请后的解决,让客户端本人去更新信息。
心跳总结
次要是收到心跳申请后,更新心跳的工夫、衰弱状态以及播送