1. Zookeeper概述

Zookeeper(后续简称ZK)是一个分布式的,开放源码的分布式应用程序协调服务,通常以集群模式运行,其协调能力能够了解为是基于观察者设计模式来实现的;ZK服务会应用Znode存储使用者的数据,并将这些数据以树形目录的模式来组织治理,反对使用者以观察者的角色指定本人关注哪些节点\数据的变更,当这些变更产生时,ZK会告诉其观察者;为满足本篇指标所需,着重介绍以下几个要害个性:

  • 数据组织:数据节点以树形目录(相似文件系统)组织治理,每一个节点中都会保留数据信息和节点信息。

  • 集群模式:通常是由3、5个基数实例组成集群,当超过半数服务实例失常工作就能对外提供服务,既能防止单点故障,又尽量高可用,每个服务实例都有一个数据备份,以实现数据全局统一

  • 程序更新:更新申请都会转由leader执行,来自同一客户端的更新将依照发送的程序被写入到ZK,解决写申请创立Znode时,Znode名称后会被调配一个全局惟一的递增编号,能够通过顺序号推断申请的程序,利用这个个性能够实现高级协调服务

  • 监听机制:给某个节点注册监听器,该节点一旦产生变更(例如更新或者删除),监听者就会收到一个Watch Event,能够感知到节点\数据的变更

  • 长期节点:session链接断开长期节点就没了,不能创立子节点(很要害)

ZK的分布式锁正是基于以上个性来实现的,简略来说是:

  • 长期节点:用于撑持异常情况下的锁主动开释能力
  • 程序节点:用于撑持偏心锁获取锁和排队期待的能力
  • 监听机制:用于撑持抢锁能力
  • 集群模式:用于撑持锁服务的高可用

2. 加解锁的流程形容

  1. 创立一个永恒节点作为锁节点(/lock2)
  2. 试图加锁的客户端在指定锁名称节点(/lock2)下,创立长期程序子节点
  3. 获取锁节点(/lock2)下所有子节点
  4. 对所获取的子节点按节点自增序号从小到大排序
  5. 判断本人是不是第一个子节点,若是,则获取锁
  6. 若不是,则监听比该节点小的那个节点的删除事件(这种只监听前一个节点的形式防止了惊群效应)
  7. 若是阻塞申请锁,则申请锁的操作可减少阻塞期待
  8. 若监听事件失效(阐明前节点开释了,能够尝试去获取锁),则回到第3步从新进行判断,直到获取到锁
  9. 解锁时,将第一个子节点删除开释

3. ZK分布式锁的能力

可能读者是单篇浏览,这里引入上一篇《分布式锁上-初探》中的一些内容,一个分布式锁应具备这样一些性能特点:

  • 互斥性:在同一时刻,只有一个客户端能持有锁
  • 安全性:防止死锁,如果某个客户端取得锁之后解决工夫超过最大约定工夫,或者持锁期间产生了故障导致无奈被动开释锁,其持有的锁也可能被其余机制正确开释,并保障后续其它客户端也能加锁,整个解决流程持续失常执行
  • 可用性:也被称作容错性,分布式锁须要有高可用能力,防止单点故障,当提供锁的服务节点故障(宕机)时不影响服务运行,这里有两种模式:一种是分布式锁服务本身具备集群模式,遇到故障能主动切换复原工作;另一种是客户端向多个独立的锁服务发动申请,当某个锁服务故障时依然能够从其余锁服务读取到锁信息(Redlock)
  • 可重入性:对同一个锁,加锁和解锁必须是同一个线程,即不能把其余线程程持有的锁给开释了
  • 高效灵便:加锁、解锁的速度要快;反对阻塞和非阻塞;反对偏心锁和非偏心锁

基于上文的内容,这里简略总结一下ZK的能力矩阵(其它分布式锁的状况会在后续文章中补充):

对于性能不太高的一种说法

因为每次在创立锁和开释锁的过程中,都要动态创建、销毁长期节点来实现锁性能。ZK中创立和删除节点只能通过Leader服务器来执行,而后Leader服务器还须要将数据同步到所有的Follower机器上,这样频繁的网络通信,性能的短板是十分突出的。在高性能,高并发的场景下,不倡议应用ZooKeeper的分布式锁。

因为ZooKeeper的高可用个性,在并发量不是太高的场景,也举荐应用ZK的分布式锁。

4. InterProcessMutex 应用示例

