乐趣区

关于redis:Redis分布式锁正确打开方式

作者:京东保险 张江涛

1、为什么要有分布式锁?

JUC 提供的锁机制,能够保障在同一个 JVM 过程中同一时刻只有一个线程执行操作逻辑;

多服务多节点的状况下,就意味着有多个 JVM 过程,要做到这样,就须要有一个中间人;

分布式锁就是用来保障在同一时刻,仅有一个 JVM 过程中的一个线程在执行操作逻辑;

换句话说,JUC 的锁和分布式锁都是一种爱护系统资源的措施。尽可能将并发带来的不确定性转换为同步的确定性;

2、分布式锁个性(五大个性 十分重要)

个性 1:互斥性。在任意时刻,只有一个客户端能持有锁。

个性 2:不会产生死锁。即便有一个客户端在持有锁的期间解体而没有被动解锁,也能保障后续其余客户端能加锁。

个性 3:解铃还须系铃人。加锁和解锁必须是同一个客户端(线程),客户端本人不能把他人加的锁给解了。

个性 4:可重入性。同一个现线程曾经获取到锁,可再次获取到锁。

个性 5:具备容错性。只有大部分的分布式锁节点失常运行,客户端就能够加锁和解锁。

2-1 常见分布式锁的三种实现形式

1. 数据库锁;2. 基于 ZooKeeper 的分布式锁;3. 基于 Redis 的分布式锁。

2-2 本文咱们次要聊 redis 实现 分布式 锁:

一个 setnx 就行了?value 没意义?还有人认为 incr 也能够?再加个超时工夫就行了?

3、分布式锁个性 2 之不会产生死锁

很多线程去上锁,谁锁胜利谁就有权力执行操作逻辑,其余线程要么间接走抢锁失败的逻辑,要么自旋尝试抢锁;

• 比方说 A 线程竞争到了锁,开始执行操作逻辑(代码逻辑演示中,应用 Jedis 客户端为例);

public static void doSomething() {
    // RedisLock 是封装好的一个类
    RedisLock redisLock = new RedisLock(jedis); // 创立 jedis 实例的代码省略,不是重点
    try {redisLock.lock(); // 上锁
        
        // 解决业务
        System.out.println(Thread.currentThread().getName() + "线程解决业务逻辑中...");
        Thread.sleep(2000);
        System.out.println(Thread.currentThread().getName() + "线程解决业务逻辑结束");
        
        redisLock.unlock(); // 开释锁} catch (Exception e) {e.printStackTrace();
    }
}

• 失常状况下,A 线程执行完操作逻辑后,应该将锁开释。如果说执行过程中抛出异样,程序不再持续走失常的开释锁流程,没有开释锁怎么办?所以咱们想到:

开释锁的流程肯定要在 finally{} 块中执行,当然,上锁的流程肯定要在 finally{} 对应的 try{} 块中,否则 finally{} 就没用了,如下:

