咱们在Nacos - NacosNamingService初始化提过,NacosNamingService对象创立的时候,会创立一个EventDispatcher对象。EventDispatcher的构造方法如下,创立一个线程池,而后放入Notifier工作。
public EventDispatcher() { this.executor = Executors.newSingleThreadExecutor(new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, "com.alibaba.nacos.naming.client.listener"); thread.setDaemon(true); return thread; } }); this.executor.execute(new Notifier());}
Notifier的Runnable的类,所以放入线程池的时候,会执行run办法。他次要是从阻塞队列changedServices取出ServiceInfo,而后依据ServiceInfo的key取出他对应的EventListener汇合,再执行EventListener的onEvent办法。
@Overridepublic void run() { while (!closed) { ServiceInfo serviceInfo = null; try { // changedServices是LinkedBlockingQueue,从阻塞队列取值 serviceInfo = changedServices.poll(5, TimeUnit.MINUTES); } catch (Exception ignore) { } // 没取值,从新从阻塞队列取 if (serviceInfo == null) { continue; } try { // 从observerMap取到EventListener汇合 List<EventListener> listeners = observerMap.get(serviceInfo.getKey()); if (!CollectionUtils.isEmpty(listeners)) { for (EventListener listener : listeners) { // 执行onEvent办法 List<Instance> hosts = Collections.unmodifiableList(serviceInfo.getHosts()); listener.onEvent(new NamingEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), hosts)); } } } catch (Exception e) { NAMING_LOGGER.error("[NA] notify error for service: " + serviceInfo.getName() + ", clusters: " + serviceInfo.getClusters(), e); } }}
在Nacos - 启动中,提到NacosWatch实例化的时候,就会调用namingService.subscribe,他会调用EventDispatcher#addListener办法,在这里会把监听放入observerMap的map里,而后调用serviceChanged办法。
public void addListener(ServiceInfo serviceInfo, String clusters, EventListener listener) { NAMING_LOGGER.info("[LISTENER] adding " + serviceInfo.getName() + " with " + clusters + " to listener map"); List<EventListener> observers = Collections.synchronizedList(new ArrayList<EventListener>()); observers.add(listener); observers = observerMap.putIfAbsent(ServiceInfo.getKey(serviceInfo.getName(), clusters), observers); if (observers != null) { observers.add(listener); } serviceChanged(serviceInfo);}
当serviceChanged被调用的时候,就会往阻塞队列存入ServiceInfo。下面曾经晓得了有个循环工作始终从阻塞队列changedServices取值,这个值就是这么来的。
public void serviceChanged(ServiceInfo serviceInfo) { if (serviceInfo == null) { return; } changedServices.add(serviceInfo);}
总结
这个类有两个比拟重要的成员,一个是observerMap,他的key是serviceInfo.getKey(),value是EventListener汇合。一个是changedServices,寄存serviceInfo的LinkedBlockingQueue阻塞队列。这两个成员的关联关系通过serviceInfo.getKey()维持。
当调用addListener的时候,就会把serviceInfo存入到changedServices,以及serviceInfo.getKey()和EventListener汇合存入到observerMap。
while(true)中,因为阻塞队列changedServices有值,就会从中拿到serviceInfo,再通过serviceInfo.getKey()拿到observerMap对应的EventListener汇合,而后执行EventListener汇合的EventListener.onEvent办法。