Redis 实现分布式锁+执行lua脚本
本篇来看看Redis 实现分布式锁的 步步演进过程 ,包含 setnx -> set -> 过期工夫 -> 误删锁 -> uuid管制锁误删-> lua脚本管制删锁的原子性
分布式锁,即分布式系统中的锁。在单体利用中咱们通过锁解决的是管制共享资源拜访的问题,而分布式锁,就是解决了分布式系统中管制共享资源拜访的问题。与单体利用不同的是,分布式系统中竞争共享资源的最小粒度从线程升级成了过程。
假如当初在redis中有 5000个 iphone14商品
上面来通过扣减这5000个iphone的案例 来一步步欠缺分布式的实现,让咱们可能更加了解这些改良的起因.
set iphone14 5000
1.无 分布式锁管制
无锁管制的时候 多个线程获取会同时获取到库存 而后进行扣减 会导致并发问题
String stock = stringRedisTemplate.opsForValue().get("iphone14");
if (stock != null) {
// 3.比拟并且扣减库存
long stockCount = Long.parseLong(stock);
if (stockCount > 0) {
// 4.设置库存
stringRedisTemplate.opsForValue().set("iphone14", String.valueOf(--stockCount));
}
}
ab测试后发现重大的超卖问题
ab -c 100 -n 5000 http://127.0.0.1:10010/deduct
2.应用 setnx 命令
能够通过 redis 的 setnx 命令 来增加锁, 这个命令的意思是 如果key不存在 才设置
, 这就模仿了如果他人没抢到锁我就加锁的意思
上面是应用 setnx 命令实现的分布式 看看会有什么问题?
public void deduct() {
// 1.获取redis 锁
while (Boolean.FALSE.equals(stringRedisTemplate.opsForValue().setIfAbsent("lock", "lockvalue"))) {
try {
TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
// 2.获取库存
String stock = stringRedisTemplate.opsForValue().get("iphone14");
if (stock != null) {
// 3.比拟并且扣减库存
long stockCount = Long.parseLong(stock);
if (stockCount > 0) {
// 4.设置库存
stringRedisTemplate.opsForValue().set("iphone14", String.valueOf(--stockCount));
}
}
} finally {
// 5.开释锁
stringRedisTemplate.delete("lock");
}
问题 (没有设置过期工夫)
能够看到在finally 外面咱们进行了开释锁的操作, 然而如果还没执行finally这里就宕机了
那这个lock 锁会始终存在就导致没有开释 产生了死锁, 所以须要给锁增加一个过期工夫
改良一
while (Boolean.FALSE.equals(stringRedisTemplate.opsForValue().setIfAbsent("lock", "lockvalue"))) {
try {
TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//给锁 增加过期工夫 ???问题: 刚刚要执行这行代码的时候 就宕机了 就会产生死锁
stringRedisTemplate.expire("lock",30,TimeUnit.SECONDS);
这里尽管在获取锁后 去给锁设置了过期工夫, 然而如果 刚刚获取锁就宕机了 也会导致死锁
咱们会发现须要 在设置锁的时候同时设置过期工夫, 须要这2个操作是原子性的
, 那么 setnx 命令就不适合了须要应用 set 命令
3. 应用 set 命令代替setnx (保障原子性)
从2.6.12版本开始,redis 为SET命令减少了一系列选项:
EX seconds 设置key的过期工夫,单位是秒
PX milliseconds 设置key的过期工夫,单位是毫秒
NX 只有键key不存在时,能力设置key的值
XX 只有键key存在时,能力设置key的值
set 命令是反对 NX XX 判断的 , NX 代表不存在才设置
, 并且同时反对 设置过期工夫
对应到 redistemplate 的办法就是 setifAbsent 是NX setIfPresent 对应XX
//保障了 设置key 和 过期工夫 两条命令的 的原子性
while (Boolean.FALSE.equals(
stringRedisTemplate.opsForValue().setIfAbsent("lock", "lockvalue", 3, TimeUnit.SECONDS))) {
try {
TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
// 2.获取库存
String stock = stringRedisTemplate.opsForValue().get("iphone14");
if (stock != null) {
// 3.比拟并且扣减库存
long stockCount = Long.parseLong(stock);
if (stockCount > 0) {
// 4.设置库存
stringRedisTemplate.opsForValue().set("iphone14", String.valueOf(--stockCount));
}
}
} finally {
// 5.开释锁
stringRedisTemplate.delete("lock");
}
问题(误删锁)
从下面代码能够发现 如果某个线程获取了锁 并且在执行业务的因为某些起因执行较慢 导致锁曾经到了过期工夫 主动开释了
, 那么 其余线程会获取到锁, 而后第一个线程执行实现后 又会去删除锁,而此时的锁曾经是其余线程的锁
, 导致了误删锁的状况
4. 增加UUID 避免误删锁
这里简略演示通过增加 uuid 来避免误删其余线程的锁
String uuid = UUID.randomUUID().toString();
while (Boolean.FALSE.equals(
stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid, 3, TimeUnit.SECONDS))) {
try {
TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
.....
finally {
// 5.开释锁 对锁进行了 uuid 的判断 判断是否是本人的锁
String lockValue = stringRedisTemplate.opsForValue().get("lock");
if (lockValue.equals(uuid)) {
stringRedisTemplate.delete("lock");
}
}
问题(误删锁 原子性问题)
能够发现还是有问题 , 尽管咱们比拟了 锁是不是本人的, 然而 还是有可能在刚刚比拟实现 equals 进去后 外面锁生效了 又被其余线程增加了新的锁
, 此时还是会有误删的可能
5. Lua 脚本 管制删除锁 的原子性
redis给lua 脚本留了口子通过 eval 命令运行lua脚本来 原子性执行lua脚本里的逻辑
lua 脚本多的就不介绍了, 只须要晓得看懂上面的代码即可
if 10 > 11
then return '10>11'
elseif 10 > 9
then return '10>9'
else
return 'nil'
end
同时 在lua里也能够调用 redis的命令 通过 redis.call(‘set’, ‘name’, ‘johnny’)
好了不介绍了 上面来看看 如何通过lua脚本来管制 删除锁的原子性
5.1 lua 脚本删除锁逻辑
if redis.call('get' , 'lock') == uuid
then
redis.call('del' , 'lock')
return 1
else
return 0
end
如果 此时 redis中 lock 锁的 uuid = 91dbc829-d44e-4f03-96d8-95a06f3ff975
转化成 能够执行的
if redis.call('get' , KEYS[1]) == ARGV[1] then redis.call('del' , KEYS[1]) return 1 else return 0 end 1 lock 91dbc829-d44e-4f03-96d8-95a06f3ff975**
5.2 redistemplate 执行lua脚本实现
private final String deleteLockLua =
"if redis.call('get' , KEYS[1]) == ARGV[1] then redis.call('del' , KEYS[1]) return 1 else return 0 end";
public void deduct() {
// 1.获取redis 锁
String uuid = UUID.randomUUID().toString();
while (Boolean.FALSE.equals(
stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid, 3, TimeUnit.SECONDS))) {
try {
TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
// 2.获取库存
String stock = stringRedisTemplate.opsForValue().get("iphone14");
if (stock != null) {
// 3.比拟并且扣减库存
long stockCount = Long.parseLong(stock);
if (stockCount > 0) {
// 4.设置库存
stringRedisTemplate.opsForValue().set("iphone14", String.valueOf(--stockCount));
}
}
} finally {
//通过 DefaultRedisScript 来执行 lua脚本
DefaultRedisScript<Boolean> redisScript = new DefaultRedisScript<>();
//Boolean 对应 lua脚本返回的 0 1
redisScript.setResultType(Boolean.class);
//指定须要执行的 lua脚本
redisScript.setScriptText(deleteLockLua);
// 5.开释锁
//留神 须要提供 List<K> keys, Object... args 代表 keys 和 ARGV
stringRedisTemplate.execute(redisScript, Collections.singletonList("lock"), uuid);
}
}
压测一下发现失常管制了库存
总结
本篇次要一步步演进手写redis分布式锁的实现, 包含 setnx -> set -> 过期工夫 -> 误删锁 -> uuid管制锁误删-> lua脚本管制删锁的原子性等等.. 其实目前还有问题, 包含 锁续期问题
以及 redis 可重入锁的问题
有机会在欠缺吧
须要留神 redis 中如何应用 lua脚本的
, 因为一些原子性操作就是须要lua脚本来管制 包含 redission 框架也是通过lua脚本实现的.
1. 分布式锁
排他性:setnx
避免死锁 须要设置过期工夫
过期工夫 和 设置key 又不是原子性 所以须要缓存 set key v ex 20 nx 命令 ,
避免误删 增加uuid
删除又不是原子性 所以 引入 lua 脚本 解决误删原子问题
if redis.call(‘get’ , ‘lock’) == uuid
then
redis.call(‘del’ , ‘lock’)
return 1
else
return 0
end
可重入解锁
if redis.call(‘hexists’, ‘lock’, ‘uuid’) == 0
then
return nil
elseif redis.call(‘hincrby’ , ‘lock’ , ‘uuid’, -1) == 0
then
return redis.call(‘del’, ‘lock’)
else
return 0
end
可重入加锁
if redis.call(‘exists’,’lock’) == 0 or redis.call(‘hexists’, ‘lock’, ‘uuid’) == 1 then redis.call(‘hincrby’, ‘uuid’, 1) redis.call(‘expire’, ‘lock’, 30) return 1 else return 0 end
欢送大家拜访 集体博客 Johnny小屋
欢送关注集体公众号