本文主要研究一下NacosNamingService的registerInstance

NacosNamingService

nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/NacosNamingService.java

public class NacosNamingService implements NamingService {    private static final String DEFAULT_PORT = "8080";    private static final long DEFAULT_HEART_BEAT_INTERVAL = TimeUnit.SECONDS.toMillis(5);    //......    @Override    public void registerInstance(String serviceName, String ip, int port) throws NacosException {        registerInstance(serviceName, ip, port, Constants.DEFAULT_CLUSTER_NAME);    }    @Override    public void registerInstance(String serviceName, String groupName, String ip, int port) throws NacosException {        registerInstance(serviceName, groupName, ip, port, Constants.DEFAULT_CLUSTER_NAME);    }    @Override    public void registerInstance(String serviceName, String ip, int port, String clusterName) throws NacosException {        registerInstance(serviceName, Constants.DEFAULT_GROUP, ip, port, clusterName);    }    @Override    public void registerInstance(String serviceName, String groupName, String ip, int port, String clusterName) throws NacosException {        Instance instance = new Instance();        instance.setIp(ip);        instance.setPort(port);        instance.setWeight(1.0);        instance.setClusterName(clusterName);        registerInstance(serviceName, groupName, instance);    }    @Override    public void registerInstance(String serviceName, Instance instance) throws NacosException {        registerInstance(serviceName, Constants.DEFAULT_GROUP, instance);    }    @Override    public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {        if (instance.isEphemeral()) {            BeatInfo beatInfo = new BeatInfo();            beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));            beatInfo.setIp(instance.getIp());            beatInfo.setPort(instance.getPort());            beatInfo.setCluster(instance.getClusterName());            beatInfo.setWeight(instance.getWeight());            beatInfo.setMetadata(instance.getMetadata());            beatInfo.setScheduled(false);            long instanceInterval = instance.getInstanceHeartBeatInterval();            beatInfo.setPeriod(instanceInterval == 0 ? DEFAULT_HEART_BEAT_INTERVAL : instanceInterval);            beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);        }        serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);    }    //......}
  • registerInstance方法对于ephemeral的instance(默认是ephemeral)会创建BeatInfo,通过beatReactor.addBeatInfo方法添加到调度任务中,然后通过serverProxy.registerService进行服务注册

BeatReactor

nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/beat/BeatReactor.java

public class BeatReactor {    private ScheduledExecutorService executorService;    private NamingProxy serverProxy;    public final Map<String, BeatInfo> dom2Beat = new ConcurrentHashMap<String, BeatInfo>();    public BeatReactor(NamingProxy serverProxy) {        this(serverProxy, UtilAndComs.DEFAULT_CLIENT_BEAT_THREAD_COUNT);    }    public BeatReactor(NamingProxy serverProxy, int threadCount) {        this.serverProxy = serverProxy;        executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() {            @Override            public Thread newThread(Runnable r) {                Thread thread = new Thread(r);                thread.setDaemon(true);                thread.setName("com.alibaba.nacos.naming.beat.sender");                return thread;            }        });    }    public void addBeatInfo(String serviceName, BeatInfo beatInfo) {        NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);        dom2Beat.put(buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort()), beatInfo);        executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);        MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());    }    public void removeBeatInfo(String serviceName, String ip, int port) {        NAMING_LOGGER.info("[BEAT] removing beat: {}:{}:{} from beat map.", serviceName, ip, port);        BeatInfo beatInfo = dom2Beat.remove(buildKey(serviceName, ip, port));        if (beatInfo == null) {            return;        }        beatInfo.setStopped(true);        MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());    }    private String buildKey(String serviceName, String ip, int port) {        return serviceName + Constants.NAMING_INSTANCE_ID_SPLITTER            + ip + Constants.NAMING_INSTANCE_ID_SPLITTER + port;    }    class BeatTask implements Runnable {        BeatInfo beatInfo;        public BeatTask(BeatInfo beatInfo) {            this.beatInfo = beatInfo;        }        @Override        public void run() {            if (beatInfo.isStopped()) {                return;            }            long result = serverProxy.sendBeat(beatInfo);            long nextTime = result > 0 ? result : beatInfo.getPeriod();            executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);        }    }}
  • BeatReactor的addBeatInfo方法会往dom2Beat添加信息,然后调度BeatTask;removeBeatInfo方法则将指定key从dom2Beat移除,标记其stopped为true;BeatTask则会先判断是否为stopped,否的话则通过serverProxy.sendBeat发送心跳信息,计算下一次调度的时间,往executorService添加调度任务

NamingProxy

nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/net/NamingProxy.java

public class NamingProxy {    private static final int DEFAULT_SERVER_PORT = 8848;    private int serverPort = DEFAULT_SERVER_PORT;    private String namespaceId;    private String endpoint;    private String nacosDomain;    private List<String> serverList;    private List<String> serversFromEndpoint = new ArrayList<String>();    private long lastSrvRefTime = 0L;    private long vipSrvRefInterMillis = TimeUnit.SECONDS.toMillis(30);    private Properties properties;    //......    public long sendBeat(BeatInfo beatInfo) {        try {            if (NAMING_LOGGER.isDebugEnabled()) {                NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString());            }            Map<String, String> params = new HashMap<String, String>(4);            params.put("beat", JSON.toJSONString(beatInfo));            params.put(CommonParams.NAMESPACE_ID, namespaceId);            params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName());            String result = reqAPI(UtilAndComs.NACOS_URL_BASE + "/instance/beat", params, HttpMethod.PUT);            JSONObject jsonObject = JSON.parseObject(result);            if (jsonObject != null) {                return jsonObject.getLong("clientBeatInterval");            }        } catch (Exception e) {            NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: " + JSON.toJSONString(beatInfo), e);        }        return 0L;    }    public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {        NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}",            namespaceId, serviceName, instance);        final Map<String, String> params = new HashMap<String, String>(9);        params.put(CommonParams.NAMESPACE_ID, namespaceId);        params.put(CommonParams.SERVICE_NAME, serviceName);        params.put(CommonParams.GROUP_NAME, groupName);        params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());        params.put("ip", instance.getIp());        params.put("port", String.valueOf(instance.getPort()));        params.put("weight", String.valueOf(instance.getWeight()));        params.put("enable", String.valueOf(instance.isEnabled()));        params.put("healthy", String.valueOf(instance.isHealthy()));        params.put("ephemeral", String.valueOf(instance.isEphemeral()));        params.put("metadata", JSON.toJSONString(instance.getMetadata()));        reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST);    }            //......}
  • NamingProxy的sendBeat方法会往/instance/beat接口发送PUT请求;registerService方法会往/instance接口发送POST请求进行服务注册

小结

NacosNamingService的registerInstance方法对于ephemeral的instance(默认是ephemeral)会创建BeatInfo,通过beatReactor.addBeatInfo方法添加到调度任务中,然后通过serverProxy.registerService进行服务注册

doc

  • NacosNamingService