序
本文主要研究一下 nacos server 的 PushService
PushService
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/push/PushService.java
@Component
public class PushService implements ApplicationContextAware, ApplicationListener<ServiceChangeEvent> {
@Autowired
private SwitchDomain switchDomain;
private ApplicationContext applicationContext;
private static final long ACK_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(10L);
private static final int MAX_RETRY_TIMES = 1;
private static volatile ConcurrentMap<String, Receiver.AckEntry> ackMap
= new ConcurrentHashMap<String, Receiver.AckEntry>();
private static ConcurrentMap<String, ConcurrentMap<String, PushClient>> clientMap
= new ConcurrentHashMap<String, ConcurrentMap<String, PushClient>>();
private static volatile ConcurrentHashMap<String, Long> udpSendTimeMap = new ConcurrentHashMap<String, Long>();
public static volatile ConcurrentHashMap<String, Long> pushCostMap = new ConcurrentHashMap<String, Long>();
private static int totalPush = 0;
private static int failedPush = 0;
private static ConcurrentHashMap<String, Long> lastPushMillisMap = new ConcurrentHashMap<>();
private static DatagramSocket udpSocket;
private static Map<String, Future> futureMap = new ConcurrentHashMap<>();
private static ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {Thread t = new Thread(r);
t.setDaemon(true);
t.setName("com.alibaba.nacos.naming.push.retransmitter");
return t;
}
});
private static ScheduledExecutorService udpSender = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {Thread t = new Thread(r);
t.setDaemon(true);
t.setName("com.alibaba.nacos.naming.push.udpSender");
return t;
}
});
static {
try {udpSocket = new DatagramSocket();
Receiver receiver = new Receiver();
Thread inThread = new Thread(receiver);
inThread.setDaemon(true);
inThread.setName("com.alibaba.nacos.naming.push.receiver");
inThread.start();
executorService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {removeClientIfZombie();
} catch (Throwable e) {Loggers.PUSH.warn("[NACOS-PUSH] failed to remove client zombie");
}
}
}, 0, 20, TimeUnit.SECONDS);
} catch (SocketException e) {Loggers.SRV_LOG.error("[NACOS-PUSH] failed to init push service");
}
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.applicationContext = applicationContext;}
//......
public static void removeClientIfZombie() {
int size = 0;
for (Map.Entry<String, ConcurrentMap<String, PushClient>> entry : clientMap.entrySet()) {ConcurrentMap<String, PushClient> clientConcurrentMap = entry.getValue();
for (Map.Entry<String, PushClient> entry1 : clientConcurrentMap.entrySet()) {PushClient client = entry1.getValue();
if (client.zombie()) {clientConcurrentMap.remove(entry1.getKey());
}
}
size += clientConcurrentMap.size();}
if (Loggers.PUSH.isDebugEnabled()) {Loggers.PUSH.debug("[NACOS-PUSH] clientMap size: {}", size);
}
}
//......
}
- PushService 实现了 ApplicationContextAware、ApplicationListener<ServiceChangeEvent> 接口;它有两个 ScheduledExecutorService,一个用于 retransmitter,一个用于 udpSender;其 static 代码块创建了一个 deamon 线程执行 Receiver,同时注册了一个定时任务执行 removeClientIfZombie,它会遍历 clientMap,移除 zombie 的 client
Receiver
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/push/PushService.java
public static class Receiver implements Runnable {
@Override
public void run() {while (true) {byte[] buffer = new byte[1024 * 64];
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
try {udpSocket.receive(packet);
String json = new String(packet.getData(), 0, packet.getLength(), Charset.forName("UTF-8")).trim();
AckPacket ackPacket = JSON.parseObject(json, AckPacket.class);
InetSocketAddress socketAddress = (InetSocketAddress) packet.getSocketAddress();
String ip = socketAddress.getAddress().getHostAddress();
int port = socketAddress.getPort();
if (System.nanoTime() - ackPacket.lastRefTime > ACK_TIMEOUT_NANOS) {Loggers.PUSH.warn("ack takes too long from {} ack json: {}", packet.getSocketAddress(), json);
}
String ackKey = getACKKey(ip, port, ackPacket.lastRefTime);
AckEntry ackEntry = ackMap.remove(ackKey);
if (ackEntry == null) {
throw new IllegalStateException("unable to find ackEntry for key:" + ackKey
+ ", ack json:" + json);
}
long pushCost = System.currentTimeMillis() - udpSendTimeMap.get(ackKey);
Loggers.PUSH.info("received ack: {} from: {}:, cost: {} ms, unacked: {}, total push: {}",
json, ip, port, pushCost, ackMap.size(), totalPush);
pushCostMap.put(ackKey, pushCost);
udpSendTimeMap.remove(ackKey);
} catch (Throwable e) {Loggers.PUSH.error("[NACOS-PUSH] error while receiving ack data", e);
}
}
}
//......
public static class AckPacket {
public String type;
public long lastRefTime;
public String data;
}
}
- Receiver 实现了 Runnable 接口,其 run 方法使用 while true 循环来执行 udpSocket.receive,之后解析 AckPacket,从 ackMap 移除该 ackKey,更新 pushCostMap,同时从 udpSendTimeMap 移除该 ackKey
PushClient
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/push/PushService.java
public class PushClient {
private String namespaceId;
private String serviceName;
private String clusters;
private String agent;
private String tenant;
private String app;
private InetSocketAddress socketAddr;
private DataSource dataSource;
private Map<String, String[]> params;
public Map<String, String[]> getParams() {return params;}
public void setParams(Map<String, String[]> params) {this.params = params;}
public long lastRefTime = System.currentTimeMillis();
public PushClient(String namespaceId,
String serviceName,
String clusters,
String agent,
InetSocketAddress socketAddr,
DataSource dataSource,
String tenant,
String app) {
this.namespaceId = namespaceId;
this.serviceName = serviceName;
this.clusters = clusters;
this.agent = agent;
this.socketAddr = socketAddr;
this.dataSource = dataSource;
this.tenant = tenant;
this.app = app;
}
public DataSource getDataSource() {return dataSource;}
public PushClient(InetSocketAddress socketAddr) {this.socketAddr = socketAddr;}
public boolean zombie() {return System.currentTimeMillis() - lastRefTime > switchDomain.getPushCacheMillis(serviceName);
}
@Override
public String toString() {
return "serviceName:" + serviceName
+ ", clusters:" + clusters
+ ", ip:" + socketAddr.getAddress().getHostAddress()
+ ", port:" + socketAddr.getPort()
+ ", agent:" + agent;
}
public String getAgent() {return agent;}
public String getAddrStr() {return socketAddr.getAddress().getHostAddress() + ":" + socketAddr.getPort();
}
public String getIp() {return socketAddr.getAddress().getHostAddress();}
@Override
public int hashCode() {return Objects.hash(serviceName, clusters, socketAddr);
}
@Override
public boolean equals(Object obj) {if (!(obj instanceof PushClient)) {return false;}
PushClient other = (PushClient) obj;
return serviceName.equals(other.serviceName) && clusters.equals(other.clusters) && socketAddr.equals(other.socketAddr);
}
public String getClusters() {return clusters;}
public void setClusters(String clusters) {this.clusters = clusters;}
public String getNamespaceId() {return namespaceId;}
public void setNamespaceId(String namespaceId) {this.namespaceId = namespaceId;}
public String getServiceName() {return serviceName;}
public void setServiceName(String serviceName) {this.serviceName = serviceName;}
public String getTenant() {return tenant;}
public void setTenant(String tenant) {this.tenant = tenant;}
public String getApp() {return app;}
public void setApp(String app) {this.app = app;}
public InetSocketAddress getSocketAddr() {return socketAddr;}
public void refresh() {lastRefTime = System.currentTimeMillis();
}
}
- PushClient 封装了要推送的目标服务地址等信息,它提供了 zombie 方法来判断目标服务是否 zombie,它判断距离 lastRefTime 的时间差是否超过 switchDomain 指定的该 serviceName 的 PushCacheMillis(
默认为 10 秒
),超过则判定为 zombie
PushService.onApplicationEvent
@Component
public class PushService implements ApplicationContextAware, ApplicationListener<ServiceChangeEvent> {
//......
@Override
public void onApplicationEvent(ServiceChangeEvent event) {Service service = event.getService();
String serviceName = service.getName();
String namespaceId = service.getNamespaceId();
Future future = udpSender.schedule(new Runnable() {
@Override
public void run() {
try {Loggers.PUSH.info(serviceName + "is changed, add it to push queue.");
ConcurrentMap<String, PushClient> clients = clientMap.get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
if (MapUtils.isEmpty(clients)) {return;}
Map<String, Object> cache = new HashMap<>(16);
long lastRefTime = System.nanoTime();
for (PushClient client : clients.values()) {if (client.zombie()) {Loggers.PUSH.debug("client is zombie:" + client.toString());
clients.remove(client.toString());
Loggers.PUSH.debug("client is zombie:" + client.toString());
continue;
}
Receiver.AckEntry ackEntry;
Loggers.PUSH.debug("push serviceName: {} to client: {}", serviceName, client.toString());
String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());
byte[] compressData = null;
Map<String, Object> data = null;
if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);
compressData = (byte[]) (pair.getValue0());
data = (Map<String, Object>) pair.getValue1();
Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", serviceName, client.getAddrStr());
}
if (compressData != null) {ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);
} else {ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);
if (ackEntry != null) {cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data));
}
}
Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}",
client.getServiceName(), client.getAddrStr(), client.getAgent(), (ackEntry == null ? null : ackEntry.key));
udpPush(ackEntry);
}
} catch (Exception e) {Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);
} finally {futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
}
}
}, 1000, TimeUnit.MILLISECONDS);
futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);
}
//......
public void serviceChanged(Service service) {
// merge some change events to reduce the push frequency:
if (futureMap.containsKey(UtilsAndCommons.assembleFullServiceName(service.getNamespaceId(), service.getName()))) {return;}
this.applicationContext.publishEvent(new ServiceChangeEvent(this, service));
}
//......
}
- onApplicationEvent 会处理 ServiceChangeEvent,它会注册一个延时任务并将该 future 放入 futureMap;该延时任务会从 clientMap 获取指定 namespaceId, serviceName 的 clients;然后遍历 clients 判断是否是 zombie,如果是的话则移除该 client,否则创建 Receiver.AckEntry,然后执行 udpPush(ackEntry),最后从 futureMap 移除该 future;serviceChanged 方法提供给外部调用发布 ServiceChangeEvent
PushService.udpPush
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/push/PushService.java
@Component
public class PushService implements ApplicationContextAware, ApplicationListener<ServiceChangeEvent> {
//......
public static class Receiver implements Runnable {
//......
public static class AckEntry {public AckEntry(String key, DatagramPacket packet) {
this.key = key;
this.origin = packet;
}
public void increaseRetryTime() {retryTimes.incrementAndGet();
}
public int getRetryTimes() {return retryTimes.get();
}
public String key;
public DatagramPacket origin;
private AtomicInteger retryTimes = new AtomicInteger(0);
public Map<String, Object> data;
}
//......
}
private static Receiver.AckEntry udpPush(Receiver.AckEntry ackEntry) {if (ackEntry == null) {Loggers.PUSH.error("[NACOS-PUSH] ackEntry is null.");
return null;
}
if (ackEntry.getRetryTimes() > MAX_RETRY_TIMES) {Loggers.PUSH.warn("max re-push times reached, retry times {}, key: {}", ackEntry.retryTimes, ackEntry.key);
ackMap.remove(ackEntry.key);
udpSendTimeMap.remove(ackEntry.key);
failedPush += 1;
return ackEntry;
}
try {if (!ackMap.containsKey(ackEntry.key)) {totalPush++;}
ackMap.put(ackEntry.key, ackEntry);
udpSendTimeMap.put(ackEntry.key, System.currentTimeMillis());
Loggers.PUSH.info("send udp packet:" + ackEntry.key);
udpSocket.send(ackEntry.origin);
ackEntry.increaseRetryTime();
executorService.schedule(new Retransmitter(ackEntry), TimeUnit.NANOSECONDS.toMillis(ACK_TIMEOUT_NANOS),
TimeUnit.MILLISECONDS);
return ackEntry;
} catch (Exception e) {Loggers.PUSH.error("[NACOS-PUSH] failed to push data: {} to client: {}, error: {}",
ackEntry.data, ackEntry.origin.getAddress().getHostAddress(), e);
ackMap.remove(ackEntry.key);
udpSendTimeMap.remove(ackEntry.key);
failedPush += 1;
return null;
}
}
//......
}
- udpPush 方法会根据 Receiver.AckEntry 的信息进行判断,如果其重试次数大于 MAX_RETRY_TIMES 则终止 push,将其从 ackMap、udpSendTimeMap 中移除;如果可以重试则将其 ackEntry.key 放入 ackMap 及 udpSendTimeMap,然后执行 udpSocket.send(ackEntry.origin) 及 ackEntry.increaseRetryTime(),并注册 Retransmitter 的延时任务;如果出现异常则将其从 ackMap、udpSendTimeMap 移除
Retransmitter
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/push/PushService.java
public static class Retransmitter implements Runnable {
Receiver.AckEntry ackEntry;
public Retransmitter(Receiver.AckEntry ackEntry) {this.ackEntry = ackEntry;}
@Override
public void run() {if (ackMap.containsKey(ackEntry.key)) {Loggers.PUSH.info("retry to push data, key:" + ackEntry.key);
udpPush(ackEntry);
}
}
}
- Retransmitter 实现了 Runnable 方法,其 run 方法在 ackMap 包含 ackEntry.key 的条件下执行 udpPush 重试
小结
- PushService 实现了 ApplicationContextAware、ApplicationListener<ServiceChangeEvent> 接口
- 其 static 代码块创建了一个 deamon 线程执行 Receiver,同时注册了一个定时任务执行 removeClientIfZombie,它会遍历 clientMap,移除 zombie 的 client
- 其 onApplicationEvent 会处理 ServiceChangeEvent,它会注册一个延时任务并将该 future 放入 futureMap;该延时任务会从 clientMap 获取指定 namespaceId, serviceName 的 clients;然后遍历 clients 判断是否是 zombie,如果是的话则移除该 client,否则创建 Receiver.AckEntry,然后执行 udpPush(ackEntry),最后从 futureMap 移除该 future;serviceChanged 方法提供给外部调用发布 ServiceChangeEvent
doc
- PushService