乐趣区

聊聊nacos-ServiceManager的registerInstance

本文主要研究一下 nacos ServiceManager 的 registerInstance

ServiceManager

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java

@Component
@DependsOn("nacosApplicationContext")
public class ServiceManager implements RecordListener<Service> {

    /**
     * Map<namespace, Map<group::serviceName, Service>>
     */
    private Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();

    private LinkedBlockingDeque<ServiceKey> toBeUpdatedServicesQueue = new LinkedBlockingDeque<>(1024 * 1024);

    private Synchronizer synchronizer = new ServiceStatusSynchronizer();

    private final Lock lock = new ReentrantLock();

    @Resource(name = "consistencyDelegate")
    private ConsistencyService consistencyService;

    @Autowired
    private SwitchDomain switchDomain;

    @Autowired
    private DistroMapper distroMapper;

    @Autowired
    private ServerListManager serverListManager;

    @Autowired
    private PushService pushService;

    private final Object putServiceLock = new Object();

    //......

    public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {createEmptyService(namespaceId, serviceName, instance.isEphemeral());

        Service service = getService(namespaceId, serviceName);

        if (service == null) {
            throw new NacosException(NacosException.INVALID_PARAM,
                "service not found, namespace:" + namespaceId + ", service:" + serviceName);
        }

        addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
    }

    public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException {createServiceIfAbsent(namespaceId, serviceName, local, null);
    }

    public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster) throws NacosException {Service service = getService(namespaceId, serviceName);
        if (service == null) {Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
            service = new Service();
            service.setName(serviceName);
            service.setNamespaceId(namespaceId);
            service.setGroupName(NamingUtils.getGroupName(serviceName));
            // now validate the service. if failed, exception will be thrown
            service.setLastModifiedMillis(System.currentTimeMillis());
            service.recalculateChecksum();
            if (cluster != null) {cluster.setService(service);
                service.getClusterMap().put(cluster.getName(), cluster);
            }
            service.validate();
            if (local) {putServiceAndInit(service);
            } else {addOrReplaceService(service);
            }
        }
    }

    public Service getService(String namespaceId, String serviceName) {if (serviceMap.get(namespaceId) == null) {return null;}
        return chooseServiceMap(namespaceId).get(serviceName);
    }

    public Map<String, Service> chooseServiceMap(String namespaceId) {return serviceMap.get(namespaceId);
    }

    private void putServiceAndInit(Service service) throws NacosException {putService(service);
        service.init();
        consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
        consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
        Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJSON());
    }

    public void putService(Service service) {if (!serviceMap.containsKey(service.getNamespaceId())) {synchronized (putServiceLock) {if (!serviceMap.containsKey(service.getNamespaceId())) {serviceMap.put(service.getNamespaceId(), new ConcurrentHashMap<>(16));
                }
            }
        }
        serviceMap.get(service.getNamespaceId()).put(service.getName(), service);
    }

    public void addOrReplaceService(Service service) throws NacosException {consistencyService.put(KeyBuilder.buildServiceMetaKey(service.getNamespaceId(), service.getName()), service);
    }

    public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException {String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);

        Service service = getService(namespaceId, serviceName);

        List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);

        Instances instances = new Instances();
        instances.setInstanceList(instanceList);

        consistencyService.put(key, instances);
    }

    public List<Instance> addIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException {return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips);
    }

    public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips) throws NacosException {Datum datum = consistencyService.get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));

        Map<String, Instance> oldInstanceMap = new HashMap<>(16);
        List<Instance> currentIPs = service.allIPs(ephemeral);
        Map<String, Instance> map = new ConcurrentHashMap<>(currentIPs.size());

        for (Instance instance : currentIPs) {map.put(instance.toIPAddr(), instance);
        }
        if (datum != null) {oldInstanceMap = setValid(((Instances) datum.value).getInstanceList(), map);
        }

        // use HashMap for deep copy:
        HashMap<String, Instance> instanceMap = new HashMap<>(oldInstanceMap.size());
        instanceMap.putAll(oldInstanceMap);

        for (Instance instance : ips) {if (!service.getClusterMap().containsKey(instance.getClusterName())) {Cluster cluster = new Cluster(instance.getClusterName(), service);
                cluster.init();
                service.getClusterMap().put(instance.getClusterName(), cluster);
                Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
                    instance.getClusterName(), instance.toJSON());
            }

            if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {instanceMap.remove(instance.getDatumKey());
            } else {instanceMap.put(instance.getDatumKey(), instance);
            }

        }

        if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {throw new IllegalArgumentException("ip list can not be empty, service:" + service.getName() + ", ip list:"
                + JSON.toJSONString(instanceMap.values()));
        }

        return new ArrayList<>(instanceMap.values());
    }
                                
    //......
}
  • registerInstance 方法首先执行 createEmptyService 在 service 不存在的时候会创建,然后再获取 service,最后执行 addInstance;createEmptyService 的 local 参数取之于 instance.isEphemeral(),它主要是执行 createServiceIfAbsent 方法,其 cluster 参数为 null;它首先通过 getService 方法来获取 service( 从 serviceMap 中获取 ),获取不到则创建,local 为 true 执行 putServiceAndInit,否则执行 addOrReplaceService
  • putServiceAndInit 方法首先执行 putService,然后执行 service.init,然后回调 consistencyService.listen 方法;putService 方法主要是往 serviceMap 添加 service 信息;addOrReplaceService 方法则是执行 consistencyService.put 方法
  • addInstance 方法它会获取 service,然后执行 addIpAddresses,最后执行 consistencyService.put;addIpAddresses 调用的是 updateIpAddresses 方法,其 action 参数为 UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD;updateIpAddresses 方法首先从 consistencyService 获取 datum,然后通过 service.allIPs 方法获取 currentIPs,之后根据 datum 设置 oldInstanceMap,最后放到 instanceMap 中