public static void doSomething() {RedisLock redisLock = new RedisLock(jedis); // 创立 jedis 实例的代码省略,不是重点
    try {redisLock.lock(); // 上锁,必须在 try{}中
        
        // 解决业务
        System.out.println(Thread.currentThread().getName() + "线程解决业务逻辑中...");
        Thread.sleep(2000);
        System.out.println(Thread.currentThread().getName() + "线程解决业务逻辑结束");
    } catch (Exception e) {e.printStackTrace();
    } finally {redisLock.unlock(); // 在 finally{} 中开释锁}
}

写法留神:redisLock.lock(); 上分布式锁,必须在 try{} 中。

在 JAVA 多线程中 lock.lock(); 单机多线程 加锁操作须要在 try{} 之前。

3-1 redisLock.unlock() 放在 finally{} 块中就行了吗?还须要设置超时工夫

如果在执行 try{} 中逻辑的时候,程序呈现了 System.exit(0); 或者 finally{} 中执行异样,比方说连贯不上 redis-server 了;或者还未执行到 finally{}的时候,JVM 过程挂掉了,服务宕机;这些状况都会导致没有胜利开释锁,别的线程始终拿不到锁,怎么办?如果我的零碎因为一个节点影响,别的节点也都无奈失常提供服务了,那我的零碎也太弱了。所以咱们想到必须要将危险升高,能够给锁设置一个超时工夫,比方说 1 秒,即使产生了上边的状况,那我的锁也会在 1 秒之后主动开释,其余线程就能够获取到锁,接班干活了;

public static final String lock_key = "zjt-lock";
 
     public void lock() {while (!tryLock()) {
            try {Thread.sleep(50); // 在 while 中自旋,如果说读者想设置一些自旋次数,期待最大时长等本人去扩大,不是此处的重点
            } catch (InterruptedException e) {e.printStackTrace();
            }
        }
        
        System.out.println("线程:" + threadName + ",占锁胜利!★★★");
     }
 
      private boolean tryLock() {SetParams setParams = new SetParams();
        setParams.ex(1); // 超时工夫 1s
        setParams.nx();  // nx
        String response = jedis.set(lock_key, "", setParams); // 转换为 redis 命令就是:set zjt-key"" ex 1 nx
        return "OK".equals(response);
     }

留神,上锁的时候,设置 key 和设置超时工夫这两个操作要是 原子性 的,要么都执行,要么都不执行。

Redis 原生反对:

// http://redis.io/commands/set.html
SET key value [EX seconds] [PX milliseconds] [NX|XX]

不要在代码里边分两次调用:

set k v
exipre k time

3-2 锁的超时工夫该怎么计算?

方才假如的超时工夫 1s 是怎么计算的?这个工夫该设多少适合呢?

锁中的业务逻辑的执行工夫,个别是咱们在测试环境进行屡次测试,而后在压测环境多轮压测之后,比方说计算出均匀的执行工夫是 200ms,锁的超时工夫放大 3 - 5 倍,比方这里咱们设置为 1s,为啥要放大,因为如果锁的操作逻辑中有网络 IO 操作,线上的网络不会总一帆风顺,咱们要给网络抖动留有缓冲工夫。另外,如果你设置 10s,果然产生了宕机,那意味着这 10s 两头,你的这个分布式锁的服务全副节点都是不可用的,这个和你的业务以及零碎的可用性有挂钩,要掂量,要谨慎(后边 3 -13 会再具体聊)。那如果一个节点宕机之后能够告诉 redis-server 开释锁吗?留神,我是宕机,不可控力,断电了兄弟,告诉不了的。

回头一想,如果我是优雅停机呢,我不是 kill -9,也不是断电,这样仿佛能够去做一些编码去开释锁,你能够参考下 JVM 的钩子、Dubbo 的优雅停机、或者 linux 过程级通信技术来做这件事件。当然也能够手动停服务后,手动删除掉 redis 中的锁。

4、分布式锁个性 3: 解铃还须系铃人

如果说 A 线程在执行操作逻辑的过程中,别的线程间接进行了开释锁的操作,是不是就出问题了?

什么?别的线程没有取得锁却间接执行了开释锁??当初是 A 线程上的锁,那必定只能 A 线程开释锁呀!别的线程开释锁算怎么回事?联想 ReentrantLock 中的 isHeldByCurrentThread()办法,所以咱们想到,必须在锁上加个标记,只有上锁的线程 A 线程晓得,相当于是一个密语,也就是说开释锁的时候,首先先把密语和锁上的标记进行匹配,如果匹配不上,就没有权力开释锁;

   private boolean tryLock() {SetParams setParams = new SetParams();
        setParams.ex(1); // 超时工夫 1s
        setParams.nx();  // nx
        String response = jedis.set(lock_key, "", setParams); // 转换为 redis 命令就是:set zjt_key"" ex 1 nx
        return "OK".equals(response);
    }
  
    // 别的线程间接调用开释锁操作,分布式锁解体!public void unlock() {jedis.del(encode(lock_key));
        System.out.println("线程:" + threadName + "开释锁胜利!☆☆☆");
    }
 
     private byte[] encode(String param) {return param.getBytes();
    }