Zookeeper 客户端框架 Curator 提供的 InterProcessMutex 是分布式锁的一种实现,acquire 办法阻塞|非阻塞获取锁,release 办法开释锁,另外还提供了可撤销、可重入性能。
4.1 接口介绍
// 获取互斥锁public void acquire() throws Exception;// 在给定的工夫内获取互斥锁public boolean acquire(long time, TimeUnit unit) throws Exception;// 开释锁解决public void release() throws Exception;// 如果以后线程获取了互斥锁,则返回trueboolean isAcquiredInThisProcess();
4.2 pom依赖
<dependency>  <groupId>org.apache.logging.log4j</groupId>  <artifactId>log4j-core</artifactId>  <version>2.8.2</version></dependency><dependency>  <groupId>org.apache.zookeeper</groupId>  <artifactId>zookeeper</artifactId>  <version>3.5.7</version></dependency><dependency>  <groupId>org.apache.curator</groupId>  <artifactId>curator-framework</artifactId>  <version>4.3.0</version></dependency><dependency>  <groupId>org.apache.curator</groupId>  <artifactId>curator-recipes</artifactId>  <version>4.3.0</version></dependency><dependency>  <groupId>org.apache.curator</groupId>  <artifactId>curator-client</artifactId>  <version>4.3.0</version></dependency>
4.3 示例
package com.atguigu.case3;import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.framework.recipes.locks.InterProcessMutex;import org.apache.curator.retry.ExponentialBackoffRetry;public class CuratorLockTest {    public static void main(String[] args) {        // 创立分布式锁1        InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");        // 创立分布式锁2        InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");        new Thread(new Runnable() {            @Override            public void run() {                try {                    lock1.acquire();                    System.out.println("线程1 获取到锁");                    lock1.acquire();                    System.out.println("线程1 再次获取到锁");                    Thread.sleep(5 * 1000);                    lock1.release();                    System.out.println("线程1 开释锁");                    lock1.release();                    System.out.println("线程1  再次开释锁");                } catch (Exception e) {                    e.printStackTrace();                }            }        }).start();        new Thread(new Runnable() {            @Override            public void run() {                try {                    lock2.acquire();                    System.out.println("线程2 获取到锁");                    lock2.acquire();                    System.out.println("线程2 再次获取到锁");                    Thread.sleep(5 * 1000);                    lock2.release();                    System.out.println("线程2 开释锁");                    lock2.release();                    System.out.println("线程2  再次开释锁");                } catch (Exception e) {                    e.printStackTrace();                }            }        }).start();    }    private static CuratorFramework getCuratorFramework() {        ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);        CuratorFramework client = CuratorFrameworkFactory.builder().connectString("xxx:2181,xxx:2181,xxx:2181")                .connectionTimeoutMs(2000)                .sessionTimeoutMs(2000)                .retryPolicy(policy).build();        // 启动客户端        client.start();        System.out.println("zookeeper 启动胜利");        return client;    }}

5. DIY一个阉割版的分布式锁

通过这个实例对照第2节内容来了解加解锁的流程,以及如何防止惊群效应。

package com.rock.case2;import org.apache.zookeeper.*;import org.apache.zookeeper.data.Stat;import java.io.IOException;import java.util.List;import java.util.concurrent.CountDownLatch;/** * zk 分布式锁 v1版本: * 实现性能 : *      1. 防止了惊群效应 * 缺失性能: *      1. 超时管制 *      2. 读写锁 *      3. 重入管制 */public class DistributedLock {    private String connectString;    private int sessionTimeout;    private ZooKeeper zk;    private CountDownLatch connectLatch = new CountDownLatch(1);    private CountDownLatch waitLatch = new CountDownLatch(1);    private String waitPath;    private String currentNode;    private String LOCK_ROOT_PATH;    private static String NODE_PREFIX = "w";    public DistributedLock(String connectString, int sessionTimeout, String lockName) {        //TODO:数据校验        this.connectString = connectString;        this.sessionTimeout = sessionTimeout;        this.LOCK_ROOT_PATH = lockName;    }    public void init() throws IOException, KeeperException, InterruptedException {        // 建联        zk = new ZooKeeper(connectString, sessionTimeout, watchedEvent -> {            // connectLatch  连贯上zk后  开释            if (watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected) {                connectLatch.countDown();            }        });        connectLatch.await();// 期待zk失常连贯后        // 判断锁名称节点是否存在        Stat stat = zk.exists(LOCK_ROOT_PATH, false);        if (stat == null) {            // 创立一下锁名称节点            try {                zk.create(LOCK_ROOT_PATH, LOCK_ROOT_PATH.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);            } catch (KeeperException e) {                //并发创立抵触疏忽。                if (!e.code().name().equals("NODEEXISTS")) {                    throw e;                }            }        }    }    /**     * 待补充性能:     * 1. 超时设置     * 2. 读写辨别     * 3. 重入管制     */    public void zklock() throws KeeperException, InterruptedException {        if (!tryLock()) {            waitLock();            zklock();        }    }    /**     *     */    private void waitLock() throws KeeperException, InterruptedException {        try {            zk.getData(waitPath, new Watcher() {                @Override                public void process(WatchedEvent watchedEvent) {                    // waitLatch  须要开释                    if (watchedEvent.getType() == Watcher.Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)) {                        waitLatch.countDown();                    }                }            }, new Stat());            // 期待监听            waitLatch.await();        } catch (KeeperException.NoNodeException e) {            //如果期待的节点曾经被革除了,不等了,再尝试去抢锁            return;        }    }    private boolean tryLock() throws KeeperException, InterruptedException {        currentNode = zk.create(LOCK_ROOT_PATH + "/" + NODE_PREFIX, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);        // 判断创立的节点是否是最小的序号节点,如果是获取到锁;如果不是,监听他序号前一个节点        List<String> children = zk.getChildren(LOCK_ROOT_PATH, false);        // 如果children 只有一个值,那就间接获取锁; 如果有多个节点,须要判断,谁最小        if (children.size() == 1) {            return true;        } else {            String thisNode = currentNode.substring(LOCK_ROOT_PATH.length() + 1);            // 通过w00000000获取该节点在children汇合的地位            int index = children.indexOf(thisNode);            if (index == 0) {                //本人就是第一个节点                return true;            }            // 须要监听  他前一个节点变动            waitPath = LOCK_ROOT_PATH + "/" + children.get(index - 1);        }        return false;    }    // 解锁    public void unZkLock() {        // 删除节点        try {            zk.delete(this.currentNode, -1);        } catch (InterruptedException e) {            e.printStackTrace();        } catch (KeeperException e) {            e.printStackTrace();        }    }}