本文主要研究一下nacos的ServerListManager

ServerListManager

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/cluster/ServerListManager.java

@Component("serverListManager")public class ServerListManager {    private static final int STABLE_PERIOD = 60 * 1000;    @Autowired    private SwitchDomain switchDomain;    private List<ServerChangeListener> listeners = new ArrayList<>();    private List<Server> servers = new ArrayList<>();    private List<Server> healthyServers = new ArrayList<>();    private Map<String, List<Server>> distroConfig = new ConcurrentHashMap<>();    private Map<String, Long> distroBeats = new ConcurrentHashMap<>(16);    private Set<String> liveSites = new HashSet<>();    private final static String LOCALHOST_SITE = UtilsAndCommons.UNKNOWN_SITE;    private long lastHealthServerMillis = 0L;    private boolean autoDisabledHealthCheck = false;    private Synchronizer synchronizer = new ServerStatusSynchronizer();    public void listen(ServerChangeListener listener) {        listeners.add(listener);    }    @PostConstruct    public void init() {        GlobalExecutor.registerServerListUpdater(new ServerListUpdater());        GlobalExecutor.registerServerStatusReporter(new ServerStatusReporter(), 5000);    }    //......}
  • ServerListManager的init方法注册了ServerListUpdater、ServerStatusReporter两个定时任务

GlobalExecutor

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/misc/GlobalExecutor.java

public class GlobalExecutor {    public static final long HEARTBEAT_INTERVAL_MS = TimeUnit.SECONDS.toMillis(5L);    public static final long LEADER_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(15L);    public static final long RANDOM_MS = TimeUnit.SECONDS.toMillis(5L);    public static final long TICK_PERIOD_MS = TimeUnit.MILLISECONDS.toMillis(500L);    private static final long NACOS_SERVER_LIST_REFRESH_INTERVAL = TimeUnit.SECONDS.toMillis(5);    private static final long PARTITION_DATA_TIMED_SYNC_INTERVAL = TimeUnit.SECONDS.toMillis(5);    private static final long SERVER_STATUS_UPDATE_PERIOD = TimeUnit.SECONDS.toMillis(5);    //......    public static void registerServerListUpdater(Runnable runnable) {        executorService.scheduleAtFixedRate(runnable, 0, NACOS_SERVER_LIST_REFRESH_INTERVAL, TimeUnit.MILLISECONDS);    }    public static void registerServerStatusReporter(Runnable runnable, long delay) {        SERVER_STATUS_EXECUTOR.schedule(runnable, delay, TimeUnit.MILLISECONDS);    }            //......}
  • registerServerListUpdater是每隔NACOS_SERVER_LIST_REFRESH_INTERVAL调度一次,默认是5秒;registerServerStatusReporter则是调度一个延时任务

ServerListUpdater

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/cluster/ServerListManager.java

    public class ServerListUpdater implements Runnable {        @Override        public void run() {            try {                List<Server> refreshedServers = refreshServerList();                List<Server> oldServers = servers;                if (CollectionUtils.isEmpty(refreshedServers)) {                    Loggers.RAFT.warn("refresh server list failed, ignore it.");                    return;                }                boolean changed = false;                List<Server> newServers = (List<Server>) CollectionUtils.subtract(refreshedServers, oldServers);                if (CollectionUtils.isNotEmpty(newServers)) {                    servers.addAll(newServers);                    changed = true;                    Loggers.RAFT.info("server list is updated, new: {} servers: {}", newServers.size(), newServers);                }                List<Server> deadServers = (List<Server>) CollectionUtils.subtract(oldServers, refreshedServers);                if (CollectionUtils.isNotEmpty(deadServers)) {                    servers.removeAll(deadServers);                    changed = true;                    Loggers.RAFT.info("server list is updated, dead: {}, servers: {}", deadServers.size(), deadServers);                }                if (changed) {                    notifyListeners();                }            } catch (Exception e) {                Loggers.RAFT.info("error while updating server list.", e);            }        }    }    private List<Server> refreshServerList() {        List<Server> result = new ArrayList<>();        if (STANDALONE_MODE) {            Server server = new Server();            server.setIp(NetUtils.getLocalAddress());            server.setServePort(RunningConfig.getServerPort());            result.add(server);            return result;        }        List<String> serverList = new ArrayList<>();        try {            serverList = readClusterConf();        } catch (Exception e) {            Loggers.SRV_LOG.warn("failed to get config: " + CLUSTER_CONF_FILE_PATH, e);        }        if (Loggers.SRV_LOG.isDebugEnabled()) {            Loggers.SRV_LOG.debug("SERVER-LIST from cluster.conf: {}", result);        }        //use system env        if (CollectionUtils.isEmpty(serverList)) {            serverList = SystemUtils.getIPsBySystemEnv(UtilsAndCommons.SELF_SERVICE_CLUSTER_ENV);            if (Loggers.SRV_LOG.isDebugEnabled()) {                Loggers.SRV_LOG.debug("SERVER-LIST from system variable: {}", result);            }        }        if (CollectionUtils.isNotEmpty(serverList)) {            for (int i = 0; i < serverList.size(); i++) {                String ip;                int port;                String server = serverList.get(i);                if (server.contains(UtilsAndCommons.IP_PORT_SPLITER)) {                    ip = server.split(UtilsAndCommons.IP_PORT_SPLITER)[0];                    port = Integer.parseInt(server.split(UtilsAndCommons.IP_PORT_SPLITER)[1]);                } else {                    ip = server;                    port = RunningConfig.getServerPort();                }                Server member = new Server();                member.setIp(ip);                member.setServePort(port);                result.add(member);            }        }        return result;    }    private void notifyListeners() {        GlobalExecutor.notifyServerListChange(new Runnable() {            @Override            public void run() {                for (ServerChangeListener listener : listeners) {                    listener.onChangeServerList(servers);                    listener.onChangeHealthyServerList(healthyServers);                }            }        });    }    
  • ServerListUpdater实现了Runnable接口,其run方法会通过refreshServerList方法从配置文件读取最新的servers配置,然后对比oldServers,看哪些是新增,哪些是删除的,如果确实有变更则通过notifyListeners方法进行通知回调ServerChangeListener的onChangeServerList、onChangeHealthyServerList方法

ServerStatusReporter

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/cluster/ServerListManager.java

    private class ServerStatusReporter implements Runnable {        @Override        public void run() {            try {                if (RunningConfig.getServerPort() <= 0) {                    return;                }                checkDistroHeartbeat();                int weight = Runtime.getRuntime().availableProcessors() / 2;                if (weight <= 0) {                    weight = 1;                }                long curTime = System.currentTimeMillis();                String status = LOCALHOST_SITE + "#" + NetUtils.localServer() + "#" + curTime + "#" + weight + "\r\n";                //send status to itself                onReceiveServerStatus(status);                List<Server> allServers = getServers();                if (!contains(NetUtils.localServer())) {                    Loggers.SRV_LOG.error("local ip is not in serverlist, ip: {}, serverlist: {}", NetUtils.localServer(), allServers);                    return;                }                if (allServers.size() > 0 && !NetUtils.localServer().contains(UtilsAndCommons.LOCAL_HOST_IP)) {                    for (com.alibaba.nacos.naming.cluster.servers.Server server : allServers) {                        if (server.getKey().equals(NetUtils.localServer())) {                            continue;                        }                        Message msg = new Message();                        msg.setData(status);                        synchronizer.send(server.getKey(), msg);                    }                }            } catch (Exception e) {                Loggers.SRV_LOG.error("[SERVER-STATUS] Exception while sending server status", e);            } finally {                GlobalExecutor.registerServerStatusReporter(this, switchDomain.getServerStatusSynchronizationPeriodMillis());            }        }    }
  • ServerStatusReporter实现了Runnable接口,其run方法首先执行checkDistroHeartbeat,然后执行onReceiveServerStatus,最后获取servers列表通知通过synchronizer.send(server.getKey(), msg)方法通知其他server自己的状态;最后再次通过registerServerStatusReporter调度下次执行的时间

checkDistroHeartbeat

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/cluster/ServerListManager.java

    private void checkDistroHeartbeat() {        Loggers.SRV_LOG.debug("check distro heartbeat.");        List<Server> servers = distroConfig.get(LOCALHOST_SITE);        if (CollectionUtils.isEmpty(servers)) {            return;        }        List<Server> newHealthyList = new ArrayList<>(servers.size());        long now = System.currentTimeMillis();        for (Server s: servers) {            Long lastBeat = distroBeats.get(s.getKey());            if (null == lastBeat) {                continue;            }            s.setAlive(now - lastBeat < switchDomain.getDistroServerExpiredMillis());        }        //local site servers        List<String> allLocalSiteSrvs = new ArrayList<>();        for (Server server : servers) {            if (server.getKey().endsWith(":0")) {                continue;            }            server.setAdWeight(switchDomain.getAdWeight(server.getKey()) == null ? 0 : switchDomain.getAdWeight(server.getKey()));            for (int i = 0; i < server.getWeight() + server.getAdWeight(); i++) {                if (!allLocalSiteSrvs.contains(server.getKey())) {                    allLocalSiteSrvs.add(server.getKey());                }                if (server.isAlive() && !newHealthyList.contains(server)) {                    newHealthyList.add(server);                }            }        }        Collections.sort(newHealthyList);        float curRatio = (float) newHealthyList.size() / allLocalSiteSrvs.size();        if (autoDisabledHealthCheck            && curRatio > switchDomain.getDistroThreshold()            && System.currentTimeMillis() - lastHealthServerMillis > STABLE_PERIOD) {            Loggers.SRV_LOG.info("[NACOS-DISTRO] distro threshold restored and " +                "stable now, enable health check. current ratio: {}", curRatio);            switchDomain.setHealthCheckEnabled(true);            // we must set this variable, otherwise it will conflict with user's action            autoDisabledHealthCheck = false;        }        if (!CollectionUtils.isEqualCollection(healthyServers, newHealthyList)) {            // for every change disable healthy check for some while            if (switchDomain.isHealthCheckEnabled()) {                Loggers.SRV_LOG.info("[NACOS-DISTRO] healthy server list changed, " +                        "disable health check for {} ms from now on, old: {}, new: {}", STABLE_PERIOD,                    healthyServers, newHealthyList);                switchDomain.setHealthCheckEnabled(false);                autoDisabledHealthCheck = true;                lastHealthServerMillis = System.currentTimeMillis();            }            healthyServers = newHealthyList;            notifyListeners();        }    }
  • checkDistroHeartbeat方法会遍历servers从distroBeats获取lastBeat信息,然后判断距离lastBeat的时间是否小于distroServerExpiredMillis,小于则alive为true,否则alive为false;之后遍历servers更新adWeight
  • 然后计算curRatio,如果大于distroThreshold且autoDisabledHealthCheck为true且距离lastHealthServerMillis大于STABLE_PERIOD则会执行switchDomain.setHealthCheckEnabled(true)并更新autoDisabledHealthCheck为false
  • 最后判断healthyServers与newHealthyList是否一致,不一致且switchDomain.isHealthCheckEnabled()则会执行switchDomain.setHealthCheckEnabled(false)且设置autoDisabledHealthCheck为true,更新lastHealthServerMillis;最后更新healthyServers,执行notifyListeners方法

小结

  • ServerListManager的init方法注册了ServerListUpdater、ServerStatusReporter两个定时任务
  • ServerListUpdater实现了Runnable接口,其run方法会通过refreshServerList方法从配置文件读取最新的servers配置,然后对比oldServers,看哪些是新增,哪些是删除的,如果确实有变更则通过notifyListeners方法进行通知回调ServerChangeListener的onChangeServerList、onChangeHealthyServerList方法
  • ServerStatusReporter实现了Runnable接口,其run方法首先执行checkDistroHeartbeat,然后执行onReceiveServerStatus,最后获取servers列表通知通过synchronizer.send(server.getKey(), msg)方法通知其他server自己的状态;最后再次通过registerServerStatusReporter调度下次执行的时间

doc

  • ServerListManager