乐趣区

关于java:Nacos配置中心集群原理及源码分析

Nacos 作为配置核心,必然须要保障服务节点的高可用性,那么 Nacos 是如何实现集群的呢?

上面这个图,示意 Nacos 集群的部署图。

Nacos 集群工作原理

Nacos 作为配置核心的集群构造中,是一种无中心化节点的设计,因为没有主从节点,也没有选举机制,所以为了可能实现热备,就须要减少虚构 IP(VIP)。

Nacos 的数据存储分为两局部

  1. Mysql 数据库存储,所有 Nacos 节点共享同一份数据,数据的正本机制由 Mysql 自身的主从计划来解决,从而保证数据的可靠性。
  2. 每个节点的本地磁盘,会保留一份全量数据,具体门路:/data/program/nacos-1/data/config-data/${GROUP}.

在 Nacos 的设计中,Mysql 是一个核心数据仓库,且认为在 Mysql 中的数据是相对正确的。除此之外,Nacos 在启动时会把 Mysql 中的数据写一份到本地磁盘。

这么设计的益处是能够进步性能,当客户端须要申请某个配置项时,服务端会想 Ian 从磁盘中读取对应文件返回,而磁盘的读取效率要比数据库效率高。

当配置产生变更时:

  1. Nacos 会把变更的配置保留到数据库,而后再写入本地文件。
  2. 接着发送一个 HTTP 申请,给到集群中的其余节点,其余节点收到事件后,从 Mysql 中 dump 刚刚写入的数据到本地文件中。

另外,NacosServer 启动后,会同步启动一个定时工作,每隔 6 小时,会 dump 一次全量数据到本地文件

配置变更同步入口

当配置产生批改、删除、新增操作时,通过公布一个 notifyConfigChange 事件。

@PostMapping
@Secured(action = ActionTypes.WRITE, parser = ConfigResourceParser.class)
public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response,
        @RequestParam(value = "dataId") String dataId, @RequestParam(value = "group") String group,
        @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
        @RequestParam(value = "content") String content, @RequestParam(value = "tag", required = false) String tag,
        @RequestParam(value = "appName", required = false) String appName,
        @RequestParam(value = "src_user", required = false) String srcUser,
        @RequestParam(value = "config_tags", required = false) String configTags,
        @RequestParam(value = "desc", required = false) String desc,
        @RequestParam(value = "use", required = false) String use,
        @RequestParam(value = "effect", required = false) String effect,
        @RequestParam(value = "type", required = false) String type,
        @RequestParam(value = "schema", required = false) String schema) throws NacosException {
    
   // 省略..
    if (StringUtils.isBlank(betaIps)) {if (StringUtils.isBlank(tag)) {persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, true);
            ConfigChangePublisher
                    .notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));
        } else {persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, true);
            ConfigChangePublisher.notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));
        }
    }// 省略
    return true;
}

AsyncNotifyService

配置数据变更事件,专门有一个监听器 AsyncNotifyService,它会解决数据变更后的同步事件。

@Autowired
public AsyncNotifyService(ServerMemberManager memberManager) {
    this.memberManager = memberManager;
    
    // Register ConfigDataChangeEvent to NotifyCenter.
    NotifyCenter.registerToPublisher(ConfigDataChangeEvent.class, NotifyCenter.ringBufferSize);
    
    // Register A Subscriber to subscribe ConfigDataChangeEvent.
    NotifyCenter.registerSubscriber(new Subscriber() {
        
        @Override
        public void onEvent(Event event) {
            // Generate ConfigDataChangeEvent concurrently
            if (event instanceof ConfigDataChangeEvent) {ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;
                long dumpTs = evt.lastModifiedTs;
                String dataId = evt.dataId;
                String group = evt.group;
                String tenant = evt.tenant;
                String tag = evt.tag;
                Collection<Member> ipList = memberManager.allMembers(); // 失去集群中的 ip 列表
                
                // 构建 NotifySingleTask,并增加到队列中。Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();
                for (Member member : ipList) { // 遍历集群中的每个节点
                    queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(),
                            evt.isBeta));
                }
                // 异步执行工作 AsyncTask
                ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, queue));
            }
        }
        
        @Override
        public Class<? extends Event> subscribeType() {return ConfigDataChangeEvent.class;}
    });
}

AsyncTask

@Override
public void run() {executeAsyncInvoke();
}