4-1 这个密语 value(约定)设置成什么呢?

很多同学说设置成一个 UUID 就行了,上锁之前,在该线程代码中生成一个 UUID,将这个作为秘钥,存在锁键的 value 中,开释锁的时候,用这个进行校验,因为只有上锁的线程晓得这个秘钥,别的线程是不晓得的。这个可行吗,当然可行。

   String releaseLock_lua = "if redis.call(\"get\",KEYS[1]) == ARGV[1] \n" + 
                "then\n" + 
                "return redis.call(\"del\", KEYS[1])\n" + 
                "else\n" + 
                "return 0\n" + 
                "end";
    
    private boolean tryLock(String uuid) {SetParams setParams = new SetParams();
        setParams.ex(1); // 超时工夫 1s
        setParams.nx();  // nx
        String response = jedis.set(lock_key, uuid, setParams); // 转换为 redis 命令就是:set zjt-key "" ex 1 nx
        return "OK".equals(response);
    }
 
     public void unlock(String uuid) {List<byte[]> keys = Arrays.asList(encode(lock_key));
        List<byte[]> args = Arrays.asList(encode(uuid));
           
           // 应用 lua 脚本,保障原子性
        long eval = (Long) jedis.eval(encode(releaseLock_lua), keys, args);
        if (eval == 1) {System.out.println("线程:" + threadName + "开释锁胜利!☆☆☆");
        } else {System.out.println("线程:" + threadName + "开释锁失败!该线程未持有锁!!!");
        }
        
    }
 
     private byte[] encode(String param) {return param.getBytes();
    }

为什么应用 lua 脚本?因为保障原子性

因为是两个操作,如果分两步那就是:

get k // 进行秘钥 value 的比对
del k // 比对胜利后,删除 k 

如果第一步比对胜利后,第二步还没来得及执行的时候,锁到期,而后紧接着别的线程获取到锁,里边的 uuid 曾经变了,也就是说持有锁的线程曾经不是该线程了,此时再执行第二步的删除锁操作,必定是谬误的了。

5. 分布式锁个性 4 之可重入性

作为一把锁,咱们在应用 synchronized、ReentrantLock 的时候是不是有可重入性?

那咱们这把分布式锁该如何实现可重入呢?如果 A 线程的锁办法逻辑中调用了 x()办法,x()办法中也须要获取这把锁,依照这个逻辑,x()办法中的锁应该重入进去即可,那是不是须要将方才生成的这个 UUID 秘钥传递给 x()办法?怎么传递?用参数传递就会侵入业务代码

5-1 不侵入业务代码实现可重入:Thread-Id

咱们次要是想给上锁的 A 线程设置一个只有它本人晓得的秘钥,把思路时钟往回拨,想想:

