乐趣区

Zookeeper分布式入门ZK分布式锁的简单实现

Zookeeper 分布式入门——ZK 分布式锁的简单实现
使用 InterProcessMutex 实现

InterProcessMutex 基于 Zookeeper 实现了分布式的公平可重入互斥锁,类似于单个 JVM 进程内的 ReentrantLock

1. 初始化 InterProcessMutex

private static InterProcessMutex mutex = new InterProcessMutex(client, “/curator/lock”);

2. 获取锁

    // 获得了锁
    public static boolean acquire(long time, TimeUnit unit){
        try {return mutex.acquire(time,unit);
        } catch (Exception e) {e.printStackTrace();
            return false;
        }
    }

3. 释放锁

    // 释放锁
    public static void release(){
        try {mutex.release();
        } catch (Exception e) {e.printStackTrace();
        }
    }

测试:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
/**
 * 基于 curator 的 zookeeper 分布式锁
 * 这里我们开启 5 个线程,每个线程获取锁的最大等待时间为 5 秒,为了模拟具体业务场景,方法中设置 4 秒等待时间。* 
 */
public class CuratorUtil {
    private static String address = "192.168.12.101:2181";
    
    public static void main(String[] args) {
        //1、重试策略:初试时间为 1s 重试 3 次
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); 
        //2、通过工厂创建连接
        CuratorFramework client = CuratorFrameworkFactory.newClient(address, retryPolicy);
        //3、开启连接
        client.start();
        //4 分布式锁
        final InterProcessMutex mutex = new InterProcessMutex(client, "/curator/lock"); 
        // 读写锁
        //InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(client, "/readwriter");
        
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
        
        for (int i = 0; i < 5; i++) {fixedThreadPool.submit(new Runnable() {
                @Override
                public void run() {
                    boolean flag = false;
                    try {
                        // 尝试获取锁,最多等待 5 秒
                        flag = mutex.acquire(5, TimeUnit.SECONDS);
                        Thread currentThread = Thread.currentThread();
                        if(flag){System.out.println("线程"+currentThread.getId()+"获取锁成功");
                        }else{System.out.println("线程"+currentThread.getId()+"获取锁失败");
                        }
                        // 模拟业务逻辑,延时 4 秒
                        Thread.sleep(4000);
                    } catch (Exception e) {e.printStackTrace();
                    } finally{if(flag){
                            try {mutex.release();
                            } catch (Exception e) {e.printStackTrace();
                            }
                        }
                    }
                }
            });
        }
    }
}

这里我们开启 5 个线程,每个线程获取锁的最大等待时间为 5 秒,为了模拟具体业务场景,方法中设置 4 秒等待时间。开始执行 main 方法,通过 ZooInspector 监控 /curator/lock 下的节点如下图:

对,没错,设置 4 秒的业务处理时长就是为了观察生成了几个顺序节点。果然如案例中所述,每个线程都会生成一个节点并且还是有序的。观察控制台,我们会发现只有两个线程获取锁成功,另外三个线程超时获取锁失败会自动删除节点。线程执行完毕我们刷新一下 /curator/lock 节点,发现刚才创建的五个子节点已经不存在了。

退出移动版