紧接上文,咱们剖析了Nacos的客户端代码,

我决定手撕Nacos源码(先撕客户端源码)

明天咱们再来试一下服务端 ,至此就能够Nacos源码就告一段落,欢送大家品鉴。

nacos服务端

注册核心服务端的次要性能包含,接管客户端的服务注册,服务发现,服务下线的性能,然而除了这些和客户端的交互之外,服务端还要做一些更重要的事件,就是咱们经常会在分布式系统中听到的AP和CP,作为一个集群,nacos即实现了AP也实现了CP,其中AP应用的本人实现的Distro协定,而CP是采纳raft协定实现的,这个过程中牵涉到心跳、选主等操作。

咱们来学习一下注册核心服务端接管客户端服务注册的性能。

注册解决

咱们先来学习一下Nacos的工具类WebUtils,该工具类在nacos-core工程下,该工具类是用于解决申请参数转化的,外面提供了2个常被用到的办法required()optional()

required办法通过参数名key,解析HttpServletRequest申请中的参数,并转码为UTF-8编码。optional办法在required办法的根底上减少了默认值,如果获取不到,则返回默认值。

代码如下:

/** * required办法通过参数名key,解析HttpServletRequest申请中的参数,并转码为UTF-8编码。 */public static String required(final HttpServletRequest req, final String key) {    String value = req.getParameter(key);    if (StringUtils.isEmpty(value)) {        throw new IllegalArgumentException("Param '" + key + "' is required.");    }    String encoding = req.getParameter("encoding");    return resolveValue(value, encoding);}/** * optional办法在required办法的根底上减少了默认值,如果获取不到,则返回默认值。 */public static String optional(final HttpServletRequest req, final String key, final String defaultValue) {    if (!req.getParameterMap().containsKey(key) || req.getParameterMap().get(key)[0] == null) {        return defaultValue;    }    String value = req.getParameter(key);    if (StringUtils.isBlank(value)) {        return defaultValue;    }    String encoding = req.getParameter("encoding");    return resolveValue(value, encoding);}

nacos 的 server 与 client应用了http协定来交互,那么在server端必然提供了http接口的入口,并且在core模块看到其依赖了spring boot starter,所以它的http接口由集成了Spring的web服务器反对,简略地说就是像咱们平时写的业务服务一样,有controller层和service层。

以OpenAPI作为入口来学习,咱们找到/nacos/v1/ns/instance服务注册接口,在nacos-naming工程中咱们能够看到InstanceController正是咱们要找的对象,如下图:

解决服务注册,咱们间接找对应的POST办法即可,代码如下:

