Nacos - 客户端注册曾经讲过了,那这里讲一下服务端是怎么解决申请的。
解决客户的申请在InstanceController里,咱们看看register办法。

InstanceController#register

这里次要是封装Instance,并调用serviceManager的registerInstance办法进行服务注册。

public String register(HttpServletRequest request) throws Exception {    // 获取namespaceId    final String namespaceId = WebUtils            .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);    // 获取serviceName    final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);    // 验证serviceName的合法性    NamingUtils.checkServiceNameFormat(serviceName);    // 封装并验证Instance的合法性    final Instance instance = parseInstance(request);    // 服务注册    serviceManager.registerInstance(namespaceId, serviceName, instance);    return "ok";}

ServiceManager#registerInstance

判断是否曾经注册过,如果没有注册,则创立一个Service并注册,而后再增加实例。

public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {    // 是否曾经注册过,如果没有注册,则创立一个Service并注册    createEmptyService(namespaceId, serviceName, instance.isEphemeral());    // 从注册的服务中取Service    Service service = getService(namespaceId, serviceName);        if (service == null) {        throw new NacosException(NacosException.INVALID_PARAM,                "service not found, namespace: " + namespaceId + ", service: " + serviceName);    }    // 增加实例    addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);}

ServiceManager#createEmptyService

间接调用createServiceIfAbsent办法。

public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException {    createServiceIfAbsent(namespaceId, serviceName, local, null);}

ServiceManager#createServiceIfAbsent

public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)        throws NacosException {    // 获取Service    Service service = getService(namespaceId, serviceName);    if (service == null) {        // 没有获取到则创立        Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);        service = new Service();        service.setName(serviceName);        service.setNamespaceId(namespaceId);        service.setGroupName(NamingUtils.getGroupName(serviceName));        // now validate the service. if failed, exception will be thrown        service.setLastModifiedMillis(System.currentTimeMillis());        // checksum用于校验的        service.recalculateChecksum();        if (cluster != null) {            // 退出到集群            cluster.setService(service);            service.getClusterMap().put(cluster.getName(), cluster);        }        // 验证服务和集群名称的合法性        service.validate();        // 放入缓存并查看心跳        putServiceAndInit(service);        if (!local) {            // 一致性协定保留            addOrReplaceService(service);        }    }}

ServiceManager#putServiceAndInit

Service存入serviceMap缓存,并每5秒健康检查

private void putServiceAndInit(Service service) throws NacosException {    // 存入serviceMap缓存    putService(service);    // 每5秒健康检查,包含service和cluster    service.init();    // 增加监听,包含长期和永恒    consistencyService            .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);    consistencyService            .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);    Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());}

ServiceManager#getService

从serviceMap缓存中取值。

public Service getService(String namespaceId, String serviceName) {    if (serviceMap.get(namespaceId) == null) {        return null;    }    return chooseServiceMap(namespaceId).get(serviceName);}public Map<String, Service> chooseServiceMap(String namespaceId) {    return serviceMap.get(namespaceId);}

ServiceManager#addInstance

保留实例

public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)        throws NacosException {    // 获取实例key    String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);    // 从缓存serviceMap获取Service    Service service = getService(namespaceId, serviceName);        synchronized (service) {        // 获取service的所有实例        List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);                Instances instances = new Instances();        instances.setInstanceList(instanceList);        // 一致性保留,理论调用DistroConsistencyServiceImpl#put        consistencyService.put(key, instances);    }}

ServiceManager#addIpAddresses

间接调用updateIpAddresses办法

private List<Instance> addIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException {    return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips);}

ServiceManager#updateIpAddresses

获取service的所有实例,这里会更新老数据

public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips)        throws NacosException {    // 获取旧数据    Datum datum = consistencyService            .get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));    // 获取集群中所有的实例    List<Instance> currentIPs = service.allIPs(ephemeral);    Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size());    Set<String> currentInstanceIds = Sets.newHashSet();    // 遍历所有实例,key为ip+端口    for (Instance instance : currentIPs) {        currentInstances.put(instance.toIpAddr(), instance);        currentInstanceIds.add(instance.getInstanceId());    }    // 定义新数据    Map<String, Instance> instanceMap;    if (datum != null && null != datum.value) {        // 如果有老数据,通过currentInstances来更新衰弱状态和心跳工夫        instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances);    } else {        // 没有就创立一个        instanceMap = new HashMap<>(ips.length);    }        for (Instance instance : ips) {        // 不存在,就创立一个Cluster集群,并开启健康检查        if (!service.getClusterMap().containsKey(instance.getClusterName())) {            // 从新创立一个            Cluster cluster = new Cluster(instance.getClusterName(), service);            // 开启健康检查            cluster.init();            service.getClusterMap().put(instance.getClusterName(), cluster);            Loggers.SRV_LOG                    .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",                            instance.getClusterName(), instance.toJson());        }        // 删除操作的话,就删除实例        if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {            instanceMap.remove(instance.getDatumKey());        } else {            // 新增实例,设置惟一id            instance.setInstanceId(instance.generateInstanceId(currentInstanceIds));            instanceMap.put(instance.getDatumKey(), instance);        }            }        if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {        throw new IllegalArgumentException(                "ip list can not be empty, service: " + service.getName() + ", ip list: " + JacksonUtils                        .toJson(instanceMap.values()));    }    // 返回所有实例    return new ArrayList<>(instanceMap.values());}

DistroConsistencyServiceImpl#put

如果是长期,则退出缓存,并放入阻塞队列。

public void put(String key, Record value) throws NacosException {    onPut(key, value);    // 一致性服务    distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,            globalConfig.getTaskDispatchPeriod() / 2);}public void onPut(String key, Record value) {    // 长期退出dataStore缓存    if (KeyBuilder.matchEphemeralInstanceListKey(key)) {        Datum<Instances> datum = new Datum<>();        datum.value = (Instances) value;        datum.key = key;        datum.timestamp.incrementAndGet();        dataStore.put(key, datum);    }        if (!listeners.containsKey(key)) {        return;    }    // 在notifier的阻塞队列退出ArrayBlockingQueue    notifier.addTask(key, DataOperation.CHANGE);}

DistroConsistencyServiceImp.Notifierl#run

调用handle办法

@Overridepublic void run() {    Loggers.DISTRO.info("distro notifier started");        for (; ; ) {        try {            Pair<String, DataOperation> pair = tasks.take();            handle(pair);        } catch (Throwable e) {            Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);        }    }}

DistroConsistencyServiceImp.Notifierl#handle

对事件进行告诉

private void handle(Pair<String, DataOperation> pair) {    try {        String datumKey = pair.getValue0();        DataOperation action = pair.getValue1();                services.remove(datumKey);                int count = 0;        // 没有监听,返回        if (!listeners.containsKey(datumKey)) {            return;        }                for (RecordListener listener : listeners.get(datumKey)) {                        count++;                        try {                if (action == DataOperation.CHANGE) {                    // 解决变更事件                    listener.onChange(datumKey, dataStore.get(datumKey).value);                    continue;                }                                if (action == DataOperation.DELETE) {                    // 解决删除事件                    listener.onDelete(datumKey);                    continue;                }            } catch (Throwable e) {                Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);            }        }                if (Loggers.DISTRO.isDebugEnabled()) {            Loggers.DISTRO                    .debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",                            datumKey, count, action.name());        }    } catch (Throwable e) {        Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);    }}

总结

  1. 服务、集群如果不存在,则创立,并注册监听事件。
  2. 开启服务、集群的健康检查。
  3. 如果有旧服务数据,则更新衰弱状态和心跳工夫。
  4. 节点的数据一致性。
  5. 调用监听。