线程自身的 id(Thread.currentThread().getId())是不是就是一个惟一标识呢?咱们把秘钥 value 设置为线程的 id 不就行了。

   String releaseLock_lua = "if redis.call(\"get\",KEYS[1]) == ARGV[1] \n" + 
                "then\n" + 
                "return redis.call(\"del\", KEYS[1])\n" + 
                "else\n" + 
                "return 0\n" + 
                "end";
    String addLockLife_lua = "if redis.call(\"exists\", KEYS[1]) == 1\n" + 
                "then\n" + 
                "return redis.call(\"expire\", KEYS[1], ARGV[1])\n" + 
                "else\n" + 
                "return 0\n" + 
                "end";
        
     public void lock() {
             // 判断是否可重入
        if (isHeldByCurrentThread()) {return;}
        
        while (!tryLock()) {
            try {Thread.sleep(50); // 自旋
            } catch (InterruptedException e) {e.printStackTrace();
            }
        }
        
        System.out.println("线程:" + threadName + ",占锁胜利!★★★");
    }
 
   // 是否是以后线程占有锁,同时将超时工夫从新设置,这个很重要,同样也是原子操作
     private boolean isHeldByCurrentThread() {List<byte[]> keys = Arrays.asList(encode(lock_key));
        List<byte[]> args = Arrays.asList(encode(String.valueOf(threadId)), encode(String.valueOf(1)));
        
        long eval = (Long) jedis.eval(encode(addLockLife_lua), keys, args);
        return eval == 1;
    }
    
    private boolean tryLock(String uuid) {SetParams setParams = new SetParams();
        setParams.ex(1); // 超时工夫 1s
        setParams.nx();  // nx
        String response = jedis.set(lock_key, String.valueOf(threadId), setParams); // 转换为 redis 命令就是:set zjt-key xxx ex 1 nx
        return "OK".equals(response);
    }
 
     public void unlock(String uuid) {List<byte[]> keys = Arrays.asList(encode(lock_key));
        List<byte[]> args = Arrays.asList(encode(String.valueOf(threadId)));
           
        // 应用 lua 脚本,保障原子性
        long eval = (Long) jedis.eval(encode(releaseLock_lua), keys, args);
        if (eval == 1) {System.out.println("线程:" + threadName + "开释锁胜利!☆☆☆");
        } else {System.out.println("线程:" + threadName + "开释锁失败!该线程未持有锁!!!");
        }
        
    }
 
     private byte[] encode(String param) {return param.getBytes();
    }

5-2 Thread-Id 真能行吗?不行。

想想,咱们说一个 Thread 的 id 是惟一 的,是在同一个 JVM 过程中,是在一个操作系统中,也就是在一个机器中。而事实是,咱们的部署是集群部署,多个实例节点,那意味着会存在这样一种状况,S1 机器上的线程上锁胜利,此时锁中秘钥 value 是线程 id=1,如果说同一时间 S2 机器中,正好线程 id= 1 的线程尝试取得这把锁,比对秘钥发现胜利,后果也重入了这把锁,也开始执行逻辑,此时,咱们的分布式锁解体!怎么解决?咱们只须要在每个节点中保护不同的标识即可,怎么保护呢?利用启动的时候,应用 UUID 生成一个惟一标识 APP_ID,放在内存中(或者应用 zookeeper 去调配机器 id 等等)。此时,咱们的 秘钥 value 这样存即可:APP_ID+ThreadId

   // static 变量,final 润饰,加载在内存中,JVM 过程生命周期中不变
   private static final String APP_ID = UUID.randomUUID().toString();

    String releaseLock_lua = "if redis.call(\"get\",KEYS[1]) == ARGV[1] \n" + 
                "then\n" + 
                "return redis.call(\"del\", KEYS[1])\n" + 
                "else\n" + 
                "return 0\n" + 
                "end";
    String addLockLife_lua = "if redis.call(\"exists\", KEYS[1]) == 1\n" + 
                "then\n" + 
                "return redis.call(\"expire\", KEYS[1], ARGV[1])\n" + 
                "else\n" + 
                "return 0\n" + 
                "end";
        
     public void lock() {
             // 判断是否可重入
        if (isHeldByCurrentThread()) {return;}
        
        while (!tryLock()) {
            try {Thread.sleep(50); // 自旋
            } catch (InterruptedException e) {e.printStackTrace();
            }
        }
        
        System.out.println("线程:" + threadName + ",占锁胜利!★★★");
    }
 
    // 是否是以后线程占有锁,同时将超时工夫从新设置,这个很重要,同样也是原子操作
     private boolean isHeldByCurrentThread() {List<byte[]> keys = Arrays.asList(encode(lock_key));
        List<byte[]> args = Arrays.asList(encode(APP_ID + String.valueOf(threadId)), encode(String.valueOf(1)));
        
        long eval = (Long) jedis.eval(encode(addLockLife_lua), keys, args);
        return eval == 1;
    }
    
    private boolean tryLock(String uuid) {SetParams setParams = new SetParams();
        setParams.ex(1); // 超时工夫 1s
        setParams.nx();  // nx
        String response = jedis.set(lock_key, APP_ID + String.valueOf(threadId), setParams); // 转换为 redis 命令就是:set zjt-key xxx ex 1 nx
        return "OK".equals(response);
    }
 
     public void unlock(String uuid) {List<byte[]> keys = Arrays.asList(encode(lock_key));
        List<byte[]> args = Arrays.asList(encode(APP_ID + String.valueOf(threadId)));
           
           // 应用 lua 脚本,保障原子性
        long eval = (Long) jedis.eval(encode(releaseLock_lua), keys, args);
        if (eval == 1) {System.out.println("线程:" + threadName + "开释锁胜利!☆☆☆");
        } else {System.out.println("线程:" + threadName + "开释锁失败!该线程未持有锁!!!");
        }
        
    }
 
     private byte[] encode(String param) {return param.getBytes();
    }

