聊聊nacos-ServiceManager的updateInstance

35次阅读

共计 10785 个字符,预计需要花费 27 分钟才能阅读完成。

本文主要研究一下 nacos ServiceManager 的 updateInstance

ServiceManager

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java

@Component
@DependsOn("nacosApplicationContext")
public class ServiceManager implements RecordListener<Service> {

    /**
     * Map<namespace, Map<group::serviceName, Service>>
     */
    private Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();

    private LinkedBlockingDeque<ServiceKey> toBeUpdatedServicesQueue = new LinkedBlockingDeque<>(1024 * 1024);

    private Synchronizer synchronizer = new ServiceStatusSynchronizer();

    private final Lock lock = new ReentrantLock();

    @Resource(name = "consistencyDelegate")
    private ConsistencyService consistencyService;

    @Autowired
    private SwitchDomain switchDomain;

    @Autowired
    private DistroMapper distroMapper;

    @Autowired
    private ServerListManager serverListManager;

    @Autowired
    private PushService pushService;

    private final Object putServiceLock = new Object();

    //......

    public void updateInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {Service service = getService(namespaceId, serviceName);

        if (service == null) {
            throw new NacosException(NacosException.INVALID_PARAM,
                "service not found, namespace:" + namespaceId + ", service:" + serviceName);
        }

        if (!service.allIPs().contains(instance)) {throw new NacosException(NacosException.INVALID_PARAM, "instance not exist:" + instance);
        }

        addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
    }

    public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException {String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);

        Service service = getService(namespaceId, serviceName);

        List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);

        Instances instances = new Instances();
        instances.setInstanceList(instanceList);

        consistencyService.put(key, instances);
    }

    public List<Instance> addIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException {return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips);
    }

    public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips) throws NacosException {Datum datum = consistencyService.get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));

        Map<String, Instance> oldInstanceMap = new HashMap<>(16);
        List<Instance> currentIPs = service.allIPs(ephemeral);
        Map<String, Instance> map = new ConcurrentHashMap<>(currentIPs.size());

        for (Instance instance : currentIPs) {map.put(instance.toIPAddr(), instance);
        }
        if (datum != null) {oldInstanceMap = setValid(((Instances) datum.value).getInstanceList(), map);
        }

        // use HashMap for deep copy:
        HashMap<String, Instance> instanceMap = new HashMap<>(oldInstanceMap.size());
        instanceMap.putAll(oldInstanceMap);

        for (Instance instance : ips) {if (!service.getClusterMap().containsKey(instance.getClusterName())) {Cluster cluster = new Cluster(instance.getClusterName(), service);
                cluster.init();
                service.getClusterMap().put(instance.getClusterName(), cluster);
                Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
                    instance.getClusterName(), instance.toJSON());
            }

            if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {instanceMap.remove(instance.getDatumKey());
            } else {instanceMap.put(instance.getDatumKey(), instance);
            }

        }

        if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {throw new IllegalArgumentException("ip list can not be empty, service:" + service.getName() + ", ip list:"
                + JSON.toJSONString(instanceMap.values()));
        }

        return new ArrayList<>(instanceMap.values());
    }

    //......
 }
  • updateInstance 会通过 service.allIPs().contains(instance) 校验要更新的 instance 是否存在,不存在则抛出 NacosException,存在则执行 addInstance 方法
  • addInstance 方法它会获取 service,然后执行 addIpAddresses,最后执行 consistencyService.put;addIpAddresses 调用的是 updateIpAddresses 方法,其 action 参数为 UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD
  • updateIpAddresses 方法首先从 consistencyService 获取 datum,然后通过 service.allIPs 方法获取 currentIPs,之后根据 datum 设置 oldInstanceMap,对于 UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE 类型执行删除,其余的 action 则将 instance 方法到 instanceMap 中

DistroConsistencyServiceImpl.put

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java

