聊聊nacos的DistroMapper

33次阅读

共计 2918 个字符,预计需要花费 8 分钟才能阅读完成。

本文主要研究一下 nacos 的 DistroMapper

ServerChangeListener

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/cluster/servers/ServerChangeListener.java

public interface ServerChangeListener {

    /**
     * If member list changed, this method is invoked.
     *
     * @param servers servers after change
     */
    void onChangeServerList(List<Server> servers);

    /**
     * If reachable member list changed, this method is invoked.
     *
     * @param healthyServer reachable servers after change
     */
    void onChangeHealthyServerList(List<Server> healthyServer);
}
  • ServerChangeListener 定义了 onChangeServerList、onChangeHealthyServerList 方法

DistroMapper

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/DistroMapper.java

@Component("distroMapper")
public class DistroMapper implements ServerChangeListener {private List<String> healthyList = new ArrayList<>();

    public List<String> getHealthyList() {return healthyList;}

    @Autowired
    private SwitchDomain switchDomain;

    @Autowired
    private ServerListManager serverListManager;

    /**
     * init server list
     */
    @PostConstruct
    public void init() {serverListManager.listen(this);
    }

    public boolean responsible(Cluster cluster, Instance instance) {return switchDomain.isHealthCheckEnabled(cluster.getServiceName())
            && !cluster.getHealthCheckTask().isCancelled()
            && responsible(cluster.getServiceName())
            && cluster.contains(instance);
    }

    public boolean responsible(String serviceName) {if (!switchDomain.isDistroEnabled() || SystemUtils.STANDALONE_MODE) {return true;}

        if (CollectionUtils.isEmpty(healthyList)) {
            // means distro config is not ready yet
            return false;
        }

        int index = healthyList.indexOf(NetUtils.localServer());
        int lastIndex = healthyList.lastIndexOf(NetUtils.localServer());
        if (lastIndex < 0 || index < 0) {return true;}

        int target = distroHash(serviceName) % healthyList.size();
        return target >= index && target <= lastIndex;
    }

    public String mapSrv(String serviceName) {if (CollectionUtils.isEmpty(healthyList) || !switchDomain.isDistroEnabled()) {return NetUtils.localServer();
        }

        try {return healthyList.get(distroHash(serviceName) % healthyList.size());
        } catch (Exception e) {Loggers.SRV_LOG.warn("distro mapper failed, return localhost:" + NetUtils.localServer(), e);

            return NetUtils.localServer();}
    }

    public int distroHash(String serviceName) {return Math.abs(serviceName.hashCode() % Integer.MAX_VALUE);
    }

    @Override
    public void onChangeServerList(List<Server> latestMembers) { }

    @Override
    public void onChangeHealthyServerList(List<Server> latestReachableMembers) {List<String> newHealthyList = new ArrayList<>();
        for (Server server : latestReachableMembers) {newHealthyList.add(server.getKey());
        }
        healthyList = newHealthyList;
    }
}
  • DistroMapper 实现了 ServerChangeListener 接口,其 onChangeHealthyServerList 方法会更新 healthyList
  • 它还提供了 responsible 方法,该方法在 switchDomain.isHealthCheckEnabled 以及 cluster.getHealthCheckTask() 不是 cancelled 的情况下会执行 responsible,该方法会调用 distroHash 来计算 hash 值
  • 它还提供了 mapSrv 方法,也是通过 distroHash 计算 hash 然后与 healthyList 的大小取余,最后返回 server 的 key

小结

ServerChangeListener 定义了 onChangeServerList、onChangeHealthyServerList 方法;DistroMapper 实现了 ServerChangeListener 接口,其 onChangeHealthyServerList 方法会更新 healthyList;DistroMapper 还提供了 responsible 方法及 mapSrv 方法

doc

  • ServerChangeListener

正文完
 0