乐趣区

关于nacos:Nacos-HostReactor的创建

Nacos – NacosNamingService 初始化中提到 NacosNamingService 初始化会初始化 EventDispatcher、NamingProxy、BeatReactor、HostReactor。其中 EventDispatcher 曾经说了,NamingProxy 的定时工作次要是默认每 30 毫秒更新服务器地址、默认每 5 毫秒登陆获取 token 等信息,这里过了。BeatReactor 初始化的时候并没有开启定时工作,前面来说,那只剩下 HostReactor 了。
咱们看看他的构造函数,会创立一个 FailoverReactor 和 PushReceiver 对象。

public HostReactor(EventDispatcher eventDispatcher, NamingProxy serverProxy, BeatReactor beatReactor,
        String cacheDir, boolean loadCacheAtStart, int pollingThreadCount) {
    // 其余略
    this.failoverReactor = new FailoverReactor(this, cacheDir);
    this.pushReceiver = new PushReceiver(this);
}

FailoverReactor

FailoverReactor 的构造函数,会调用他的 init 办法:

public FailoverReactor(HostReactor hostReactor, String cacheDir) {
    // 其余略
    this.init();}

在 init 里会有三个工作:

  1. FailoverReactor.SwitchRefresher, 默认每 5 秒检测是否开启故障转移,如果开启,则把文件数据读入 serviceMap。
  2. FailoverReactor.DiskFileWriter, 默认每天把服务信息写入本地。
  3. 创立 10 秒后调用 DiskFileWriter#run,检测本地缓存文件,如果没有则创立缓存文件。
public void init() {executorService.scheduleWithFixedDelay(new SwitchRefresher(), 0L, 5000L, TimeUnit.MILLISECONDS);
    
    executorService.scheduleWithFixedDelay(new DiskFileWriter(), 30, DAY_PERIOD_MINUTES, TimeUnit.MINUTES);
    
    // backup file on startup if failover directory is empty.
    executorService.schedule(new Runnable() {// 其余略}, 10000L, TimeUnit.MILLISECONDS);
}

FailoverReactor.SwitchRefresher, 默认每 5 秒检测是否开启故障转移,如果开启,则把文件数据读入 serviceMap。

class SwitchRefresher implements Runnable {       
    long lastModifiedMillis = 0L;    
    @Override
    public void run() {
        try {
            // 其余略    
            switchParams.put("failover-mode", "true");
            NAMING_LOGGER.info("failover-mode is on");
            // 故障转移的时候调用 FailoverFileReader#run
            new FailoverFileReader().run();
        } catch (Throwable e) {NAMING_LOGGER.error("[NA] failed to read failover switch.", e);
        }
    }
}

class FailoverFileReader implements Runnable {
    
    @Override
    public void run() {Map<String, ServiceInfo> domMap = new HashMap<String, ServiceInfo>(16);
        
        BufferedReader reader = null;
        try {
            // 其余略
            // 读取文件信息,赋值给 dom,存入 domMap
            for (File file : files) {
                // 其余略
                ServiceInfo dom = new ServiceInfo(file.getName());                
                // 其余略
                dom = JacksonUtils.toObj(json, ServiceInfo.class);            
                if (!CollectionUtils.isEmpty(dom.getHosts())) {domMap.put(dom.getKey(), dom);
                }
            }
        } catch (Exception e) {NAMING_LOGGER.error("[NA] failed to read cache file", e);
        }
        // domMap 的值赋值给 serviceMap
        if (domMap.size() > 0) {serviceMap = domMap;}
    }
}

PushReceiver

PushReceiver 实现了 Runnable 接口,在构造函数中把本人放入了线程池。

public PushReceiver(HostReactor hostReactor) {
    try {
        // 其余略
        this.executorService.execute(this);
    } catch (Exception e) {NAMING_LOGGER.error("[NA] init udp socket failed", e);
    }
}

在 run 中,通过 while 始终监听 UDP 数据,并依据不同的 type 进行解决数据,解决后响应申请。

@Override
public void run() {while (!closed) {
        try {// byte[] is initialized with 0 full filled by default
            byte[] buffer = new byte[UDP_MSS];
            DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
            // 获取 UDP 数据
            udpSocket.receive(packet);
            
            String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim();
            NAMING_LOGGER.info("received push data:" + json + "from" + packet.getAddress().toString());
            
            PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class);
            String ack;
            // 依据不同的 type 进行解决数据
            if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {hostReactor.processServiceJson(pushPacket.data);
                
                // send ack to server
                ack = "{\"type\": \"push-ack\""+", \"lastRefTime\":\""+ pushPacket.lastRefTime +"\", \"data\":"
                        + "\"\"}";
            } else if ("dump".equals(pushPacket.type)) {
                // dump data to server
                ack = "{\"type\": \"dump-ack\""+", \"lastRefTime\": \""+ pushPacket.lastRefTime +"\", \"data\":"
                        + "\"" + StringUtils.escapeJavaScript(JacksonUtils.toJson(hostReactor.getServiceInfoMap()))
                        + "\"}";
            } else {
                // do nothing send ack only
                ack = "{\"type\": \"unknown-ack\""+", \"lastRefTime\":\"" + pushPacket.lastRefTime
                        + "\", \"data\":"+"\"\"}";
            }
            // 响应申请
            udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length,
                    packet.getSocketAddress()));
        } catch (Exception e) {NAMING_LOGGER.error("[NA] error while receiving push data", e);
        }
    }
}

总结

HostReactor 的创立工作包含每 5 秒检测是否开启故障转移,如果开启,则把文件数据读入 serviceMap、每天把服务信息写入本地、检测本地缓存文件,如果没有则创立缓存文件、监听 UDP 申请。

退出移动版