序
本文主要研究一下 nacos client 的 PushReceiver
PushReceiver
nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/core/PushReceiver.java
public class PushReceiver implements Runnable {
private ScheduledExecutorService executorService;
private static final int UDP_MSS = 64 * 1024;
private DatagramSocket udpSocket;
private HostReactor hostReactor;
public PushReceiver(HostReactor hostReactor) {
try {
this.hostReactor = hostReactor;
udpSocket = new DatagramSocket();
executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.naming.push.receiver");
return thread;
}
});
executorService.execute(this);
} catch (Exception e) {NAMING_LOGGER.error("[NA] init udp socket failed", e);
}
}
@Override
public void run() {while (true) {
try {// byte[] is initialized with 0 full filled by default
byte[] buffer = new byte[UDP_MSS];
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
udpSocket.receive(packet);
String json = new String(IoUtils.tryDecompress(packet.getData()), "UTF-8").trim();
NAMING_LOGGER.info("received push data:" + json + "from" + packet.getAddress().toString());
PushPacket pushPacket = JSON.parseObject(json, PushPacket.class);
String ack;
if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {hostReactor.processServiceJSON(pushPacket.data);
// send ack to server
ack = "{\"type\": \"push-ack\""+", \"lastRefTime\":\"" + pushPacket.lastRefTime
+ "\", \"data\":"+"\"\"}";
} else if ("dump".equals(pushPacket.type)) {
// dump data to server
ack = "{\"type\": \"dump-ack\""+", \"lastRefTime\": \"" + pushPacket.lastRefTime
+ "\", \"data\":"+"\""
+ StringUtils.escapeJavaScript(JSON.toJSONString(hostReactor.getServiceInfoMap()))
+ "\"}";
} else {
// do nothing send ack only
ack = "{\"type\": \"unknown-ack\""+", \"lastRefTime\":\"" + pushPacket.lastRefTime
+ "\", \"data\":"+"\"\"}";
}
udpSocket.send(new DatagramPacket(ack.getBytes(Charset.forName("UTF-8")),
ack.getBytes(Charset.forName("UTF-8")).length, packet.getSocketAddress()));
} catch (Exception e) {NAMING_LOGGER.error("[NA] error while receiving push data", e);
}
}
}
public static class PushPacket {
public String type;
public long lastRefTime;
public String data;
}
public int getUDPPort() {return udpSocket.getLocalPort();
}
}
- PushReceiver 实现了 Runnable 接口,其构造器会创建 udpSocket 以及 ScheduledThreadPoolExecutor,然后往 ScheduledThreadPoolExecutor 注册自己
- 其 run 方法使用 while true 循环来执行 udpSocket.receive(packet),之后将接收到的数据解析为 PushPacket,然后根据不同 pushPacket.type 做不同处理
- 当 pushPacket.type 为 dom 或者 service 的时候会调用 hostReactor.processServiceJSON(pushPacket.data);当 pushPacket.type 为 dump 的时候会将 hostReactor.getServiceInfoMap()序列化到 ack 中,最后将 ack 返回回去
HostReactor
nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/core/HostReactor.java
public class HostReactor {
private static final long DEFAULT_DELAY = 1000L;
private static final long UPDATE_HOLD_INTERVAL = 5000L;
private final Map<String, ScheduledFuture<?>> futureMap = new HashMap<String, ScheduledFuture<?>>();
private Map<String, ServiceInfo> serviceInfoMap;
private Map<String, Object> updatingMap;
private PushReceiver pushReceiver;
private EventDispatcher eventDispatcher;
private NamingProxy serverProxy;
private FailoverReactor failoverReactor;
private String cacheDir;
private ScheduledExecutorService executor;
public HostReactor(EventDispatcher eventDispatcher, NamingProxy serverProxy, String cacheDir) {this(eventDispatcher, serverProxy, cacheDir, false, UtilAndComs.DEFAULT_POLLING_THREAD_COUNT);
}
public HostReactor(EventDispatcher eventDispatcher, NamingProxy serverProxy, String cacheDir,
boolean loadCacheAtStart, int pollingThreadCount) {executor = new ScheduledThreadPoolExecutor(pollingThreadCount, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.client.naming.updater");
return thread;
}
});
this.eventDispatcher = eventDispatcher;
this.serverProxy = serverProxy;
this.cacheDir = cacheDir;
if (loadCacheAtStart) {this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(DiskCache.read(this.cacheDir));
} else {this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(16);
}
this.updatingMap = new ConcurrentHashMap<String, Object>();
this.failoverReactor = new FailoverReactor(this, cacheDir);
this.pushReceiver = new PushReceiver(this);
}
//......
public ServiceInfo processServiceJSON(String json) {ServiceInfo serviceInfo = JSON.parseObject(json, ServiceInfo.class);
ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
if (serviceInfo.getHosts() == null || !serviceInfo.validate()) {
//empty or error push, just ignore
return oldService;
}
boolean changed = false;
if (oldService != null) {if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) {NAMING_LOGGER.warn("out of date data received, old-t:" + oldService.getLastRefTime()
+ ", new-t:" + serviceInfo.getLastRefTime());
}
serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
Map<String, Instance> oldHostMap = new HashMap<String, Instance>(oldService.getHosts().size());
for (Instance host : oldService.getHosts()) {oldHostMap.put(host.toInetAddr(), host);
}
Map<String, Instance> newHostMap = new HashMap<String, Instance>(serviceInfo.getHosts().size());
for (Instance host : serviceInfo.getHosts()) {newHostMap.put(host.toInetAddr(), host);
}
Set<Instance> modHosts = new HashSet<Instance>();
Set<Instance> newHosts = new HashSet<Instance>();
Set<Instance> remvHosts = new HashSet<Instance>();
List<Map.Entry<String, Instance>> newServiceHosts = new ArrayList<Map.Entry<String, Instance>>(newHostMap.entrySet());
for (Map.Entry<String, Instance> entry : newServiceHosts) {Instance host = entry.getValue();
String key = entry.getKey();
if (oldHostMap.containsKey(key) && !StringUtils.equals(host.toString(),
oldHostMap.get(key).toString())) {modHosts.add(host);
continue;
}
if (!oldHostMap.containsKey(key)) {newHosts.add(host);
}
}
for (Map.Entry<String, Instance> entry : oldHostMap.entrySet()) {Instance host = entry.getValue();
String key = entry.getKey();
if (newHostMap.containsKey(key)) {continue;}
if (!newHostMap.containsKey(key)) {remvHosts.add(host);
}
}
if (newHosts.size() > 0) {
changed = true;
NAMING_LOGGER.info("new ips(" + newHosts.size() + ") service:"
+ serviceInfo.getKey() + "->" + JSON.toJSONString(newHosts));
}
if (remvHosts.size() > 0) {
changed = true;
NAMING_LOGGER.info("removed ips(" + remvHosts.size() + ") service:"
+ serviceInfo.getKey() + "->" + JSON.toJSONString(remvHosts));
}
if (modHosts.size() > 0) {
changed = true;
NAMING_LOGGER.info("modified ips(" + modHosts.size() + ") service:"
+ serviceInfo.getKey() + "->" + JSON.toJSONString(modHosts));
}
serviceInfo.setJsonFromServer(json);
if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) {eventDispatcher.serviceChanged(serviceInfo);
DiskCache.write(serviceInfo, cacheDir);
}
} else {
changed = true;
NAMING_LOGGER.info("init new ips(" + serviceInfo.ipCount() + ") service:" + serviceInfo.getKey() + "->" + JSON
.toJSONString(serviceInfo.getHosts()));
serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
eventDispatcher.serviceChanged(serviceInfo);
serviceInfo.setJsonFromServer(json);
DiskCache.write(serviceInfo, cacheDir);
}
MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
if (changed) {NAMING_LOGGER.info("current ips:(" + serviceInfo.ipCount() + ") service:" + serviceInfo.getKey() +
"->" + JSON.toJSONString(serviceInfo.getHosts()));
}
return serviceInfo;
}
public Map<String, ServiceInfo> getServiceInfoMap() {return serviceInfoMap;}
//......
}
- processServiceJSON 方法会对比将接收到 serviceInfo 与本地对比,然后判断是否变更,并在需要的时候更新本地的 serviceInfo 并回调 eventDispatcher.serviceChanged(serviceInfo)以及 DiskCache.write(serviceInfo, cacheDir);HostReactor 的构造器有个 loadCacheAtStart 参数 (
默认为 false
),如果为 true 则会使用 DiskCache.read(this.cacheDir)从本地文件读取 serviceInfo 信息来初始化 serviceInfoMap
DiskCache
nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/cache/DiskCache.java
public class DiskCache {public static void write(ServiceInfo dom, String dir) {
try {makeSureCacheDirExists(dir);
File file = new File(dir, dom.getKeyEncoded());
if (!file.exists()) {// add another !file.exists() to avoid conflicted creating-new-file from multi-instances
if (!file.createNewFile() && !file.exists()) {throw new IllegalStateException("failed to create cache file");
}
}
StringBuilder keyContentBuffer = new StringBuilder("");
String json = dom.getJsonFromServer();
if (StringUtils.isEmpty(json)) {json = JSON.toJSONString(dom);
}
keyContentBuffer.append(json);
//Use the concurrent API to ensure the consistency.
ConcurrentDiskUtil.writeFileContent(file, keyContentBuffer.toString(), Charset.defaultCharset().toString());
} catch (Throwable e) {NAMING_LOGGER.error("[NA] failed to write cache for dom:" + dom.getName(), e);
}
}
public static String getLineSeparator() {return System.getProperty("line.separator");
}
public static Map<String, ServiceInfo> read(String cacheDir) {Map<String, ServiceInfo> domMap = new HashMap<String, ServiceInfo>(16);
BufferedReader reader = null;
try {File[] files = makeSureCacheDirExists(cacheDir).listFiles();
if (files == null || files.length == 0) {return domMap;}
for (File file : files) {if (!file.isFile()) {continue;}
String fileName = URLDecoder.decode(file.getName(), "UTF-8");
if (!(fileName.endsWith(Constants.SERVICE_INFO_SPLITER + "meta") || fileName.endsWith(Constants.SERVICE_INFO_SPLITER + "special-url"))) {ServiceInfo dom = new ServiceInfo(fileName);
List<Instance> ips = new ArrayList<Instance>();
dom.setHosts(ips);
ServiceInfo newFormat = null;
try {
String dataString = ConcurrentDiskUtil.getFileContent(file,
Charset.defaultCharset().toString());
reader = new BufferedReader(new StringReader(dataString));
String json;
while ((json = reader.readLine()) != null) {
try {if (!json.startsWith("{")) {continue;}
newFormat = JSON.parseObject(json, ServiceInfo.class);
if (StringUtils.isEmpty(newFormat.getName())) {ips.add(JSON.parseObject(json, Instance.class));
}
} catch (Throwable e) {NAMING_LOGGER.error("[NA] error while parsing cache file:" + json, e);
}
}
} catch (Exception e) {NAMING_LOGGER.error("[NA] failed to read cache for dom:" + file.getName(), e);
} finally {
try {if (reader != null) {reader.close();
}
} catch (Exception e) {//ignore}
}
if (newFormat != null && !StringUtils.isEmpty(newFormat.getName()) && !CollectionUtils.isEmpty(newFormat.getHosts())) {domMap.put(dom.getKey(), newFormat);
} else if (!CollectionUtils.isEmpty(dom.getHosts())) {domMap.put(dom.getKey(), dom);
}
}
}
} catch (Throwable e) {NAMING_LOGGER.error("[NA] failed to read cache file", e);
}
return domMap;
}
private static File makeSureCacheDirExists(String dir) {File cacheDir = new File(dir);
if (!cacheDir.exists() && !cacheDir.mkdirs()) {throw new IllegalStateException("failed to create cache dir:" + dir);
}
return cacheDir;
}
}
- DiskCache 的 write 方法会将 serviceInfo 写入到 dir 文件夹下面,文件名为 serviceInfo.getKeyEncoded();read 方法则是读取 dir 文件夹下面的文件然后解析为一个个 ServiceInfo 然后放到 domMap,最后返回 domMap
小结
- PushReceiver 实现了 Runnable 接口,其构造器会创建 udpSocket 以及 ScheduledThreadPoolExecutor,然后往 ScheduledThreadPoolExecutor 注册自己
- 其 run 方法使用 while true 循环来执行 udpSocket.receive(packet),之后将接收到的数据解析为 PushPacket,然后根据不同 pushPacket.type 做不同处理
- 当 pushPacket.type 为 dom 或者 service 的时候会调用 hostReactor.processServiceJSON(pushPacket.data);当 pushPacket.type 为 dump 的时候会将 hostReactor.getServiceInfoMap()序列化到 ack 中,最后将 ack 返回回去
doc
- PushReceiver