序
本文主要研究一下 nacos ServiceManager 的 updateInstance
ServiceManager
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java
@Component
@DependsOn("nacosApplicationContext")
public class ServiceManager implements RecordListener<Service> {
/**
* Map<namespace, Map<group::serviceName, Service>>
*/
private Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
private LinkedBlockingDeque<ServiceKey> toBeUpdatedServicesQueue = new LinkedBlockingDeque<>(1024 * 1024);
private Synchronizer synchronizer = new ServiceStatusSynchronizer();
private final Lock lock = new ReentrantLock();
@Resource(name = "consistencyDelegate")
private ConsistencyService consistencyService;
@Autowired
private SwitchDomain switchDomain;
@Autowired
private DistroMapper distroMapper;
@Autowired
private ServerListManager serverListManager;
@Autowired
private PushService pushService;
private final Object putServiceLock = new Object();
//......
public void updateInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {Service service = getService(namespaceId, serviceName);
if (service == null) {
throw new NacosException(NacosException.INVALID_PARAM,
"service not found, namespace:" + namespaceId + ", service:" + serviceName);
}
if (!service.allIPs().contains(instance)) {throw new NacosException(NacosException.INVALID_PARAM, "instance not exist:" + instance);
}
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException {String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
Service service = getService(namespaceId, serviceName);
List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
Instances instances = new Instances();
instances.setInstanceList(instanceList);
consistencyService.put(key, instances);
}
public List<Instance> addIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException {return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips);
}
public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips) throws NacosException {Datum datum = consistencyService.get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));
Map<String, Instance> oldInstanceMap = new HashMap<>(16);
List<Instance> currentIPs = service.allIPs(ephemeral);
Map<String, Instance> map = new ConcurrentHashMap<>(currentIPs.size());
for (Instance instance : currentIPs) {map.put(instance.toIPAddr(), instance);
}
if (datum != null) {oldInstanceMap = setValid(((Instances) datum.value).getInstanceList(), map);
}
// use HashMap for deep copy:
HashMap<String, Instance> instanceMap = new HashMap<>(oldInstanceMap.size());
instanceMap.putAll(oldInstanceMap);
for (Instance instance : ips) {if (!service.getClusterMap().containsKey(instance.getClusterName())) {Cluster cluster = new Cluster(instance.getClusterName(), service);
cluster.init();
service.getClusterMap().put(instance.getClusterName(), cluster);
Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
instance.getClusterName(), instance.toJSON());
}
if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {instanceMap.remove(instance.getDatumKey());
} else {instanceMap.put(instance.getDatumKey(), instance);
}
}
if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {throw new IllegalArgumentException("ip list can not be empty, service:" + service.getName() + ", ip list:"
+ JSON.toJSONString(instanceMap.values()));
}
return new ArrayList<>(instanceMap.values());
}
//......
}
- updateInstance 会通过 service.allIPs().contains(instance) 校验要更新的 instance 是否存在,不存在则抛出 NacosException,存在则执行 addInstance 方法
- addInstance 方法它会获取 service,然后执行 addIpAddresses,最后执行 consistencyService.put;addIpAddresses 调用的是 updateIpAddresses 方法,其 action 参数为 UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD
- updateIpAddresses 方法首先从 consistencyService 获取 datum,然后通过 service.allIPs 方法获取 currentIPs,之后根据 datum 设置 oldInstanceMap,对于 UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE 类型执行删除,其余的 action 则将 instance 方法到 instanceMap 中
DistroConsistencyServiceImpl.put
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java
@org.springframework.stereotype.Service("distroConsistencyService")
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService {private ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {Thread t = new Thread(r);
t.setDaemon(true);
t.setName("com.alibaba.nacos.naming.distro.notifier");
return t;
}
});
@Autowired
private DistroMapper distroMapper;
@Autowired
private DataStore dataStore;
@Autowired
private TaskDispatcher taskDispatcher;
@Autowired
private DataSyncer dataSyncer;
@Autowired
private Serializer serializer;
@Autowired
private ServerListManager serverListManager;
@Autowired
private SwitchDomain switchDomain;
@Autowired
private GlobalConfig globalConfig;
private boolean initialized = false;
public volatile Notifier notifier = new Notifier();
private Map<String, CopyOnWriteArrayList<RecordListener>> listeners = new ConcurrentHashMap<>();
private Map<String, String> syncChecksumTasks = new ConcurrentHashMap<>(16);
//......
public void put(String key, Record value) throws NacosException {onPut(key, value);
taskDispatcher.addTask(key);
}
public void onPut(String key, Record value) {if (KeyBuilder.matchEphemeralInstanceListKey(key)) {Datum<Instances> datum = new Datum<>();
datum.value = (Instances) value;
datum.key = key;
datum.timestamp.incrementAndGet();
dataStore.put(key, datum);
}
if (!listeners.containsKey(key)) {return;}
notifier.addTask(key, ApplyAction.CHANGE);
}
//......
}
- DistroConsistencyServiceImpl 的 put 方法会先执行 onPut,然后执行 taskDispatcher.addTask(key);onPut 在判断 key 是 ephemeralInstanceListKey 时会创建一个 Datum,递增其 timestamp,然后放到 dataStore 中,最后调用 notifier.addTask(key, ApplyAction.CHANGE)
Notifier.addTask
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java
public class Notifier implements Runnable {private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);
private BlockingQueue<Pair> tasks = new LinkedBlockingQueue<Pair>(1024 * 1024);
public void addTask(String datumKey, ApplyAction action) {if (services.containsKey(datumKey) && action == ApplyAction.CHANGE) {return;}
if (action == ApplyAction.CHANGE) {services.put(datumKey, StringUtils.EMPTY);
}
tasks.add(Pair.with(datumKey, action));
}
public int getTaskSize() {return tasks.size();
}
@Override
public void run() {Loggers.DISTRO.info("distro notifier started");
while (true) {
try {Pair pair = tasks.take();
if (pair == null) {continue;}
String datumKey = (String) pair.getValue0();
ApplyAction action = (ApplyAction) pair.getValue1();
services.remove(datumKey);
int count = 0;
if (!listeners.containsKey(datumKey)) {continue;}
for (RecordListener listener : listeners.get(datumKey)) {
count++;
try {if (action == ApplyAction.CHANGE) {listener.onChange(datumKey, dataStore.get(datumKey).value);
continue;
}
if (action == ApplyAction.DELETE) {listener.onDelete(datumKey);
continue;
}
} catch (Throwable e) {Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
}
}
if (Loggers.DISTRO.isDebugEnabled()) {Loggers.DISTRO.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
datumKey, count, action.name());
}
} catch (Throwable e) {Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
}
}
- Notifier 的 addTask 方法对于 action 为 ApplyAction.CHANGE 的且不在 services 当中的会放入到 services 当中,最后添加到 tasks;run 方法会不断从 tasks 取出数据,执行相应的回调
TaskDispatcher.addTask
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/TaskDispatcher.java
@Component
public class TaskDispatcher {
@Autowired
private GlobalConfig partitionConfig;
@Autowired
private DataSyncer dataSyncer;
private List<TaskScheduler> taskSchedulerList = new ArrayList<>();
private final int cpuCoreCount = Runtime.getRuntime().availableProcessors();
@PostConstruct
public void init() {for (int i = 0; i < cpuCoreCount; i++) {TaskScheduler taskScheduler = new TaskScheduler(i);
taskSchedulerList.add(taskScheduler);
GlobalExecutor.submitTaskDispatch(taskScheduler);
}
}
public void addTask(String key) {taskSchedulerList.get(UtilsAndCommons.shakeUp(key, cpuCoreCount)).addTask(key);
}
public class TaskScheduler implements Runnable {
private int index;
private int dataSize = 0;
private long lastDispatchTime = 0L;
private BlockingQueue<String> queue = new LinkedBlockingQueue<>(128 * 1024);
public TaskScheduler(int index) {this.index = index;}
public void addTask(String key) {queue.offer(key);
}
public int getIndex() {return index;}
@Override
public void run() {List<String> keys = new ArrayList<>();
while (true) {
try {String key = queue.poll(partitionConfig.getTaskDispatchPeriod(),
TimeUnit.MILLISECONDS);
if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {Loggers.DISTRO.debug("got key: {}", key);
}
if (dataSyncer.getServers() == null || dataSyncer.getServers().isEmpty()) {continue;}
if (StringUtils.isBlank(key)) {continue;}
if (dataSize == 0) {keys = new ArrayList<>();
}
keys.add(key);
dataSize++;
if (dataSize == partitionConfig.getBatchSyncKeyCount() ||
(System.currentTimeMillis() - lastDispatchTime) > partitionConfig.getTaskDispatchPeriod()) {for (Server member : dataSyncer.getServers()) {if (NetUtils.localServer().equals(member.getKey())) {continue;}
SyncTask syncTask = new SyncTask();
syncTask.setKeys(keys);
syncTask.setTargetServer(member.getKey());
if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {Loggers.DISTRO.debug("add sync task: {}", JSON.toJSONString(syncTask));
}
dataSyncer.submit(syncTask, 0);
}
lastDispatchTime = System.currentTimeMillis();
dataSize = 0;
}
} catch (Exception e) {Loggers.DISTRO.error("dispatch sync task failed.", e);
}
}
}
}
}
- TaskDispatcher 的 addTask 方法会从 taskSchedulerList 获取指定的 TaskScheduler,然后执行其 addTask 方法;TaskScheduler 的 addTask 方法会往 queue 中添加数据,而 run 方法则不断从 queue 取数据,然后通过 dataSyncer 执行 syncTask
SyncTask
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/SyncTask.java
public class SyncTask {
private List<String> keys;
private int retryCount;
private long lastExecuteTime;
private String targetServer;
public List<String> getKeys() {return keys;}
public void setKeys(List<String> keys) {this.keys = keys;}
public int getRetryCount() {return retryCount;}
public void setRetryCount(int retryCount) {this.retryCount = retryCount;}
public long getLastExecuteTime() {return lastExecuteTime;}
public void setLastExecuteTime(long lastExecuteTime) {this.lastExecuteTime = lastExecuteTime;}
public String getTargetServer() {return targetServer;}
public void setTargetServer(String targetServer) {this.targetServer = targetServer;}
}
- SyncTask 包含了 keys、targetServer 属性,其中 targetServer 用于告诉 DataSyncer 该往哪个 server 执行 sync 操作
小结
- updateInstance 会通过 service.allIPs().contains(instance) 校验要更新的 instance 是否存在,不存在则抛出 NacosException,存在则执行 addInstance 方法
- addInstance 方法它会获取 service,然后执行 addIpAddresses,最后执行 consistencyService.put;addIpAddresses 调用的是 updateIpAddresses 方法,其 action 参数为 UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD
- updateIpAddresses 方法首先从 consistencyService 获取 datum,然后通过 service.allIPs 方法获取 currentIPs,之后根据 datum 设置 oldInstanceMap,对于 UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE 类型执行删除,其余的 action 则将 instance 方法到 instanceMap 中
doc
- ServiceManager