关于java:Zookeeper客户端之-Curator

63次阅读

共计 5719 个字符,预计需要花费 15 分钟才能阅读完成。

Apache Curator 是一个比较完善的 ZooKeeper 客户端框架,通过封装的一套高级 API 简化了 ZooKeeper 的操作。Curator 次要解决了三类问题:

  • 封装 ZooKeeper client 与 ZooKeeper server 之间的连贯解决
  • 提供了一套 Fluent 格调的操作 API
  • 提供 ZooKeeper 各种利用场景 (recipe,比方:分布式锁服务、集群领导选举、共享计数器、缓存机制、分布式队列等) 的形象封装

Curator 次要从以下几个方面升高了 zk 应用的复杂性:

  • 重试机制: 提供可插拔的重试机制, 它将给捕捉所有可复原的异样配置一个重试策略,并且外部也提供了几种规范的重试策略(比方指数弥补)
  • 连贯状态监控: Curator 初始化之后会始终对 zk 连贯进行监听,一旦发现连贯状态发生变化将会作出相应的解决
  • zk 客户端实例治理:Curator 会对 zk 客户端到 server 集群的连贯进行治理,并在须要的时候重建 zk 实例,保障与 zk 集群连贯的可靠性
  • 各种应用场景反对:Curator 实现了 zk 反对的大部分应用场景(甚至包含 zk 本身不反对的场景),这些实现都遵循了 zk 的最佳实际,并思考了各种极其状况

(一):客户端连贯的创立:

  curator 的操作客户端是:CuratorFramework。其连贯的建设形式如下:
      @Bean
      public CuratorFramework curatorFramework() {RetryForever forever = new RetryForever(500);
          CuratorFramework framework=  CuratorFrameworkFactory.builder().connectString(zkUrl)
            .connectionTimeoutMs(60000)
            .sessionTimeoutMs(120000)
            .retryPolicy(forever).build();
         framework.start();
         return framework;
      }

其参数除了连贯字符串之外,还有如下是三个参数:
1:连贯超时工夫:如果配置了 curator-default-connection-timeout 参数,则取该参数值。默认值是 15 秒
2:会话超时工夫:如果配置了 curator-default-session-timeout 参数,则取该参数值。默认值是 60 秒
3:重试策略。curator 提供了如下重试策略:
3.1:RetryForever。始终重试,参数是重试间隔时间,单位毫秒
3.2:RetryOneTime。重试一次,参数是重试之间的间隔时间,单位毫秒。
3.3:RetryNTimes。重试 N 次, 参数是重试次数和重试之间的间隔时间,单位毫秒。
3.4:ExponentialBackoffRetry。参数为最大重试次数(默认值 29),最大休眠工夫,根本休眠工夫。其休眠工夫不确定。
3.5:BoundedExponentialBackoffRetry。该类继承了 ExponentialBackoffRetry。

(二):Watch 机制

默认状况下,在操作 zookeeper 的命令中,应用 usingWatcher() 办法调用的监听器是一次性的。Curator 提供了 Cache 机制,一次注册监听器即可。Curator 提供了如下三种 watch。(carator 版本不一样,可能会有所不同)1:NodeCache。提供的 Listener 是 NodeCacheListener,其监听了节点数据的变动。2:PathChildrenCache。提供的 Listener 是 PathChildrenCacheListener。其监听子节点的变动。3:TreeCache。提供的 Listener 是 TreeCacheListener。其监听了节点数据和子节点的变动。应用例子如下:
    NodeCache nodeCache = new NodeCache(cutator, "/file/cache");
    nodeCache.getListenable().addListener(new NodeCacheListener() {
        @Override
     public void nodeChanged() throws Exception {System.out.println("path: /file/cache changed!!!!!!");
     }
    });
    nodeCache.start();
    TreeCache treeCache = new TreeCache(cutator, "/file/cache");
    treeCache.getListenable().addListener(new TreeCacheListener() {
        @Override
     public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {System.out.println("TREE CACHE:" + event.toString());
     }
    });
    treeCache.start();

(三):分布式锁
curator 提供了几种分布式锁。其有如下几种:
InterProcessMultiLock
InterProcessMutex
InterProcessReadWriteLock
InterProcessSemaphoreMutex
InterProcessSemaphoreV2。
curator 提供的基于 zookeeper 的分布式锁和 redis 提供的分布式锁有如下不一样:
1:curator 的分布式锁无过期工夫,redis 的分布式锁个别会设置过期工夫。
2:curator 的分布式锁在服务进行或者重启后,会开释,如果死锁能够应用这个方法解锁。而 redis 锁则不行。
3:curator 分布式锁会防止羊群效应。
4:curator 分布式锁是可重入锁。

