关于nacos:Nacos-事件的注册取消与监听EventDispatcher

47次阅读

共计 2544 个字符,预计需要花费 7 分钟才能阅读完成。

咱们在 Nacos – NacosNamingService 初始化提过,NacosNamingService 对象创立的时候,会创立一个 EventDispatcher 对象。EventDispatcher 的构造方法如下,创立一个线程池,而后放入 Notifier 工作。

public EventDispatcher() {this.executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {Thread thread = new Thread(r, "com.alibaba.nacos.naming.client.listener");
            thread.setDaemon(true);
            
            return thread;
        }
    });
    this.executor.execute(new Notifier());
}

Notifier 的 Runnable 的类,所以放入线程池的时候,会执行 run 办法。他次要是从阻塞队列 changedServices 取出 ServiceInfo,而后依据 ServiceInfo 的 key 取出他对应的 EventListener 汇合,再执行 EventListener 的 onEvent 办法。

@Override
public void run() {while (!closed) {
        ServiceInfo serviceInfo = null;
        try {
            // changedServices 是 LinkedBlockingQueue,从阻塞队列取值
            serviceInfo = changedServices.poll(5, TimeUnit.MINUTES);
        } catch (Exception ignore) { }
        // 没取值,从新从阻塞队列取
        if (serviceInfo == null) {continue;}
        
        try {
            // 从 observerMap 取到 EventListener 汇合
            List<EventListener> listeners = observerMap.get(serviceInfo.getKey());
            
            if (!CollectionUtils.isEmpty(listeners)) {for (EventListener listener : listeners) {
                    // 执行 onEvent 办法
                    List<Instance> hosts = Collections.unmodifiableList(serviceInfo.getHosts());
                    listener.onEvent(new NamingEvent(serviceInfo.getName(), serviceInfo.getGroupName(),
                            serviceInfo.getClusters(), hosts));
                }
            }
            
        } catch (Exception e) {NAMING_LOGGER.error("[NA] notify error for service:" + serviceInfo.getName() + ", clusters:"
                    + serviceInfo.getClusters(), e);
        }
    }
}

在 Nacos – 启动中,提到 NacosWatch 实例化的时候,就会调用 namingService.subscribe,他会调用 EventDispatcher#addListener 办法,在这里会把监听放入 observerMap 的 map 里,而后调用 serviceChanged 办法。

public void addListener(ServiceInfo serviceInfo, String clusters, EventListener listener) {NAMING_LOGGER.info("[LISTENER] adding" + serviceInfo.getName() + "with" + clusters + "to listener map");
    List<EventListener> observers = Collections.synchronizedList(new ArrayList<EventListener>());
    observers.add(listener);
    
    observers = observerMap.putIfAbsent(ServiceInfo.getKey(serviceInfo.getName(), clusters), observers);
    if (observers != null) {observers.add(listener);
    }
    
    serviceChanged(serviceInfo);
}

当 serviceChanged 被调用的时候,就会往阻塞队列存入 ServiceInfo。下面曾经晓得了有个循环工作始终从阻塞队列 changedServices 取值,这个值就是这么来的。

public void serviceChanged(ServiceInfo serviceInfo) {if (serviceInfo == null) {return;}
    
    changedServices.add(serviceInfo);
}

总结

这个类有两个比拟重要的成员,一个是 observerMap,他的 key 是 serviceInfo.getKey(),value 是 EventListener 汇合。一个是 changedServices,寄存 serviceInfo 的 LinkedBlockingQueue 阻塞队列。这两个成员的关联关系通过 serviceInfo.getKey() 维持。
当调用 addListener 的时候,就会把 serviceInfo 存入到 changedServices,以及 serviceInfo.getKey() 和 EventListener 汇合存入到 observerMap。
while(true) 中,因为阻塞队列 changedServices 有值,就会从中拿到 serviceInfo,再通过 serviceInfo.getKey() 拿到 observerMap 对应的 EventListener 汇合,而后执行 EventListener 汇合的 EventListener.onEvent 办法。

正文完
 0