关于zookeeper:zookeeper分布式锁代码实现

前言

这里是zookeeper响应式编程的第二篇——自定义分布式锁,第一篇zookeeper分布式注册配置核心见如下链接:
https://segmentfault.com/a/11…

分布式锁

因为在分布式系统中,任意2个节点的线程须要获取同一个资源的时候就须要锁来维持程序的正确运行,然而呢,如果应用JVM提供的锁只能锁住本人,因为这是2台主机,这就引入了分布式锁的概念。也即是说须要一把锁在主机的里面,而很多框架都能够实现分布式锁,比方redis,mysql和zookeeper,目前最为不便的是应用zookeeper,因为就其高可用性和对立视图而言就比其余的技术不便很多。

对于zookeeper做分布式锁的剖析过程如下:首先对于2台主机抢占同一把锁的时候,只能有一台主机胜利抢占,那么有可能呈现取得锁的主机“挂了”,那么咱们能够应用长期节点解决该问题,那么在一个主机胜利抢占该锁之后,如果它开释了锁,其余主机是如何晓得它曾经胜利开释了呢?第一种形式就是能够采纳被动轮询的形式一直检测该锁是否曾经被开释,然而这种形式有提早,并且在主机特地多的时候多台主机轮询一把锁会造成zookeeper很大的压力。第二种形式就是应用watch机制,这种形式能够解决提早的问题,然而在取得锁的主机开释锁的时候,zookeeper会回调哪些所有没有抢到锁的线程,而后那些主机又会发动强锁的操作,会造成较大的通信压力。第三种形式就能够应用watche机制+序列节点,而后让每一个长期序列节点都watch前一个节点,这样只有一个编号最小的能力取得锁,并且在开释锁后会只告诉前面的一个主机。

代码实现

首选咱们须要在编写配置核心的Utils工具类,并且创立TestLock类实现分布式锁。而后咱们开拓10个线程模仿多台主机抢占锁的过程,根本流程就是抢占锁,而后执行业务代码(这里应用睡眠来代替),最初再开释锁。

Utils代码如下:
package org.qzx.config;

import org.apache.zookeeper.ZooKeeper;

import java.util.concurrent.CountDownLatch;

/**
 * @Auther: qzx
 * @Date: 2020/10/29 - 10 - 29 - 1:02 下午
 * @Description: org.qzx.config
 * @version: 1.0
 */
public class Utils {
    // zookeeper对象
    private static ZooKeeper zooKeeper;
    // 连贯地址
    private static String address = "10.211.55.5:2181,10.211.55.8:2181,10.211.55.9:2181,10.211.55.10:2181/test";
    private static DefaultWatcher defaultWatcher = new DefaultWatcher();
    // 锁
    private static CountDownLatch latch = new CountDownLatch(1);

    public static ZooKeeper getZooKeeper() throws Exception{
        zooKeeper = new ZooKeeper(address,3000,defaultWatcher);
        defaultWatcher.setLatch(latch);
        latch.await();
        return zooKeeper;
    }
}
TestLock大体框架如下:
package org.qzx.lock;

import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.concurrent.TimeUnit;

/**
 * @Auther: qzx
 * @Date: 2020/10/29 - 10 - 29 - 3:54 下午
 * @Description: org.qzx.lock
 * @version: 1.0
 */
public class TestLock {
    private ZooKeeper zooKeeper;

    @Before
    public void conn() throws Exception {
        zooKeeper = Utils.getZooKeeper();
    }

    @After
    public void close() throws InterruptedException {
        zooKeeper.close();
    }

    @Test
    public void TestLock(){
        for (int i = 0; i < 10; i++) {
            new Thread(()->{
                try {
                    // 抢占锁

                    // 业务代码
                    TimeUnit.SECONDS.sleep(1);
                    System.out.println(Thread.currentThread().getName()+"is woorking!!!");
                    // 开释锁
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }).start();
        }
    }
}

咱们在这里提供另外一个工具类能够为每一个线程实现抢锁和开释锁的过程,同时因为抢占的锁实际上是zookeeper的长期序列节点,所以必定会应用wather和回调机制,这里就把这个工具类叫做MyWatcherAndCallBack,该类提供抢占锁、开释锁,节点变动回调办法。其大体框架如下:

package org.qzx.lock;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

/**
 * @Auther: qzx
 * @Date: 2020/10/29 - 10 - 29 - 4:03 下午
 * @Description: org.qzx.lock
 * @version: 1.0
 */
public class MyWatcherAndCallBack implements Watcher {
    private ZooKeeper zooKeeper;

