介绍
Curator是netflix公司开源的一套zookeeper客户端,目前是Apache的顶级我的项目。和ZK的原生客户端相比,Curator的抽象层次要更高,同时简化了ZK的罕用性能开发量,比方Curator自带连贯重试、重复注册Watcher、NodeExistsException 异样解决等等。
依据官网的介绍,咱们能够理解到它是一个用于分布式的Java客户端API工具。它基于high-level API
,领有它能够更简略易懂的指挥vZookeeper实现分布式平安利用程序开发。
Curator由一系列的模块形成,对于个别开发者而言,罕用的是curator-framework和curator-recipes,上面对此顺次介绍。
Curator 当然也包含许多扩大,比方服务发现和Java 8异步DSL。
Apache Curator is a Java/JVM client library for [Apache ZooKeeper](https://zookeeper.apache.org/), a distributed coordination service.
Apache Curator includes a high-level API framework and utilities to make using Apache ZooKeeper much easier and more reliable. It also includes recipes for common use cases and extensions such as service discovery and a Java 8 asynchronous DSL.
用官网的介绍来说就是:guava之于java就像curator之于zookeeper
ZK 版本反对
Curator 目前最新的版本为 5.X 的版本,曾经不反对 ZK 的 3.4.X 以及之前的版本,这里通过思考最终抉择了 ZK的 3.5.10 版本。
5.X 对于 Curator 做了不少破坏性的改变,不兼容的起因如下:
- 旧的ListenerContainer类曾经被移除,以防止Guava类透露。
- ConnectionHandlingPolicy和相干类已被删除
- Reaper和ChildReaper类/食谱已被删除。您应该改用 ZooKeeper 容器节点。
- newPersistentEphemeralNode()和newPathChildrenCache()已从GroupMember中移除。
- ServiceCacheBuilder< T> executorService(CloseableExecutorService executorService)已从ServiceCacheBuilder中移除。
- ServiceProviderBuilder< T> executorService(CloseableExecutorService executorService)已从ServiceProviderBuilder中移除。
- static boolean shouldRetry(int rc)已从RetryLoop中移除。
- static boolean isRetryException(Throwable exception)已从RetryLoop中移除。
官网地址
Apache Curator –
下载地址
Curator Maven 相干地址:https://mvnrepository.com/artifact/org.apache.curator
Curator jar包下载地址:https://cwiki.apache.org/confluence/display/CURATOR/Releases
疾速开始
ZK 集群部署
学习之前须要应用ZK搭建集群环境,不便Debug的时候调试代码。这部分搭建过程放到另一篇:
[[【Zookeeper】基于3台linux虚拟机搭建zookeeper集群]](https://segmentfault.com/a/1190000043925962)
Maven依赖引入
上面是对应的Zookeeper和Curator的版本抉择。
<curator.version>4.3.0</curator.version>
<zookeeper.version>3.5.10</zookeeper.version>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>${curator.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>${curator.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>${zookeeper.version}</version>
</dependency>
构建入门实例
Curator 最为外围和弱小并且罕用性能是分布式锁。在入门demo中能够看到整个 Curator 依附 CuratorFrameworkFactory 构建,应用 Curator 进行分布式加锁解锁操作,只须要为所连贯的每个ZooKeeper集群提供一个CuratorFramework对象。
CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy)
下面的办法将会应用默认值创立与ZooKeeper集群的连贯,惟一须要关注的是重试策略。
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3)
CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
client.start();
依据参数值能够大抵理解到,这里应用的策略是指数的形式递增距离尝试重试工夫,并且最终重试三次。
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.newClient("192.168.0.1;192.168.0.2;192.168.0.3", retryPolicy);
client.start();
// 此处就获取到 zk的一个连贯实例。
//.....
领有了 CuratorFramework 实例之后,就能够间接通过 API 调用操作ZK。上面咱们看一下重点以及应用最多的分布式锁的操作局部:
client.create().forPath("/my/path", myData)
这样的间接调用还有个益处是对于ZK的操作client实例如果碰到网络抖动等状况会主动重试。
可重入锁(偏心锁)案例代码
上面是官网可重入锁的Demo应用代码。
InterProcessMutex lock = new InterProcessMutex(client, lockPath);
if ( lock.acquire(maxWait, waitUnit) )
{
try
{
// do some work inside of the critical section here
}
finally
{
lock.release();
}
}
这里革新一下即可简略应用。
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.newClient("192.168.0.1,192.168.0.2,192.168.0.3", retryPolicy);
client.start();
// 此处就获取到 zk的一个连贯实例。
//.....
client.create().forPath("/my/path", "Test".getBytes());
InterProcessMutex lock = new InterProcessMutex(client, "/test/myLock");
lock.acquire();
try {
// do some work inside of the critical section here
Thread.sleep(3000);
} finally {
lock.release();
}
初始化过程流程图
初始化过程流程图全图如下。上面将会一步步拆解这幅图是如何拼凑的。
drawio 源文件和图片地址如下:
链接:https://pan.baidu.com/s/18PoMjkp11LztmNB3XgZ0qw?pwd=4bug
提取码:4bug
初始化源码剖析
直奔源码剖析局部,本文次要介绍和Curator初始化、外部的告诉机制以及会话治理局部。
CuratorFramework 初始化过程
初始化过程流程图
CuratorFramework 初始化过程上面截图这一部分,红色局部为集体认为绝对比拟重要的对象和变量。
CuratorFrameworkFactory.newClient() 代码剖析
Curator 当中默认应用偏心锁的策略去获取锁,多个客户端会依照排队的程序挨个获取锁,上面咱们通过代码进行验证。
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.newClient("192.168.19.100:2181,192.168.19.101:2181,192.168.19.102:2181", retryPolicy);
在获取分布式锁之前咱们须要先连贯ZK集群,整个过程通过两行代码实现,首先须要确定连贯ZK的重试策略,接着通过CuratorFrameworkFactory构建Curator 实例即可,Curator 外部依据ZK原生客户端做了一层封装,开发者应用过程中不须要关注。
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.0.1,192.168.0.2,192.168.0.3", retryPolicy);
下面是简略的模板代码。ExponentialBackoffRetry 构建重试策略为依照指数增长重试工夫,比方第一次1秒,第二次2秒,第三次4秒,第四次8秒…..
接着是利用CuratorFrameworkFactory
构建实例。
return newClient(connectString, DEFAULT_SESSION_TIMEOUT_MS, DEFAULT_CONNECTION_TIMEOUT_MS, retryPolicy);
这里强调一下两个常量 DEFAULT_SESSION_TIMEOUT_MS (默认的会话超时工夫)、DEFAULT_CONNECTION_TIMEOUT_MS(默认的连贯超时工夫),作用是传入指定的重试策略默认参数。
private static final int DEFAULT_SESSION_TIMEOUT_MS
= Integer.getInteger("curator-default-session-timeout", 60 * 1000)
private static final int DEFAULT_CONNECTION_TIMEOUT_MS = Integer.getInteger("curator-default-connection-timeout", 15 * 1000);
咱们进一步进入构造方法,这里用了建造者模式。
return builder().
connectString(connectString).
sessionTimeoutMs(sessionTimeoutMs).
connectionTimeoutMs(connectionTimeoutMs).
retryPolicy(retryPolicy).
build();
实际上调用的是CuratorFrameworkImpl实例。这里把CuratorFrameworkFactory的this援用逸出给CuratorFrameworkImpl对象。
return new CuratorFrameworkImpl(this);
CuratorFrameworkImpl 构造方法的内容比拟多,这里次要说一下CuratorZookeeperClient这个对象,相当于ZK原生客户端的封装对象。
其余组件内容和Curator 的各种告诉治理和会话治理等等性能无关。
public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder)
{
ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(builder.getZookeeperFactory());
this.client = new CuratorZookeeperClient
(
localZookeeperFactory,
builder.getEnsembleProvider(),
builder.getSessionTimeoutMs(),
builder.getConnectionTimeoutMs(),
builder.getWaitForShutdownTimeoutMs(),
new Watcher()
{
@Override
public void process(WatchedEvent watchedEvent)
{
CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null);
processEvent(event);
}
},
builder.getRetryPolicy(),
builder.canBeReadOnly(),
builder.getConnectionHandlingPolicy()
);
//用于判断连贯断开和连贯超时的状态,设置curator的连贯状态,并通过connectionStateManager触发连贯事件状态告诉
internalConnectionHandler = new StandardInternalConnectionHandler();
//接管事件的告诉。后盾线程操作事件和连贯状态事件会触发
listeners = new ListenerContainer<CuratorListener>();
//当后盾线程产生异样或者handler产生异样的时候会触发
unhandledErrorListeners = new ListenerContainer<UnhandledErrorListener>();
//后盾线程执行的操作队列
backgroundOperations = new DelayQueue<OperationAndData<?>>();
forcedSleepOperations = new LinkedBlockingQueue<>();
//命名空间
namespace = new NamespaceImpl(this, builder.getNamespace());
//线程工厂办法,初始化后盾线程池时会应用
threadFactory = getThreadFactory(builder);
maxCloseWaitMs = builder.getMaxCloseWaitMs();
//负责连贯状态变动时的告诉
connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory(), builder.getSessionTimeoutMs(), builder.getConnectionHandlingPolicy().getSimulatedSessionExpirationPercent(), builder.getConnectionStateListenerDecorator());
compressionProvider = builder.getCompressionProvider();
aclProvider = builder.getAclProvider();
//CuratorFrameworkImpl的状态,调用start办法之前为 LATENT,调用start办法之后为 STARTED ,调用close()办法之后为STOPPEDstate = new AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT);
useContainerParentsIfAvailable = builder.useContainerParentsIfAvailable();
//谬误连贯策略
connectionStateErrorPolicy = Preconditions.checkNotNull(builder.getConnectionStateErrorPolicy(), "errorPolicy cannot be null");
schemaSet = Preconditions.checkNotNull(builder.getSchemaSet(), "schemaSet cannot be null");
zk34CompatibilityMode = builder.isZk34CompatibilityMode();
byte[] builderDefaultData = builder.getDefaultData();
defaultData = (builderDefaultData != null) ? Arrays.copyOf(builderDefaultData, builderDefaultData.length) : new byte[0];
authInfos = buildAuths(builder);
//有保障的执行删除操作,其实是一直尝试直到删除胜利,通过递归调用实现
failedDeleteManager = new FailedDeleteManager(this);
//有保障的执行删除watch操作
failedRemoveWatcherManager = new FailedRemoveWatchManager(this);
namespaceFacadeCache = new NamespaceFacadeCache(this);
//服务端可用节点的检测器,第一次连贯和重连胜利之后都会触发从新获取服务端列表
ensembleTracker = zk34CompatibilityMode ? null : new EnsembleTracker(this, builder.getEnsembleProvider());
runSafeService = makeRunSafeService(builder);
newClient
的目标是构建ZK连贯实例,包含一系列附加外围组件:后盾操作、连贯事件、异样监控、容器,命名空间、负载平衡等等。
CuratorZookeeperClient 初始化过程
CuratorZookeeperClient 初始化过程流程图
CuratorZookeeperClient 初始化过程图如下:
CuratorZookeeperClient 初始化代码剖析
下面提到,CuratorFrameworkImpl的初始化过程中有一段比拟重要的CuratorZookeeperClient客户端初始化过程,上面就来看看这个CuratorZookeeperClient初始化过程干了啥。
public CuratorZookeeperClient(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider,
int sessionTimeoutMs, int connectionTimeoutMs, int waitForShutdownTimeoutMs, Watcher watcher,
RetryPolicy retryPolicy, boolean canBeReadOnly, ConnectionHandlingPolicy connectionHandlingPolicy)
{
// StandardConnectionHandler当收到Disconnect事件后,如果在规定工夫内没有重连到服务器,则会被动触发Expired事件
this.connectionHandlingPolicy = connectionHandlingPolicy;
if ( sessionTimeoutMs < connectionTimeoutMs )
{
log.warn(String.format("session timeout [%d] is less than connection timeout [%d]", sessionTimeoutMs, connectionTimeoutMs));
}
// 重连策略
retryPolicy = Preconditions.checkNotNull(retryPolicy, "retryPolicy cannot be null");
ensembleProvider = Preconditions.checkNotNull(ensembleProvider, "ensembleProvider cannot be null");
this.connectionTimeoutMs = connectionTimeoutMs;
this.waitForShutdownTimeoutMs = waitForShutdownTimeoutMs;
// //curator注册到原生客户端上的defaultWatcher,会收到和连贯状态无关的事件告诉等,负责超时重连
state = new ConnectionState(zookeeperFactory, ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, tracer, canBeReadOnly, connectionHandlingPolicy);
// 重试策略设置
setRetryPolicy(retryPolicy);
}
ConnectionState是curator注册到原生客户端上的defaultWatcher,它会收到和连贯状态无关的事件告诉等,负责超时重连操作等。
上面看下ConnectionState
的构造方法。
ConnectionState(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher parentWatcher, AtomicReference<TracerDriver> tracer, boolean canBeReadOnly, ConnectionHandlingPolicy connectionHandlingPolicy)
{
this.ensembleProvider = ensembleProvider;
this.sessionTimeoutMs = sessionTimeoutMs;
this.connectionTimeoutMs = connectionTimeoutMs;
this.tracer = tracer;
this.connectionHandlingPolicy = connectionHandlingPolicy;
if ( parentWatcher != null )
{
// 因为defaultWatcher只能有一个,通过parentWatchers可实现defaultWatcher接到事件告诉时parentWatchers的回调
parentWatchers.offer(parentWatcher);
}
handleHolder = new HandleHolder(zookeeperFactory, this, ensembleProvider, sessionTimeoutMs, canBeReadOnly);
}
parentWatchers 应用了并发平安队列 ConcurrentLinkedQueue,这个队列的作用能够如下:
ConcurrentLinkedQueue:一个基于链接节点的无界限程平安队列。此队列依照 FIFO(先进先出)准则对元素进行排序。队列的头部 是队列中工夫最长的元素。队列的尾部 是队列中工夫最短的元素。新的元素插入到队列的尾部,队列获取操作从队列头部取得元素。当多个线程共享拜访一个公共 collection 时,ConcurrentLinkedQueue 是一个失当的抉择。此队列不容许应用 null 元素。
private final Queue<Watcher> parentWatchers = new ConcurrentLinkedQueue<Watcher>();
ConnectionStateManager 初始化过程
ConnectionStateManager 初始化过程流程图
ConnectionStateManager 次要是持有Client援用,通过连贯状态治理工程创立构建监听器,以及构建只容许一个线程执行的线程池。
Curator 的设计记录是一个客户端永远只有一个线程负责工作。
ConnectionStateManager 初始化代码剖析
在Curator框架初始化代码中蕴含了 ConnectionStateManager 初始化,它次要负责状态保护和连贯状态变更告诉。
//负责连贯状态变动时的告诉
connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory(), builder.getSessionTimeoutMs(), builder.getConnectionHandlingPolicy().getSimulatedSessionExpirationPercent(), builder.getConnectionStateListenerManagerFactory());
从初始化代码能够看到,如果要监听状态扭转,须要注册一个监听器。相干的注册形式在“告诉治理”局部进行介绍,这里咱们来看下相干的成员变量以及初始化办法。
//连贯状态事件告诉队列
private final BlockingQueue<ConnectionState> eventQueue = new ArrayBlockingQueue<ConnectionState>(QUEUE_SIZE);
//须要告诉的listeners
private final UnaryListenerManager<ConnectionStateListener> listeners;
//ConnectionStateManager的运行状态
private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
org.apache.curator.framework.state.ConnectionStateManager#ConnectionStateManager(org.apache.curator.framework.CuratorFramework, java.util.concurrent.ThreadFactory, int, int, org.apache.curator.framework.state.ConnectionStateListenerManagerFactory)
/**
Params:
client – the client
threadFactory – thread factory to use or null for a default
sessionTimeoutMs – the ZK session timeout in milliseconds
sessionExpirationPercent – percentage of negotiated session timeout to use when simulating a session timeout. 0 means don't simulate at all
managerFactory – manager factory to use
*/
public ConnectionStateManager(CuratorFramework client, ThreadFactory threadFactory, int sessionTimeoutMs, int sessionExpirationPercent, ConnectionStateListenerManagerFactory managerFactory)
{
this.client = client;
this.sessionTimeoutMs = sessionTimeoutMs;
this.sessionExpirationPercent = sessionExpirationPercent;
if ( threadFactory == null )
{
threadFactory = ThreadUtils.newThreadFactory("ConnectionStateManager");
}
//事件队列解决线程池
service = Executors.newSingleThreadExecutor(threadFactory);
// 构建监听器队列
listeners = managerFactory.newManager(client);
}
CuratorFrameworkImpl 启动过程
启动过程的次要工作是启动 ConnectionStateManager,同时负责连贯事件的告诉筹备,接着是启动 CuratorZookeeperClient ,建设服务端会话连贯等操作,最初通过开启一个独自的线程监听执行后台任务队列,这个线程的工作是一直从工作队列取出元素并且执行。
CuratorFrameworkImpl 启动过程流程图
客户端连贯 client.start();
调用start 办法的代码如下:
client.start();
通过上面CAS操作将以后状态更新为 STARTED,同时依据if
逻辑能够得悉start()
办法不容许反复调用,这和 JDK的 Thread 设计思路比拟类似,Thread 同样只容许执行一次start()
办法。
CAS 操作胜利则构建连贯监听器监听异样连贯状态,监听器中判断以后客户端是否曾经连贯或者正在重连,如果是则logAsErrorConnectionErrors=true。
client.start();
外部逻辑如下,这个办法的代码都比较简单,具体能够参考正文了解。
public void start()
{
log.info("Starting");
// 应用CAS把以后的运行状态切换为 STARTED,状态切换之后不可逆
// LATENT:CuratorFramework.start() has not yet been called
// STARTED: CuratorFramework.start() has been called
if ( !state.compareAndSet(CuratorFrameworkState.LATENT, CuratorFrameworkState.STARTED) )
{
throw new IllegalStateException("Cannot be started more than once");
}
try
{
// ordering dependency - must be called before client.start()
// 程序依赖 - 必须在 client.start()之前调用。
connectionStateManager.start();
// 构建连贯监听器,监听异样连贯状态
final ConnectionStateListener listener = new ConnectionStateListener()
{
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
// CONNECTED:为第一次胜利连贯到服务器而发送。留神:对于任何一个CuratorFramework实例只会收到其中一条信息。
// RECONNECTED:一个暂停的、失落的或只读的连贯已被从新建设
// RECONNECTED:A suspended, lost, or read-only connection has been re-established
// 如果曾经连贯或者正在重连
if ( ConnectionState.CONNECTED == newState || ConnectionState.RECONNECTED == newState )
{
logAsErrorConnectionErrors.set(true);
}
}
@Override
public boolean doNotDecorate()
{
return true;
}
};
// 注册监听器
this.getConnectionStateListenable().addListener(listener);
// 全局启动开发设置为true,ConnectionState 状态更新
client.start();
// 构建线程池
executorService = Executors.newSingleThreadScheduledExecutor(threadFactory); // 执行具备返回值的Callable 工作
executorService.submit(new Callable<Object>()
{
@Override
public Object call() throws Exception
{
// 要害局部:挂起后盾操作
backgroundOperationsLoop();
return null;
}
});
if ( ensembleTracker != null )
{
ensembleTracker.start();
}
log.info(schemaSet.toDocumentation());
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
handleBackgroundOperationException(null, e);
}
}
咱们持续看要害局部backgroundOperationsLoop();
。
后盾轮询操作指令 backgroundOperationsLoop()
这里再介绍backgroundOperationsLoop()
办法,依据名称得悉这是一个后盾循环,后台任务的整体流程如下:
private void backgroundOperationsLoop()
{
try
{
while ( state.get() == CuratorFrameworkState.STARTED )
{
OperationAndData<?> operationAndData;
try
{
operationAndData = backgroundOperations.take();
if ( debugListener != null )
{
debugListener.listen(operationAndData);
}
// 执行后盾操作
performBackgroundOperation(operationAndData);
}
catch ( InterruptedException e )
{
// 在这里中断异样会被吞掉。
// swallow the interrupt as it's only possible from either a background
// operation and, thus, doesn't apply to this loop or the instance // is being closed in which case the while test will get it }
}
}
finally
{
log.info("backgroundOperationsLoop exiting");
}
}
OperationAndData 实现了 Delayed 接口用于实现阻塞队列提早重试。
下面的解决逻辑如下:
- 判断以后是否为
STARTED
状态,始终循环。 - 从阻塞队列BlockingQueue当中弹出操作指令对象,在初始化代码中能够得悉是一个
DelayQueue
,提早并发平安阻塞队列,OperationAndData
对象毫无疑问实现了Delayed
接口。
backgroundOperations = new DelayQueue<OperationAndData<?>>();
- 判断Debug 监听器是否存在,如果存在则监听
OperationAndData
。 - 执行后盾操作
performBackgroundOperation
,他的工作是从阻塞队列一直获取数据操作OperationAndData
对象调用callPerformBackgroundOperation
办法, - 如果无奈失常连贯ZK集群,此时会else并且进入到重连判断逻辑,如果符合条件,则增加到阻塞队列的当中期待下一次重试。(留神这里是被动重试,同步操作)
void performBackgroundOperation(OperationAndData<?> operationAndData)
{
try
{
if ( !operationAndData.isConnectionRequired() || client.isConnected() )
{
operationAndData.callPerformBackgroundOperation();
}
else
{
// 容许重连或者超时这样的状况产生
client.getZooKeeper(); // important - allow connection resets, timeouts, etc. to occur
// 如果连贯超时,则跑出 CuratorConnectionLossException 异样
if ( operationAndData.getElapsedTimeMs() >= client.getConnectionTimeoutMs() )
{
throw new CuratorConnectionLossException();
}
// 如果没有超时,则推入到 forcedSleepOperations 强制睡眠后期待重连
sleepAndQueueOperation(operationAndData);
}
}
catch ( Throwable e )
{
// 查看线程中断
ThreadUtils.checkInterrupted(e);
/**
* Fix edge case reported as CURATOR-52. ConnectionState.checkTimeouts() throws KeeperException.ConnectionLossException
* when the initial (or previously failed) connection cannot be re-established. This needs to be run through the retry policy
* and callbacks need to get invoked, etc.
*/
/*
修复报告为CURATOR-52的边缘案例。当初始(或之前失败的)连贯无奈从新建设时,ConnectionState.checkTimeouts()会抛出KeeperException.ConnectionLossException。这须要通过重试策略运行,回调须要被调用,等等。
*/
// 连贯失落异样解决
if ( e instanceof CuratorConnectionLossException )
{
WatchedEvent watchedEvent = new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Disconnected, null);
CuratorEvent event = new CuratorEventImpl(this, CuratorEventType.WATCHED, KeeperException.Code.CONNECTIONLOSS.intValue(), null, null, operationAndData.getContext(), null, null, null, watchedEvent, null, null);
// 如果重连次数
if ( checkBackgroundRetry(operationAndData, event) )
{
// 推送到backgroundOperations队列尝试重连
queueOperation(operationAndData);
}
else
{
// 放弃重连
logError("Background retry gave up", e);
}
}
else
{
// 否则须要解决后盾操作异样
handleBackgroundOperationException(operationAndData, e);
}
}
}
这里顺带介绍下后盾决定是否重试的判断逻辑,次要是依据用户传输的重试策略执行对应的重试逻辑判断,是十分经典的策略模式实现。
client.getRetryPolicy().allowRetry(operationAndData.getThenIncrementRetryCount(), operationAndData.getElapsedTimeMs(), operationAndData)
operationAndData.callPerformBackgroundOperation();
operationAndData 继承了DelayQueue,使用多态个性领有不同实现,外部只有一行代码:
void callPerformBackgroundOperation() throws Exception
{
operation.performBackgroundOperation(this);
}
operation.performBackgroundOperation(this); 对应 org.apache.curator.framework.imps.BackgroundOperation#performBackgroundOperation
BackgroundOperation 后盾操作有很多具体的实现,对应了ZK常见操作。传递的this就是 operationAndData 对象。
会话治理
Client 连贯过程的连贯状态都是通过 ConnectionState 进行治理的,它会负责尝试超时重连的操作,ConnectionStateManager 会负责连贯状态的扭转和告诉,ConnectionHandlingPolicy 则对应了连贯超时策略的触发。
在后面的后盾轮询队列操作指令对象过程中,也容许在超时工夫内尝试重连,那么 Curator 是如何进行客户端 会话状态告诉以及会话超时重连的?
连贯事件监听和状态变更 org.apache.curator.ConnectionState#process
从org.apache.curator.ConnectionState#process
的代码能够得悉,连贯状态相干的事件类型为Watcher.Event.EventType.None
,会告诉到所有的Wathcer。
其中ConnectionState
作为 defaultWatcher ,它的事件回调如下:
public void process(WatchedEvent event)
{
if ( LOG_EVENTS )
{
log.debug("ConnectState watcher: " + event);
}
if ( event.getType() == Watcher.Event.EventType.None )
{
//isConnected:客户以后的连贯状态,true示意已连贯(SyncConnected 和 ConnectedReadOnly 状态)
boolean wasConnected = isConnected.get();
// 依据 org.apache.zookeeper.Watcher.Event.KeeperState 进行状态判断。
boolean newIsConnected = checkState(event.getState(), wasConnected);
if ( newIsConnected != wasConnected )
{
// /如果连贯状态产生扭转,则更新
isConnected.set(newIsConnected);
connectionStartMs = System.currentTimeMillis();
if ( newIsConnected )
{
//重连,更新会话超时协商工夫
// NegotiatedSessionTimeoutMs(协商会话超时)。
lastNegotiatedSessionTimeoutMs.set(handleHolder.getNegotiatedSessionTimeoutMs());
log.debug("Negotiated session timeout: " + lastNegotiatedSessionTimeoutMs.get());
}
}
}
// 告诉parentWatchers, 留神初始化的时候其实传入了一个parentWatcher,会调用CuratorFrameworkImpl.processEvent
for ( Watcher parentWatcher : parentWatchers )
{
OperationTrace trace = new OperationTrace("connection-state-parent-process", tracer.get(), getSessionId());
parentWatcher.process(event);
trace.commit();
}
}
最初一段正文提到能够看到遍历parentWatchers
并且调用process
办法。这里实际上默认会有个Watcher,那就是在初始化的时候默认会注册一个Watch作为parentWatcher传入。
this.client = new CuratorZookeeperClient
(
localZookeeperFactory,
builder.getEnsembleProvider(),
builder.getSessionTimeoutMs(),
builder.getConnectionTimeoutMs(),
builder.getWaitForShutdownTimeoutMs(),
new Watcher()
{
@Override
public void process(WatchedEvent watchedEvent)
{
CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null);
// 留神初始化的时候其实传入了一个parentWatcher,会调用CuratorFrameworkImpl.processEvent
processEvent(event);
}
},
builder.getRetryPolicy(),
builder.canBeReadOnly(),
builder.getConnectionHandlingPolicy()
);
这部分告诉事件回调在下文会再次提到,这里简略无关印象即可。
连贯状态检查和解决 org.apache.curator.ConnectionState#checkState
连贯状态检查和解决在org.apache.curator.ConnectionState#checkState
办法中进行。
boolean newIsConnected = checkState(event.getState(), wasConnected);
private boolean checkState(Event.KeeperState state, boolean wasConnected)
{
boolean isConnected = wasConnected;
boolean checkNewConnectionString = true;
switch ( state )
{
default:
case Disconnected:
{
isConnected = false;
break; }
case SyncConnected:
case ConnectedReadOnly:
{
isConnected = true;
break; }
// 拜访权限异样
case AuthFailed:
{
isConnected = false;
log.error("Authentication failed");
break; }
case Expired:
{
isConnected = false;
checkNewConnectionString = false;
handleExpiredSession();
break; }
case SaslAuthenticated:
{
// NOP
break;
}
}
// the session expired is logged in handleExpiredSession, so not log here
// 会话过期被记录在handleExpiredSession中,所以不记录在这里。
if (state != Event.KeeperState.Expired) {
new EventTrace(state.toString(), tracer.get(), getSessionId()).commit();
}
if ( checkNewConnectionString )
{
//如果服务端列表发生变化,则更新
String newConnectionString = handleHolder.getNewConnectionString();
if ( newConnectionString != null )
{
handleNewConnectionString(newConnectionString);
}
}
return isConnected;
}
下面依据不同连贯状态判断连贯是否异样, 返回后果为true则示意连贯是失常的,当会话超时过期Expired
时,会调用handleExpiredSession
进行reset
操作(会话被动重连),这里对于非连贯超时的状态进行工夫追踪。
留神重连策略 RetryPolicy这个策略在被动和被动重连中均会调用。
parentWatchers 注册和回调
产生状态变更的办法最初局部是告诉所有的parentWatchers,上面来看看这个循环干了什么事件。
再次强调初始化的时候传入了一个 parentWatcher,会调用CuratorFrameworkImpl.processEvent
办法,当初来看看这部分是如何注册和回调的。
// 告诉parentWatchers,留神初始化的时候其实传入了一个parentWatcher,会调用CuratorFrameworkImpl.processEvent
for ( Watcher parentWatcher : parentWatchers )
{
OperationTrace trace = new OperationTrace("connection-state-parent-process", tracer.get(), getSessionId());
parentWatcher.process(event);
trace.commit();
}
咱们间接看看这个默认的Watcher回调CuratorFrameworkImpl#processEvent(event)
相干代码逻辑。
new Watcher()
{
@Override
public void process(WatchedEvent watchedEvent)
{
CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null);
// 处理事件
processEvent(event);
}
},
processEvent(event)
相干逻辑如下,首先对于状态变更判断,状态如果呈现变更则告诉到所有注册在 CuratorListener 上的监听器。
private void processEvent(final CuratorEvent curatorEvent)
{
if ( curatorEvent.getType() == CuratorEventType.WATCHED )
{
//状态转换
validateConnection(curatorEvent.getWatchedEvent().getState());
}
//告诉所有注册的CuratorListener
listeners.forEach(new Function<CuratorListener, Void>()
{
@Override
public Void apply(CuratorListener listener)
{
try
{
OperationTrace trace = client.startAdvancedTracer("EventListener");
// 接管回调事件
listener.eventReceived(CuratorFrameworkImpl.this, curatorEvent);
trace.commit();
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
logError("Event listener threw exception", e);
}
return null;
}
});
}
其中validateConnection
负责连贯状态的转换代码。
org.apache.curator.framework.imps.CuratorFrameworkImpl#validateConnection
void validateConnection(Watcher.Event.KeeperState state)
{
if ( state == Watcher.Event.KeeperState.Disconnected )
{
internalConnectionHandler.suspendConnection(this);
}
else if ( state == Watcher.Event.KeeperState.Expired )
{
connectionStateManager.addStateChange(ConnectionState.LOST);
}
else if ( state == Watcher.Event.KeeperState.SyncConnected )
{
internalConnectionHandler.checkNewConnection(this);
connectionStateManager.addStateChange(ConnectionState.RECONNECTED);
unSleepBackgroundOperations();
}
else if ( state == Watcher.Event.KeeperState.ConnectedReadOnly )
{
internalConnectionHandler.checkNewConnection(this);
connectionStateManager.addStateChange(ConnectionState.READ_ONLY);
}
}
能够看到理论的状态变更是依附 ConnectionStateManager 组件负责的,ZK的原生客户端状态和Curator包装的状态对应表如下:
此外还须要留神每一个 if
判断的最初一行代码中有一个增加 ConnectionState 的操作,这个操作的意义是告诉所有注册到 listeners
的ConnectionStateListener
。
connectionStateManager.addStateChange(ConnectionState.READ_ONLY);
至于怎么告诉的会在下文介绍。
告诉机制
告诉是干什么?其实就是在事件产生的时候,及时回调注册的Listenrner监听器对应的回调函数。Curator 针对不同组件设计了不同的监听器注册和回调。
// 自定义监听器 CuratorListener
client.getCuratorListenable().addListener((_fk, e) -> {
if (e.getType().equals(CuratorEventType.WATCHED)) {
log.info("测试");
}
});
ConnectionStateListener connectionStateListener = (client1, newState) -> {
//Some details
log.info("newState => "+ newState);
};
能够注册的监听器形式如下:
- 一次性 Watch 告诉
- 注册 CuratorListener 告诉
- 注册 ConnectionStateListener 告诉
- 注册 UnhandledErrorListener 告诉
- 后盾线程操作实现时的回调告诉
- 缓存机制,屡次注册
一次性 Watch 告诉
每次都须要重复通过上面的办法从新注册。这里波及到 NodeCache 的相干组件,因为目前并没有介绍相干的前置代码,这里临时跳过介绍。
client.checkExists().creatingParentContainersIfNeeded().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path);
注册 CuratorListener 告诉
实现形式很简略,就是把监听器注册到CuratorFrameworkImpl.listeners
这个容器当中,后盾线程实现操作告诉该监听器容器的所有监听器。
比方异步的形式在ZK下面创立门路会触发CuratorEventType.CREATE事件,还有就是连贯状态事件触发的时候parentWatcher也会回调这些listeners,比方上面的代码:
/**
* connect ZK, register watchers
*/
public CuratorFramework mkClient(Map conf, List<String> servers, Object port,
String root, final WatcherCallBack watcher) {
CuratorFramework fk = Utils.newCurator(conf, servers, port, root);
// 自定义监听器 CuratorListener
fk.getCuratorListenable().addListener(new CuratorListener() {
@Override
public void eventReceived(CuratorFramework _fk, CuratorEvent e) throws Exception {
if (e.getType().equals(CuratorEventType.WATCHED)) {
WatchedEvent event = e.getWatchedEvent();
watcher.execute(event.getState(), event.getType(), event.getPath());
}
}
});
fk.start();
return fk;
}
org.apache.curator.framework.imps.CuratorFrameworkImpl#processEvent
processEvent
办法总会进行注册的 CuratorListener 回调操作。
private void processEvent(final CuratorEvent curatorEvent)
{
if ( curatorEvent.getType() == CuratorEventType.WATCHED )
{
validateConnection(curatorEvent.getWatchedEvent().getState());
}
listeners.forEach(new Function<CuratorListener, Void>()
{
@Override
public Void apply(CuratorListener listener)
{
try
{
OperationTrace trace = client.startAdvancedTracer("EventListener");
listener.eventReceived(CuratorFrameworkImpl.this, curatorEvent);
trace.commit();
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
logError("Event listener threw exception", e);
}
return null;
}
});
}
具体回调则是有各种执行构建实现器实现的,这一块深究比较复杂,这里有个概念后续有须要查看相干实现即可。
注册 ConnectionStateListener 告诉
如果增加 ConnectionStateListener 监听器,则在连贯状态产生扭转时,会收到告诉。
ConnectionStateListener connectionStateListener = new ConnectionStateListener()
{
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
//Some details
}
};
client.getConnectionStateListenable().addListener(connectionStateListener);
ConnectionStateListener 监听器的事件回调产生在ConnectionStateManager当中,然而后面咱们只介绍了如何初始化,上面扩大介绍回调ConnectionStateListener
的局部
ConnectionStateManager 如何回调 ConnectionStateListener?
org.apache.curator.framework.imps.CuratorFrameworkImpl#validateConnection
下面解说会话机制的时候,提到了最初有一个增加 ConnectionState 的操作,这里将介绍收到 ConnectionState 变更之后如何回调注册在本人身上的监听器。
void validateConnection(Watcher.Event.KeeperState state)
{
// ......
else if ( state == Watcher.Event.KeeperState.Expired )
{
connectionStateManager.addStateChange(ConnectionState.LOST);
}
else if ( state == Watcher.Event.KeeperState.SyncConnected )
{
unSleepBackgroundOperations();
}
else if ( state == Watcher.Event.KeeperState.ConnectedReadOnly )
{
connectionStateManager.addStateChange(ConnectionState.READ_ONLY);
}
}
具体解决在上面这个办法中实现。
org.apache.curator.framework.state.ConnectionStateManager#processEvents
private void processEvents()
{
while ( state.get() == State.STARTED )
{
try
{
int useSessionTimeoutMs = getUseSessionTimeoutMs();
long elapsedMs = startOfSuspendedEpoch == 0 ? useSessionTimeoutMs / 2 : System.currentTimeMillis() - startOfSuspendedEpoch;
long pollMaxMs = useSessionTimeoutMs - elapsedMs;
final ConnectionState newState = eventQueue.poll(pollMaxMs, TimeUnit.MILLISECONDS);
if ( newState != null )
{
if ( listeners.isEmpty() )
{
log.warn("There are no ConnectionStateListeners registered.");
}
// 要害局部,当呈现状态变更进行回调监听器告诉
listeners.forEach(listener -> listener.stateChanged(client, newState));
}
else if ( sessionExpirationPercent > 0 )
{
synchronized(this)
{
checkSessionExpiration();
}
}
}
catch ( InterruptedException e )
{
// swallow the interrupt as it's only possible from either a background
// 吞下中断,因为它只可能来自后盾操作
// operation and, thus, doesn't apply to this loop or the instance
// is being closed in which case the while test will get it
// 如果实例在敞开有可能走到这一块代码
}
}
}
下面内容重要的其实就一行代码:
listeners.forEach(listener -> listener.stateChanged(client, newState));
这个processEvents是怎么回调的?其实在之前画的 CuratorFrameworkImpl 启动过程流程图中就有展现。
ConnectionStateManager 当中有一个 ExecutorService 线程池,翻看代码能够得悉他的实现是 SingleThreadScheduledExecutor,这里含意显著就是独自开启一个线程轮询这一段代码查看 listener,状态变更告诉注册在 ConnectionStateManager 上的监听器。
注册 UnhandledErrorListener 告诉
同理注册到CuratorFrameworkImpl.unhandledErrorListeners
当中,当后盾线程操作产生异样或者handler产生异样的时候会触发。
注册形式
/**
* connect ZK, register watchers
*/
public CuratorFramework mkClient(Map conf, List<String> servers, Object port,
String root, final WatcherCallBack watcher) {
CuratorFramework fk = Utils.newCurator(conf, servers, port, root);
// 自定义监听器 UnhandledErrorListener
fk.getUnhandledErrorListenable().addListener(new UnhandledErrorListener() {
@Override
public void unhandledError(String msg, Throwable error) {
String errmsg = "Unrecoverable zookeeper error, halting process: " + msg;
LOG.error(errmsg, error);
JStormUtils.halt_process(1, "Unrecoverable zookeeper error");
}
});
fk.start();
return fk;
}
如何触发?
触发的相干代码在org.apache.curator.framework.imps.CuratorFrameworkImpl#logError
办法中,留神这里的apply
办法解决。
void logError(String reason, final Throwable e)
{
// 省略其余无关代码
unhandledErrorListeners.forEach(new Function<UnhandledErrorListener, Void>()
{
@Override
public Void apply(UnhandledErrorListener listener)
{
listener.unhandledError(localReason, e);
return null;
}
});
// 省略无关代码
}
后盾线程操作实现时的回调告诉
对于不同操作比方 setData
,能够通过链式调用的形式传入回调函数 callback,操作实现之后会执行回调函数实现回调操作。
public static void setDataAsyncWithCallback(CuratorFramework client, BackgroundCallback callback, String path, byte[] payload) throws Exception {
// this is another method of getting notification of an async completion
client.setData().inBackground(callback).forPath(path, payload);
}
缓存机制,屡次注册
Curator的缓存机制是一块比拟大的部头,Curator 的缓存形式包含:
- Path Cache
- Node Cache
- Tree Cache
缓存在应用之前会和服务端的节点数据进行比照,当数据不统一时,会通过watch机制触发回调刷新本地缓存,同时再次注册Watch,每次重连会注册新的 Watcher,保障 Watcher永远不失落。
小结
通过告诉机制和会话治理两个局部,咱们理解到:
- 客户端告诉是同步实现。
connectionStateManager.listeners
是由外部的线程池做异步告诉CuratorFrameworkImpl.listeners
对于连贯状态的告诉,与watcher告诉线程为同步,由后盾线程告诉时为异步。- watcher注册过多可能导致重连之后watcher失落。
写到最初
本节介绍了Curator的根底应用,从源码角度剖析了Curator 组件的初始化过程,并且简略剖析会话治理和告诉机制的相干源码调用。
上面是本文波及到的源码解说汇总的一副总图。集体源码剖析过程如果有存在谬误或者疑难欢送反馈和探讨。
最初是整个demo代码:
@Slf4j
public class CuratorTestExample {
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.newClient("192.168.19.100:2181,192.168.19.101:2181,192.168.19.102:2181", retryPolicy);
// 连贯ZK,开启连贯
// 自定义监听器 CuratorListener
client.getCuratorListenable().addListener((_fk, e) -> {
if (e.getType().equals(CuratorEventType.WATCHED)) {
log.info("测试");
}
});
ConnectionStateListener connectionStateListener = (client1, newState) -> {
//Some details
log.info("newState => "+ newState);
};
// 11:31:17.026 [Curator-ConnectionStateManager-0] INFO com.zxd.interview.zkcurator.CuratorTestExample - newState => CONNECTED
client.getConnectionStateListenable().addListener(connectionStateListener);
client.start();
// 此处就获取到 zk的一个连贯实例。
//.....
// 创立znode,如果有必要须要创立父目录
client.create().creatingParentsIfNeeded().withProtection().forPath("/my/path", "Test".getBytes());
InterProcessMutex lock = new InterProcessMutex(client, "/my/path");
lock.acquire();
try {
// do some work inside of the critical section here
Thread.sleep(1000);
} finally {
lock.release();
}
}
}
举荐浏览
ZK客户端Curator应用详解 – 知乎 (zhihu.com)
https://cloud.tencent.com/developer/article/1648976?areaSourc…
Curator目录监听 | Ravitn Blog (donaldhan.github.io)
发表回复