private void executeAsyncInvoke() {while (!queue.isEmpty()) {// 遍历队列中的数据,直到数据为空
        NotifySingleTask task = queue.poll(); // 获取 task
        String targetIp = task.getTargetIP(); // 获取指标 ip
        
        if (memberManager.hasMember(targetIp)) { // 如果集群中的 ip 列表蕴含指标 ip
            // start the health check and there are ips that are not monitored, put them directly in the notification queue, otherwise notify
            // 判断指标 ip 的衰弱状态
            boolean unHealthNeedDelay = memberManager.isUnHealth(targetIp); //
            if (unHealthNeedDelay) { // 如果指标服务是非衰弱,则持续增加到队列中,延后再执行。// target ip is unhealthy, then put it in the notification list
                ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null,
                        task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_UNHEALTH,
                        0, task.target);
                // get delay time and set fail count to the task
                asyncTaskExecute(task);
            } else {
                // 构建 header
                Header header = Header.newInstance();
                header.addParam(NotifyService.NOTIFY_HEADER_LAST_MODIFIED, String.valueOf(task.getLastModified()));
                header.addParam(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP, InetUtils.getSelfIP());
                if (task.isBeta) {header.addParam("isBeta", "true");
                }
                AuthHeaderUtil.addIdentityToHeader(header);
                // 通过 restTemplate 发动近程调用,如果调用胜利,则执行 AsyncNotifyCallBack 的回调办法
                restTemplate.get(task.url, header, Query.EMPTY, String.class, new AsyncNotifyCallBack(task));
            }
        }
    }
}

指标节点接管申请

数据同步的申请地址为,task.url=http://192.168.8.16:8848/naco…

@GetMapping("/dataChange")
public Boolean notifyConfigInfo(HttpServletRequest request, @RequestParam("dataId") String dataId,
        @RequestParam("group") String group,
        @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
        @RequestParam(value = "tag", required = false) String tag) {dataId = dataId.trim();
    group = group.trim();
    String lastModified = request.getHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED);
    long lastModifiedTs = StringUtils.isEmpty(lastModified) ? -1 : Long.parseLong(lastModified);
    String handleIp = request.getHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP);
    String isBetaStr = request.getHeader("isBeta");
    if (StringUtils.isNotBlank(isBetaStr) && trueStr.equals(isBetaStr)) {dumpService.dump(dataId, group, tenant, lastModifiedTs, handleIp, true);
    } else {
        //
        dumpService.dump(dataId, group, tenant, tag, lastModifiedTs, handleIp);
    }
    return true;
}

dumpService.dump 用来实现配置的更新,代码如下

当前任务会被增加到 DumpTaskMgr 中治理。

public void dump(String dataId, String group, String tenant, String tag, long lastModified, String handleIp,
        boolean isBeta) {String groupKey = GroupKey2.getKey(dataId, group, tenant);
    String taskKey = String.join("+", dataId, group, tenant, String.valueOf(isBeta), tag);
    dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, tag, lastModified, handleIp, isBeta));
    DUMP_LOG.info("[dump-task] add task. groupKey={}, taskKey={}", groupKey, taskKey);
}

TaskManager.addTask, 先调用父类去实现工作增加。

@Override
public void addTask(Object key, AbstractDelayTask newTask) {super.addTask(key, newTask);
    MetricsMonitor.getDumpTaskMonitor().set(tasks.size());
}

在这种场景设计中,个别都会采纳生产者消费者模式来实现,因而这里不难猜测到,工作会被保留到一个队列中,而后有另外一个线程来执行。

NacosDelayTaskExecuteEngine

TaskManager 的父类是 NacosDelayTaskExecuteEngine,

这个类中有一个成员属性protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks;,专门来保留延期执行的工作类型 AbstractDelayTask.

在这个类的构造方法中,初始化了一个延期执行的工作,其中具体的工作是 ProcessRunnable.

public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {super(logger);
    tasks = new ConcurrentHashMap<Object, AbstractDelayTask>(initCapacity);
    processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));
    processingExecutor
            .scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);
}

ProcessRunnable

private class ProcessRunnable implements Runnable {
    
    @Override
    public void run() {
        try {processTasks();
        } catch (Throwable e) {getEngineLog().error(e.toString(), e);
        }
    }
}

processTasks

protected void processTasks() {
    // 获取所有的工作
    Collection<Object> keys = getAllTaskKeys();
    for (Object taskKey : keys) {AbstractDelayTask task = removeTask(taskKey);
        if (null == task) {continue;}
        // 获取工作处理器,这里返回的是 DumpProcessor
        NacosTaskProcessor processor = getProcessor(taskKey);
        if (null == processor) {getEngineLog().error("processor not found for task, so discarded." + task);
            continue;
        }
        try {
            // ReAdd task if process failed
            // 执行具体任务
            if (!processor.process(task)) {retryFailedTask(taskKey, task);
            }
        } catch (Throwable e) {getEngineLog().error("Nacos task execute error :" + e.toString(), e);
            retryFailedTask(taskKey, task);
        }
    }
}

DumpProcessor.process

读取数据库的最新数据,而后更新本地缓存和磁盘。

版权申明:本博客所有文章除特地申明外,均采纳 CC BY-NC-SA 4.0 许可协定。转载请注明来自 Mic 带你学架构
如果本篇文章对您有帮忙,还请帮忙点个关注和赞,您的保持是我一直创作的能源。欢送关注同名微信公众号获取更多技术干货!

退出移动版