共计 4768 个字符,预计需要花费 12 分钟才能阅读完成。
序
本文主要研究一下 nacos Service 的 processClientBeat
Service.processClientBeat
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/Service.java
public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener<Instances> {private static final String SERVICE_NAME_SYNTAX = "[0-9a-zA-Z@\\.:_-]+";
@JSONField(serialize = false)
private ClientBeatCheckTask clientBeatCheckTask = new ClientBeatCheckTask(this);
private String token;
private List<String> owners = new ArrayList<>();
private Boolean resetWeight = false;
private Boolean enabled = true;
private Selector selector = new NoneSelector();
private String namespaceId;
/**
* IP will be deleted if it has not send beat for some time, default timeout is 30 seconds.
*/
private long ipDeleteTimeout = 30 * 1000;
private volatile long lastModifiedMillis = 0L;
private volatile String checksum;
/**
* TODO set customized push expire time:
*/
private long pushCacheMillis = 0L;
private Map<String, Cluster> clusterMap = new HashMap<>();
//......
public void processClientBeat(final RsInfo rsInfo) {ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();
clientBeatProcessor.setService(this);
clientBeatProcessor.setRsInfo(rsInfo);
HealthCheckReactor.scheduleNow(clientBeatProcessor);
}
//......
}
- Service 的 processClientBeat 方法会创建 ClientBeatProcessor,并使用 HealthCheckReactor 进行调度
ClientBeatProcessor
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/ClientBeatProcessor.java
public class ClientBeatProcessor implements Runnable {public static final long CLIENT_BEAT_TIMEOUT = TimeUnit.SECONDS.toMillis(15);
private RsInfo rsInfo;
private Service service;
@JSONField(serialize = false)
public PushService getPushService() {return SpringContext.getAppContext().getBean(PushService.class);
}
public RsInfo getRsInfo() {return rsInfo;}
public void setRsInfo(RsInfo rsInfo) {this.rsInfo = rsInfo;}
public Service getService() {return service;}
public void setService(Service service) {this.service = service;}
@Override
public void run() {
Service service = this.service;
if (Loggers.EVT_LOG.isDebugEnabled()) {Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());
}
String ip = rsInfo.getIp();
String clusterName = rsInfo.getCluster();
int port = rsInfo.getPort();
Cluster cluster = service.getClusterMap().get(clusterName);
List<Instance> instances = cluster.allIPs(true);
for (Instance instance : instances) {if (instance.getIp().equals(ip) && instance.getPort() == port) {if (Loggers.EVT_LOG.isDebugEnabled()) {Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());
}
instance.setLastBeat(System.currentTimeMillis());
if (!instance.isMarked()) {if (!instance.isHealthy()) {instance.setHealthy(true);
Loggers.EVT_LOG.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",
cluster.getService().getName(), ip, port, cluster.getName(), UtilsAndCommons.LOCALHOST_SITE);
getPushService().serviceChanged(service);
}
}
}
}
}
}
- ClientBeatProcessor 实现了 Runnable 方法,它会遍历 instances 更新指定 ip 及 port 的 instance 的 lastBeat 时间;同时对于非 marked 且 healthy 为 false 的 instance 更新其 healthy 为 true 并通过 getPushService().serviceChanged 发布变更事件
HealthCheckReactor
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/HealthCheckReactor.java
public class HealthCheckReactor {
private static final ScheduledExecutorService EXECUTOR;
private static Map<String, ScheduledFuture> futureMap = new ConcurrentHashMap<>();
static {int processorCount = Runtime.getRuntime().availableProcessors();
EXECUTOR
= Executors
.newScheduledThreadPool(processorCount <= 1 ? 1 : processorCount / 2, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.naming.health");
return thread;
}
});
}
public static ScheduledFuture<?> scheduleCheck(HealthCheckTask task) {task.setStartTime(System.currentTimeMillis());
return EXECUTOR.schedule(task, task.getCheckRTNormalized(), TimeUnit.MILLISECONDS);
}
public static void scheduleCheck(ClientBeatCheckTask task) {futureMap.putIfAbsent(task.taskKey(), EXECUTOR.scheduleWithFixedDelay(task, 5000, 5000, TimeUnit.MILLISECONDS));
}
public static void cancelCheck(ClientBeatCheckTask task) {ScheduledFuture scheduledFuture = futureMap.get(task.taskKey());
if (scheduledFuture == null) {return;}
try {scheduledFuture.cancel(true);
} catch (Exception e) {Loggers.EVT_LOG.error("[CANCEL-CHECK] cancel failed!", e);
}
}
public static ScheduledFuture<?> scheduleNow(Runnable task) {return EXECUTOR.schedule(task, 0, TimeUnit.MILLISECONDS);
}
}
- HealthCheckReactor 在 static 代码块创建了 EXECUTOR,它提供了 HealthCheckTask、ClientBeatCheckTask 的 schedule 方法以及 ClientBeatCheckTask 的 cancel 方法,并提供了 Runnable 的 scheduleNow 方法
小结
- Service 的 processClientBeat 方法会创建 ClientBeatProcessor,并使用 HealthCheckReactor 进行调度
- ClientBeatProcessor 实现了 Runnable 方法,它会遍历 instances 更新指定 ip 及 port 的 instance 的 lastBeat 时间;同时对于非 marked 且 healthy 为 false 的 instance 更新其 healthy 为 true 并通过 getPushService().serviceChanged 发布变更事件
- HealthCheckReactor 在 static 代码块创建了 EXECUTOR,它提供了 HealthCheckTask、ClientBeatCheckTask 的 schedule 方法以及 ClientBeatCheckTask 的 cancel 方法,并提供了 Runnable 的 scheduleNow 方法
doc
- Service
正文完