    public void setZooKeeper(ZooKeeper zooKeeper) {
        this.zooKeeper = zooKeeper;
    }

    @Override
    public void process(WatchedEvent event) {

    }

    // 抢占锁
    public void tryLock(){
        
    }

    // 开释锁
    public void unlock(){

    }
}
TestLock的代码也能够进行略微的批改
package org.qzx.lock;

import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.concurrent.TimeUnit;

/**
 * @Auther: qzx
 * @Date: 2020/10/29 - 10 - 29 - 3:54 下午
 * @Description: org.qzx.lock
 * @version: 1.0
 */
public class TestLock {
    private ZooKeeper zooKeeper;

    @Before
    public void conn() throws Exception {
        zooKeeper = Utils.getZooKeeper();
    }

    @After
    public void close() throws InterruptedException {
        zooKeeper.close();
    }

    @Test
    public void TestLock(){
        for (int i = 0; i < 10; i++) {
            new Thread(()->{
                MyWatcherAndCallBack myWatcherAndCallBack = new MyWatcherAndCallBack();
                myWatcherAndCallBack.setZooKeeper(zooKeeper);
                try {
                    // 抢占锁
                    myWatcherAndCallBack.tryLock();
                    // 业务代码
                    TimeUnit.SECONDS.sleep(1);
                    System.out.println(Thread.currentThread().getName()+"is woorking!!!");
                    // 开释锁
                    myWatcherAndCallBack.unlock();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}
这样框架就曾经搭建结束,接下来就是编写具体的抢占锁和开释的逻辑代码了。

首先对于抢占锁的过程肯定是阻塞的,直到抢占胜利的时候才会接着往下走,这里应用CountDownLatch实现。而每一个线程都会创立属于本人的长期序列节点作为本人的锁,不过只有编号最小的那个才会取得被对应的线程所占有,其余的线程在创立节点后都会阻塞。这里为了不便看到那些线程创立了哪些锁,将线程的名字作为数据写入到节点中。而后咱们在创立节点的回调函数中输入以后线程的名字和节点的名字,目标是为了测验代码写到当初是否正确。

MyWatcherAndCallBack代码如下:
package org.qzx.lock;

import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.concurrent.TimeUnit;

/**
 * @Auther: qzx
 * @Date: 2020/10/29 - 10 - 29 - 3:54 下午
 * @Description: org.qzx.lock
 * @version: 1.0
 */
public class TestLock {
    private ZooKeeper zooKeeper;

    @Before
    public void conn() throws Exception {
        zooKeeper = Utils.getZooKeeper();
    }

    @After
    public void close() throws InterruptedException {
        zooKeeper.close();
    }

    @Test
    public void TestLock(){
        for (int i = 0; i < 10; i++) {
            new Thread(()->{
                MyWatcherAndCallBack myWatcherAndCallBack = new MyWatcherAndCallBack();
                myWatcherAndCallBack.setThreadName(Thread.currentThread().getName());
                myWatcherAndCallBack.setZooKeeper(zooKeeper);
                try {
                    // 抢占锁
                    myWatcherAndCallBack.tryLock();
                    // 业务代码
                    TimeUnit.SECONDS.sleep(1);
                    System.out.println(Thread.currentThread().getName()+"is woorking!!!");
                    // 开释锁
                    myWatcherAndCallBack.unlock();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }
        while (true){

        }
    }
}

为了避免主线程执行太快导致回调函数还没有执行结束就完结,在TestLock办法最初加上死循环进行阻塞。

@Test
public void TestLock(){
    for (int i = 0; i < 10; i++) {
        new Thread(()->{
            MyWatcherAndCallBack myWatcherAndCallBack = new MyWatcherAndCallBack();
            myWatcherAndCallBack.setThreadName(Thread.currentThread().getName());
            myWatcherAndCallBack.setZooKeeper(zooKeeper);
            try {
                // 抢占锁
                myWatcherAndCallBack.tryLock();
                // 业务代码
                TimeUnit.SECONDS.sleep(1);
                System.out.println(Thread.currentThread().getName()+"is woorking!!!");
                // 开释锁
                myWatcherAndCallBack.unlock();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
    while (true){

    }
}

启动测试,在测试前记得创立工作目录/test,后果如下:

能够看到节点创立的是有序的,然而线程是无序的。
接下来在创立节点胜利的回调函数中,咱们就须要获取锁了,应用getChildren办法取得工作目录下的孩子节点,也就是创立的长期序列节点,该办法不须要应用watch机制,因为不须要监测父节点,同时对于其回调对象咱们也是同样封装在MyWatcherAndCallBack中。最初因为创立节点的名字在前面会用到,应用pathName属性保留以后线程创立的节点名字。

MyWatcherAndCallBack批改后代码如下:
package org.qzx.lock;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
 * @Auther: qzx
 * @Date: 2020/10/29 - 10 - 29 - 4:03 下午
 * @Description: org.qzx.lock
 * @version: 1.0
 */
public class MyWatcherAndCallBack implements Watcher, AsyncCallback.StringCallback, AsyncCallback.Children2Callback {
    private ZooKeeper zooKeeper;
    private CountDownLatch latch = new CountDownLatch(1);
    private String threadName;
    private String pathName;

    public void setThreadName(String threadName) {
        this.threadName = threadName;
    }

    public void setZooKeeper(ZooKeeper zooKeeper) {
        this.zooKeeper = zooKeeper;
    }

    // Watcher
    @Override
    public void process(WatchedEvent event) {

    }

    // 抢占锁
    public void tryLock(){
        try {
            // 创立一个长期序列节点作为锁
            zooKeeper.create("/lcok",threadName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
                    this,"abc");
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    // 开释锁
    public void unlock(){

    }

    // StringCallback
    @Override
    public void processResult(int rc, String path, Object ctx, String name) {
        if(name!=null){
            System.out.println(threadName+"------>"+name);
            pathName = name;//相似于/lcok0000000000
            zooKeeper.getChildren("/",false,this,"aaa");
        }
    }

    //Children2Callback
    @Override
    public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
        
    }
}

在getChildren的回调办法中,以后线程肯定胜利创立了节点,并且能够看到所有在它之前创立的节点。那么咱们当初遍历输入所有的children中的所有节点,目标是为了看到以后线程所看到的所有节点是无序的,这样就为前面须要排序提供了必要性。批改的局部代码如下:

//Children2Callback
@Override
public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
    System.out.println(threadName+"能看到的节点如下:");
    for (String child : children) {
        System.out.println(child);
    }
}

输入后果为:

能够看到第一个创立节点的线程为Thread-6,并且看到的节点都是无序的。并且节点的名字少了个/。
接下来就是对于以后线程,得判断它创立的锁是不是第一个,所以咱们先对children进行排序,而后再获取以后锁在children的地位,如果是第一个阐明该线程能够取得锁,执行latch.countdown(),这样该线程就能够去执行相应的工作了。如果不是第一个,那么就得判断前一个锁是否曾经开释,判断的办法为exists,如果后面一个节点不存在了,阐明曾经开释,对于exists办法有可能会呈现还没有胜利监控到前一个节点就呈现开释锁的状况,也就是exists执行失败了,没能监控前一个节点,那么阐明锁曾经开释,以后线程所须要进行的操作不在watcher中执行而是在回调函数中中执行,所以在这里exists的回调函数是必须的。

MyWatcherAndCallBack批改后的代码如下:
package org.qzx.lock;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
 * @Auther: qzx
 * @Date: 2020/10/29 - 10 - 29 - 4:03 下午
 * @Description: org.qzx.lock
 * @version: 1.0
 */
public class MyWatcherAndCallBack implements Watcher, AsyncCallback.StringCallback, AsyncCallback.Children2Callback, AsyncCallback.StatCallback {
    private ZooKeeper zooKeeper;
    private CountDownLatch latch = new CountDownLatch(1);
    private String threadName;
    private String pathName;

    public void setThreadName(String threadName) {
        this.threadName = threadName;
    }

    public void setZooKeeper(ZooKeeper zooKeeper) {
        this.zooKeeper = zooKeeper;
    }

    // Watcher
    @Override
    public void process(WatchedEvent event) {

    }

    // 抢占锁
    public void tryLock(){
        try {
            // 创立一个长期序列节点作为锁
            zooKeeper.create("/lcok",threadName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
                    this,"abc");
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    // 开释锁
    public void unlock(){

    }

    // StringCallback
    @Override
    public void processResult(int rc, String path, Object ctx, String name) {
        if(name!=null){
            System.out.println(threadName+"------>"+name);
            pathName = name;//相似于/lcok0000000000
            zooKeeper.getChildren("/",false,this,"aaa");
        }
    }

    //Children2Callback
    @Override
    public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
        Collections.sort(children);
        int index = children.indexOf(pathName.substring(1));
        if(index==0){
            // 以后线程创立的锁是第一个,能够取得
            latch.countDown();
        }else {
            // 不是第一个,得判断前一个锁是否曾经开释
            zooKeeper.exists("/"+children.get(index-1),this,this,"azz");
        }
    }

    // StatCallback
    @Override
    public void processResult(int rc, String path, Object ctx, Stat stat) {

    }
}

接下来须要对前一把锁的开释事件做解决,首先是在节点删除后,会触发节点删除工夫,在该线程中会做出响应,具体做法就是要么间接latch.coutdown()取得锁或者通过getChildren判断以后锁是否是第一个了,是第一个就取得锁。同时对于以后线程取得锁后开释锁进行解决,间接对其创立的节点进行删除即可。

MyWatcherAndCallBack批改后的代码如下:
package org.qzx.lock;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
 * @Auther: qzx
 * @Date: 2020/10/29 - 10 - 29 - 4:03 下午
 * @Description: org.qzx.lock
 * @version: 1.0
 */
public class MyWatcherAndCallBack implements Watcher, AsyncCallback.StringCallback, AsyncCallback.Children2Callback, AsyncCallback.StatCallback {
    private ZooKeeper zooKeeper;
    private CountDownLatch latch = new CountDownLatch(1);
    private String threadName;
    private String pathName;

    public void setThreadName(String threadName) {
        this.threadName = threadName;
    }

    public void setZooKeeper(ZooKeeper zooKeeper) {
        this.zooKeeper = zooKeeper;
    }

    // Watcher
    @Override
    public void process(WatchedEvent event) {
        switch (event.getType()) {
            case None:
                break;
            case NodeCreated:
                break;
            case NodeDeleted:
                // 前一把锁被删除,以后线程取得锁
//                zooKeeper.getChildren("/",false,this,"aaa");
                latch.countDown();
                break;
            case NodeDataChanged:
                break;
            case NodeChildrenChanged:
                break;
            case DataWatchRemoved:
                break;
            case ChildWatchRemoved:
                break;
        }
    }

    // 抢占锁
    public void tryLock(){
        try {
            // 创立一个长期序列节点作为锁
            zooKeeper.create("/lcok",threadName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
                    this,"abc");
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    // 开释锁
    public void unlock() throws KeeperException, InterruptedException {
        zooKeeper.delete(pathName,-1);// -1代表疏忽版本号
    }

    // StringCallback
    @Override
    public void processResult(int rc, String path, Object ctx, String name) {
        if(name!=null){
            System.out.println(threadName+"------>"+name);
            pathName = name;//相似于/lcok0000000000
            zooKeeper.getChildren("/",false,this,"aaa");
        }
    }

    //Children2Callback
    @Override
    public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
        Collections.sort(children);
        int index = children.indexOf(pathName.substring(1));
        if(index==0){
            // 以后线程创立的锁是第一个,能够取得
            latch.countDown();
        }else {
            // 不是第一个,得判断前一个锁是否曾经开释
            zooKeeper.exists("/"+children.get(index-1),this,this,"azz");
        }
    }

    // StatCallback
    @Override
    public void processResult(int rc, String path, Object ctx, Stat stat) {

    }
}

咱们当初运行程序能够看到每一个线程都能够取得锁并且程序执行。

对于上述代码,存在一个问题,以后去除主线程业务代码中睡眠的操作,就会呈现只有一个线程能够胜利取得锁并且执行响应的操作,其余线程会呈现相似于死锁的景象,然而这里不是死锁。这里的起因是执行速度太快了,很快就把以后线程取得的锁删除了,那么前面的线程在执行完排序,监控后面的锁就会呈现失败的状况,这里的一种解决办法就是在exists的回调函数中针对节点不存在,也就是stat==null的时候,从新调用getChildren办法判断以后是否是第一把锁,如果是就会执行。

MyWatcherAndCallBack批改后的代码如下:
package org.qzx.lock;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
 * @Auther: qzx
 * @Date: 2020/10/29 - 10 - 29 - 4:03 下午
 * @Description: org.qzx.lock
 * @version: 1.0
 */
public class MyWatcherAndCallBack implements Watcher, AsyncCallback.StringCallback, AsyncCallback.Children2Callback, AsyncCallback.StatCallback {
    private ZooKeeper zooKeeper;
    private CountDownLatch latch = new CountDownLatch(1);
    private String threadName;
    private String pathName;

    public void setThreadName(String threadName) {
        this.threadName = threadName;
    }

    public void setZooKeeper(ZooKeeper zooKeeper) {
        this.zooKeeper = zooKeeper;
    }

    // Watcher
    @Override
    public void process(WatchedEvent event) {
        switch (event.getType()) {
            case None:
                break;
            case NodeCreated:
                break;
            case NodeDeleted:
                // 前一把锁被删除,以后线程取得锁
//                zooKeeper.getChildren("/",false,this,"aaa");
                latch.countDown();
                break;
            case NodeDataChanged:
                break;
            case NodeChildrenChanged:
                break;
            case DataWatchRemoved:
                break;
            case ChildWatchRemoved:
                break;
        }
    }

    // 抢占锁
    public void tryLock(){
        try {
            // 创立一个长期序列节点作为锁
            zooKeeper.create("/lcok",threadName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
                    this,"abc");
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    // 开释锁
    public void unlock() throws KeeperException, InterruptedException {
        zooKeeper.delete(pathName,-1);
    }

    // StringCallback
    @Override
    public void processResult(int rc, String path, Object ctx, String name) {
        if(name!=null){
            System.out.println(threadName+"------>"+name);
            pathName = name;//相似于/lcok0000000000
            zooKeeper.getChildren("/",false,this,"aaa");
        }
    }

    //Children2Callback
    @Override
    public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
        Collections.sort(children);
        int index = children.indexOf(pathName.substring(1));
        if(index==0){
            // 以后线程创立的锁是第一个,能够取得
            latch.countDown();
        }else {
            // 不是第一个,得判断前一个锁是否曾经开释
            zooKeeper.exists("/"+children.get(index-1),this,this,"azz");
        }
    }

    // StatCallback
    @Override
    public void processResult(int rc, String path, Object ctx, Stat stat) {
        if(stat==null){
            // 监控失败,主动获取锁
            zooKeeper.getChildren("/",false,this,"aaa");
        }
    }
}

在主线程的睡眠操作去除掉后,程序运行的后果如下:

能够看到所有线程又从新失常运行了。
到此,zookeeper自定义分布式锁的小demo就编写结束。对于zookeeper分布式锁的所有代码整顿如下。

Utils类:
package org.qzx.lock;

import org.apache.zookeeper.ZooKeeper;
import org.qzx.config.DefaultWatcher;

import java.util.concurrent.CountDownLatch;

/**
 * @Auther: qzx
 * @Date: 2020/10/29 - 10 - 29 - 1:02 下午
 * @Description: org.qzx.config
 * @version: 1.0
 */
public class Utils {
    // zookeeper对象
    private static ZooKeeper zooKeeper;
    // 连贯地址
    private static String address = "10.211.55.5:2181,10.211.55.8:2181,10.211.55.9:2181,10.211.55.10:2181/test";
    private static DefaultWatcher defaultWatcher = new DefaultWatcher();
    // 锁
    private static CountDownLatch latch = new CountDownLatch(1);

    public static ZooKeeper getZooKeeper() throws Exception{
        zooKeeper = new ZooKeeper(address,3000,defaultWatcher);
        defaultWatcher.setLatch(latch);
        latch.await();
        return zooKeeper;
    }
}
TestLock类:
package org.qzx.lock;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.concurrent.TimeUnit;

/**
 * @Auther: qzx
 * @Date: 2020/10/29 - 10 - 29 - 3:54 下午
 * @Description: org.qzx.lock
 * @version: 1.0
 */
public class TestLock {
    private ZooKeeper zooKeeper;

    @Before
    public void conn() throws Exception {
        zooKeeper = Utils.getZooKeeper();
    }

    @After
    public void close() throws InterruptedException {
        zooKeeper.close();
    }

    @Test
    public void TestLock(){
        for (int i = 0; i < 10; i++) {
            new Thread(()->{
                MyWatcherAndCallBack myWatcherAndCallBack = new MyWatcherAndCallBack();
                myWatcherAndCallBack.setThreadName(Thread.currentThread().getName());
                myWatcherAndCallBack.setZooKeeper(zooKeeper);
                try {
                    // 抢占锁
                    myWatcherAndCallBack.tryLock();
                    // 业务代码
//                    TimeUnit.SECONDS.sleep(1);
                    System.out.println(Thread.currentThread().getName()+"is woorking!!!");
                    // 开释锁
                    myWatcherAndCallBack.unlock();
                } catch (InterruptedException | KeeperException e) {
                    e.printStackTrace();
                }
            }).start();
        }
        while (true){

        }
    }
}
MyWatcherAndCallBack类:
package org.qzx.lock;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
 * @Auther: qzx
 * @Date: 2020/10/29 - 10 - 29 - 4:03 下午
 * @Description: org.qzx.lock
 * @version: 1.0
 */
public class MyWatcherAndCallBack implements Watcher, AsyncCallback.StringCallback, AsyncCallback.Children2Callback, AsyncCallback.StatCallback {
    private ZooKeeper zooKeeper;
    private CountDownLatch latch = new CountDownLatch(1);
    private String threadName;
    private String pathName;

    public void setThreadName(String threadName) {
        this.threadName = threadName;
    }

    public void setZooKeeper(ZooKeeper zooKeeper) {
        this.zooKeeper = zooKeeper;
    }

    // Watcher
    @Override
    public void process(WatchedEvent event) {
        switch (event.getType()) {
            case None:
                break;
            case NodeCreated:
                break;
            case NodeDeleted:
                // 前一把锁被删除,以后线程取得锁
//                zooKeeper.getChildren("/",false,this,"aaa");
                latch.countDown();
                break;
            case NodeDataChanged:
                break;
            case NodeChildrenChanged:
                break;
            case DataWatchRemoved:
                break;
            case ChildWatchRemoved:
                break;
        }
    }

    // 抢占锁
    public void tryLock(){
        try {
            // 创立一个长期序列节点作为锁
            zooKeeper.create("/lcok",threadName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
                    this,"abc");
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    // 开释锁
    public void unlock() throws KeeperException, InterruptedException {
        zooKeeper.delete(pathName,-1);
    }

    // StringCallback
    @Override
    public void processResult(int rc, String path, Object ctx, String name) {
        if(name!=null){
            System.out.println(threadName+"------>"+name);
            pathName = name;//相似于/lcok0000000000
            zooKeeper.getChildren("/",false,this,"aaa");
        }
    }

    //Children2Callback
    @Override
    public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
        Collections.sort(children);
        int index = children.indexOf(pathName.substring(1));
        if(index==0){
            // 以后线程创立的锁是第一个,能够取得
            latch.countDown();
        }else {
            // 不是第一个,得判断前一个锁是否曾经开释
            zooKeeper.exists("/"+children.get(index-1),this,this,"azz");
        }
    }

    // StatCallback
    @Override
    public void processResult(int rc, String path, Object ctx, Stat stat) {
        if(stat==null){
            // 监控失败,主动获取锁
            zooKeeper.getChildren("/",false,this,"aaa");
        }
    }
}

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理