@org.springframework.stereotype.Service("distroConsistencyService")
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService {private ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {Thread t = new Thread(r);

            t.setDaemon(true);
            t.setName("com.alibaba.nacos.naming.distro.notifier");

            return t;
        }
    });

    @Autowired
    private DistroMapper distroMapper;

    @Autowired
    private DataStore dataStore;

    @Autowired
    private TaskDispatcher taskDispatcher;

    @Autowired
    private DataSyncer dataSyncer;

    @Autowired
    private Serializer serializer;

    @Autowired
    private ServerListManager serverListManager;

    @Autowired
    private SwitchDomain switchDomain;

    @Autowired
    private GlobalConfig globalConfig;

    private boolean initialized = false;

    public volatile Notifier notifier = new Notifier();

    private Map<String, CopyOnWriteArrayList<RecordListener>> listeners = new ConcurrentHashMap<>();

    private Map<String, String> syncChecksumTasks = new ConcurrentHashMap<>(16);

    //......

    public void put(String key, Record value) throws NacosException {onPut(key, value);
        taskDispatcher.addTask(key);
    }

    public void onPut(String key, Record value) {if (KeyBuilder.matchEphemeralInstanceListKey(key)) {Datum<Instances> datum = new Datum<>();
            datum.value = (Instances) value;
            datum.key = key;
            datum.timestamp.incrementAndGet();
            dataStore.put(key, datum);
        }

        if (!listeners.containsKey(key)) {return;}

        notifier.addTask(key, ApplyAction.CHANGE);
    }
    //......
}
  • DistroConsistencyServiceImpl 的 put 方法会先执行 onPut,然后执行 taskDispatcher.addTask(key);onPut 在判断 key 是 ephemeralInstanceListKey 时会创建一个 Datum,递增其 timestamp,然后放到 dataStore 中,最后调用 notifier.addTask(key, ApplyAction.CHANGE)

Notifier.addTask

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java

    public class Notifier implements Runnable {private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);

        private BlockingQueue<Pair> tasks = new LinkedBlockingQueue<Pair>(1024 * 1024);

        public void addTask(String datumKey, ApplyAction action) {if (services.containsKey(datumKey) && action == ApplyAction.CHANGE) {return;}
            if (action == ApplyAction.CHANGE) {services.put(datumKey, StringUtils.EMPTY);
            }
            tasks.add(Pair.with(datumKey, action));
        }

        public int getTaskSize() {return tasks.size();
        }

        @Override
        public void run() {Loggers.DISTRO.info("distro notifier started");

            while (true) {
                try {Pair pair = tasks.take();

                    if (pair == null) {continue;}

                    String datumKey = (String) pair.getValue0();
                    ApplyAction action = (ApplyAction) pair.getValue1();

                    services.remove(datumKey);

                    int count = 0;

                    if (!listeners.containsKey(datumKey)) {continue;}

                    for (RecordListener listener : listeners.get(datumKey)) {

                        count++;

                        try {if (action == ApplyAction.CHANGE) {listener.onChange(datumKey, dataStore.get(datumKey).value);
                                continue;
                            }

                            if (action == ApplyAction.DELETE) {listener.onDelete(datumKey);
                                continue;
                            }
                        } catch (Throwable e) {Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
                        }
                    }

                    if (Loggers.DISTRO.isDebugEnabled()) {Loggers.DISTRO.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
                            datumKey, count, action.name());
                    }
                } catch (Throwable e) {Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
                }
            }
        }
    }
  • Notifier 的 addTask 方法对于 action 为 ApplyAction.CHANGE 的且不在 services 当中的会放入到 services 当中,最后添加到 tasks;run 方法会不断从 tasks 取出数据,执行相应的回调

TaskDispatcher.addTask

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/TaskDispatcher.java

@Component
public class TaskDispatcher {

    @Autowired
    private GlobalConfig partitionConfig;

    @Autowired
    private DataSyncer dataSyncer;

    private List<TaskScheduler> taskSchedulerList = new ArrayList<>();

    private final int cpuCoreCount = Runtime.getRuntime().availableProcessors();

