关于zookeeper:ZookeeperApach-Curator-框架源码分析初始化过程一Ver-430

7次阅读

共计 31399 个字符,预计需要花费 79 分钟才能阅读完成。

介绍

Curator 是 netflix 公司开源的一套 zookeeper 客户端,目前是 Apache 的顶级我的项目。和 ZK 的原生客户端相比,Curator 的抽象层次要更高,同时简化了 ZK 的罕用性能开发量,比方 Curator 自带连贯重试、重复注册 Watcher、NodeExistsException 异样解决等等。

依据官网的介绍,咱们能够理解到它是一个用于分布式的 Java 客户端 API 工具。它基于high-level API,领有它能够更简略易懂的指挥 vZookeeper 实现分布式平安利用程序开发。

Curator 由一系列的模块形成,对于个别开发者而言,罕用的是 curator-frameworkcurator-recipes,上面对此顺次介绍。

Curator 当然也包含许多扩大,比方 服务发现 Java 8 异步 DSL

Apache Curator is a Java/JVM client library for [Apache ZooKeeper](https://zookeeper.apache.org/), a distributed coordination service.

Apache Curator includes a high-level API framework and utilities to make using Apache ZooKeeper much easier and more reliable. It also includes recipes for common use cases and extensions such as service discovery and a Java 8 asynchronous DSL.

用官网的介绍来说就是:guava 之于 java 就像 curator 之于 zookeeper

ZK 版本反对

Curator 目前最新的版本为 5.X 的版本,曾经不反对 ZK 的 3.4.X 以及之前的版本,这里通过思考最终抉择了 ZK 的 3.5.10 版本。

5.X 对于 Curator 做了不少破坏性的改变,不兼容的起因如下:

  • 旧的 ListenerContainer 类曾经被移除,以防止 Guava 类透露。
  • ConnectionHandlingPolicy 和相干类已被删除
  • Reaper 和 ChildReaper 类 / 食谱已被删除。您应该改用 ZooKeeper 容器节点。
  • newPersistentEphemeralNode()和 newPathChildrenCache()已从 GroupMember 中移除。
  • ServiceCacheBuilder< T> executorService(CloseableExecutorService executorService)已从 ServiceCacheBuilder 中移除。
  • ServiceProviderBuilder< T> executorService(CloseableExecutorService executorService)已从 ServiceProviderBuilder 中移除。
  • static boolean shouldRetry(int rc)已从 RetryLoop 中移除。
  • static boolean isRetryException(Throwable exception)已从 RetryLoop 中移除。

官网地址

Apache Curator –

下载地址

Curator Maven 相干地址:https://mvnrepository.com/artifact/org.apache.curator

Curator jar 包下载地址:https://cwiki.apache.org/confluence/display/CURATOR/Releases

疾速开始

ZK 集群部署

学习之前须要应用 ZK 搭建集群环境,不便 Debug 的时候调试代码。这部分搭建过程放到另一篇:

[[【Zookeeper】基于 3 台 linux 虚拟机搭建 zookeeper 集群]](https://segmentfault.com/a/1190000043925962)

Maven 依赖引入

上面是对应的 Zookeeper 和 Curator 的版本抉择。

<curator.version>4.3.0</curator.version>  
<zookeeper.version>3.5.10</zookeeper.version>
<dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-framework</artifactId>
        <version>${curator.version}</version>
        <exclusions>
            <exclusion>
                <groupId>org.apache.zookeeper</groupId>
                <artifactId>zookeeper</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-recipes</artifactId>
        <version>${curator.version}</version>
        <exclusions>
            <exclusion>
                <groupId>org.apache.zookeeper</groupId>
                <artifactId>zookeeper</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>${zookeeper.version}</version>
    </dependency>

构建入门实例

Curator 最为外围和弱小并且罕用性能是分布式锁。在入门 demo 中能够看到整个 Curator 依附 CuratorFrameworkFactory 构建,应用 Curator 进行分布式加锁解锁操作,只须要为所连贯的每个 ZooKeeper 集群提供一个 CuratorFramework 对象。

CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy)

下面的办法将会应用默认值创立与 ZooKeeper 集群的连贯,惟一须要关注的是重试策略。

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3)
CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
client.start();

依据参数值能够大抵理解到,这里应用的策略是 指数的形式递增距离尝试重试工夫,并且最终重试三次

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);  
CuratorFramework client =  
        CuratorFrameworkFactory.newClient("192.168.0.1;192.168.0.2;192.168.0.3", retryPolicy);  
client.start();  
// 此处就获取到 zk 的一个连贯实例。//.....

领有了 CuratorFramework 实例之后,就能够间接通过 API 调用操作 ZK。上面咱们看一下重点以及应用最多的分布式锁的操作局部:

client.create().forPath("/my/path", myData)

这样的间接调用还有个益处是对于 ZK 的操作 client 实例如果碰到网络抖动等状况会主动重试。

可重入锁(偏心锁)案例代码

上面是官网可重入锁的 Demo 应用代码。

InterProcessMutex lock = new InterProcessMutex(client, lockPath);
if (lock.acquire(maxWait, waitUnit) ) 
{
    try 
    {// do some work inside of the critical section here}
    finally
    {lock.release();
    }
}

这里革新一下即可简略应用。

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);  
CuratorFramework client =  
        CuratorFrameworkFactory.newClient("192.168.0.1,192.168.0.2,192.168.0.3", retryPolicy);  
client.start();  
// 此处就获取到 zk 的一个连贯实例。//.....  
client.create().forPath("/my/path", "Test".getBytes());  
InterProcessMutex lock = new InterProcessMutex(client, "/test/myLock");  
lock.acquire();  
try {  
    // do some work inside of the critical section here  
    Thread.sleep(3000);  
} finally {lock.release();  
}

初始化过程流程图

初始化过程流程图全图如下。上面将会一步步拆解这幅图是如何拼凑的。