/** * Register new instance. * 接管客户端注册信息 * @param request http request * @return 'ok' if success * @throws Exception any error during register */@CanDistro@PostMapping@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)public String register(HttpServletRequest request) throws Exception {    //获取namespaceid,该参数是可选参数    final String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);    //获取服务名字    final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);    //校验服务的名字,服务的名字格局为groupName@@serviceName    NamingUtils.checkServiceNameFormat(serviceName);    //创立实例    final Instance instance = parseInstance(request);    //注册服务    serviceManager.registerInstance(namespaceId, serviceName, instance);    return "ok";}

如上图,该办法次要用于接管客户端注册信息,并且会校验参数是否存在问题,如果不存在问题就创立服务的实例,服务实例创立后将服务实例注册到Nacos中,注册的办法如下:

public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {    //判断本地缓存中是否存在该命名空间,如果不存在就创立,之后判断该命名空间下是否    //存在该服务,如果不存在就创立空的服务    //如果实例为空,则创立实例,并且会将创立的实例存入到serviceMap汇合中    createEmptyService(namespaceId, serviceName, instance.isEphemeral());    //从serviceMap汇合中获取创立的实例    Service service = getService(namespaceId, serviceName);    if (service == null) {        throw new NacosException(NacosException.INVALID_PARAM,                "service not found, namespace: " + namespaceId + ", service: " + serviceName);    }    //服务注册,这一步才会把服务的实例信息和服务绑定起来    addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);}

注册的办法中会先创立该实例对象,创立前先查看本地缓存是否存在该实例对象,如果不存在就创立,最初注册该服务,并且该服务会和实例信息捆绑到一起。

Distro协定介绍

Distro是阿里巴巴的公有协定, 是一种分布式一致性算法,目前风行的Nacos服务治理框架就采纳了Distro协定。Distro 协定被定位为 长期数据的一致性协定:该类型协定, 不须要把数据存储到磁盘或者数据库,因为长期数据通常和服务器放弃一个session会话, 该会话只有存在,数据就不会失落

Distro 协定保障写必须永远是胜利的,即便可能会产生网络分区。当网络复原时,把各数据分片的数据进行合并。

Distro 协定具备以下特点:

1:专门为了注册核心而发明出的协定;2:客户端与服务端有两个重要的交互,服务注册与心跳发送;3:客户端以服务为维度向服务端注册,注册后每隔一段时间向服务端发送一次心跳,心跳包须要带上注册服务的全副信息,在客户端看来,服务端节点对等,所以申请的节点是随机的;4:客户端申请失败则换一个节点从新发送申请;5:服务端节点都存储所有数据,但每个节点只负责其中一部分服务,在接管到客户端的“写”(注册、心跳、下线等)申请后,服务端节点判断申请的服务是否为本人负责,如果是,则解决,否则交由负责的节点解决;6:每个服务端节点被动发送健康检查到其余节点,响应的节点被该节点视为衰弱节点;7:服务端在接管到客户端的服务心跳后,如果该服务不存在,则将该心跳申请当做注册申请来解决;8:服务端如果长时间未收到客户端心跳,则下线该服务;9:负责的节点在接管到服务注册、服务心跳等写申请后将数据写入后即返回,后盾异步地将数据同步给其余节点;10:节点在收到读申请后间接从本机获取后返回,无论数据是否为最新。

Distro寻址

Distro协定次要用于nacos 服务端节点之间的互相发现,nacos应用寻址机制来实现服务端节点的治理。在Nacos中,寻址模式有三种:

单机模式(StandaloneMemberLookup)文件模式(FileConfigMemberLookup)服务器模式(AddressServerMemberLookup)

三种寻址模式如下图:

1.2.3.1 单机模式

com.alibaba.nacos.core.cluster.lookup.LookupFactory中有创立寻址形式,能够创立集群启动形式、单机启动形式,不同启动形式就决定了不同寻址模式,如果是集群启动,

/** * Create the target addressing pattern. * 创立寻址模式 * @param memberManager {@link ServerMemberManager} * @return {@link MemberLookup} * @throws NacosException NacosException */public static MemberLookup createLookUp(ServerMemberManager memberManager) throws NacosException {    //NacosServer 集群形式启动    if (!EnvUtil.getStandaloneMode()) {        String lookupType = EnvUtil.getProperty(LOOKUP_MODE_TYPE);        //由参数中传入的寻址形式失去LookupType对象        LookupType type = chooseLookup(lookupType);        //抉择寻址形式        LOOK_UP = find(type);        //设置以后寻址形式        currentLookupType = type;    } else {        //NacosServer单机启动        LOOK_UP = new StandaloneMemberLookup();    }    LOOK_UP.injectMemberManager(memberManager);    Loggers.CLUSTER.info("Current addressing mode selection : {}", LOOK_UP.getClass().getSimpleName());    return LOOK_UP;}/*** * 抉择寻址形式 * @param type * @return */private static MemberLookup find(LookupType type) {    //文件寻址模式,也就是配置cluster.conf配置文件将多个节点串联起来,    // 通过配置文件寻找其余节点,以达到和其余节点通信的目标    if (LookupType.FILE_CONFIG.equals(type)) {        LOOK_UP = new FileConfigMemberLookup();        return LOOK_UP;    }    //服务器模式    if (LookupType.ADDRESS_SERVER.equals(type)) {        LOOK_UP = new AddressServerMemberLookup();        return LOOK_UP;    }    // unpossible to run here    throw new IllegalArgumentException();}

单节点寻址模式会间接创立StandaloneMemberLookup对象,而文件寻址模式会创立FileConfigMemberLookup对象,服务器寻址模式会创立AddressServerMemberLookup

1.2.3.2 文件寻址模式

文件寻址模式次要在创立集群的时候,通过cluster.conf来配置集群,程序能够通过监听cluster.conf文件变动实现动静治理节点,FileConfigMemberLookup源码如下:

public class FileConfigMemberLookup extends AbstractMemberLookup {    //创立文件监听器    private FileWatcher watcher = new FileWatcher() {        //文件产生变更事件        @Override        public void onChange(FileChangeEvent event) {            readClusterConfFromDisk();        }        //查看context是否蕴含cluster.conf        @Override        public boolean interest(String context) {            return StringUtils.contains(context, "cluster.conf");        }    };    @Override    public void start() throws NacosException {        if (start.compareAndSet(false, true)) {            readClusterConfFromDisk();            // 应用inotify机制来监督文件更改,并主动触发对cluster.conf的读取            try {                WatchFileCenter.registerWatcher(EnvUtil.getConfPath(), watcher);            } catch (Throwable e) {                Loggers.CLUSTER.error("An exception occurred in the launch file monitor : {}", e.getMessage());            }        }    }    @Override    public void destroy() throws NacosException {        WatchFileCenter.deregisterWatcher(EnvUtil.getConfPath(), watcher);    }    private void readClusterConfFromDisk() {        Collection<Member> tmpMembers = new ArrayList<>();        try {            List<String> tmp = EnvUtil.readClusterConf();            tmpMembers = MemberUtil.readServerConf(tmp);        } catch (Throwable e) {            Loggers.CLUSTER                    .error("nacos-XXXX [serverlist] failed to get serverlist from disk!, error : {}", e.getMessage());        }        afterLookup(tmpMembers);    }}
1.2.3.3 服务器寻址模式

应用地址服务器存储节点信息,会创立AddressServerMemberLookup,服务端定时拉取信息进行治理;

public class AddressServerMemberLookup extends AbstractMemberLookup {    private final GenericType<RestResult<String>> genericType = new GenericType<RestResult<String>>() {    };    public String domainName;    public String addressPort;    public String addressUrl;    public String envIdUrl;    public String addressServerUrl;    private volatile boolean isAddressServerHealth = true;    private int addressServerFailCount = 0;    private int maxFailCount = 12;    private final NacosRestTemplate restTemplate = HttpClientBeanHolder.getNacosRestTemplate(Loggers.CORE);    private volatile boolean shutdown = false;    @Override    public void start() throws NacosException {        if (start.compareAndSet(false, true)) {            this.maxFailCount = Integer.parseInt(EnvUtil.getProperty("maxHealthCheckFailCount", "12"));            initAddressSys();            run();        }    }    /***     * 获取服务器地址     */    private void initAddressSys() {        String envDomainName = System.getenv("address_server_domain");        if (StringUtils.isBlank(envDomainName)) {            domainName = EnvUtil.getProperty("address.server.domain", "jmenv.tbsite.net");        } else {            domainName = envDomainName;        }        String envAddressPort = System.getenv("address_server_port");        if (StringUtils.isBlank(envAddressPort)) {            addressPort = EnvUtil.getProperty("address.server.port", "8080");        } else {            addressPort = envAddressPort;        }        String envAddressUrl = System.getenv("address_server_url");        if (StringUtils.isBlank(envAddressUrl)) {            addressUrl = EnvUtil.getProperty("address.server.url", EnvUtil.getContextPath() + "/" + "serverlist");        } else {            addressUrl = envAddressUrl;        }        addressServerUrl = "http://" + domainName + ":" + addressPort + addressUrl;        envIdUrl = "http://" + domainName + ":" + addressPort + "/env";        Loggers.CORE.info("ServerListService address-server port:" + addressPort);        Loggers.CORE.info("ADDRESS_SERVER_URL:" + addressServerUrl);    }    @SuppressWarnings("PMD.UndefineMagicConstantRule")    private void run() throws NacosException {        // With the address server, you need to perform a synchronous member node pull at startup        // Repeat three times, successfully jump out        boolean success = false;        Throwable ex = null;        int maxRetry = EnvUtil.getProperty("nacos.core.address-server.retry", Integer.class, 5);        for (int i = 0; i < maxRetry; i++) {            try {                //拉取集群节点信息                syncFromAddressUrl();                success = true;                break;            } catch (Throwable e) {                ex = e;                Loggers.CLUSTER.error("[serverlist] exception, error : {}", ExceptionUtil.getAllExceptionMsg(ex));            }        }        if (!success) {            throw new NacosException(NacosException.SERVER_ERROR, ex);        }        //创立定时工作        GlobalExecutor.scheduleByCommon(new AddressServerSyncTask(), 5_000L);    }    @Override    public void destroy() throws NacosException {        shutdown = true;    }    @Override    public Map<String, Object> info() {        Map<String, Object> info = new HashMap<>(4);        info.put("addressServerHealth", isAddressServerHealth);        info.put("addressServerUrl", addressServerUrl);        info.put("envIdUrl", envIdUrl);        info.put("addressServerFailCount", addressServerFailCount);        return info;    }    private void syncFromAddressUrl() throws Exception {        RestResult<String> result = restTemplate                .get(addressServerUrl, Header.EMPTY, Query.EMPTY, genericType.getType());        if (result.ok()) {            isAddressServerHealth = true;            Reader reader = new StringReader(result.getData());            try {                afterLookup(MemberUtil.readServerConf(EnvUtil.analyzeClusterConf(reader)));            } catch (Throwable e) {                Loggers.CLUSTER.error("[serverlist] exception for analyzeClusterConf, error : {}",                        ExceptionUtil.getAllExceptionMsg(e));            }            addressServerFailCount = 0;        } else {            addressServerFailCount++;            if (addressServerFailCount >= maxFailCount) {                isAddressServerHealth = false;            }            Loggers.CLUSTER.error("[serverlist] failed to get serverlist, error code {}", result.getCode());        }    }    // 定时工作    class AddressServerSyncTask implements Runnable {        @Override        public void run() {            if (shutdown) {                return;            }            try {                //拉取服务列表                syncFromAddressUrl();            } catch (Throwable ex) {                addressServerFailCount++;                if (addressServerFailCount >= maxFailCount) {                    isAddressServerHealth = false;                }                Loggers.CLUSTER.error("[serverlist] exception, error : {}", ExceptionUtil.getAllExceptionMsg(ex));            } finally {                GlobalExecutor.scheduleByCommon(this, 5_000L);            }        }    }}

数据同步

Nacos数据同步分为全量同步和增量同步,所谓全量同步就是初始化数据一次性同步,而增量同步是指有数据减少的时候,只同步减少的数据。

全量同步

全量同步流程比较复杂,流程如上图:

1:启动一个定时工作线程DistroLoadDataTask加载数据,调用load()办法加载数据2:调用loadAllDataSnapshotFromRemote()办法从近程机器同步所有的数据3:从namingProxy代理获取所有的数据data4:结构http申请,调用httpGet办法从指定的server获取数据5:从获取的后果result中获取数据bytes6:解决数据processData7:从data反序列化出datumMap8:把数据存储到dataStore,也就是本地缓存dataMap9:监听器不包含key,就创立一个空的service,并且绑定监听器10:监听器listener执行胜利后,就更新data store
工作启动

com.alibaba.nacos.core.distributed.distro.DistroProtocol的构造函数中调用startDistroTask()办法,该办法会执行startVerifyTask()startLoadTask(),咱们重点关注startLoadTask(),该办法代码如下:

/*** * 启动DistroTask */private void startDistroTask() {    if (EnvUtil.getStandaloneMode()) {        isInitialized = true;        return;    }    //启动startVerifyTask,做数据同步校验    startVerifyTask();    //启动DistroLoadDataTask,批量加载数据    startLoadTask();}//启动DistroLoadDataTaskprivate void startLoadTask() {    //解决状态回调对象    DistroCallback loadCallback = new DistroCallback() {        //解决胜利        @Override        public void onSuccess() {            isInitialized = true;        }        //解决失败        @Override        public void onFailed(Throwable throwable) {            isInitialized = false;        }    };    //执行DistroLoadDataTask,是一个多线程    GlobalExecutor.submitLoadDataTask(            new DistroLoadDataTask(memberManager, distroComponentHolder, distroConfig, loadCallback));}/*** * 启动startVerifyTask * 数据校验 */private void startVerifyTask() {    GlobalExecutor.schedulePartitionDataTimedSync(        new DistroVerifyTask(            memberManager,            distroComponentHolder),        distroConfig.getVerifyIntervalMillis());}
数据如何执行加载

下面办法会调用DistroLoadDataTask对象,而该对象其实是个线程,因而会执行它的run办法,run办法会调用load()办法实现数据全量加载,代码如下:

/*** * 数据加载过程 */@Overridepublic void run() {    try {        //加载数据        load();        if (!checkCompleted()) {            GlobalExecutor.submitLoadDataTask(this, distroConfig.getLoadDataRetryDelayMillis());        } else {            loadCallback.onSuccess();            Loggers.DISTRO.info("[DISTRO-INIT] load snapshot data success");        }    } catch (Exception e) {        loadCallback.onFailed(e);        Loggers.DISTRO.error("[DISTRO-INIT] load snapshot data failed. ", e);    }}/*** * 加载数据,并同步 * @throws Exception */private void load() throws Exception {    while (memberManager.allMembersWithoutSelf().isEmpty()) {        Loggers.DISTRO.info("[DISTRO-INIT] waiting server list init...");        TimeUnit.SECONDS.sleep(1);    }    while (distroComponentHolder.getDataStorageTypes().isEmpty()) {        Loggers.DISTRO.info("[DISTRO-INIT] waiting distro data storage register...");        TimeUnit.SECONDS.sleep(1);    }    //同步数据    for (String each : distroComponentHolder.getDataStorageTypes()) {        if (!loadCompletedMap.containsKey(each) || !loadCompletedMap.get(each)) {            //从近程机器上同步所有数据            loadCompletedMap.put(each, loadAllDataSnapshotFromRemote(each));        }    }}
数据同步

数据同步会通过Http申请从近程服务器获取数据,并同步到以后服务的缓存中,执行流程如下:

1:loadAllDataSnapshotFromRemote()从近程加载所有数据,并解决同步到本机2:transportAgent.getDatumSnapshot()近程加载数据,通过Http申请执行近程加载3:dataProcessor.processSnapshot()解决数据同步到本地

数据处理残缺逻辑代码如下:loadAllDataSnapshotFromRemote()办法

/*** * 从近程机器上同步所有数据 */private boolean loadAllDataSnapshotFromRemote(String resourceType) {    DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType);    DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);    if (null == transportAgent || null == dataProcessor) {        Loggers.DISTRO.warn("[DISTRO-INIT] Can't find component for type {}, transportAgent: {}, dataProcessor: {}",                resourceType, transportAgent, dataProcessor);        return false;    }    //遍历集群成员节点,不包含本人    for (Member each : memberManager.allMembersWithoutSelf()) {        try {            Loggers.DISTRO.info("[DISTRO-INIT] load snapshot {} from {}", resourceType, each.getAddress());            //从近程节点加载数据,调用http申请接口: distro/datums;            DistroData distroData = transportAgent.getDatumSnapshot(each.getAddress());            //解决数据            boolean result = dataProcessor.processSnapshot(distroData);            Loggers.DISTRO                    .info("[DISTRO-INIT] load snapshot {} from {} result: {}", resourceType, each.getAddress(),                            result);            if (result) {                return true;            }        } catch (Exception e) {            Loggers.DISTRO.error("[DISTRO-INIT] load snapshot {} from {} failed.", resourceType, each.getAddress(), e);        }    }    return false;}

近程加载数据代码如下:transportAgent.getDatumSnapshot()办法

/*** * 从namingProxy代理获取所有的数据data,从获取的后果result中获取数据bytes; * @param targetServer target server. * @return */@Overridepublic DistroData getDatumSnapshot(String targetServer) {    try {        //从namingProxy代理获取所有的数据data,从获取的后果result中获取数据bytes;        byte[] allDatum = NamingProxy.getAllData(targetServer);        //将数据封装成DistroData        return new DistroData(new DistroKey("snapshot", KeyBuilder.INSTANCE_LIST_KEY_PREFIX), allDatum);    } catch (Exception e) {        throw new DistroException(String.format("Get snapshot from %s failed.", targetServer), e);    }}/** * Get all datum from target server. * NamingProxy.getAllData * 执行HttpGet申请,并获取返回数据 * @param server target server address * @return all datum byte array * @throws Exception exception */public static byte[] getAllData(String server) throws Exception {    //参数封装    Map<String, String> params = new HashMap<>(8);    //组装URL,并执行HttpGet申请,获取后果集    RestResult<String> result = HttpClient.httpGet(            "http://" + server + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + ALL_DATA_GET_URL,            new ArrayList<>(), params);    //返回数据    if (result.ok()) {        return result.getData().getBytes();    }    throw new IOException("failed to req API: " + "http://" + server + EnvUtil.getContextPath()            + UtilsAndCommons.NACOS_NAMING_CONTEXT + ALL_DATA_GET_URL + ". code: " + result.getCode() + " msg: "            + result.getMessage());}

解决数据同步到本地代码如下:dataProcessor.processSnapshot()

/** * 数据处理并更新本地缓存 * @param data * @return * @throws Exception */private boolean processData(byte[] data) throws Exception {    if (data.length > 0) {        //从data反序列化出datumMap        Map<String, Datum<Instances>> datumMap = serializer.deserializeMap(data, Instances.class);        // 把数据存储到dataStore,也就是本地缓存dataMap        for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {            dataStore.put(entry.getKey(), entry.getValue());            //监听器不包含key,就创立一个空的service,并且绑定监听器            if (!listeners.containsKey(entry.getKey())) {                // pretty sure the service not exist:                if (switchDomain.isDefaultInstanceEphemeral()) {                    // create empty service                    //创立一个空的service                    Loggers.DISTRO.info("creating service {}", entry.getKey());                    Service service = new Service();                    String serviceName = KeyBuilder.getServiceName(entry.getKey());                    String namespaceId = KeyBuilder.getNamespace(entry.getKey());                    service.setName(serviceName);                    service.setNamespaceId(namespaceId);                    service.setGroupName(Constants.DEFAULT_GROUP);                    // now validate the service. if failed, exception will be thrown                    service.setLastModifiedMillis(System.currentTimeMillis());                    service.recalculateChecksum();                    // The Listener corresponding to the key value must not be empty                    // 与键值对应的监听器不能为空,这里的监听器类型是 ServiceManager                    RecordListener listener = listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX).peek();                    if (Objects.isNull(listener)) {                        return false;                    }                    //为空的绑定监听器                    listener.onChange(KeyBuilder.buildServiceMetaKey(namespaceId, serviceName), service);                }            }        }        //循环所有datumMap        for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {            if (!listeners.containsKey(entry.getKey())) {                // Should not happen:                Loggers.DISTRO.warn("listener of {} not found.", entry.getKey());                continue;            }            try {                //执行监听器的onChange监听办法                for (RecordListener listener : listeners.get(entry.getKey())) {                    listener.onChange(entry.getKey(), entry.getValue().value);                }            } catch (Exception e) {                Loggers.DISTRO.error("[NACOS-DISTRO] error while execute listener of key: {}", entry.getKey(), e);                continue;            }            // Update data store if listener executed successfully:            // 监听器listener执行胜利后,就更新dataStore            dataStore.put(entry.getKey(), entry.getValue());        }    }    return true;}

到此实现数据全量同步,其实全量同步最终封装的协定还是Http。

增量同步

新增数据应用异步播送同步:

1:DistroProtocol 应用 sync() 办法接管增量数据2:向其余节点公布播送工作  调用 distroTaskEngineHolder 公布提早工作  3:调用 DistroDelayTaskProcessor.process() 办法进行工作投递:将提早工作转换为异步变更工作4:执行变更工作 DistroSyncChangeTask.run() 办法:向指定节点发送音讯  调用 DistroHttpAgent.syncData() 办法发送数据  调用 NamingProxy.syncData() 办法发送数据  5:异样工作调用 handleFailedTask() 办法进行解决  调用 DistroFailedTaskHandler 解决失败工作  调用 DistroHttpCombinedKeyTaskFailedHandler 将失败工作从新投递成提早工作。
增量数据入口

咱们回到服务注册,服务注册的InstanceController.register()就是数据入口,它会调用ServiceManager.registerInstance(),执行数据同步的时候,调用addInstance(),在该办法中会执行DistroConsistencyServiceImpl.put(),该办法是增量同步的入口,会调用distroProtocol.sync()办法,代码如下:

/*** * 数据保留 * @param key   key of data, this key should be globally unique * @param value value of data * @throws NacosException */@Overridepublic void put(String key, Record value) throws NacosException {    //将数据存入到dataStore中    onPut(key, value);    //应用distroProtocol同步数据    distroProtocol.sync(        new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX),        DataOperation.CHANGE,        globalConfig.getTaskDispatchPeriod() / 2);}

sync()办法会执行工作公布,代码如下:

public void sync(DistroKey distroKey, DataOperation action, long delay) {    //向除了本人外的所有节点播送    for (Member each : memberManager.allMembersWithoutSelf()) {        DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),                each.getAddress());        DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);        //从distroTaskEngineHolder获取延时执行引擎,并将distroDelayTask工作增加进来        //执行延时工作公布        distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);        if (Loggers.DISTRO.isDebugEnabled()) {            Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress());        }    }}
增量同步操作

提早工作对象咱们能够从DistroTaskEngineHolder构造函数中得悉是DistroDelayTaskProcessor,代码如下:

/*** * 构造函数指定工作处理器 * @param distroComponentHolder */public DistroTaskEngineHolder(DistroComponentHolder distroComponentHolder) {    DistroDelayTaskProcessor defaultDelayTaskProcessor = new DistroDelayTaskProcessor(this, distroComponentHolder);    //指定工作处理器defaultDelayTaskProcessor    delayTaskExecuteEngine.setDefaultTaskProcessor(defaultDelayTaskProcessor);}

它提早执行的时候会执行process办法,该办法正是执行数据同步的中央,它会执行DistroSyncChangeTask工作,代码如下:

/*** * 工作处理过程 * @param task     task. * @return */@Overridepublic boolean process(NacosTask task) {    if (!(task instanceof DistroDelayTask)) {        return true;    }    DistroDelayTask distroDelayTask = (DistroDelayTask) task;    DistroKey distroKey = distroDelayTask.getDistroKey();    if (DataOperation.CHANGE.equals(distroDelayTask.getAction())) {        //将提早工作变更成异步工作,异步工作对象是一个线程        DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);        //将工作增加到NacosExecuteTaskExecuteEngine中,并执行        distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);        return true;    }    return false;}

DistroSyncChangeTask本质上是工作的开始,它本身是一个线程,所以会执行它的run办法,而run办法这是数据同步操作,代码如下:

/*** * 执行数据同步 */@Overridepublic void run() {    Loggers.DISTRO.info("[DISTRO-START] {}", toString());    try {        //获取本地缓存数据        String type = getDistroKey().getResourceType();        DistroData distroData = distroComponentHolder.findDataStorage(type).getDistroData(getDistroKey());        distroData.setType(DataOperation.CHANGE);        //向其余节点同步数据        boolean result = distroComponentHolder.findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer());        if (!result) {            handleFailedTask();        }        Loggers.DISTRO.info("[DISTRO-END] {} result: {}", toString(), result);    } catch (Exception e) {        Loggers.DISTRO.warn("[DISTRO] Sync data change failed.", e);        handleFailedTask();    }}

数据同步会执行调用syncData,该办法其实就是通过Http协定将数据发送到其余节点实现数据同步,代码如下:

/*** * 向其余节点同步数据 * @param data         data * @param targetServer target server * @return */@Overridepublic boolean syncData(DistroData data, String targetServer) {    if (!memberManager.hasMember(targetServer)) {        return true;    }    //获取数据字节数组    byte[] dataContent = data.getContent();    //通过Http协定同步数据    return NamingProxy.syncData(dataContent, data.getDistroKey().getTargetServer());}

最初:肯定要跟着讲师所给的源码自行走一遍!!!

本文由传智教育博学谷 - 狂野架构师教研团队公布,转载请注明出处!

如果本文对您有帮忙,欢送关注和点赞;如果您有任何倡议也可留言评论或私信,您的反对是我保持创作的能源