共计 2918 个字符,预计需要花费 8 分钟才能阅读完成。
序
本文主要研究一下 nacos 的 DistroMapper
ServerChangeListener
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/cluster/servers/ServerChangeListener.java
public interface ServerChangeListener {
/**
* If member list changed, this method is invoked.
*
* @param servers servers after change
*/
void onChangeServerList(List<Server> servers);
/**
* If reachable member list changed, this method is invoked.
*
* @param healthyServer reachable servers after change
*/
void onChangeHealthyServerList(List<Server> healthyServer);
}
- ServerChangeListener 定义了 onChangeServerList、onChangeHealthyServerList 方法
DistroMapper
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/DistroMapper.java
@Component("distroMapper")
public class DistroMapper implements ServerChangeListener {private List<String> healthyList = new ArrayList<>();
public List<String> getHealthyList() {return healthyList;}
@Autowired
private SwitchDomain switchDomain;
@Autowired
private ServerListManager serverListManager;
/**
* init server list
*/
@PostConstruct
public void init() {serverListManager.listen(this);
}
public boolean responsible(Cluster cluster, Instance instance) {return switchDomain.isHealthCheckEnabled(cluster.getServiceName())
&& !cluster.getHealthCheckTask().isCancelled()
&& responsible(cluster.getServiceName())
&& cluster.contains(instance);
}
public boolean responsible(String serviceName) {if (!switchDomain.isDistroEnabled() || SystemUtils.STANDALONE_MODE) {return true;}
if (CollectionUtils.isEmpty(healthyList)) {
// means distro config is not ready yet
return false;
}
int index = healthyList.indexOf(NetUtils.localServer());
int lastIndex = healthyList.lastIndexOf(NetUtils.localServer());
if (lastIndex < 0 || index < 0) {return true;}
int target = distroHash(serviceName) % healthyList.size();
return target >= index && target <= lastIndex;
}
public String mapSrv(String serviceName) {if (CollectionUtils.isEmpty(healthyList) || !switchDomain.isDistroEnabled()) {return NetUtils.localServer();
}
try {return healthyList.get(distroHash(serviceName) % healthyList.size());
} catch (Exception e) {Loggers.SRV_LOG.warn("distro mapper failed, return localhost:" + NetUtils.localServer(), e);
return NetUtils.localServer();}
}
public int distroHash(String serviceName) {return Math.abs(serviceName.hashCode() % Integer.MAX_VALUE);
}
@Override
public void onChangeServerList(List<Server> latestMembers) { }
@Override
public void onChangeHealthyServerList(List<Server> latestReachableMembers) {List<String> newHealthyList = new ArrayList<>();
for (Server server : latestReachableMembers) {newHealthyList.add(server.getKey());
}
healthyList = newHealthyList;
}
}
- DistroMapper 实现了 ServerChangeListener 接口,其 onChangeHealthyServerList 方法会更新 healthyList
- 它还提供了 responsible 方法,该方法在 switchDomain.isHealthCheckEnabled 以及 cluster.getHealthCheckTask() 不是 cancelled 的情况下会执行 responsible,该方法会调用 distroHash 来计算 hash 值
- 它还提供了 mapSrv 方法,也是通过 distroHash 计算 hash 然后与 healthyList 的大小取余,最后返回 server 的 key
小结
ServerChangeListener 定义了 onChangeServerList、onChangeHealthyServerList 方法;DistroMapper 实现了 ServerChangeListener 接口,其 onChangeHealthyServerList 方法会更新 healthyList;DistroMapper 还提供了 responsible 方法及 mapSrv 方法
doc
- ServerChangeListener
正文完