共计 9400 个字符,预计需要花费 24 分钟才能阅读完成。
序
本文主要研究一下 nacos 的 ServerListManager
ServerListManager
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/cluster/ServerListManager.java
@Component("serverListManager")
public class ServerListManager {
private static final int STABLE_PERIOD = 60 * 1000;
@Autowired
private SwitchDomain switchDomain;
private List<ServerChangeListener> listeners = new ArrayList<>();
private List<Server> servers = new ArrayList<>();
private List<Server> healthyServers = new ArrayList<>();
private Map<String, List<Server>> distroConfig = new ConcurrentHashMap<>();
private Map<String, Long> distroBeats = new ConcurrentHashMap<>(16);
private Set<String> liveSites = new HashSet<>();
private final static String LOCALHOST_SITE = UtilsAndCommons.UNKNOWN_SITE;
private long lastHealthServerMillis = 0L;
private boolean autoDisabledHealthCheck = false;
private Synchronizer synchronizer = new ServerStatusSynchronizer();
public void listen(ServerChangeListener listener) {listeners.add(listener);
}
@PostConstruct
public void init() {GlobalExecutor.registerServerListUpdater(new ServerListUpdater());
GlobalExecutor.registerServerStatusReporter(new ServerStatusReporter(), 5000);
}
//......
}
- ServerListManager 的 init 方法注册了 ServerListUpdater、ServerStatusReporter 两个定时任务
GlobalExecutor
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/misc/GlobalExecutor.java
public class GlobalExecutor {public static final long HEARTBEAT_INTERVAL_MS = TimeUnit.SECONDS.toMillis(5L);
public static final long LEADER_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(15L);
public static final long RANDOM_MS = TimeUnit.SECONDS.toMillis(5L);
public static final long TICK_PERIOD_MS = TimeUnit.MILLISECONDS.toMillis(500L);
private static final long NACOS_SERVER_LIST_REFRESH_INTERVAL = TimeUnit.SECONDS.toMillis(5);
private static final long PARTITION_DATA_TIMED_SYNC_INTERVAL = TimeUnit.SECONDS.toMillis(5);
private static final long SERVER_STATUS_UPDATE_PERIOD = TimeUnit.SECONDS.toMillis(5);
//......
public static void registerServerListUpdater(Runnable runnable) {executorService.scheduleAtFixedRate(runnable, 0, NACOS_SERVER_LIST_REFRESH_INTERVAL, TimeUnit.MILLISECONDS);
}
public static void registerServerStatusReporter(Runnable runnable, long delay) {SERVER_STATUS_EXECUTOR.schedule(runnable, delay, TimeUnit.MILLISECONDS);
}
//......
}
- registerServerListUpdater 是每隔 NACOS_SERVER_LIST_REFRESH_INTERVAL 调度一次,默认是 5 秒;registerServerStatusReporter 则是调度一个延时任务
ServerListUpdater
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/cluster/ServerListManager.java
public class ServerListUpdater implements Runnable {
@Override
public void run() {
try {List<Server> refreshedServers = refreshServerList();
List<Server> oldServers = servers;
if (CollectionUtils.isEmpty(refreshedServers)) {Loggers.RAFT.warn("refresh server list failed, ignore it.");
return;
}
boolean changed = false;
List<Server> newServers = (List<Server>) CollectionUtils.subtract(refreshedServers, oldServers);
if (CollectionUtils.isNotEmpty(newServers)) {servers.addAll(newServers);
changed = true;
Loggers.RAFT.info("server list is updated, new: {} servers: {}", newServers.size(), newServers);
}
List<Server> deadServers = (List<Server>) CollectionUtils.subtract(oldServers, refreshedServers);
if (CollectionUtils.isNotEmpty(deadServers)) {servers.removeAll(deadServers);
changed = true;
Loggers.RAFT.info("server list is updated, dead: {}, servers: {}", deadServers.size(), deadServers);
}
if (changed) {notifyListeners();
}
} catch (Exception e) {Loggers.RAFT.info("error while updating server list.", e);
}
}
}
private List<Server> refreshServerList() {List<Server> result = new ArrayList<>();
if (STANDALONE_MODE) {Server server = new Server();
server.setIp(NetUtils.getLocalAddress());
server.setServePort(RunningConfig.getServerPort());
result.add(server);
return result;
}
List<String> serverList = new ArrayList<>();
try {serverList = readClusterConf();
} catch (Exception e) {Loggers.SRV_LOG.warn("failed to get config:" + CLUSTER_CONF_FILE_PATH, e);
}
if (Loggers.SRV_LOG.isDebugEnabled()) {Loggers.SRV_LOG.debug("SERVER-LIST from cluster.conf: {}", result);
}
//use system env
if (CollectionUtils.isEmpty(serverList)) {serverList = SystemUtils.getIPsBySystemEnv(UtilsAndCommons.SELF_SERVICE_CLUSTER_ENV);
if (Loggers.SRV_LOG.isDebugEnabled()) {Loggers.SRV_LOG.debug("SERVER-LIST from system variable: {}", result);
}
}
if (CollectionUtils.isNotEmpty(serverList)) {for (int i = 0; i < serverList.size(); i++) {
String ip;
int port;
String server = serverList.get(i);
if (server.contains(UtilsAndCommons.IP_PORT_SPLITER)) {ip = server.split(UtilsAndCommons.IP_PORT_SPLITER)[0];
port = Integer.parseInt(server.split(UtilsAndCommons.IP_PORT_SPLITER)[1]);
} else {
ip = server;
port = RunningConfig.getServerPort();}
Server member = new Server();
member.setIp(ip);
member.setServePort(port);
result.add(member);
}
}
return result;
}
private void notifyListeners() {GlobalExecutor.notifyServerListChange(new Runnable() {
@Override
public void run() {for (ServerChangeListener listener : listeners) {listener.onChangeServerList(servers);
listener.onChangeHealthyServerList(healthyServers);
}
}
});
}
- ServerListUpdater 实现了 Runnable 接口,其 run 方法会通过 refreshServerList 方法从配置文件读取最新的 servers 配置,然后对比 oldServers,看哪些是新增,哪些是删除的,如果确实有变更则通过 notifyListeners 方法进行通知回调 ServerChangeListener 的 onChangeServerList、onChangeHealthyServerList 方法
ServerStatusReporter
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/cluster/ServerListManager.java
private class ServerStatusReporter implements Runnable {
@Override
public void run() {
try {if (RunningConfig.getServerPort() <= 0) {return;}
checkDistroHeartbeat();
int weight = Runtime.getRuntime().availableProcessors() / 2;
if (weight <= 0) {weight = 1;}
long curTime = System.currentTimeMillis();
String status = LOCALHOST_SITE + "#" + NetUtils.localServer() + "#" + curTime + "#" + weight + "\r\n";
//send status to itself
onReceiveServerStatus(status);
List<Server> allServers = getServers();
if (!contains(NetUtils.localServer())) {Loggers.SRV_LOG.error("local ip is not in serverlist, ip: {}, serverlist: {}", NetUtils.localServer(), allServers);
return;
}
if (allServers.size() > 0 && !NetUtils.localServer().contains(UtilsAndCommons.LOCAL_HOST_IP)) {for (com.alibaba.nacos.naming.cluster.servers.Server server : allServers) {if (server.getKey().equals(NetUtils.localServer())) {continue;}
Message msg = new Message();
msg.setData(status);
synchronizer.send(server.getKey(), msg);
}
}
} catch (Exception e) {Loggers.SRV_LOG.error("[SERVER-STATUS] Exception while sending server status", e);
} finally {GlobalExecutor.registerServerStatusReporter(this, switchDomain.getServerStatusSynchronizationPeriodMillis());
}
}
}
- ServerStatusReporter 实现了 Runnable 接口,其 run 方法首先执行 checkDistroHeartbeat,然后执行 onReceiveServerStatus,最后获取 servers 列表通知通过 synchronizer.send(server.getKey(), msg) 方法通知其他 server 自己的状态;最后再次通过 registerServerStatusReporter 调度下次执行的时间
checkDistroHeartbeat
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/cluster/ServerListManager.java
private void checkDistroHeartbeat() {Loggers.SRV_LOG.debug("check distro heartbeat.");
List<Server> servers = distroConfig.get(LOCALHOST_SITE);
if (CollectionUtils.isEmpty(servers)) {return;}
List<Server> newHealthyList = new ArrayList<>(servers.size());
long now = System.currentTimeMillis();
for (Server s: servers) {Long lastBeat = distroBeats.get(s.getKey());
if (null == lastBeat) {continue;}
s.setAlive(now - lastBeat < switchDomain.getDistroServerExpiredMillis());
}
//local site servers
List<String> allLocalSiteSrvs = new ArrayList<>();
for (Server server : servers) {if (server.getKey().endsWith(":0")) {continue;}
server.setAdWeight(switchDomain.getAdWeight(server.getKey()) == null ? 0 : switchDomain.getAdWeight(server.getKey()));
for (int i = 0; i < server.getWeight() + server.getAdWeight(); i++) {if (!allLocalSiteSrvs.contains(server.getKey())) {allLocalSiteSrvs.add(server.getKey());
}
if (server.isAlive() && !newHealthyList.contains(server)) {newHealthyList.add(server);
}
}
}
Collections.sort(newHealthyList);
float curRatio = (float) newHealthyList.size() / allLocalSiteSrvs.size();
if (autoDisabledHealthCheck
&& curRatio > switchDomain.getDistroThreshold()
&& System.currentTimeMillis() - lastHealthServerMillis > STABLE_PERIOD) {Loggers.SRV_LOG.info("[NACOS-DISTRO] distro threshold restored and" +
"stable now, enable health check. current ratio: {}", curRatio);
switchDomain.setHealthCheckEnabled(true);
// we must set this variable, otherwise it will conflict with user's action
autoDisabledHealthCheck = false;
}
if (!CollectionUtils.isEqualCollection(healthyServers, newHealthyList)) {
// for every change disable healthy check for some while
if (switchDomain.isHealthCheckEnabled()) {Loggers.SRV_LOG.info("[NACOS-DISTRO] healthy server list changed," +
"disable health check for {} ms from now on, old: {}, new: {}", STABLE_PERIOD,
healthyServers, newHealthyList);
switchDomain.setHealthCheckEnabled(false);
autoDisabledHealthCheck = true;
lastHealthServerMillis = System.currentTimeMillis();}
healthyServers = newHealthyList;
notifyListeners();}
}
- checkDistroHeartbeat 方法会遍历 servers 从 distroBeats 获取 lastBeat 信息,然后判断距离 lastBeat 的时间是否小于 distroServerExpiredMillis,小于则 alive 为 true,否则 alive 为 false;之后遍历 servers 更新 adWeight
- 然后计算 curRatio,如果大于 distroThreshold 且 autoDisabledHealthCheck 为 true 且距离 lastHealthServerMillis 大于 STABLE_PERIOD 则会执行 switchDomain.setHealthCheckEnabled(true) 并更新 autoDisabledHealthCheck 为 false
- 最后判断 healthyServers 与 newHealthyList 是否一致,不一致且 switchDomain.isHealthCheckEnabled() 则会执行 switchDomain.setHealthCheckEnabled(false) 且设置 autoDisabledHealthCheck 为 true,更新 lastHealthServerMillis;最后更新 healthyServers,执行 notifyListeners 方法
小结
- ServerListManager 的 init 方法注册了 ServerListUpdater、ServerStatusReporter 两个定时任务
- ServerListUpdater 实现了 Runnable 接口,其 run 方法会通过 refreshServerList 方法从配置文件读取最新的 servers 配置,然后对比 oldServers,看哪些是新增,哪些是删除的,如果确实有变更则通过 notifyListeners 方法进行通知回调 ServerChangeListener 的 onChangeServerList、onChangeHealthyServerList 方法
- ServerStatusReporter 实现了 Runnable 接口,其 run 方法首先执行 checkDistroHeartbeat,然后执行 onReceiveServerStatus,最后获取 servers 列表通知通过 synchronizer.send(server.getKey(), msg) 方法通知其他 server 自己的状态;最后再次通过 registerServerStatusReporter 调度下次执行的时间
doc
- ServerListManager
正文完