5-3 APP_ID(实例惟一标识)+ ThreadId 还是 UUID 好呢?

持续听我说,如果 A 线程执行逻辑两头开启了一个子线程执行工作,这个子线程工作中也须要重入这把锁,因为子线程获取到的线程 id 不一样,导致重入失败。那意味着须要将这个秘钥持续传递给子线程,JUC 中 InheritableThreadLocal 派上用场,然而感觉怪怪的,因为线程间传递的是父线程的 id。

微服务中多服务间调用的话能够借用零碎本身有的 traceId 作为秘钥即可。比方 sgm 中的 traceId 或者 利用 RPC 框架的隐式传参

「至于抉择哪种 value 的形式,依据理论的零碎设计 + 业务场景,抉择最合适的即可,没有最好,只有最合适。」

5-4、锁重入的超时工夫怎么设置?

留神,咱们上边的次要注意力在怎么重入进去,而咱们这是分布式锁,要思考的事件还有很多,重入进去后,超时工夫轻易设吗?

比方说 A 线程在锁办法中调用了 x()办法,而 x()办法中也有获取锁的逻辑,如果 A 线程获取锁后,执行过程中,到 x()办法时,这把锁是要重入进去的,然而请留神,这把锁的超时工夫如果小于第一次上锁的工夫,比方说 A 线程设置的超时工夫是 1s,在 100ms 的时候执行到 x()办法中,而 x()办法中设置的超时工夫是 100ms,那么意味着 100ms 之后锁就开释了,而这个时候我的 A 线程的主办法还没有执行完呢!却被重入锁设置的工夫搞坏了!这个怎么搞?

如果说我在内存中设置一个这把锁设置过的最大的超时工夫,重入的时候判断下传进来的工夫,我重入时 expire 的时候始终设置成最大的工夫,而不是由重入锁随便升高锁工夫导致上一步的主锁呈现问题

放在内存中行吗?咱们上边举例中,调用的 x()办法是在一个 JVM 中,如果是调用近程的一个 RPC 服务呢(像这种调用的话就须要将秘钥 value 通过 RpcContext 传递过来了)到另一个节点的服务中进行锁重入,这个工夫仍然是要用以后设置过锁的最大工夫的,所以这个 最大的工夫要存在 redis 中而非 JVM 内存中

通过这一步的剖析,咱们的重入 lua 脚本就批改为这样了:

    ADD_LOCK_LIFE("if redis.call(\"get\", KEYS[1]) == ARGV[1]\n" +     // 判断是否是锁持有者
                "then\n" + 
                "local thisLockMaxTimeKeepKey=KEYS[1] .. \":maxTime\"\n" +  // 记录锁最大工夫的 key 是:锁名字:maxTime
                "local nowTime=tonumber(ARGV[2])\n" +  // 以后传参进来的 time
                "local maxTime=redis.call(\"incr\", thisLockMaxTimeKeepKey)\n" + // 取出以后锁设置的最大的超时工夫,如果这个放弃工夫的 key 不存在返回的是字符串 nil,这里为了 lua 脚本的易读性,用 incr 操作,这样读出来的都是 number 类型的操作
                "local bigerTime=maxTime\n" + // 长期变量 bigerTime=maxTime
                "if nowTime>maxTime-1\n" +    // 如果传参进来的工夫 > 记录的最大工夫
                "then\n" + 
                "bigerTime=nowTime\n" + // 则更新 bigerTime
                "redis.call(\"set\", thisLockMaxTimeKeepKey, tostring(bigerTime))\n" + // 设置超时工夫为最大的 time,是最平安的
                "else \n" + 
                "redis.call(\"decr\", thisLockMaxTimeKeepKey)\n" + // 以后传参 time<maxTime,将方才那次 incr 减回来
                "end\n" + 
                "return redis.call(\"expire\", KEYS[1], tostring(bigerTime))\n" + // 从新设置超时工夫为以后锁过的最大的 time
                "else\n" + 
                "return 0\n" + 
                "end"),