drawio 源文件和图片地址如下:
链接:https://pan.baidu.com/s/18PoMjkp11LztmNB3XgZ0qw?pwd=4bug

提取码:4bug 

初始化源码剖析

直奔源码剖析局部,本文次要介绍和 Curator 初始化、外部的 告诉机制 以及 会话治理 局部。

CuratorFramework 初始化过程

初始化过程流程图

CuratorFramework 初始化过程上面截图这一部分,红色局部为集体认为绝对比拟重要的对象和变量。

CuratorFrameworkFactory.newClient() 代码剖析

Curator 当中默认应用偏心锁的策略去获取锁,多个客户端会依照排队的程序挨个获取锁,上面咱们通过代码进行验证。

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);  
CuratorFramework client =  
        CuratorFrameworkFactory.newClient("192.168.19.100:2181,192.168.19.101:2181,192.168.19.102:2181", retryPolicy);

在获取分布式锁之前咱们须要先连贯 ZK 集群,整个过程通过两行代码实现,首先须要确定连贯 ZK 的重试策略,接着通过 CuratorFrameworkFactory 构建 Curator 实例即可,Curator 外部依据 ZK 原生客户端做了一层封装,开发者应用过程中不须要关注。

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);  
CuratorFramework client =  CuratorFrameworkFactory.newClient("192.168.0.1,192.168.0.2,192.168.0.3", retryPolicy);

下面是简略的模板代码。ExponentialBackoffRetry 构建重试策略为依照指数增长重试工夫,比方第一次 1 秒,第二次 2 秒,第三次 4 秒,第四次 8 秒 …..

接着是利用 CuratorFrameworkFactory 构建实例。

return newClient(connectString, DEFAULT_SESSION_TIMEOUT_MS, DEFAULT_CONNECTION_TIMEOUT_MS, retryPolicy);

这里强调一下两个常量 DEFAULT_SESSION_TIMEOUT_MS(默认的会话超时工夫)、DEFAULT_CONNECTION_TIMEOUT_MS(默认的连贯超时工夫),作用是传入指定的重试策略默认参数。

private static final int DEFAULT_SESSION_TIMEOUT_MS
    = Integer.getInteger("curator-default-session-timeout", 60 * 1000)
private static final int DEFAULT_CONNECTION_TIMEOUT_MS = Integer.getInteger("curator-default-connection-timeout", 15 * 1000);

咱们进一步进入构造方法,这里用了建造者模式。

return builder().  
    connectString(connectString).  
    sessionTimeoutMs(sessionTimeoutMs).  
    connectionTimeoutMs(connectionTimeoutMs).  
    retryPolicy(retryPolicy).  
    build();

实际上调用的是 CuratorFrameworkImpl 实例。这里把 CuratorFrameworkFactorythis援用逸出给 CuratorFrameworkImpl 对象。

return new CuratorFrameworkImpl(this);

CuratorFrameworkImpl 构造方法的内容比拟多,这里次要说一下 CuratorZookeeperClient 这个对象,相当于 ZK 原生客户端的封装对象。

其余组件内容和 Curator 的各种告诉治理和会话治理等等性能无关。

  
public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder)  
{ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(builder.getZookeeperFactory());  
    this.client = new CuratorZookeeperClient  
        (  
            localZookeeperFactory,  
            builder.getEnsembleProvider(),  
            builder.getSessionTimeoutMs(),  
            builder.getConnectionTimeoutMs(),  
            builder.getWaitForShutdownTimeoutMs(),  
            new Watcher()  
            {  
                @Override  
                public void process(WatchedEvent watchedEvent)  
                {CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null);  
                    processEvent(event);  
                }  
            },  
            builder.getRetryPolicy(),  
            builder.canBeReadOnly(),  
            builder.getConnectionHandlingPolicy());  
  // 用于判断连贯断开和连贯超时的状态,设置 curator 的连贯状态,并通过 connectionStateManager 触发连贯事件状态告诉
    internalConnectionHandler = new StandardInternalConnectionHandler();
     
    // 接管事件的告诉。后盾线程操作事件和连贯状态事件会触发 
    listeners = new ListenerContainer<CuratorListener>();  
    
    // 当后盾线程产生异样或者 handler 产生异样的时候会触发
    unhandledErrorListeners = new ListenerContainer<UnhandledErrorListener>();  
    // 后盾线程执行的操作队列
    backgroundOperations = new DelayQueue<OperationAndData<?>>();  
    forcedSleepOperations = new LinkedBlockingQueue<>();  
    // 命名空间
    namespace = new NamespaceImpl(this, builder.getNamespace());  

// 线程工厂办法,初始化后盾线程池时会应用
    threadFactory = getThreadFactory(builder);  

maxCloseWaitMs = builder.getMaxCloseWaitMs();  

// 负责连贯状态变动时的告诉
    connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory(), builder.getSessionTimeoutMs(), builder.getConnectionHandlingPolicy().getSimulatedSessionExpirationPercent(), builder.getConnectionStateListenerDecorator());  
    compressionProvider = builder.getCompressionProvider();  
    aclProvider = builder.getAclProvider();  
    
    //CuratorFrameworkImpl 的状态,调用 start 办法之前为 LATENT,调用 start 办法之后为 STARTED , 调用 close()办法之后为 STOPPEDstate = new AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT);  
    useContainerParentsIfAvailable = builder.useContainerParentsIfAvailable(); 
    // 谬误连贯策略 
    connectionStateErrorPolicy = Preconditions.checkNotNull(builder.getConnectionStateErrorPolicy(), "errorPolicy cannot be null");  
    schemaSet = Preconditions.checkNotNull(builder.getSchemaSet(), "schemaSet cannot be null");  
    zk34CompatibilityMode = builder.isZk34CompatibilityMode();  
  
    byte[] builderDefaultData = builder.getDefaultData();  
    defaultData = (builderDefaultData != null) ? Arrays.copyOf(builderDefaultData, builderDefaultData.length) : new byte[0];  
    authInfos = buildAuths(builder);  
    
  // 有保障的执行删除操作,其实是一直尝试直到删除胜利,通过递归调用实现
    failedDeleteManager = new FailedDeleteManager(this);  
    
    // 有保障的执行删除 watch 操作
    failedRemoveWatcherManager = new FailedRemoveWatchManager(this);  
    
    namespaceFacadeCache = new NamespaceFacadeCache(this);  
    
  // 服务端可用节点的检测器,第一次连贯和重连胜利之后都会触发从新获取服务端列表
    ensembleTracker = zk34CompatibilityMode ? null : new EnsembleTracker(this, builder.getEnsembleProvider());  
  
    runSafeService = makeRunSafeService(builder);

