关于java:Redis-实现分布式锁执行lua脚本

8次阅读

共计 5385 个字符,预计需要花费 14 分钟才能阅读完成。

Redis 实现分布式锁 + 执行 lua 脚本

本篇来看看 Redis 实现分布式锁的 步步演进过程 , 包含 setnx -> set -> 过期工夫 -> 误删锁 -> uuid 管制锁误删 -> lua 脚本管制删锁的原子性

分布式锁,即分布式系统中的锁。在单体利用中咱们通过锁解决的是 管制共享资源拜访 的问题,而分布式锁,就是解决了 分布式系统中管制共享资源拜访 的问题。与单体利用不同的是,分布式系统中竞争共享资源的最小粒度从线程升级成了过程。

假如当初在redis 中有 5000 个 iphone14 商品 上面来通过扣减这 5000 个 iphone 的案例 来一步步欠缺分布式的实现, 让咱们可能更加了解这些改良的起因.

set iphone14 5000

1. 无 分布式锁管制

无锁管制的时候 多个线程获取会同时获取到库存 而后进行扣减 会导致并发问题

String stock = stringRedisTemplate.opsForValue().get("iphone14");
if (stock != null) {
    // 3. 比拟并且扣减库存
    long stockCount = Long.parseLong(stock);
    if (stockCount > 0) {
        // 4. 设置库存
        stringRedisTemplate.opsForValue().set("iphone14", String.valueOf(--stockCount));
    }
}

ab 测试后发现重大的超卖问题

ab -c 100 -n 5000 http://127.0.0.1:10010/deduct

2. 应用 setnx 命令

能够通过 redis 的 setnx 命令 来增加锁, 这个命令的意思是 如果 key 不存在 才设置, 这就模仿了如果他人没抢到锁我就加锁的意思

上面是应用 setnx 命令实现的分布式 看看会有什么问题?

    public void deduct() {
        // 1. 获取 redis 锁

        while (Boolean.FALSE.equals(stringRedisTemplate.opsForValue().setIfAbsent("lock", "lockvalue"))) {
            try {TimeUnit.MILLISECONDS.sleep(50);
            } catch (InterruptedException e) {e.printStackTrace();
            }
        }
      try {
        // 2. 获取库存
        String stock = stringRedisTemplate.opsForValue().get("iphone14");
        if (stock != null) {
          // 3. 比拟并且扣减库存
          long stockCount = Long.parseLong(stock);
          if (stockCount > 0) {
            // 4. 设置库存
            stringRedisTemplate.opsForValue().set("iphone14", String.valueOf(--stockCount));
          }
        }
      } finally {
        // 5. 开释锁
        stringRedisTemplate.delete("lock");
      }

问题 (没有设置过期工夫)

能够看到在 finally 外面咱们进行了开释锁的操作, 然而如果还没执行 finally 这里就宕机了 那这个 lock 锁会始终存在就导致没有开释 产生了死锁, 所以须要给锁增加一个过期工夫

改良一

        while (Boolean.FALSE.equals(stringRedisTemplate.opsForValue().setIfAbsent("lock", "lockvalue"))) {
            try {TimeUnit.MILLISECONDS.sleep(50);
            } catch (InterruptedException e) {e.printStackTrace();
            }
        }
        // 给锁 增加过期工夫  ??? 问题: 刚刚要执行这行代码的时候 就宕机了 就会产生死锁
        stringRedisTemplate.expire("lock",30,TimeUnit.SECONDS);

这里尽管在获取锁后 去给锁设置了过期工夫, 然而 如果 刚刚获取锁就宕机了 也会导致死锁

咱们会发现须要 在设置锁的时候同时设置过期工夫, 须要这 2 个操作是原子性的 , 那么 setnx 命令就不适合了 须要应用 set 命令

3. 应用 set 命令代替 setnx (保障原子性)

2.6.12 版本 开始,redis 为 SET 命令减少了一系列选项:

EX seconds 设置 key 的过期工夫,单位是秒

PX milliseconds 设置 key 的过期工夫,单位是毫秒

NX 只有键 key 不存在时,能力设置 key 的值

XX 只有键 key 存在时,能力设置 key 的值

set 命令是反对 NX XX 判断的 , NX 代表不存在才设置, 并且同时反对 设置过期工夫

对应到 redistemplate 的办法就是 setifAbsent 是 NX setIfPresent 对应 XX

       // 保障了 设置 key 和 过期工夫 两条命令的 的原子性 
        while (Boolean.FALSE.equals(stringRedisTemplate.opsForValue().setIfAbsent("lock", "lockvalue", 3, TimeUnit.SECONDS))) {
            try {TimeUnit.MILLISECONDS.sleep(50);
            } catch (InterruptedException e) {e.printStackTrace();
            }
        }
        try {
            // 2. 获取库存
            String stock = stringRedisTemplate.opsForValue().get("iphone14");
            if (stock != null) {
                // 3. 比拟并且扣减库存
                long stockCount = Long.parseLong(stock);
                if (stockCount > 0) {
                    // 4. 设置库存
                    stringRedisTemplate.opsForValue().set("iphone14", String.valueOf(--stockCount));
                }
            }
        } finally {
            // 5. 开释锁
            stringRedisTemplate.delete("lock");
        }

问题(误删锁)

从下面代码能够发现 如果某个线程获取了锁 并且在执行业务的因为某些起因 执行较慢 导致锁曾经到了过期工夫 主动开释了 , 那么 其余线程会获取到锁, 而后第一个线程执行实现后 又会去删除锁, 而此时的锁曾经是其余线程的锁 , 导致了误删锁的状况

4. 增加 UUID 避免误删锁

这里简略演示通过增加 uuid 来避免误删其余线程的锁

  String uuid = UUID.randomUUID().toString();
        while (Boolean.FALSE.equals(stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid, 3, TimeUnit.SECONDS))) {
            try {TimeUnit.MILLISECONDS.sleep(50);
            } catch (InterruptedException e) {e.printStackTrace();
            }
        }
   .....
             finally {
            // 5. 开释锁  对锁进行了 uuid 的判断 判断是否是本人的锁
            String lockValue = stringRedisTemplate.opsForValue().get("lock");
            if (lockValue.equals(uuid)) {stringRedisTemplate.delete("lock");
            }
        }

