Zookeeper 分布式入门——ZK 分布式锁的简单实现
使用 InterProcessMutex 实现
InterProcessMutex 基于 Zookeeper 实现了分布式的公平可重入互斥锁,类似于单个 JVM 进程内的 ReentrantLock
1. 初始化 InterProcessMutex
private static InterProcessMutex mutex = new InterProcessMutex(client, “/curator/lock”);
2. 获取锁
// 获得了锁
public static boolean acquire(long time, TimeUnit unit){
try {return mutex.acquire(time,unit);
} catch (Exception e) {e.printStackTrace();
return false;
}
}
3. 释放锁
// 释放锁
public static void release(){
try {mutex.release();
} catch (Exception e) {e.printStackTrace();
}
}
测试:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
/**
* 基于 curator 的 zookeeper 分布式锁
* 这里我们开启 5 个线程,每个线程获取锁的最大等待时间为 5 秒,为了模拟具体业务场景,方法中设置 4 秒等待时间。*
*/
public class CuratorUtil {
private static String address = "192.168.12.101:2181";
public static void main(String[] args) {
//1、重试策略:初试时间为 1s 重试 3 次
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
//2、通过工厂创建连接
CuratorFramework client = CuratorFrameworkFactory.newClient(address, retryPolicy);
//3、开启连接
client.start();
//4 分布式锁
final InterProcessMutex mutex = new InterProcessMutex(client, "/curator/lock");
// 读写锁
//InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(client, "/readwriter");
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {fixedThreadPool.submit(new Runnable() {
@Override
public void run() {
boolean flag = false;
try {
// 尝试获取锁,最多等待 5 秒
flag = mutex.acquire(5, TimeUnit.SECONDS);
Thread currentThread = Thread.currentThread();
if(flag){System.out.println("线程"+currentThread.getId()+"获取锁成功");
}else{System.out.println("线程"+currentThread.getId()+"获取锁失败");
}
// 模拟业务逻辑,延时 4 秒
Thread.sleep(4000);
} catch (Exception e) {e.printStackTrace();
} finally{if(flag){
try {mutex.release();
} catch (Exception e) {e.printStackTrace();
}
}
}
}
});
}
}
}
这里我们开启 5 个线程,每个线程获取锁的最大等待时间为 5 秒,为了模拟具体业务场景,方法中设置 4 秒等待时间。开始执行 main 方法,通过 ZooInspector 监控 /curator/lock 下的节点如下图:
对,没错,设置 4 秒的业务处理时长就是为了观察生成了几个顺序节点。果然如案例中所述,每个线程都会生成一个节点并且还是有序的。观察控制台,我们会发现只有两个线程获取锁成功,另外三个线程超时获取锁失败会自动删除节点。线程执行完毕我们刷新一下 /curator/lock 节点,发现刚才创建的五个子节点已经不存在了。