newClient的目标是构建 ZK 连贯实例,包含一系列附加外围组件:后盾操作、连贯事件、异样监控、容器,命名空间、负载平衡等等。

CuratorZookeeperClient 初始化过程

CuratorZookeeperClient 初始化过程流程图

CuratorZookeeperClient 初始化过程图如下:

CuratorZookeeperClient 初始化代码剖析

下面提到,CuratorFrameworkImpl 的初始化过程中有一段比拟重要的 CuratorZookeeperClient 客户端初始化过程,上面就来看看这个 CuratorZookeeperClient 初始化过程干了啥。

public CuratorZookeeperClient(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider,
            int sessionTimeoutMs, int connectionTimeoutMs, int waitForShutdownTimeoutMs, Watcher watcher,
            RetryPolicy retryPolicy, boolean canBeReadOnly, ConnectionHandlingPolicy connectionHandlingPolicy)
    {

        // StandardConnectionHandler 当收到 Disconnect 事件后,如果在规定工夫内没有重连到服务器,则会被动触发 Expired 事件
        this.connectionHandlingPolicy = connectionHandlingPolicy;
        if (sessionTimeoutMs < connectionTimeoutMs)
        {log.warn(String.format("session timeout [%d] is less than connection timeout [%d]", sessionTimeoutMs, connectionTimeoutMs));
        }
        // 重连策略
        retryPolicy = Preconditions.checkNotNull(retryPolicy, "retryPolicy cannot be null");
        ensembleProvider = Preconditions.checkNotNull(ensembleProvider, "ensembleProvider cannot be null");

        this.connectionTimeoutMs = connectionTimeoutMs;
        this.waitForShutdownTimeoutMs = waitForShutdownTimeoutMs;
        // //curator 注册到原生客户端上的 defaultWatcher, 会收到和连贯状态无关的事件告诉等,负责超时重连
        state = new ConnectionState(zookeeperFactory, ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, tracer, canBeReadOnly, connectionHandlingPolicy);

        //  重试策略设置
        setRetryPolicy(retryPolicy);
    }

ConnectionState是 curator 注册到原生客户端上的defaultWatcher,它会收到和连贯状态无关的事件告诉等,负责超时重连操作等。

上面看下 ConnectionState 的构造方法。

ConnectionState(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher parentWatcher, AtomicReference<TracerDriver> tracer, boolean canBeReadOnly, ConnectionHandlingPolicy connectionHandlingPolicy)  
{  
    this.ensembleProvider = ensembleProvider;  
    this.sessionTimeoutMs = sessionTimeoutMs;  
    this.connectionTimeoutMs = connectionTimeoutMs;  
    this.tracer = tracer;  
    this.connectionHandlingPolicy = connectionHandlingPolicy;  
    if (parentWatcher != null)  
    {  
        // 因为 defaultWatcher 只能有一个,通过 parentWatchers 可实现 defaultWatcher 接到事件告诉时 parentWatchers 的回调
        parentWatchers.offer(parentWatcher);  
    }  
  
    handleHolder = new HandleHolder(zookeeperFactory, this, ensembleProvider, sessionTimeoutMs, canBeReadOnly);  
}

parentWatchers 应用了并发平安队列 ConcurrentLinkedQueue,这个队列的作用能够如下:

ConcurrentLinkedQueue:一个基于链接节点的 无界限程平安队列 。此队列依照 FIFO( 先进先出 )准则对元素进行排序。队列的 头部 是队列中 工夫最长的元素 。队列的尾部 是队列中工夫最短的元素。 新的元素插入到队列的尾部,队列获取操作从队列头部取得元素。当多个线程共享拜访一个公共 collection 时,ConcurrentLinkedQueue 是一个失当的抉择。此队列不容许应用 null 元素。

private final Queue<Watcher> parentWatchers = new ConcurrentLinkedQueue<Watcher>();

ConnectionStateManager 初始化过程

ConnectionStateManager 初始化过程流程图

ConnectionStateManager 次要是持有 Client 援用,通过连贯状态治理工程创立构建监听器,以及构建只容许一个线程执行的线程池。

Curator 的设计记录是一个客户端永远只有一个线程负责工作。

ConnectionStateManager 初始化代码剖析

在 Curator 框架初始化代码中蕴含了 ConnectionStateManager 初始化,它次要负责状态保护和连贯状态变更告诉。

// 负责连贯状态变动时的告诉
connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory(), builder.getSessionTimeoutMs(), builder.getConnectionHandlingPolicy().getSimulatedSessionExpirationPercent(), builder.getConnectionStateListenerManagerFactory());

从初始化代码能够看到,如果要监听状态扭转,须要注册一个监听器。相干的注册形式在“告诉治理”局部进行介绍,这里咱们来看下相干的成员变量以及初始化办法。

// 连贯状态事件告诉队列
private final BlockingQueue<ConnectionState> eventQueue = new ArrayBlockingQueue<ConnectionState>(QUEUE_SIZE);

// 须要告诉的 listeners 
private final UnaryListenerManager<ConnectionStateListener> listeners;

//ConnectionStateManager 的运行状态 
private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);