问题(误删锁 原子性问题)

能够发现还是有问题 , 尽管咱们比拟了 锁是不是本人的, 然而 还是 有可能在刚刚比拟实现 equals 进去后 外面锁生效了 又被其余线程增加了新的锁, 此时还是会有误删的可能

5. Lua 脚本 管制删除锁 的原子性

redis 给 lua 脚本留了口子通过 eval 命令运行 lua 脚本来 原子性执行 lua 脚本里的逻辑

lua 脚本多的就不介绍了, 只须要晓得看懂上面的代码即可

 if  10 > 11 
 then return '10>11'
 elseif 10 > 9
 then return '10>9'
 else
      return 'nil'
 end

同时 在 lua 里也能够调用 redis 的命令 通过 redis.call(‘set’, ‘name’, ‘johnny’)

好了不介绍了 上面来看看 如何通过 lua 脚本来管制 删除锁的原子性

5.1 lua 脚本删除锁逻辑
if redis.call('get' , 'lock') == uuid 
then 
   redis.call('del' , 'lock')
   return 1
else 
  return 0 
end

如果 此时 redis 中 lock 锁的 uuid = 91dbc829-d44e-4f03-96d8-95a06f3ff975

转化成 能够执行的

if redis.call('get' , KEYS[1]) == ARGV[1] then  redis.call('del' , KEYS[1])  return 1 else  return 0 end 1 lock 91dbc829-d44e-4f03-96d8-95a06f3ff975** 

5.2 redistemplate 执行 lua 脚本实现
 private final String deleteLockLua =
            "if redis.call('get', KEYS[1]) == ARGV[1] then  redis.call('del', KEYS[1])  return 1 else  return 0 end";

    public void deduct() {
        // 1. 获取 redis 锁
        String uuid = UUID.randomUUID().toString();
        while (Boolean.FALSE.equals(stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid, 3, TimeUnit.SECONDS))) {
            try {TimeUnit.MILLISECONDS.sleep(50);
            } catch (InterruptedException e) {e.printStackTrace();
            }
        }
        try {
            // 2. 获取库存
            String stock = stringRedisTemplate.opsForValue().get("iphone14");
            if (stock != null) {
                // 3. 比拟并且扣减库存
                long stockCount = Long.parseLong(stock);
                if (stockCount > 0) {
                    // 4. 设置库存
                    stringRedisTemplate.opsForValue().set("iphone14", String.valueOf(--stockCount));
                }
            }
        } finally {
                        // 通过 DefaultRedisScript 来执行 lua 脚本
            DefaultRedisScript<Boolean> redisScript = new DefaultRedisScript<>();
            //Boolean 对应 lua 脚本返回的 0 1 
            redisScript.setResultType(Boolean.class);
            // 指定须要执行的 lua 脚本
            redisScript.setScriptText(deleteLockLua);
            // 5. 开释锁
            // 留神 须要提供 List<K> keys, Object... args 代表 keys 和 ARGV
            stringRedisTemplate.execute(redisScript, Collections.singletonList("lock"), uuid);
        }
    }

压测一下发现失常管制了库存

总结

本篇次要一步步演进手写 redis 分布式锁的实现, 包含 setnx -> set -> 过期工夫 -> 误删锁 -> uuid 管制锁误删 -> lua 脚本管制删锁的原子性等等.. 其实目前还有问题, 包含 锁续期问题 以及 redis 可重入锁的问题 有机会在欠缺吧

须要留神 redis 中如何应用 lua 脚本的, 因为一些原子性操作就是须要 lua 脚本来管制 包含 redission 框架也是通过 lua 脚本实现的.

1. 分布式锁

排他性:setnx

避免死锁 须要设置过期工夫

过期工夫 和 设置 key 又不是原子性 所以须要缓存 set key v ex 20 nx 命令 ,

避免误删 增加 uuid

删除又不是原子性 所以 引入 lua 脚本 解决误删原子问题

if redis.call(‘get’ , ‘lock’) == uuid

then

redis.call(‘del’ , ‘lock’)

return 1

else

return 0

end

可重入解锁

if redis.call(‘hexists’, ‘lock’, ‘uuid’) == 0

then

return nil

elseif redis.call(‘hincrby’ , ‘lock’ , ‘uuid’, -1) == 0

then

return redis.call(‘del’, ‘lock’)

else

​ return 0

end

可重入加锁

if redis.call(‘exists’,’lock’) == 0 or redis.call(‘hexists’, ‘lock’, ‘uuid’) == 1 then redis.call(‘hincrby’, ‘uuid’, 1) redis.call(‘expire’, ‘lock’, 30) return 1 else return 0 end

欢送大家拜访 集体博客 Johnny 小屋
欢送关注集体公众号

正文完
 0