curator 锁的应用流程大略如下:
1:获取锁时,创立长期程序节点。
2:判断以后创立的节点是不是首节点,如果是首节点,则认为锁获取胜利。
3:如果不是首节点,则对上一个节点增加监听器(watcher),而后以后线程 wait。
4:当上一个节点删除时,监听器触发,获取锁胜利
5:锁开释的时候,会删除呈现的长期程序节点

// 构造函数,参数是连贯客户端和锁门路
public InterProcessMutex(CuratorFramework client, String path)
{this(client, path, new StandardLockInternalsDriver());
}

// 获取锁
private boolean internalLock(long time, TimeUnit unit) throws Exception
{
    /*
 Note on concurrency: a given lockData instance can be only acted on by a single thread so locking isn't necessary */
 // 判断是否以后线程获取到锁,如果是以后线程,则返回获取胜利
 Thread currentThread = Thread.currentThread();
 LockData lockData = threadData.get(currentThread);
 if (lockData != null)
 {lockData.lockCount.incrementAndGet();
     return true; 
 }
    // 尝试获取锁,如果获取到,就缓存在内存中。String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
 if (lockPath != null)
 {LockData newLockData = new LockData(currentThread, lockPath);
    threadData.put(currentThread, newLockData);
    return true; 
 }
    return false;
}

String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
{final long startMillis = System.currentTimeMillis();
 final Long      millisToWait = (unit != null) ? unit.toMillis(time) : null;
 final byte[]    localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
 int retryCount = 0;
 String          ourPath = null;
 boolean hasTheLock = false;
 boolean isDone = false;
 while (!isDone)
 {
     isDone = true;
     try {
            // 在该门路下创立长期程序节点。ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
            // 判断以后创立的节点是否为序号最小的节点,如果不是,则对上一个节点创立监听器 watcher,而后期待
            hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
        }
        catch (KeeperException.NoNodeException e) {
            // gets thrown by StandardLockInternalsDriver when it can't find the lock node
            // this can happen when the session expires, etc. So, if the retry allows, just try it all again 
            // 如果抛出异样提醒节点不存在,则认为是会话超时,则进行重试。if (client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis,              RetryLoop.getDefaultRetrySleeper()) ){isDone = false;} else {throw e;}
        }
    }
    if (hasTheLock){return ourPath;}
    return null;
   }

应用例子如下:

public void run() {
    try {InterProcessMutex lock = new InterProcessMutex(client, path);
        lock.acquire(); // 获取锁,能够设置超时工夫
        System.out.println("thread-" + idx + "get the lock!!!");
        Random random = new Random();
        int time = random.nextInt(10);
        TimeUnit.MILLISECONDS.sleep(time * 1000);
        System.out.println("thread-" + idx + "release the lock!!!");
        lock.release(); // 开释锁} catch (Exception e) {e.printStackTrace();
    }
}

(四):选举用法
zookeeper 是提供了 2 个类用于抉择,别离如下:
LeaderLatch:这里提供的选举是同步的
LeaderSelector:这里提供的选举是异步的,提供一个 LeaderSelectorListener 用于接管选举后果。
用法例子如下:

@Override
public void run() {LeaderSelector leader = new LeaderSelector(client, path, new LeaderSelectorListener() {
        @Override
 public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) { }
        @Override
 public void takeLeadership(CuratorFramework client) throws Exception {System.out.println("LeaderSelector Thread-" + idx + "is the leader!!!!!!");
 }
    });
 leader.autoRequeue(); // 重新加入抢的序列
 leader.start();}


public void run() {
    try {LeaderLatch leader = new LeaderLatch(client, path);
 leader.addListener(new LeaderLatchListener() {
            @Override
 public void isLeader() {System.out.println("(Listener)Thread-" + idx + "is the leader!!!!!!");
 }
            @Override
 public void notLeader() {System.out.println("(Listener)Thread-" + idx + "is not the leader!!!!!!");
 }
        });
 leader.start();
 leader.await(); // 阻塞直到获取到 Leader 身份
 if (leader.hasLeadership()) {System.out.println("Thread-" + idx + "is the leader!!!!!!");
 } else {System.out.println("Thread-" + idx + "is not the leader!!!!!!");
 }
    } catch (Exception e) {e.printStackTrace();
 }
}

正文完
 0