服务端用InstanceController#beat办法接管心跳申请。

InstanceController#beat

这里会判断是否曾经有实例,如果没有就创立实例,而后再开始查看心跳。

public ObjectNode beat(HttpServletRequest request) throws Exception {            ObjectNode result = JacksonUtils.createEmptyJsonNode();    // 设置心跳工夫,会间接改客户端的心跳工夫    result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());    String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);    // 其余略    // 通过namespaceId, serviceName, clusterName, ip, port获取Instance    Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);    // 如果没有,则注册    if (instance == null) {        // 这个是通过beat判断的,如果是第一次,则beat有信息,就会创立clientBeat        // 如果不是第一次,失常instance不为空的,所以此时为空阐明可能被移除了        if (clientBeat == null) {            result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);            return result;        }        // 其余略        // 注册        serviceManager.registerInstance(namespaceId, serviceName, instance);    }    // 从serviceMap缓存获取Service    Service service = serviceManager.getService(namespaceId, serviceName);        if (service == null) {        throw new NacosException(NacosException.SERVER_ERROR,                "service not found: " + serviceName + "@" + namespaceId);    }    // 不是第一次,组装clientBeat    if (clientBeat == null) {        clientBeat = new RsInfo();        clientBeat.setIp(ip);        clientBeat.setPort(port);        clientBeat.setCluster(clusterName);    }    // 解决心跳    service.processClientBeat(clientBeat);        result.put(CommonParams.CODE, NamingResponseCode.OK);    if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) {        result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval());    }    result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());    return result;}

ServiceManager#getInstance

通过ip和端口获取实例

public Instance getInstance(String namespaceId, String serviceName, String cluster, String ip, int port) {    // 从serviceMap缓存获取Service    Service service = getService(namespaceId, serviceName);    if (service == null) {        return null;    }        List<String> clusters = new ArrayList<>();    clusters.add(cluster);    // 从clusters集群获取Instance汇合    List<Instance> ips = service.allIPs(clusters);    if (ips == null || ips.isEmpty()) {        return null;    }    // 通过ip和端口获取实例    for (Instance instance : ips) {        if (instance.getIp().equals(ip) && instance.getPort() == port) {            return instance;        }    }        return null;}

Service#processClientBeat

封装Runnable对象,放入线程池。

public void processClientBeat(final RsInfo rsInfo) {    // 创立ClientBeatProcessor对象,这个是Runnable,所以线程池会调用他的run办法    ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();    clientBeatProcessor.setService(this);    clientBeatProcessor.setRsInfo(rsInfo);    HealthCheckReactor.scheduleNow(clientBeatProcessor);}

ClientBeatProcessor#run

找到对应的Instance,设置最初心跳工夫,并设置为衰弱的,最初播送音讯。

public void run() {    Service service = this.service;    if (Loggers.EVT_LOG.isDebugEnabled()) {        Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());    }        String ip = rsInfo.getIp();    String clusterName = rsInfo.getCluster();    int port = rsInfo.getPort();    Cluster cluster = service.getClusterMap().get(clusterName);    // 获取所有Instance    List<Instance> instances = cluster.allIPs(true);        for (Instance instance : instances) {        //  通过ip和端口获取Instance        if (instance.getIp().equals(ip) && instance.getPort() == port) {            if (Loggers.EVT_LOG.isDebugEnabled()) {                Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());            }            // 设置最初心跳工夫            instance.setLastBeat(System.currentTimeMillis());            // 没有被标记且不不衰弱的,设置为衰弱            if (!instance.isMarked()) {                if (!instance.isHealthy()) {                    instance.setHealthy(true);                    Loggers.EVT_LOG                            .info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",                                    cluster.getService().getName(), ip, port, cluster.getName(),                                    UtilsAndCommons.LOCALHOST_SITE);                    // 播送音讯                    getPushService().serviceChanged(service);                }            }        }    }}

PushService#onApplicationEvent

播送音讯后,监听ServiceChangeEvent类型的类会调用onApplicationEvent办法。这里次要是封装UDP数据并发送。

public void onApplicationEvent(ServiceChangeEvent event) {    Service service = event.getService();    String serviceName = service.getName();    String namespaceId = service.getNamespaceId();    Future future = GlobalExecutor.scheduleUdpSender(() -> {        try {            Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");            ConcurrentMap<String, PushClient> clients = clientMap                    .get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));            if (MapUtils.isEmpty(clients)) {                return;            }            Map<String, Object> cache = new HashMap<>(16);            long lastRefTime = System.nanoTime();            // 遍历PushClient汇合            for (PushClient client : clients.values()) {                // 过期了就算了                if (client.zombie()) {                    Loggers.PUSH.debug("client is zombie: " + client.toString());                    clients.remove(client.toString());                    Loggers.PUSH.debug("client is zombie: " + client.toString());                    continue;                }                Receiver.AckEntry ackEntry;                Loggers.PUSH.debug("push serviceName: {} to client: {}", serviceName, client.toString());                String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());                byte[] compressData = null;                Map<String, Object> data = null;                if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {                    org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);                    compressData = (byte[]) (pair.getValue0());                    data = (Map<String, Object>) pair.getValue1();                    Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", serviceName, client.getAddrStr());                }                // 封装UDP数据,如果数据大于1kb则压缩,compressIfNecessary这个办法判断                if (compressData != null) {                    ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);                } else {                    ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);                    if (ackEntry != null) {                        cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data));                    }                }                Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}",                        client.getServiceName(), client.getAddrStr(), client.getAgent(),                        (ackEntry == null ? null : ackEntry.key));                // 发送udp数据                udpPush(ackEntry);            }        } catch (Exception e) {            Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);        } finally {            futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));        }    }, 1000, TimeUnit.MILLISECONDS);    futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);}

PushService#udpPush

发送UDP数据,会重试10次。每10秒查看一次。

private static Receiver.AckEntry udpPush(Receiver.AckEntry ackEntry) {    if (ackEntry == null) {        Loggers.PUSH.error("[NACOS-PUSH] ackEntry is null.");        return null;    }    // 重试最大次数还没胜利,就删除ackMap和udpSendTimeMap的内容    if (ackEntry.getRetryTimes() > MAX_RETRY_TIMES) {        Loggers.PUSH.warn("max re-push times reached, retry times {}, key: {}", ackEntry.retryTimes, ackEntry.key);        ackMap.remove(ackEntry.key);        udpSendTimeMap.remove(ackEntry.key);        failedPush += 1;        return ackEntry;    }    try {        if (!ackMap.containsKey(ackEntry.key)) {            totalPush++;        }        ackMap.put(ackEntry.key, ackEntry);        udpSendTimeMap.put(ackEntry.key, System.currentTimeMillis());        Loggers.PUSH.info("send udp packet: " + ackEntry.key);        // udp发送        udpSocket.send(ackEntry.origin);        ackEntry.increaseRetryTime();        // 10秒查看一次        GlobalExecutor.scheduleRetransmitter(new Retransmitter(ackEntry),                TimeUnit.NANOSECONDS.toMillis(ACK_TIMEOUT_NANOS), TimeUnit.MILLISECONDS);        return ackEntry;    } catch (Exception e) {        Loggers.PUSH.error("[NACOS-PUSH] failed to push data: {} to client: {}, error: {}", ackEntry.data,                ackEntry.origin.getAddress().getHostAddress(), e);        ackMap.remove(ackEntry.key);        udpSendTimeMap.remove(ackEntry.key);        failedPush += 1;        return null;    }}

Retransmitter#run

每10秒查看是否发送胜利,如果没发送胜利,就持续发送,最多10次。

public void run() {    if (ackMap.containsKey(ackEntry.key)) {        Loggers.PUSH.info("retry to push data, key: " + ackEntry.key);        udpPush(ackEntry);    }}

Receiver#run

PushService创立的时候,会开启Receiver的线程。

static {    // 其余略    Receiver receiver = new Receiver();    Thread inThread = new Thread(receiver);    inThread.setDaemon(true);    inThread.setName("com.alibaba.nacos.naming.push.receiver");    inThread.start();    // 其余略}

他这里会有个while(true),收到申请后移除ackMap对应的key。

public void run() {    while (true) {        // 其余略        String ackKey = getAckKey(ip, port, ackPacket.lastRefTime);        AckEntry ackEntry = ackMap.remove(ackKey);        // 其余略    }}

播送总结

播送的时候,会往ackMap存入值,播送过程失败就从ackMap移除对应的值。有时候UDP申请不胜利,那这个值始终会在ackMap,这个时候,Retransmitter每隔10秒就会去ackMap看看有没有胜利,如果没有胜利,他就会去重试,直至达到重试最大次数。另外还有一个线程,会去监听UDP响应,如果收到了响应,就会从ackMap移除对应的值。这个UDP是发送给客户端的,Nacos - HostReactor的创立提到了收到申请后的解决,让客户端本人去更新信息。

心跳总结

次要是收到心跳申请后,更新心跳的工夫、衰弱状态以及播送