其实,还有另外一种计划比较简单,就是锁的超时工夫 = 第一次上锁的工夫 + 前面所有重入锁的工夫。也就是(expire = 主 ttl + 重入 exipre),这种计划是放大的思维,一放大就又有上边提到过的一个问题:expire 太大怎么办,参考上边。

5-5、重入锁的办法中间接执行 unlock?思考重入次数

A 线程执行一共须要 500ms,执行中须要调用 x()办法,x()办法中有一个重入锁,执行用了 50ms,而后执行完后,x()办法的 finally{} 块中将锁进行开释。

为啥能开释掉?因为秘钥我有,匹配胜利了我就间接开释了。

这当然是有问题的,所以咱们要通过 锁重入次数 来进行开释锁时候的判断,也就是说上锁的时候须要多保护一个 key 来保留以后锁的重入次数,如果执行开释锁时,先进行重入次数 -1,- 1 后如果是 0,能够间接 del,如果 >0,阐明还有重入的锁在,不能间接 del。

5-6 思考如何存储锁的属性(锁的 key 重入次数 key 最大超时工夫 key)?

目前为止,算上上一步中设置最大超时工夫的 key,加上这一步重入次数的 key,加上锁自身的 key,曾经有 3 个 key,须要留神的事件是,这三个 key 的超时工夫是都要设置的!为什么?如果说重入次数的 key 没有设置超时工夫,服务 A 节点中在一个 JVM 中重入了 5 次后,调用一次 RPC 服务,RPC 服务中同样重入锁,此时,锁重入次数是 6,这个时候 A 服务宕机,就意味着无论怎样,这把锁不可能开释了,这个分布式锁提供的残缺能力,全线不可用了!

所以,这几个 key 是要设置超时工夫的!怎么设置?我上一个锁要保护这么多 key 的超时工夫?太简单了吧,多则乱,则容易出问题。怎么办?咱们想一下,是不是最大超时工夫的 key 和重入次数的 key,都 从属于锁 ,它们都是 锁的属性,如果锁不在了,谈它们就毫无意义,这个时候用什么存储呢?redis 的 hash 数据结构,就能够做,key 是锁,里边的 hashKey 别离是锁的属性,hashValue 是属性值,超时工夫只设置锁自身 key 就能够了。这个时候,咱们的锁的数据结构就要扭转一下了。

6、如何解决过期工夫确定和业务执行时长不确定性的问题:看门狗机制

3- 2 中设置超时工夫那里,咱们预估锁办法执行工夫是 200ms,咱们放大 5 倍后,设置超时工夫是 1s(过期工夫确定 )。假想一下,如果生产环境中,锁办法中的 IO 操作,极其状况下超时重大,比方说 IO 就耗费了 2s( 业务执行时长不确定),那就意味着,在这次 IO 还没有完结的时候,我这把锁曾经到期开释掉了,就意味着别的线程趁虚而入,分布式锁解体!