org.apache.curator.framework.state.ConnectionStateManager#ConnectionStateManager(org.apache.curator.framework.CuratorFramework, java.util.concurrent.ThreadFactory, int, int, org.apache.curator.framework.state.ConnectionStateListenerManagerFactory)

/**
Params:
client – the client 

threadFactory – thread factory to use or null for a default 

sessionTimeoutMs – the ZK session timeout in milliseconds 

sessionExpirationPercent – percentage of negotiated session timeout to use when simulating a session timeout. 0 means don't simulate at all 

managerFactory – manager factory to use
*/
public ConnectionStateManager(CuratorFramework client, ThreadFactory threadFactory, int sessionTimeoutMs, int sessionExpirationPercent, ConnectionStateListenerManagerFactory managerFactory)  
{  
    this.client = client;  
    this.sessionTimeoutMs = sessionTimeoutMs;  
    this.sessionExpirationPercent = sessionExpirationPercent;  
    if (threadFactory == null)  
    {threadFactory = ThreadUtils.newThreadFactory("ConnectionStateManager");  
    }  
    // 事件队列解决线程池
    service = Executors.newSingleThreadExecutor(threadFactory);  
    // 构建监听器队列
    listeners = managerFactory.newManager(client);  
}

CuratorFrameworkImpl 启动过程

启动过程的次要工作是启动 ConnectionStateManager,同时负责连贯事件的告诉筹备,接着是启动 CuratorZookeeperClient,建设服务端会话连贯等操作,最初通过开启一个独自的线程监听执行后台任务队列,这个线程的工作是一直从工作队列取出元素并且执行。

CuratorFrameworkImpl 启动过程流程图

客户端连贯 client.start();

调用 start 办法的代码如下:

client.start();  

通过上面 CAS 操作将以后状态更新为 STARTED,同时依据 if 逻辑能够得悉 start() 办法不容许反复调用,这和 JDK 的 Thread 设计思路比拟类似,Thread 同样只容许执行一次 start() 办法。

CAS 操作胜利则构建连贯监听器监听异样连贯状态,监听器中判断以后客户端是否曾经连贯或者正在重连,如果是则logAsErrorConnectionErrors=true

client.start(); 外部逻辑如下,这个办法的代码都比较简单,具体能够参考正文了解。

public void start()  
{log.info("Starting");  
    // 应用 CAS 把以后的运行状态切换为 STARTED,状态切换之后不可逆
    // LATENT:CuratorFramework.start() has not yet been called
    // STARTED: CuratorFramework.start() has been called
    if (!state.compareAndSet(CuratorFrameworkState.LATENT, CuratorFrameworkState.STARTED) )  
    {throw new IllegalStateException("Cannot be started more than once");  
    }  
  
    try  
    {// ordering dependency - must be called before client.start()  
        // 程序依赖 - 必须在 client.start()之前调用。connectionStateManager.start(); 
        // 构建连贯监听器,监听异样连贯状态
        final ConnectionStateListener listener = new ConnectionStateListener()  
        {  
            @Override  
            public void stateChanged(CuratorFramework client, ConnectionState newState)  
            {  
                // CONNECTED:为第一次胜利连贯到服务器而发送。留神:对于任何一个 CuratorFramework 实例只会收到其中一条信息。// RECONNECTED:一个暂停的、失落的或只读的连贯已被从新建设
                // RECONNECTED:A suspended, lost, or read-only connection has been re-established
                // 如果曾经连贯或者正在重连
                if (ConnectionState.CONNECTED == newState || ConnectionState.RECONNECTED == newState)  
                {logAsErrorConnectionErrors.set(true);  
                }  
            }  
  
            @Override  
            public boolean doNotDecorate()  
            {return true;}  
        };  
        // 注册监听器
        this.getConnectionStateListenable().addListener(listener);  
        // 全局启动开发设置为 true,ConnectionState 状态更新
        client.start();  
        // 构建线程池
        executorService = Executors.newSingleThreadScheduledExecutor(threadFactory);         // 执行具备返回值的 Callable 工作
        executorService.submit(new Callable<Object>()  
        {  
            @Override  
            public Object call() throws Exception  
            {  
                // 要害局部:挂起后盾操作
                backgroundOperationsLoop();  
                return null;            
            }  
        });  
          
        if (ensembleTracker != null)  
        {ensembleTracker.start();  
        }  
  
        log.info(schemaSet.toDocumentation());  
    }  
    catch (Exception e)  
    {ThreadUtils.checkInterrupted(e);  
        handleBackgroundOperationException(null, e);  
    }  
}

咱们持续看要害局部backgroundOperationsLoop();

后盾轮询操作指令 backgroundOperationsLoop()

这里再介绍 backgroundOperationsLoop() 办法,依据名称得悉这是一个后盾循环,后台任务的整体流程如下:

private void backgroundOperationsLoop()  
{  
    try  
    {while ( state.get() == CuratorFrameworkState.STARTED )  
        {  
            OperationAndData<?> operationAndData;  
            try            
            {operationAndData = backgroundOperations.take();  
                if (debugListener != null)  
                {debugListener.listen(operationAndData);  
                }  
                // 执行后盾操作
                performBackgroundOperation(operationAndData);  
            }  
            catch (InterruptedException e)  
            {  
                // 在这里中断异样会被吞掉。// swallow the interrupt as it's only possible from either a background  
                // operation and, thus, doesn't apply to this loop or the instance                // is being closed in which case the while test will get it            }  
        }  
    }  
    finally  
    {log.info("backgroundOperationsLoop exiting");  
    }  
}

OperationAndData 实现了 Delayed 接口用于实现阻塞队列提早重试。

下面的解决逻辑如下:

  1. 判断以后是否为 STARTED 状态,始终循环。
  2. 从阻塞队列 BlockingQueue 当中弹出操作指令对象,在初始化代码中能够得悉是一个 DelayQueue,提早并发平安阻塞队列,OperationAndData 对象毫无疑问实现了Delayed 接口。
backgroundOperations = new DelayQueue<OperationAndData<?>>();
  1. 判断 Debug 监听器是否存在,如果存在则监听OperationAndData
  2. 执行后盾操作 performBackgroundOperation,他的工作是从阻塞队列一直获取数据操作OperationAndData 对象调用callPerformBackgroundOperation 办法,
  3. 如果无奈失常连贯 ZK 集群,此时会 else 并且进入到重连判断逻辑,如果符合条件,则增加到阻塞队列的当中期待下一次重试。(留神这里是被动重试,同步操作)
void performBackgroundOperation(OperationAndData<?> operationAndData)
    {
        try
        {if ( !operationAndData.isConnectionRequired() || client.isConnected())
            {operationAndData.callPerformBackgroundOperation();
            }
            else
            {
                // 容许重连或者超时这样的状况产生
                client.getZooKeeper();  // important - allow connection resets, timeouts, etc. to occur
        
                // 如果连贯超时,则跑出 CuratorConnectionLossException 异样
                if (operationAndData.getElapsedTimeMs() >= client.getConnectionTimeoutMs())
                {throw new CuratorConnectionLossException();
                }
                // 如果没有超时,则推入到 forcedSleepOperations 强制睡眠后期待重连
                sleepAndQueueOperation(operationAndData);
            }
        }
        catch (Throwable e)
        {
            // 查看线程中断
            ThreadUtils.checkInterrupted(e);

            /**
             * Fix edge case reported as CURATOR-52. ConnectionState.checkTimeouts() throws KeeperException.ConnectionLossException
             * when the initial (or previously failed) connection cannot be re-established. This needs to be run through the retry policy
             * and callbacks need to get invoked, etc.
             */
             /*
             修复报告为 CURATOR-52 的边缘案例。当初始(或之前失败的)连贯无奈从新建设时,ConnectionState.checkTimeouts()会抛出 KeeperException.ConnectionLossException。这须要通过重试策略运行,回调须要被调用,等等。*/
             // 连贯失落异样解决
            if (e instanceof CuratorConnectionLossException)
            {WatchedEvent watchedEvent = new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Disconnected, null);
                CuratorEvent event = new CuratorEventImpl(this, CuratorEventType.WATCHED, KeeperException.Code.CONNECTIONLOSS.intValue(), null, null, operationAndData.getContext(), null, null, null, watchedEvent, null, null);
                // 如果重连次数
                if (checkBackgroundRetry(operationAndData, event) )
                {
                    // 推送到 backgroundOperations 队列尝试重连
                    queueOperation(operationAndData);
                }
                else
                {
                    // 放弃重连
                    logError("Background retry gave up", e);
                }
            }
            else
            {
                // 否则须要解决后盾操作异样
                handleBackgroundOperationException(operationAndData, e);
            }
        }
    }

这里顺带介绍下后盾决定是否重试的判断逻辑,次要是依据用户传输的重试策略执行对应的重试逻辑判断,是十分经典的 策略模式 实现。

client.getRetryPolicy().allowRetry(operationAndData.getThenIncrementRetryCount(), operationAndData.getElapsedTimeMs(), operationAndData)

operationAndData.callPerformBackgroundOperation();

operationAndData 继承了DelayQueue,使用多态个性领有不同实现,外部只有一行代码:

void callPerformBackgroundOperation() throws Exception  
{operation.performBackgroundOperation(this);  
}

operation.performBackgroundOperation(this); 对应 org.apache.curator.framework.imps.BackgroundOperation#performBackgroundOperation

BackgroundOperation 后盾操作有很多具体的实现,对应了 ZK 常见操作。传递的 this 就是 operationAndData 对象。

会话治理

Client 连贯过程的连贯状态都是通过 ConnectionState 进行治理的,它会负责尝试超时重连的操作,ConnectionStateManager 会负责连贯状态的扭转和告诉,ConnectionHandlingPolicy 则对应了连贯超时策略的触发。

在后面的后盾轮询队列操作指令对象过程中,也容许在超时工夫内尝试重连,那么 Curator 是如何进行客户端 会话状态告诉 以及 会话超时重连 的?

连贯事件监听和状态变更 org.apache.curator.ConnectionState#process

org.apache.curator.ConnectionState#process 的代码能够得悉,连贯状态相干的事件类型为Watcher.Event.EventType.None,会告诉到所有的 Wathcer。

其中 ConnectionState 作为 defaultWatcher,它的事件回调如下:

public void process(WatchedEvent event)  
{if ( LOG_EVENTS)  
    {log.debug("ConnectState watcher:" + event);  
    }  
  
    if (event.getType() == Watcher.Event.EventType.None )  
    {  
    //isConnected:客户以后的连贯状态,true 示意已连贯(SyncConnected 和 ConnectedReadOnly 状态)boolean wasConnected = isConnected.get(); 
        // 依据 org.apache.zookeeper.Watcher.Event.KeeperState 进行状态判断。boolean newIsConnected = checkState(event.getState(), wasConnected);  
        if (newIsConnected != wasConnected)  
        {  
            // / 如果连贯状态产生扭转,则更新
            isConnected.set(newIsConnected);  
            connectionStartMs = System.currentTimeMillis();  
            if (newIsConnected)  
            {  
                
            // 重连,更新会话超时协商工夫
            // NegotiatedSessionTimeoutMs(协商会话超时)。lastNegotiatedSessionTimeoutMs.set(handleHolder.getNegotiatedSessionTimeoutMs());  
                log.debug("Negotiated session timeout:" + lastNegotiatedSessionTimeoutMs.get());  
            }  
        }  
    }  

    // 告诉 parentWatchers, 留神初始化的时候其实传入了一个 parentWatcher, 会调用 CuratorFrameworkImpl.processEvent
    for (Watcher parentWatcher : parentWatchers)  
    {OperationTrace trace = new OperationTrace("connection-state-parent-process", tracer.get(), getSessionId());  
        parentWatcher.process(event);  
        trace.commit();}  
}