Service.init

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/Service.java

public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener<Instances> {private static final String SERVICE_NAME_SYNTAX = "[0-9a-zA-Z@\\.:_-]+";

    @JSONField(serialize = false)
    private ClientBeatCheckTask clientBeatCheckTask = new ClientBeatCheckTask(this);

    private String token;
    private List<String> owners = new ArrayList<>();
    private Boolean resetWeight = false;
    private Boolean enabled = true;
    private Selector selector = new NoneSelector();
    private String namespaceId;

    /**
     * IP will be deleted if it has not send beat for some time, default timeout is 30 seconds.
     */
    private long ipDeleteTimeout = 30 * 1000;

    private volatile long lastModifiedMillis = 0L;

    private volatile String checksum;

    //......

    public void init() {HealthCheckReactor.scheduleCheck(clientBeatCheckTask);

        for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {entry.getValue().setService(this);
            entry.getValue().init();
        }
    }

    //......
}
  • init 方法则是通过 HealthCheckReactor.scheduleCheck(clientBeatCheckTask) 调度 ClientBeatCheckTask,然后执行 Cluster 的 init 方法

ClientBeatCheckTask

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/ClientBeatCheckTask.java

public class ClientBeatCheckTask implements Runnable {

    private Service service;

    public ClientBeatCheckTask(Service service) {this.service = service;}


    @JSONField(serialize = false)
    public PushService getPushService() {return SpringContext.getAppContext().getBean(PushService.class);
    }

    @JSONField(serialize = false)
    public DistroMapper getDistroMapper() {return SpringContext.getAppContext().getBean(DistroMapper.class);
    }

    public GlobalConfig getGlobalConfig() {return SpringContext.getAppContext().getBean(GlobalConfig.class);
    }

    public String taskKey() {return service.getName();
    }