    @PostConstruct
    public void init() {for (int i = 0; i < cpuCoreCount; i++) {TaskScheduler taskScheduler = new TaskScheduler(i);
            taskSchedulerList.add(taskScheduler);
            GlobalExecutor.submitTaskDispatch(taskScheduler);
        }
    }

    public void addTask(String key) {taskSchedulerList.get(UtilsAndCommons.shakeUp(key, cpuCoreCount)).addTask(key);
    }

    public class TaskScheduler implements Runnable {

        private int index;

        private int dataSize = 0;

        private long lastDispatchTime = 0L;

        private BlockingQueue<String> queue = new LinkedBlockingQueue<>(128 * 1024);

        public TaskScheduler(int index) {this.index = index;}

        public void addTask(String key) {queue.offer(key);
        }

        public int getIndex() {return index;}

        @Override
        public void run() {List<String> keys = new ArrayList<>();
            while (true) {

                try {String key = queue.poll(partitionConfig.getTaskDispatchPeriod(),
                        TimeUnit.MILLISECONDS);

                    if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {Loggers.DISTRO.debug("got key: {}", key);
                    }

                    if (dataSyncer.getServers() == null || dataSyncer.getServers().isEmpty()) {continue;}

                    if (StringUtils.isBlank(key)) {continue;}

                    if (dataSize == 0) {keys = new ArrayList<>();
                    }

                    keys.add(key);
                    dataSize++;

                    if (dataSize == partitionConfig.getBatchSyncKeyCount() ||
                        (System.currentTimeMillis() - lastDispatchTime) > partitionConfig.getTaskDispatchPeriod()) {for (Server member : dataSyncer.getServers()) {if (NetUtils.localServer().equals(member.getKey())) {continue;}
                            SyncTask syncTask = new SyncTask();
                            syncTask.setKeys(keys);
                            syncTask.setTargetServer(member.getKey());

                            if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {Loggers.DISTRO.debug("add sync task: {}", JSON.toJSONString(syncTask));
                            }

                            dataSyncer.submit(syncTask, 0);
                        }
                        lastDispatchTime = System.currentTimeMillis();
                        dataSize = 0;
                    }

                } catch (Exception e) {Loggers.DISTRO.error("dispatch sync task failed.", e);
                }
            }
        }
    }
}
  • TaskDispatcher 的 addTask 方法会从 taskSchedulerList 获取指定的 TaskScheduler,然后执行其 addTask 方法;TaskScheduler 的 addTask 方法会往 queue 中添加数据,而 run 方法则不断从 queue 取数据,然后通过 dataSyncer 执行 syncTask

SyncTask

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/SyncTask.java

public class SyncTask {

    private List<String> keys;

    private int retryCount;

    private long lastExecuteTime;

    private String targetServer;

    public List<String> getKeys() {return keys;}

    public void setKeys(List<String> keys) {this.keys = keys;}

    public int getRetryCount() {return retryCount;}

    public void setRetryCount(int retryCount) {this.retryCount = retryCount;}

    public long getLastExecuteTime() {return lastExecuteTime;}

    public void setLastExecuteTime(long lastExecuteTime) {this.lastExecuteTime = lastExecuteTime;}

    public String getTargetServer() {return targetServer;}

    public void setTargetServer(String targetServer) {this.targetServer = targetServer;}
}
  • SyncTask 包含了 keys、targetServer 属性,其中 targetServer 用于告诉 DataSyncer 该往哪个 server 执行 sync 操作

小结

  • updateInstance 会通过 service.allIPs().contains(instance) 校验要更新的 instance 是否存在,不存在则抛出 NacosException,存在则执行 addInstance 方法
  • addInstance 方法它会获取 service,然后执行 addIpAddresses,最后执行 consistencyService.put;addIpAddresses 调用的是 updateIpAddresses 方法,其 action 参数为 UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD
  • updateIpAddresses 方法首先从 consistencyService 获取 datum,然后通过 service.allIPs 方法获取 currentIPs,之后根据 datum 设置 oldInstanceMap,对于 UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE 类型执行删除,其余的 action 则将 instance 方法到 instanceMap 中

doc

  • ServiceManager

正文完
 0