共计 2696 个字符,预计需要花费 7 分钟才能阅读完成。
序
本文主要研究一下 nacos 的 ServerChangeListener
ServerChangeListener
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/cluster/servers/ServerChangeListener.java
public interface ServerChangeListener {
/**
* If member list changed, this method is invoked.
*
* @param servers servers after change
*/
void onChangeServerList(List<Server> servers);
/**
* If reachable member list changed, this method is invoked.
*
* @param healthyServer reachable servers after change
*/
void onChangeHealthyServerList(List<Server> healthyServer);
}
- ServerChangeListener 定义了 onChangeServerList、onChangeHealthyServerList 方法
DistroMapper
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/DistroMapper.java
@Component("distroMapper")
public class DistroMapper implements ServerChangeListener {private List<String> healthyList = new ArrayList<>();
//......
@Override
public void onChangeServerList(List<Server> latestMembers) { }
@Override
public void onChangeHealthyServerList(List<Server> latestReachableMembers) {List<String> newHealthyList = new ArrayList<>();
for (Server server : latestReachableMembers) {newHealthyList.add(server.getKey());
}
healthyList = newHealthyList;
}
//......
}
- DistroMapper 实现了 ServerChangeListener 接口,其 onChangeHealthyServerList 会更新自己的 healthyList
RaftPeerSet
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/persistent/raft/RaftPeerSet.java
@Component
@DependsOn("serverListManager")
public class RaftPeerSet implements ServerChangeListener, ApplicationContextAware {
@Autowired
private ServerListManager serverListManager;
private ApplicationContext applicationContext;
private AtomicLong localTerm = new AtomicLong(0L);
private RaftPeer leader = null;
private Map<String, RaftPeer> peers = new HashMap<>();
private Set<String> sites = new HashSet<>();
private boolean ready = false;
//......
@Override
public void onChangeServerList(List<Server> latestMembers) {Map<String, RaftPeer> tmpPeers = new HashMap<>(8);
for (Server member : latestMembers) {if (peers.containsKey(member.getKey())) {tmpPeers.put(member.getKey(), peers.get(member.getKey()));
continue;
}
RaftPeer raftPeer = new RaftPeer();
raftPeer.ip = member.getKey();
// first time meet the local server:
if (NetUtils.localServer().equals(member.getKey())) {raftPeer.term.set(localTerm.get());
}
tmpPeers.put(member.getKey(), raftPeer);
}
// replace raft peer set:
peers = tmpPeers;
if (RunningConfig.getServerPort() > 0) {ready = true;}
Loggers.RAFT.info("raft peers changed:" + latestMembers);
}
@Override
public void onChangeHealthyServerList(List<Server> latestReachableMembers) { }
//......
}
- RaftPeerSet 实现了 ServerChangeListener 接口,其 onChangeServerList 方法会更新 peers 及 ready 属性
小结
ServerChangeListener 定义了 onChangeServerList、onChangeHealthyServerList 方法;DistroMapper 实现了 ServerChangeListener 接口,其 onChangeHealthyServerList 会更新自己的 healthyList;RaftPeerSet 实现了 ServerChangeListener 接口,其 onChangeServerList 方法会更新 peers 及 ready 属性
doc
- ServerChangeListener
正文完