    @Override
    public void run() {
        try {if (!getDistroMapper().responsible(service.getName())) {return;}

            List<Instance> instances = service.allIPs(true);

            // first set health status of instances:
            for (Instance instance : instances) {if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {if (!instance.isMarked()) {if (instance.isHealthy()) {instance.setHealthy(false);
                            Loggers.EVT_LOG.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
                                instance.getIp(), instance.getPort(), instance.getClusterName(), service.getName(),
                                UtilsAndCommons.LOCALHOST_SITE, instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
                            getPushService().serviceChanged(service);
                            SpringContext.getAppContext().publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
                        }
                    }
                }
            }

            if (!getGlobalConfig().isExpireInstance()) {return;}

            // then remove obsolete instances:
            for (Instance instance : instances) {if (instance.isMarked()) {continue;}

                if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
                    // delete instance
                    Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(), JSON.toJSONString(instance));
                    deleteIP(instance);
                }
            }

        } catch (Exception e) {Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
        }

    }


    private void deleteIP(Instance instance) {

        try {NamingProxy.Request request = NamingProxy.Request.newRequest();
            request.appendParam("ip", instance.getIp())
                .appendParam("port", String.valueOf(instance.getPort()))
                .appendParam("ephemeral", "true")
                .appendParam("clusterName", instance.getClusterName())
                .appendParam("serviceName", service.getName())
                .appendParam("namespaceId", service.getNamespaceId());

            String url = "http://127.0.0.1:" + RunningConfig.getServerPort() + RunningConfig.getContextPath()
                + UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance?" + request.toUrl();

            // delete instance asynchronously:
            HttpClient.asyncHttpDelete(url, null, null, new AsyncCompletionHandler() {
                @Override
                public Object onCompleted(Response response) throws Exception {if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {Loggers.SRV_LOG.error("[IP-DEAD] failed to delete ip automatically, ip: {}, caused {}, resp code: {}",
                            instance.toJSON(), response.getResponseBody(), response.getStatusCode());
                    }
                    return null;
                }
            });

        } catch (Exception e) {Loggers.SRV_LOG.error("[IP-DEAD] failed to delete ip automatically, ip: {}, error: {}", instance.toJSON(), e);
        }
    }
}
  • ClientBeatCheckTask 实现了 Runnable 接口,其 run 方法首先判断是否可以处理该 service,可以的话,则获取 service 下所有的 instances,对于距离上次心跳时间超过 instanceHeartBeatTimeOut 的进行处理,如果还尚未被 marked,且还是 healthy 的更改其 healthy 为 false,然后触发 pushService 的 serviceChanged 方法,最后发布 InstanceHeartbeatTimeoutEvent 事件;最后再次遍历 instances,对于非 marked 而且距离上次心跳时间超过 instanceHeartBeatTimeOut 的进行 deleteIP 操作;deleteIP 方法会执行 nacos 的 delete 请求删除实例信息

小结

  • registerInstance 方法首先执行 createEmptyService 在 service 不存在的时候会创建,然后再获取 service,最后执行 addInstance;createEmptyService 的 local 参数取之于 instance.isEphemeral(),它主要是执行 createServiceIfAbsent 方法,其 cluster 参数为 null;它首先通过 getService 方法来获取 service( 从 serviceMap 中获取 ),获取不到则创建,local 为 true 执行 putServiceAndInit,否则执行 addOrReplaceService
  • putServiceAndInit 方法首先执行 putService,然后执行 service.init,然后回调 consistencyService.listen 方法;putService 方法主要是往 serviceMap 添加 service 信息;addOrReplaceService 方法则是执行 consistencyService.put 方法
  • addInstance 方法它会获取 service,然后执行 addIpAddresses,最后执行 consistencyService.put;addIpAddresses 调用的是 updateIpAddresses 方法,其 action 参数为 UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD;updateIpAddresses 方法首先从 consistencyService 获取 datum,然后通过 service.allIPs 方法获取 currentIPs,之后根据 datum 设置 oldInstanceMap,最后放到 instanceMap 中

doc

  • ServiceManager
退出移动版