最初一段正文提到能够看到遍历 parentWatchers 并且调用 process 办法。这里实际上默认会有个 Watcher,那就是在初始化的时候默认会注册一个 Watch 作为 parentWatcher 传入。

  this.client = new CuratorZookeeperClient  
        (  
            localZookeeperFactory,  
            builder.getEnsembleProvider(),  
            builder.getSessionTimeoutMs(),  
            builder.getConnectionTimeoutMs(),  
            builder.getWaitForShutdownTimeoutMs(),  
            new Watcher()  
            {  
                @Override  
                public void process(WatchedEvent watchedEvent)  
                {CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null);  
                    // 留神初始化的时候其实传入了一个 parentWatcher, 会调用 CuratorFrameworkImpl.processEvent
                    processEvent(event);  
                }  
            },  
            builder.getRetryPolicy(),  
            builder.canBeReadOnly(),  
            builder.getConnectionHandlingPolicy()); 

这部分告诉事件回调在下文会再次提到,这里简略无关印象即可。

连贯状态检查和解决 org.apache.curator.ConnectionState#checkState

连贯状态检查和解决在 org.apache.curator.ConnectionState#checkState 办法中进行。

boolean newIsConnected = checkState(event.getState(), wasConnected); 
private boolean checkState(Event.KeeperState state, boolean wasConnected)  
{  
    boolean isConnected = wasConnected;  
    boolean checkNewConnectionString = true;  
    switch (state)  
    {  
    default:  
    case Disconnected:  
    {  
        isConnected = false;  
        break;    }  
  
    case SyncConnected:  
    case ConnectedReadOnly:  
    {  
        isConnected = true;  
        break;    }  
    // 拜访权限异样
    case AuthFailed:  
    {  
        isConnected = false;  
        log.error("Authentication failed");  
        break;    }  
  
    case Expired:  
    {  
        isConnected = false;  
        checkNewConnectionString = false;  
        handleExpiredSession();  
        break;    }  
  
    case SaslAuthenticated:  
    {  
        // NOP  
        break;  
    }  
    }  
    // the session expired is logged in handleExpiredSession, so not log here  
    // 会话过期被记录在 handleExpiredSession 中,所以不记录在这里。if (state != Event.KeeperState.Expired) {new EventTrace(state.toString(), tracer.get(), getSessionId()).commit();}  
  
    if (checkNewConnectionString)  
    {  
        // 如果服务端列表发生变化,则更新
        String newConnectionString = handleHolder.getNewConnectionString();  
        if (newConnectionString != null)  
        {handleNewConnectionString(newConnectionString);  
        }  
    }  
  
    return isConnected;  
}

下面依据不同连贯状态判断连贯是否异样,返回后果为 true 则示意连贯是失常的,当会话超时过期 Expired 时,会调用 handleExpiredSession 进行 reset 操作(会话 被动重连),这里对于非连贯超时的状态进行工夫追踪。

留神重连策略 RetryPolicy这个策略在被动和被动重连中均会调用。

parentWatchers 注册和回调

产生状态变更的办法最初局部是告诉所有的 parentWatchers,上面来看看这个循环干了什么事件。

再次强调初始化的时候传入了一个 parentWatcher,会调用CuratorFrameworkImpl.processEvent 办法,当初来看看这部分是如何注册和回调的。

// 告诉 parentWatchers, 留神初始化的时候其实传入了一个 parentWatcher, 会调用 CuratorFrameworkImpl.processEvent
    for (Watcher parentWatcher : parentWatchers)  
    {OperationTrace trace = new OperationTrace("connection-state-parent-process", tracer.get(), getSessionId());  
        parentWatcher.process(event);  
        trace.commit();}  

咱们间接看看这个默认的 Watcher 回调CuratorFrameworkImpl#processEvent(event) 相干代码逻辑。

new Watcher()  
{  
    @Override  
    public void process(WatchedEvent watchedEvent)  
    {CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null);
        // 处理事件  
        processEvent(event);  
    }  
},

processEvent(event)相干逻辑如下,首先对于状态变更判断,状态如果呈现变更则告诉到所有注册在 CuratorListener 上的监听器。

private void processEvent(final CuratorEvent curatorEvent)  
{if ( curatorEvent.getType() == CuratorEventType.WATCHED )  
    {  
        // 状态转换
        validateConnection(curatorEvent.getWatchedEvent().getState());  
    }  
      // 告诉所有注册的 CuratorListener
    listeners.forEach(new Function<CuratorListener, Void>()  
    {  
        @Override  
        public Void apply(CuratorListener listener)  
        {  
            try  
            {OperationTrace trace = client.startAdvancedTracer("EventListener");  
                // 接管回调事件
                listener.eventReceived(CuratorFrameworkImpl.this, curatorEvent);  
                trace.commit();}  
            catch (Exception e)  
            {ThreadUtils.checkInterrupted(e);  
                logError("Event listener threw exception", e);  
            }  
            return null;  
        }  
    });  
}

其中validateConnection 负责连贯状态的转换代码。

org.apache.curator.framework.imps.CuratorFrameworkImpl#validateConnection

void validateConnection(Watcher.Event.KeeperState state)  
{if ( state == Watcher.Event.KeeperState.Disconnected)  
    {internalConnectionHandler.suspendConnection(this);  
    }  
    else if (state == Watcher.Event.KeeperState.Expired)  
    {connectionStateManager.addStateChange(ConnectionState.LOST);  
    }  
    else if (state == Watcher.Event.KeeperState.SyncConnected)  
    {internalConnectionHandler.checkNewConnection(this);  
        connectionStateManager.addStateChange(ConnectionState.RECONNECTED);  
        unSleepBackgroundOperations();}  
    else if (state == Watcher.Event.KeeperState.ConnectedReadOnly)  
    {internalConnectionHandler.checkNewConnection(this);  
        connectionStateManager.addStateChange(ConnectionState.READ_ONLY);  
    }  
}

