共计 3596 个字符,预计需要花费 9 分钟才能阅读完成。
一、前言
分布式锁在理论工作中的利用还是比拟多的,其实现形式也有很多种,常见的有 基于数据库锁、基于 zookeeper、基于 redis
的,明天咱们来讲下基于 redis 实现的分布式锁。
redisson 是一个 redis 客户端框架,提供了 分布式锁
的性能个性,这里咱们通过解析 redisson 的源码来剖析它是如何基于 redis 来实现分布式锁的?
二、源码解析
2.1 样例代码
上面是一个分布式锁的简略样例代码
// 初始化配置,创立 Redisson 客户端
Config config = new Config();
config.setCodec(new JsonJacksonCodec())
.useSingleServer()
.setAddress("redis://192.168.10.131:6379");
RedissonClient client = Redisson.create(config);
// 获取分布式锁
RLock lock = client.getLock("myLock");
lock.lock();
System.out.println(Thread.currentThread().getId() + ": 获取到分布式锁");
try {Thread.sleep(60 * 1000);
} catch (Exception e) {e.printStackTrace();
} finally {
// 解锁
lock.unlock();}
下面的样例代码比较简单,通过 redisson 客户端获取一个分布式锁,该分布式锁的 key 为 myLock,睡眠 60 秒之后开释锁。这里比拟重要的是 lock()
办法,该办法是获取锁的具体步骤,所以接下来具体解析一下该办法
2.2 整体流程
获取锁的流程图如下
具体流程为:
- 第一次尝试获取锁,如果获取到锁,间接返回。如果未获取到锁,返回锁的残余过期工夫 ttl
- 当未获取到锁时,订阅频道 redisson_lock__channel:{myLock}(
订阅该频道的作用是,当该分布式锁被其余拥有者所开释时,会往该订阅频道发送一个解锁音讯 UNLOCK_MESSAGE,这时以后期待该分布式锁的线程会中断期待,并再次尝试获取锁
) - 开启死循环,尝试获取锁,如果未获取到锁,拿到锁的残余过期工夫,并期待该锁的残余过期工夫(
两头过程中如果订阅频道有解锁音讯 UNLOCK_MESSAGE,会提前中断期待,持续循环
),直到获取锁,退出循环 - 获取到锁之后,勾销订阅频道
源码如下
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {long threadId = Thread.currentThread().getId();
// 1、第一次尝试获取锁,ttl 为 null,示意获取到锁,间接 return
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
if (ttl == null) {return;}
// 2、订阅频道 redisson_lock__channel:{myLock}
CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
pubSub.timeout(future);
RedissonLockEntry entry;
if (interruptibly) {entry = commandExecutor.getInterrupted(future);
} else {entry = commandExecutor.get(future);
}
try {
// 3、开启循环
while (true) {
// 再次尝试获取锁,ttl 为 null,示意获取到锁,退出循环
ttl = tryAcquire(-1, leaseTime, unit, threadId);
if (ttl == null) {break;}
// 如果 ttl 大于等于 0
if (ttl >= 0) {
try {
// 期待 ttl 工夫 或者 接管到解锁音讯
entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {...}
} else { // 如果 ttl 小于 0,阐明该锁未设置过期工夫,期待接管解锁音讯
if (interruptibly) {entry.getLatch().acquire();} else {entry.getLatch().acquireUninterruptibly();}
}
}
} finally {// 退出订阅频道 redisson_lock__channel:{myLock}
unsubscribe(entry, threadId);
}
}
2.3 锁的获取
那么如何示意以后线程获取到锁?
redisson 中的分布式锁本质上是个 hash 构造的数据,假如锁的名称为 myLock,那么当某个线程获取到锁之后,会在这个 hash 构造里设置一个 hashkey,其为 【连贯管理器 id】:【线程 id】
,如下图
redisson 通过执行 lua 脚本来获取锁,lua 脚本如下
// 如果锁不存在,则胜利获取到锁,设置锁的过期工夫,并返回 nil
if (redis.call('exists', KEYS[1]) == 0) then
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
// 如果锁已存在,判断是否是以后线程曾经获取到,如果是,对应的值加 1
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
// 否则示意未获取到锁,返回锁的过期工夫
return redis.call('pttl', KEYS[1]);
该 lua 脚本的次要作用是
- 如果锁不存在,则胜利获取到锁,设置锁的过期工夫(
lockWatchdogTimeout 默认是 30 秒
),并返回 nil - 如果锁已存在,判断是否是以后线程曾经获取到,如果是,对应的值加 1
- 否则示意未获取到锁,返回锁的过期工夫
这里的第一步为什么要设置锁的过期工夫?其实是为了当锁的拥有者挂了之后,防止锁始终存在,导致其余利用永远无奈获取到锁
。
2.4 锁续期
那么既然锁设置了过期工夫,那很天然地想到,如果在锁过期的这段时间内,领有锁的线程还未执行完业务逻辑,这时锁主动过期,导致其余利用也获取到了锁,从而产生逻辑谬误。所以引入了 锁续期
。
当获取到锁时,redisson 会启动一个看门狗,该看门狗每隔 lockWatchdogTimeout / 3 秒续期一次锁(假如 lockWatchdogTimeout 默认为 30 秒,则每隔 10 秒续期锁)
,源码如下
private void renewExpiration() {
...
// 1、创立一个 10 秒后执行的提早工作
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
...
// 2、执行续期锁的 lua 脚本
CompletionStage<Boolean> future = renewExpirationAsync(threadId);
future.whenComplete((res, e) -> {
...
// 3、res 为 true,代表锁续期胜利,从新调用该办法,持续创立提早工作
// false 示意锁续期失败
if (res) {renewExpiration();
} else {cancelExpirationRenewal(null);
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
}
续期锁的 lua 脚本如下:
// 如果锁存在这个 hashkey,从新设置锁的过期工夫
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('pexpire', KEYS[1], ARGV[1]);
return 1;
end;
return 0;
到这里,redisson 实现分布式锁的源码解析就完结了。
三、总结
redisson 的源码中大量应用了异步编程,这导致浏览源码的难度系数较高,这里我也只是大略整顿了一下,有问题的同学能够相互讨论一下或自行查阅源码。