Nacos - NacosNamingService初始化中提到NacosNamingService初始化会初始化EventDispatcher、NamingProxy、BeatReactor、HostReactor。其中EventDispatcher曾经说了,NamingProxy的定时工作次要是默认每30毫秒更新服务器地址、默认每5毫秒登陆获取token等信息,这里过了。BeatReactor初始化的时候并没有开启定时工作,前面来说,那只剩下HostReactor了。
咱们看看他的构造函数,会创立一个FailoverReactor和PushReceiver对象。

public HostReactor(EventDispatcher eventDispatcher, NamingProxy serverProxy, BeatReactor beatReactor,        String cacheDir, boolean loadCacheAtStart, int pollingThreadCount) {    // 其余略    this.failoverReactor = new FailoverReactor(this, cacheDir);    this.pushReceiver = new PushReceiver(this);}

FailoverReactor

FailoverReactor的构造函数,会调用他的init办法:

public FailoverReactor(HostReactor hostReactor, String cacheDir) {    //其余略    this.init();}

在init里会有三个工作:

  1. FailoverReactor.SwitchRefresher,默认每5秒检测是否开启故障转移,如果开启,则把文件数据读入serviceMap。
  2. FailoverReactor.DiskFileWriter,默认每天把服务信息写入本地。
  3. 创立10秒后调用DiskFileWriter#run,检测本地缓存文件,如果没有则创立缓存文件。
public void init() {            executorService.scheduleWithFixedDelay(new SwitchRefresher(), 0L, 5000L, TimeUnit.MILLISECONDS);        executorService.scheduleWithFixedDelay(new DiskFileWriter(), 30, DAY_PERIOD_MINUTES, TimeUnit.MINUTES);        // backup file on startup if failover directory is empty.    executorService.schedule(new Runnable() {        //其余略    }, 10000L, TimeUnit.MILLISECONDS);}

FailoverReactor.SwitchRefresher,默认每5秒检测是否开启故障转移,如果开启,则把文件数据读入serviceMap。

class SwitchRefresher implements Runnable {           long lastModifiedMillis = 0L;        @Override    public void run() {        try {            // 其余略                switchParams.put("failover-mode", "true");            NAMING_LOGGER.info("failover-mode is on");            // 故障转移的时候调用FailoverFileReader#run            new FailoverFileReader().run();        } catch (Throwable e) {            NAMING_LOGGER.error("[NA] failed to read failover switch.", e);        }    }}class FailoverFileReader implements Runnable {        @Override    public void run() {        Map<String, ServiceInfo> domMap = new HashMap<String, ServiceInfo>(16);                BufferedReader reader = null;        try {            // 其余略            // 读取文件信息,赋值给dom,存入domMap            for (File file : files) {                // 其余略                ServiceInfo dom = new ServiceInfo(file.getName());                                // 其余略                dom = JacksonUtils.toObj(json, ServiceInfo.class);                            if (!CollectionUtils.isEmpty(dom.getHosts())) {                    domMap.put(dom.getKey(), dom);                }            }        } catch (Exception e) {            NAMING_LOGGER.error("[NA] failed to read cache file", e);        }        // domMap的值赋值给serviceMap        if (domMap.size() > 0) {            serviceMap = domMap;        }    }}

PushReceiver

PushReceiver实现了Runnable接口,在构造函数中把本人放入了线程池。

public PushReceiver(HostReactor hostReactor) {    try {        //其余略        this.executorService.execute(this);    } catch (Exception e) {        NAMING_LOGGER.error("[NA] init udp socket failed", e);    }}

在run中,通过while始终监听UDP数据,并依据不同的type进行解决数据,解决后响应申请。

@Overridepublic void run() {    while (!closed) {        try {                        // byte[] is initialized with 0 full filled by default            byte[] buffer = new byte[UDP_MSS];            DatagramPacket packet = new DatagramPacket(buffer, buffer.length);            // 获取UDP数据            udpSocket.receive(packet);                        String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim();            NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());                        PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class);            String ack;            // 依据不同的type进行解决数据            if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {                hostReactor.processServiceJson(pushPacket.data);                                // send ack to server                ack = "{\"type\": \"push-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":"                        + "\"\"}";            } else if ("dump".equals(pushPacket.type)) {                // dump data to server                ack = "{\"type\": \"dump-ack\"" + ", \"lastRefTime\": \"" + pushPacket.lastRefTime + "\", \"data\":"                        + "\"" + StringUtils.escapeJavaScript(JacksonUtils.toJson(hostReactor.getServiceInfoMap()))                        + "\"}";            } else {                // do nothing send ack only                ack = "{\"type\": \"unknown-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime                        + "\", \"data\":" + "\"\"}";            }            // 响应申请            udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length,                    packet.getSocketAddress()));        } catch (Exception e) {            NAMING_LOGGER.error("[NA] error while receiving push data", e);        }    }}

总结

HostReactor的创立工作包含每5秒检测是否开启故障转移,如果开启,则把文件数据读入serviceMap、每天把服务信息写入本地、检测本地缓存文件,如果没有则创立缓存文件、监听UDP申请。