序
本文主要研究一下 NacosNamingService 的 subscribe 及 unsubscribe
NacosNamingService
nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/NacosNamingService.java
public class NacosNamingService implements NamingService {
private static final String DEFAULT_PORT = "8080";
private static final long DEFAULT_HEART_BEAT_INTERVAL = TimeUnit.SECONDS.toMillis(5);
/**
* Each Naming instance should have different namespace.
*/
private String namespace;
private String endpoint;
private String serverList;
private String cacheDir;
private String logName;
private HostReactor hostReactor;
private BeatReactor beatReactor;
private EventDispatcher eventDispatcher;
private NamingProxy serverProxy;
//......
@Override
public void subscribe(String serviceName, EventListener listener) throws NacosException {subscribe(serviceName, new ArrayList<String>(), listener);
}
@Override
public void subscribe(String serviceName, String groupName, EventListener listener) throws NacosException {subscribe(serviceName, groupName, new ArrayList<String>(), listener);
}
@Override
public void subscribe(String serviceName, List<String> clusters, EventListener listener) throws NacosException {subscribe(serviceName, Constants.DEFAULT_GROUP, clusters, listener);
}
@Override
public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener) throws NacosException {eventDispatcher.addListener(hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName),
StringUtils.join(clusters, ",")), StringUtils.join(clusters, ","), listener);
}
@Override
public void unsubscribe(String serviceName, EventListener listener) throws NacosException {unsubscribe(serviceName, new ArrayList<String>(), listener);
}
@Override
public void unsubscribe(String serviceName, String groupName, EventListener listener) throws NacosException {unsubscribe(serviceName, groupName, new ArrayList<String>(), listener);
}
@Override
public void unsubscribe(String serviceName, List<String> clusters, EventListener listener) throws NacosException {unsubscribe(serviceName, Constants.DEFAULT_GROUP, clusters, listener);
}
@Override
public void unsubscribe(String serviceName, String groupName, List<String> clusters, EventListener listener) throws NacosException {eventDispatcher.removeListener(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","), listener);
}
//......
}
- subscribe 方法执行 eventDispatcher.addListener;unsubscribe 方法执行 eventDispatcher.removeListener
EventDispatcher
nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/core/EventDispatcher.java
public class EventDispatcher {
private ExecutorService executor = null;
private BlockingQueue<ServiceInfo> changedServices = new LinkedBlockingQueue<ServiceInfo>();
private ConcurrentMap<String, List<EventListener>> observerMap
= new ConcurrentHashMap<String, List<EventListener>>();
public EventDispatcher() {executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {Thread thread = new Thread(r, "com.alibaba.nacos.naming.client.listener");
thread.setDaemon(true);
return thread;
}
});
executor.execute(new Notifier());
}
public void addListener(ServiceInfo serviceInfo, String clusters, EventListener listener) {NAMING_LOGGER.info("[LISTENER] adding" + serviceInfo.getName() + "with" + clusters + "to listener map");
List<EventListener> observers = Collections.synchronizedList(new ArrayList<EventListener>());
observers.add(listener);
observers = observerMap.putIfAbsent(ServiceInfo.getKey(serviceInfo.getName(), clusters), observers);
if (observers != null) {observers.add(listener);
}
serviceChanged(serviceInfo);
}
public void removeListener(String serviceName, String clusters, EventListener listener) {NAMING_LOGGER.info("[LISTENER] removing" + serviceName + "with" + clusters + "from listener map");
List<EventListener> observers = observerMap.get(ServiceInfo.getKey(serviceName, clusters));
if (observers != null) {Iterator<EventListener> iter = observers.iterator();
while (iter.hasNext()) {EventListener oldListener = iter.next();
if (oldListener.equals(listener)) {iter.remove();
}
}
if (observers.isEmpty()) {observerMap.remove(ServiceInfo.getKey(serviceName, clusters));
}
}
}
public List<ServiceInfo> getSubscribeServices() {List<ServiceInfo> serviceInfos = new ArrayList<ServiceInfo>();
for (String key : observerMap.keySet()) {serviceInfos.add(ServiceInfo.fromKey(key));
}
return serviceInfos;
}
public void serviceChanged(ServiceInfo serviceInfo) {if (serviceInfo == null) {return;}
changedServices.add(serviceInfo);
}
private class Notifier implements Runnable {
@Override
public void run() {while (true) {
ServiceInfo serviceInfo = null;
try {serviceInfo = changedServices.poll(5, TimeUnit.MINUTES);
} catch (Exception ignore) { }
if (serviceInfo == null) {continue;}
try {List<EventListener> listeners = observerMap.get(serviceInfo.getKey());
if (!CollectionUtils.isEmpty(listeners)) {for (EventListener listener : listeners) {List<Instance> hosts = Collections.unmodifiableList(serviceInfo.getHosts());
listener.onEvent(new NamingEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), hosts));
}
}
} catch (Exception e) {NAMING_LOGGER.error("[NA] notify error for service:"
+ serviceInfo.getName() + ", clusters:" + serviceInfo.getClusters(), e);
}
}
}
}
public void setExecutor(ExecutorService executor) {
ExecutorService oldExecutor = this.executor;
this.executor = executor;
oldExecutor.shutdown();}
}
- EventDispatcher 的构造器创建了 executor,并执行 Notifier;Notifier 使用一个 while true 循环不断执行 changedServices.poll(5, TimeUnit.MINUTES) 拉取 serviceInfo,拉取到的话会从 observerMap 取出对应的 EventListener 列表,然后挨个回调 listener.onEvent 方法
- addListener 方法则是往 observerMap 创建或添加 observers,然后执行 serviceChanged 方法;removeListener 则是从 observerMap 移除指定的 listener,如果指定 key 的 listener 列表为空则删除该 key
- serviceChanged 方法会往 changedServices 添加 serviceInfo;之后 Notifier 异步线程可用拉取信息执行 listener.onEvent 回调
小结
NacosNamingService 的 subscribe 方法执行 eventDispatcher.addListener;unsubscribe 方法执行 eventDispatcher.removeListener
doc
- NacosNamingService