乐趣区

聊聊nacos-Service的processClientBeat

本文主要研究一下 nacos Service 的 processClientBeat

Service.processClientBeat

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/Service.java

public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener<Instances> {private static final String SERVICE_NAME_SYNTAX = "[0-9a-zA-Z@\\.:_-]+";

    @JSONField(serialize = false)
    private ClientBeatCheckTask clientBeatCheckTask = new ClientBeatCheckTask(this);

    private String token;
    private List<String> owners = new ArrayList<>();
    private Boolean resetWeight = false;
    private Boolean enabled = true;
    private Selector selector = new NoneSelector();
    private String namespaceId;

    /**
     * IP will be deleted if it has not send beat for some time, default timeout is 30 seconds.
     */
    private long ipDeleteTimeout = 30 * 1000;

    private volatile long lastModifiedMillis = 0L;

    private volatile String checksum;

    /**
     * TODO set customized push expire time:
     */
    private long pushCacheMillis = 0L;

    private Map<String, Cluster> clusterMap = new HashMap<>();

    //......

    public void processClientBeat(final RsInfo rsInfo) {ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();
        clientBeatProcessor.setService(this);
        clientBeatProcessor.setRsInfo(rsInfo);
        HealthCheckReactor.scheduleNow(clientBeatProcessor);
    }

    //......
}
  • Service 的 processClientBeat 方法会创建 ClientBeatProcessor,并使用 HealthCheckReactor 进行调度

ClientBeatProcessor

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/ClientBeatProcessor.java

public class ClientBeatProcessor implements Runnable {public static final long CLIENT_BEAT_TIMEOUT = TimeUnit.SECONDS.toMillis(15);
    private RsInfo rsInfo;
    private Service service;

    @JSONField(serialize = false)
    public PushService getPushService() {return SpringContext.getAppContext().getBean(PushService.class);
    }

    public RsInfo getRsInfo() {return rsInfo;}

    public void setRsInfo(RsInfo rsInfo) {this.rsInfo = rsInfo;}

    public Service getService() {return service;}

    public void setService(Service service) {this.service = service;}

    @Override
    public void run() {
        Service service = this.service;
        if (Loggers.EVT_LOG.isDebugEnabled()) {Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());
        }

        String ip = rsInfo.getIp();
        String clusterName = rsInfo.getCluster();
        int port = rsInfo.getPort();
        Cluster cluster = service.getClusterMap().get(clusterName);
        List<Instance> instances = cluster.allIPs(true);

        for (Instance instance : instances) {if (instance.getIp().equals(ip) && instance.getPort() == port) {if (Loggers.EVT_LOG.isDebugEnabled()) {Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());
                }
                instance.setLastBeat(System.currentTimeMillis());
                if (!instance.isMarked()) {if (!instance.isHealthy()) {instance.setHealthy(true);
                        Loggers.EVT_LOG.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",
                            cluster.getService().getName(), ip, port, cluster.getName(), UtilsAndCommons.LOCALHOST_SITE);
                        getPushService().serviceChanged(service);
                    }
                }
            }
        }
    }
}
  • ClientBeatProcessor 实现了 Runnable 方法,它会遍历 instances 更新指定 ip 及 port 的 instance 的 lastBeat 时间;同时对于非 marked 且 healthy 为 false 的 instance 更新其 healthy 为 true 并通过 getPushService().serviceChanged 发布变更事件

HealthCheckReactor

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/HealthCheckReactor.java

public class HealthCheckReactor {

    private static final ScheduledExecutorService EXECUTOR;

    private static Map<String, ScheduledFuture> futureMap = new ConcurrentHashMap<>();

    static {int processorCount = Runtime.getRuntime().availableProcessors();
        EXECUTOR
                = Executors
                .newScheduledThreadPool(processorCount <= 1 ? 1 : processorCount / 2, new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {Thread thread = new Thread(r);
                        thread.setDaemon(true);
                        thread.setName("com.alibaba.nacos.naming.health");
                        return thread;
                    }
                });
    }

    public static ScheduledFuture<?> scheduleCheck(HealthCheckTask task) {task.setStartTime(System.currentTimeMillis());
        return EXECUTOR.schedule(task, task.getCheckRTNormalized(), TimeUnit.MILLISECONDS);
    }

    public static void scheduleCheck(ClientBeatCheckTask task) {futureMap.putIfAbsent(task.taskKey(), EXECUTOR.scheduleWithFixedDelay(task, 5000, 5000, TimeUnit.MILLISECONDS));
    }

    public static void cancelCheck(ClientBeatCheckTask task) {ScheduledFuture scheduledFuture = futureMap.get(task.taskKey());
        if (scheduledFuture == null) {return;}
        try {scheduledFuture.cancel(true);
        } catch (Exception e) {Loggers.EVT_LOG.error("[CANCEL-CHECK] cancel failed!", e);
        }
    }


    public static ScheduledFuture<?> scheduleNow(Runnable task) {return EXECUTOR.schedule(task, 0, TimeUnit.MILLISECONDS);
    }
}
  • HealthCheckReactor 在 static 代码块创建了 EXECUTOR,它提供了 HealthCheckTask、ClientBeatCheckTask 的 schedule 方法以及 ClientBeatCheckTask 的 cancel 方法,并提供了 Runnable 的 scheduleNow 方法

小结

  • Service 的 processClientBeat 方法会创建 ClientBeatProcessor,并使用 HealthCheckReactor 进行调度
  • ClientBeatProcessor 实现了 Runnable 方法,它会遍历 instances 更新指定 ip 及 port 的 instance 的 lastBeat 时间;同时对于非 marked 且 healthy 为 false 的 instance 更新其 healthy 为 true 并通过 getPushService().serviceChanged 发布变更事件
  • HealthCheckReactor 在 static 代码块创建了 EXECUTOR,它提供了 HealthCheckTask、ClientBeatCheckTask 的 schedule 方法以及 ClientBeatCheckTask 的 cancel 方法,并提供了 Runnable 的 scheduleNow 方法

doc

  • Service
退出移动版