能够看到理论的状态变更是依附 ConnectionStateManager 组件负责的,ZK 的原生客户端状态和 Curator 包装的状态对应 表如下:

此外还须要留神每一个 if 判断的最初一行代码中有一个增加 ConnectionState 的操作,这个操作的意义是告诉所有注册到 listenersConnectionStateListener

connectionStateManager.addStateChange(ConnectionState.READ_ONLY);

至于怎么告诉的会在下文介绍。

告诉机制

告诉是干什么?其实就是在事件产生的时候,及时回调注册的 Listenrner 监听器 对应的回调函数。Curator 针对不同组件设计了不同的监听器注册和回调。

// 自定义监听器 CuratorListener
client.getCuratorListenable().addListener((_fk, e) -> {if (e.getType().equals(CuratorEventType.WATCHED)) {log.info("测试");
    }

});
ConnectionStateListener connectionStateListener = (client1, newState) -> {
    //Some details
    log.info("newState =>"+ newState);
};

能够注册的监听器形式如下:

  • 一次性 Watch 告诉
  • 注册 CuratorListener 告诉
  • 注册 ConnectionStateListener 告诉
  • 注册 UnhandledErrorListener 告诉
  • 后盾线程操作实现时的回调告诉
  • 缓存机制,屡次注册

一次性 Watch 告诉

每次都须要重复通过上面的办法从新注册。这里波及到 NodeCache 的相干组件,因为目前并没有介绍相干的前置代码,这里临时跳过介绍。

client.checkExists().creatingParentContainersIfNeeded().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path);

注册 CuratorListener 告诉

实现形式很简略,就是把监听器注册到 CuratorFrameworkImpl.listeners 这个容器当中,后盾线程实现操作告诉该监听器容器的所有监听器。

比方异步的形式在 ZK 下面创立门路会触发 CuratorEventType.CREATE 事件,还有就是连贯状态事件触发的时候 parentWatcher 也会回调这些 listeners,比方上面的代码:

/**
 * connect ZK, register watchers
 */
public CuratorFramework mkClient(Map conf, List<String> servers, Object port,
                                 String root, final WatcherCallBack watcher) {CuratorFramework fk = Utils.newCurator(conf, servers, port, root);

    // 自定义监听器 CuratorListener
    fk.getCuratorListenable().addListener(new CuratorListener() {
        @Override
        public void eventReceived(CuratorFramework _fk, CuratorEvent e) throws Exception {if (e.getType().equals(CuratorEventType.WATCHED)) {WatchedEvent event = e.getWatchedEvent();

                watcher.execute(event.getState(), event.getType(), event.getPath());
            }

        }
    });

    fk.start();
    return fk;
}

org.apache.curator.framework.imps.CuratorFrameworkImpl#processEvent

processEvent 办法总会进行注册的 CuratorListener 回调操作。

private void processEvent(final CuratorEvent curatorEvent)
    {if ( curatorEvent.getType() == CuratorEventType.WATCHED )
        {validateConnection(curatorEvent.getWatchedEvent().getState());
        }

        listeners.forEach(new Function<CuratorListener, Void>()
        {
            @Override
            public Void apply(CuratorListener listener)
            {
                try
                {OperationTrace trace = client.startAdvancedTracer("EventListener");
                    listener.eventReceived(CuratorFrameworkImpl.this, curatorEvent);
                    trace.commit();}
                catch (Exception e)
                {ThreadUtils.checkInterrupted(e);
                    logError("Event listener threw exception", e);
                }
                return null;
            }
        });
    }

具体回调则是有各种执行构建实现器实现的,这一块深究比较复杂,这里有个概念后续有须要查看相干实现即可。

注册 ConnectionStateListener 告诉

如果增加 ConnectionStateListener 监听器,则在连贯状态产生扭转时,会收到告诉。