咱们要做的是一把分布式锁,想要的目标是同一时刻只有一个线程持有锁,作为服务而言,这个锁当初不论是被哪个线程上锁胜利了,我服务应该保障这个线程执行的安全性,怎么办?锁续命(看门狗机制)。什么意思,一旦这把锁呈现了上锁操作,就意味着这把锁开始投入使用,这时我的服务中须要有一个 daemon 线程定时去守护我的锁的安全性,怎么守护?比如说锁超时工夫设置的是 1s,那么我这个定时工作是每隔 300ms 去 redis 服务端做一次查看,如果我还持有,你就给我续命,就像 session 会话的沉闷机制一样。看个例子,我上锁时候超时工夫设置的是 1s,理论办法执行工夫是 3s,这两头我的定时线程每隔 300ms 就会去把这把锁的超时工夫从新设置为 1s,每隔 300ms 一次,胜利将锁续命胜利。

public class RedisLockIdleThreadPool {private String threadAddLife_lua = "if redis.call(\"exists\", KEYS[1]) == 1\n" + 
                "then\n" + 
                "return redis.call(\"expire\", KEYS[1], ARGV[1])\n" + 
                "else\n" + 
                "return 0\n" + 
                "end";
 
    private volatile ScheduledExecutorService scheduledThreadPool;
    
    public RedisLockIdleThreadPool() {if (scheduledThreadPool == null) {synchronized (this) {if (scheduledThreadPool == null) {scheduledThreadPool = Executors.newSingleThreadScheduledExecutor(); // 我这样创立线程池是为了代码的易读性,大家务必应用 ThreadPoolExecutor 去创立
                    
                    scheduledThreadPool.scheduleAtFixedRate(() -> {addLife();
                    }, 0, 300, TimeUnit.MILLISECONDS);
                }
            }
        }
    }
    
    private void addLife() {
            // ... 省略 jedis 的初始化过程
            
        List<byte[]> keys = Arrays.asList(RedisLock.lock_key.getBytes());
        List<byte[]> args = Arrays.asList(String.valueOf(1).getBytes());
        
        jedis.eval(threadAddLife_lua.getBytes(), keys, args);
    }
    
}

这就行吗?还不行!

为啥?想一下,如果每个服务中都像这样去续命锁,如果说 A 服务还在执行过程中的时候,还没有执行完,就是说还没有手动开释锁的时候,宕机,此时 redis 中锁还在有效期。服务 B 也始终在续命这把锁,此时这把锁始终在续命,然而 B 的这个续命始终续的是 A 过后设的锁,这不是扯吗?我本人在一直续命,导致我的服务上始终获取不到锁,实际上 A 曾经宕机了呀!该开释了,不应该去续命了,这不是我服务 B 该干的活!

续命的前提是,得判断是不是以后过程持有的锁,也就是咱们的 APP_ID,如果不是就不进行续命。

续命锁的 lua 脚本产生扭转,如下:

    THREAD_ADD_LIFE("local v=redis.call(\"get\", KEYS[1]) \n" +     // get key
                "if v==false \n" +  // 如果不存在 key,读出后果 v 是 false
                "then \n" +         // 不存在不解决
                "else \n" + 
                "local match = string.find(v, ARGV[1]) \n" + // 存在,判断是否能和 APP_ID 匹配,匹配不上时 match 是 nil
                "if match==\"nil\"\n" + 
                "then \n" + 
                "else  \n" + 
                "return redis.call(\"expire\", KEYS[1], ARGV[2]) \n" + // 匹配上了返回的是索引地位,如果匹配上了意味着就是以后过程占有的锁,就延长时间
                "end \n" + 
                "end")

6-1 锁在我手里,我挂了,这 … 没救。只能期待锁超时开释

即使设置了一个很正当的 expire,比方 10s,然而线上如果真呈现了 A 节点刚拿到锁就宕机了,那其余节点也只能干等 10s,之后能力拿到锁。次要还是业务能不能承受。而如果是 To C 的业务中,大部分场景无奈承受的,因为可能会导致用户散失。所以咱们须要另外 一个监控服务 ,定时去 监控 redis 中锁的获得者的衰弱状态,如果获取者超过 n 次无奈通信,由监控服务负责将锁摘除掉,让别的线程持续去获取到锁去干活。

退出移动版