本文主要研究一下nacos的ServerChangeListener

ServerChangeListener

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/cluster/servers/ServerChangeListener.java

public interface ServerChangeListener {    /**     * If member list changed, this method is invoked.     *     * @param servers servers after change     */    void onChangeServerList(List<Server> servers);    /**     * If reachable member list changed, this method is invoked.     *     * @param healthyServer reachable servers after change     */    void onChangeHealthyServerList(List<Server> healthyServer);}
  • ServerChangeListener定义了onChangeServerList、onChangeHealthyServerList方法

DistroMapper

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/DistroMapper.java

@Component("distroMapper")public class DistroMapper implements ServerChangeListener {    private List<String> healthyList = new ArrayList<>();    //......    @Override    public void onChangeServerList(List<Server> latestMembers) {    }    @Override    public void onChangeHealthyServerList(List<Server> latestReachableMembers) {        List<String> newHealthyList = new ArrayList<>();        for (Server server : latestReachableMembers) {            newHealthyList.add(server.getKey());        }        healthyList = newHealthyList;    }    //......}
  • DistroMapper实现了ServerChangeListener接口,其onChangeHealthyServerList会更新自己的healthyList

RaftPeerSet

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftPeerSet.java

@Component@DependsOn("serverListManager")public class RaftPeerSet implements ServerChangeListener, ApplicationContextAware {    @Autowired    private ServerListManager serverListManager;    private ApplicationContext applicationContext;    private AtomicLong localTerm = new AtomicLong(0L);    private RaftPeer leader = null;    private Map<String, RaftPeer> peers = new HashMap<>();    private Set<String> sites = new HashSet<>();    private boolean ready = false;    //......    @Override    public void onChangeServerList(List<Server> latestMembers) {        Map<String, RaftPeer> tmpPeers = new HashMap<>(8);        for (Server member : latestMembers) {            if (peers.containsKey(member.getKey())) {                tmpPeers.put(member.getKey(), peers.get(member.getKey()));                continue;            }            RaftPeer raftPeer = new RaftPeer();            raftPeer.ip = member.getKey();            // first time meet the local server:            if (NetUtils.localServer().equals(member.getKey())) {                raftPeer.term.set(localTerm.get());            }            tmpPeers.put(member.getKey(), raftPeer);        }        // replace raft peer set:        peers = tmpPeers;        if (RunningConfig.getServerPort() > 0) {            ready = true;        }        Loggers.RAFT.info("raft peers changed: " + latestMembers);    }    @Override    public void onChangeHealthyServerList(List<Server> latestReachableMembers) {    }    //......}
  • RaftPeerSet实现了ServerChangeListener接口,其onChangeServerList方法会更新peers及ready属性

小结

ServerChangeListener定义了onChangeServerList、onChangeHealthyServerList方法;DistroMapper实现了ServerChangeListener接口,其onChangeHealthyServerList会更新自己的healthyList;RaftPeerSet实现了ServerChangeListener接口,其onChangeServerList方法会更新peers及ready属性

doc

  • ServerChangeListener