ConnectionStateListener connectionStateListener = new ConnectionStateListener()
    {
        @Override
        public void stateChanged(CuratorFramework client, ConnectionState newState)
        {//Some details}
    };
client.getConnectionStateListenable().addListener(connectionStateListener);

ConnectionStateListener 监听器的事件回调产生在 ConnectionStateManager 当中,然而后面咱们只介绍了如何初始化,上面扩大介绍回调 ConnectionStateListener 的局部

ConnectionStateManager 如何回调 ConnectionStateListener?

org.apache.curator.framework.imps.CuratorFrameworkImpl#validateConnection

下面解说 会话机制 的时候,提到了最初有一个增加 ConnectionState 的操作,这里将介绍收到 ConnectionState 变更之后如何回调注册在本人身上的监听器。

void validateConnection(Watcher.Event.KeeperState state)  
{  
    // ......
    else if (state == Watcher.Event.KeeperState.Expired)  
    {connectionStateManager.addStateChange(ConnectionState.LOST);  
    }  
    else if (state == Watcher.Event.KeeperState.SyncConnected)  
    {unSleepBackgroundOperations();  
    }  
    else if (state == Watcher.Event.KeeperState.ConnectedReadOnly)  
    {connectionStateManager.addStateChange(ConnectionState.READ_ONLY);  
    }  
}

具体解决在上面这个办法中实现。

org.apache.curator.framework.state.ConnectionStateManager#processEvents

private void processEvents()
    {while ( state.get() == State.STARTED )
        {
            try
            {int useSessionTimeoutMs = getUseSessionTimeoutMs();
                long elapsedMs = startOfSuspendedEpoch == 0 ? useSessionTimeoutMs / 2 : System.currentTimeMillis() - startOfSuspendedEpoch;
                long pollMaxMs = useSessionTimeoutMs - elapsedMs;

                final ConnectionState newState = eventQueue.poll(pollMaxMs, TimeUnit.MILLISECONDS);
                if (newState != null)
                {if ( listeners.isEmpty() )
                    {log.warn("There are no ConnectionStateListeners registered.");
                    }
                    // 要害局部,当呈现状态变更进行回调监听器告诉
                    listeners.forEach(listener -> listener.stateChanged(client, newState));
                }
                else if (sessionExpirationPercent > 0)
                {synchronized(this)
                    {checkSessionExpiration();
                    }
                }
            }
            catch (InterruptedException e)
            {
                // swallow the interrupt as it's only possible from either a background
                //  吞下中断,因为它只可能来自后盾操作
                
                // operation and, thus, doesn't apply to this loop or the instance
                // is being closed in which case the while test will get it
                // 如果实例在敞开有可能走到这一块代码
            }
        }
    }

下面内容重要的其实就一行代码:

listeners.forEach(listener -> listener.stateChanged(client, newState));

这个 processEvents 是怎么回调的?其实在之前画的 CuratorFrameworkImpl 启动过程流程图中就有展现。

ConnectionStateManager 当中有一个 ExecutorService 线程池,翻看代码能够得悉他的实现是 SingleThreadScheduledExecutor,这里含意显著就是独自开启一个线程轮询这一段代码查看 listener,状态变更告诉注册在 ConnectionStateManager 上的监听器。

注册 UnhandledErrorListener 告诉

同理注册到 CuratorFrameworkImpl.unhandledErrorListeners 当中,当后盾线程操作产生异样或者 handler 产生异样的时候会触发。

注册形式


/**
 * connect ZK, register watchers
 */
public CuratorFramework mkClient(Map conf, List<String> servers, Object port,
                                 String root, final WatcherCallBack watcher) {CuratorFramework fk = Utils.newCurator(conf, servers, port, root);

    // 自定义监听器 UnhandledErrorListener
   fk.getUnhandledErrorListenable().addListener(new UnhandledErrorListener() {
        @Override
        public void unhandledError(String msg, Throwable error) {
            String errmsg = "Unrecoverable zookeeper error, halting process:" + msg;
            LOG.error(errmsg, error);
            JStormUtils.halt_process(1, "Unrecoverable zookeeper error");

        }
    });

   
    fk.start();
    return fk;
}

如何触发?

触发的相干代码在 org.apache.curator.framework.imps.CuratorFrameworkImpl#logError 办法中,留神这里的 apply 办法解决。

void logError(String reason, final Throwable e)  
{  
    // 省略其余无关代码
    unhandledErrorListeners.forEach(new Function<UnhandledErrorListener, Void>()  
    {  
        @Override  
        public Void apply(UnhandledErrorListener listener)  
        {listener.unhandledError(localReason, e);  
            return null;        
            }  
    });  

    // 省略无关代码
}

后盾线程操作实现时的回调告诉

对于不同操作比方 setData,能够通过链式调用的形式传入回调函数 callback,操作实现之后会执行回调函数实现回调操作。

 public static void setDataAsyncWithCallback(CuratorFramework client, BackgroundCallback callback, String path, byte[] payload) throws Exception {
        // this is another method of getting notification of an async completion
        client.setData().inBackground(callback).forPath(path, payload);
    }

缓存机制,屡次注册

Curator 的缓存机制是一块比拟大的部头,Curator 的缓存形式包含:

  • Path Cache
  • Node Cache
  • Tree Cache

缓存在应用之前会和服务端的节点数据进行比照,当数据不统一时,会通过 watch 机制触发回调刷新本地缓存,同时再次注册 Watch,每次重连会注册新的 Watcher,保障 Watcher 永远不失落。

小结

通过告诉机制和会话治理两个局部,咱们理解到:

  • 客户端告诉 是同步实现。
  • connectionStateManager.listeners是由 外部的线程池 做异步告诉
  • CuratorFrameworkImpl.listeners 对于连贯状态的告诉,与 watcher 告诉线程为 同步 ,由后盾线程告诉时为 异步
  • watcher 注册过多可能导致重连之后 watcher 失落。

写到最初

本节介绍了 Curator 的根底应用,从源码角度剖析了 Curator 组件的初始化过程,并且简略剖析会话治理和告诉机制的相干源码调用。

上面是本文波及到的源码解说汇总的一副总图。集体源码剖析过程如果有存在谬误或者疑难欢送反馈和探讨。

最初是整个 demo 代码:


@Slf4j
public class CuratorTestExample {public static void main(String[] args) throws Exception {RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client =
                CuratorFrameworkFactory.newClient("192.168.19.100:2181,192.168.19.101:2181,192.168.19.102:2181", retryPolicy);

        // 连贯 ZK, 开启连贯

        // 自定义监听器 CuratorListener
        client.getCuratorListenable().addListener((_fk, e) -> {if (e.getType().equals(CuratorEventType.WATCHED)) {log.info("测试");
            }

        });
        ConnectionStateListener connectionStateListener = (client1, newState) -> {
            //Some details
            log.info("newState =>"+ newState);
        };
        // 11:31:17.026 [Curator-ConnectionStateManager-0] INFO com.zxd.interview.zkcurator.CuratorTestExample - newState => CONNECTED
        client.getConnectionStateListenable().addListener(connectionStateListener);
        client.start();
        // 此处就获取到 zk 的一个连贯实例。//.....
        // 创立 znode,如果有必要须要创立父目录
        client.create().creatingParentsIfNeeded().withProtection().forPath("/my/path", "Test".getBytes());
        InterProcessMutex lock = new InterProcessMutex(client, "/my/path");
        lock.acquire();
        try {
            // do some work inside of the critical section here
            Thread.sleep(1000);
        } finally {lock.release();
        }

    }
}

举荐浏览

ZK 客户端 Curator 应用详解 – 知乎 (zhihu.com)

https://cloud.tencent.com/developer/article/1648976?areaSourc…

Curator 目录监听 | Ravitn Blog (donaldhan.github.io)

正文完
 0