Nacos - 客户端注册曾经讲过了,那这里讲一下服务端是怎么解决申请的。
解决客户的申请在InstanceController里,咱们看看register办法。
InstanceController#register
这里次要是封装Instance,并调用serviceManager的registerInstance办法进行服务注册。
public String register(HttpServletRequest request) throws Exception { // 获取namespaceId final String namespaceId = WebUtils .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); // 获取serviceName final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); // 验证serviceName的合法性 NamingUtils.checkServiceNameFormat(serviceName); // 封装并验证Instance的合法性 final Instance instance = parseInstance(request); // 服务注册 serviceManager.registerInstance(namespaceId, serviceName, instance); return "ok";}
ServiceManager#registerInstance
判断是否曾经注册过,如果没有注册,则创立一个Service并注册,而后再增加实例。
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException { // 是否曾经注册过,如果没有注册,则创立一个Service并注册 createEmptyService(namespaceId, serviceName, instance.isEphemeral()); // 从注册的服务中取Service Service service = getService(namespaceId, serviceName); if (service == null) { throw new NacosException(NacosException.INVALID_PARAM, "service not found, namespace: " + namespaceId + ", service: " + serviceName); } // 增加实例 addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);}
ServiceManager#createEmptyService
间接调用createServiceIfAbsent办法。
public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException { createServiceIfAbsent(namespaceId, serviceName, local, null);}
ServiceManager#createServiceIfAbsent
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster) throws NacosException { // 获取Service Service service = getService(namespaceId, serviceName); if (service == null) { // 没有获取到则创立 Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName); service = new Service(); service.setName(serviceName); service.setNamespaceId(namespaceId); service.setGroupName(NamingUtils.getGroupName(serviceName)); // now validate the service. if failed, exception will be thrown service.setLastModifiedMillis(System.currentTimeMillis()); // checksum用于校验的 service.recalculateChecksum(); if (cluster != null) { // 退出到集群 cluster.setService(service); service.getClusterMap().put(cluster.getName(), cluster); } // 验证服务和集群名称的合法性 service.validate(); // 放入缓存并查看心跳 putServiceAndInit(service); if (!local) { // 一致性协定保留 addOrReplaceService(service); } }}
ServiceManager#putServiceAndInit
Service存入serviceMap缓存,并每5秒健康检查
private void putServiceAndInit(Service service) throws NacosException { // 存入serviceMap缓存 putService(service); // 每5秒健康检查,包含service和cluster service.init(); // 增加监听,包含长期和永恒 consistencyService .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service); consistencyService .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service); Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());}
ServiceManager#getService
从serviceMap缓存中取值。
public Service getService(String namespaceId, String serviceName) { if (serviceMap.get(namespaceId) == null) { return null; } return chooseServiceMap(namespaceId).get(serviceName);}public Map<String, Service> chooseServiceMap(String namespaceId) { return serviceMap.get(namespaceId);}
ServiceManager#addInstance
保留实例
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException { // 获取实例key String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral); // 从缓存serviceMap获取Service Service service = getService(namespaceId, serviceName); synchronized (service) { // 获取service的所有实例 List<Instance> instanceList = addIpAddresses(service, ephemeral, ips); Instances instances = new Instances(); instances.setInstanceList(instanceList); // 一致性保留,理论调用DistroConsistencyServiceImpl#put consistencyService.put(key, instances); }}
ServiceManager#addIpAddresses
间接调用updateIpAddresses办法
private List<Instance> addIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException { return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips);}
ServiceManager#updateIpAddresses
获取service的所有实例,这里会更新老数据
public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips) throws NacosException { // 获取旧数据 Datum datum = consistencyService .get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral)); // 获取集群中所有的实例 List<Instance> currentIPs = service.allIPs(ephemeral); Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size()); Set<String> currentInstanceIds = Sets.newHashSet(); // 遍历所有实例,key为ip+端口 for (Instance instance : currentIPs) { currentInstances.put(instance.toIpAddr(), instance); currentInstanceIds.add(instance.getInstanceId()); } // 定义新数据 Map<String, Instance> instanceMap; if (datum != null && null != datum.value) { // 如果有老数据,通过currentInstances来更新衰弱状态和心跳工夫 instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances); } else { // 没有就创立一个 instanceMap = new HashMap<>(ips.length); } for (Instance instance : ips) { // 不存在,就创立一个Cluster集群,并开启健康检查 if (!service.getClusterMap().containsKey(instance.getClusterName())) { // 从新创立一个 Cluster cluster = new Cluster(instance.getClusterName(), service); // 开启健康检查 cluster.init(); service.getClusterMap().put(instance.getClusterName(), cluster); Loggers.SRV_LOG .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.", instance.getClusterName(), instance.toJson()); } // 删除操作的话,就删除实例 if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) { instanceMap.remove(instance.getDatumKey()); } else { // 新增实例,设置惟一id instance.setInstanceId(instance.generateInstanceId(currentInstanceIds)); instanceMap.put(instance.getDatumKey(), instance); } } if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) { throw new IllegalArgumentException( "ip list can not be empty, service: " + service.getName() + ", ip list: " + JacksonUtils .toJson(instanceMap.values())); } // 返回所有实例 return new ArrayList<>(instanceMap.values());}
DistroConsistencyServiceImpl#put
如果是长期,则退出缓存,并放入阻塞队列。
public void put(String key, Record value) throws NacosException { onPut(key, value); // 一致性服务 distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE, globalConfig.getTaskDispatchPeriod() / 2);}public void onPut(String key, Record value) { // 长期退出dataStore缓存 if (KeyBuilder.matchEphemeralInstanceListKey(key)) { Datum<Instances> datum = new Datum<>(); datum.value = (Instances) value; datum.key = key; datum.timestamp.incrementAndGet(); dataStore.put(key, datum); } if (!listeners.containsKey(key)) { return; } // 在notifier的阻塞队列退出ArrayBlockingQueue notifier.addTask(key, DataOperation.CHANGE);}
DistroConsistencyServiceImp.Notifierl#run
调用handle办法
@Overridepublic void run() { Loggers.DISTRO.info("distro notifier started"); for (; ; ) { try { Pair<String, DataOperation> pair = tasks.take(); handle(pair); } catch (Throwable e) { Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e); } }}
DistroConsistencyServiceImp.Notifierl#handle
对事件进行告诉
private void handle(Pair<String, DataOperation> pair) { try { String datumKey = pair.getValue0(); DataOperation action = pair.getValue1(); services.remove(datumKey); int count = 0; // 没有监听,返回 if (!listeners.containsKey(datumKey)) { return; } for (RecordListener listener : listeners.get(datumKey)) { count++; try { if (action == DataOperation.CHANGE) { // 解决变更事件 listener.onChange(datumKey, dataStore.get(datumKey).value); continue; } if (action == DataOperation.DELETE) { // 解决删除事件 listener.onDelete(datumKey); continue; } } catch (Throwable e) { Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e); } } if (Loggers.DISTRO.isDebugEnabled()) { Loggers.DISTRO .debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}", datumKey, count, action.name()); } } catch (Throwable e) { Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e); }}
总结
- 服务、集群如果不存在,则创立,并注册监听事件。
- 开启服务、集群的健康检查。
- 如果有旧服务数据,则更新衰弱状态和心跳工夫。
- 节点的数据一致性。
- 调用监听。