by emanjusaka from https://www.emanjusaka.top/2024/03/redisson-distributed-lock 彼岸花开可奈何
本文欢送分享与聚合,全文转载请留下原文地址。

实现分布式锁通常有三种形式:数据库、Redis 和 Zookeeper。咱们比拟罕用的是通过 Redis 和 Zookeeper 实现分布式锁。Redisson 框架中封装了通过 Redis 实现的分布式锁,上面咱们剖析一下它的具体实现。

关键点

  1. 原子性

    要么都胜利,要么都失败

  2. 过期工夫

    如果锁还没来得及开释就遇到了服务宕机,就会呈现死锁的问题。给 Redis 的 key 设置过期工夫,即便服务宕机了超过设置的过期工夫锁会主动进行开释。

  3. 锁续期

    因为给锁设置了过期工夫而咱们的业务逻辑具体要执行多长时间可能是变动和不确定的,如果设定了一个固定的过期工夫,可能会导致业务逻辑还没有执行完,锁被开释了的问题。锁续期能保障锁是在业务逻辑执行完才被开释。

  4. 正确开释锁

    保障开释本人持有的锁,不能呈现 A 开释了 B 持有锁的状况。

Redis 实现分布式锁的几种部署形式

  1. 单机

    在这种部署形式中,Redis 的所有实例都部署在同一台服务器上。这种部署形式简单易行,但存在单点故障的危险。如果 Redis 实例宕机,则所有分布式锁都将生效。

  2. 哨兵

    在这种部署形式中,Redis 的多个实例被配置为哨兵。哨兵负责监控 Redis 实例的状态,并在主实例宕机时主动选举一个新的主实例。这种部署形式能够提供更高的可用性和容错性。

  3. 集群

    在这种部署形式中,Redis 的多个实例被配置为一个集群。集群中的每个实例都是平等的,并且能够解决读写操作。这种部署形式能够提供最高的可用性和容错性。

  4. 红锁

    搞几个独立的 Master,比方 5 个,而后挨个加锁,只有超过一半以上(这里是 5/2+1=3 个)就代表加锁胜利,而后开释锁的时候也逐台开释。

应用形式

  1. 引入依赖

    <!--        pom.xml文件--><dependency>    <groupId>org.redisson</groupId>    <artifactId>redisson-spring-boot-starter</artifactId>    <version>3.17.7</version></dependency>

    版本依赖:

    redisson-spring-data module nameSpring Boot version
    redisson-spring-data-161.3.y
    redisson-spring-data-171.4.y
    redisson-spring-data-181.5.y
    redisson-spring-data-2x2.x.y
    redisson-spring-data-3x3.x.y
  1. yml配置

    spring:  redis:    redisson:      config:        singleServerConfig:          address: redis://127.0.0.1:6379          database: 0          password: null          timeout: 3000
  2. 间接注入应用

    package top.emanjusaka;import org.redisson.api.RLock;import org.redisson.api.RedissonClient;import org.springframework.stereotype.Service;import javax.annotation.Resource;import java.util.concurrent.TimeUnit;/** * @Author emanjusaka www.emanjusaka.top * @Date 2024/2/28 16:41 * @Version 1.0 */@Servicepublic class Lock {    @Resource    private RedissonClient redissonClient;    public void lock() {        // 写入redis的key值        String lockKey = "lock-test";        // 获取一个Rlock锁对象        RLock lock = redissonClient.getLock(lockKey);        // 获取锁,并为其设置过期工夫为10s        lock.lock(10, TimeUnit.SECONDS);        try {            // 执行业务逻辑....            System.out.println("获取锁胜利!");        } finally {            // 开释锁            lock.unlock();            System.out.println("开释锁胜利!");        }    }}

底层分析

lock()

要害代码

    <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {        return commandExecutor.syncedEval(getRawName(), LongCodec.INSTANCE, command,                "if ((redis.call('exists', KEYS[1]) == 0) " +                       "or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " +                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +                        "return nil; " +                    "end; " +                    "return redis.call('pttl', KEYS[1]);",                Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));    }
  • RFuture<T>:示意返回一个异步后果对象,其中泛型参数 T 示意后果的类型。
  • tryLockInnerAsync 办法承受一下参数:

    • waitTime:等待时间,用于指定在获取锁时的最大等待时间。
    • leaseTime:租约工夫,用于指定锁的持有工夫
    • unit:工夫单位,用于将 leaseTime 转换为毫秒
    • threadId:线程 ID,用于标识以后线程
    • command:Redis 命令对象,用于执行 Redis 操作
  • 办法体中的代码应用 Lua 脚本来实现分布式锁的逻辑。

    • if ((redis.call('exists', KEYS[1]) == 0) or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)): 如果键不存在或者哈希表中曾经存在对应的线程ID,则执行以下操作:

      • redis.call('hincrby', KEYS[1], ARGV[2], 1): 将哈希表中对应线程ID的值加1。
      • redis.call('pexpire', KEYS[1], ARGV[1]): 设置键的过期工夫为租约工夫。
      • return nil: 返回nil示意胜利获取锁。
    • else: 如果键存在且哈希表中不存在对应的线程ID,则执行以下操作:

      • return redis.call('pttl', KEYS[1]): 返回键的残余生存工夫。
  • commandExecutor.syncedEval:示意同步执行 Redis 命令
  • LongCodec.INSTANCE:用于编码和解码长整型数据
  • Collections.singletonList(getRawName()):创立一个只蕴含一个元素的列表,元素为锁的名称
  • unit.toMillis(leaseTime):将租约工夫转换为毫秒
  • getLockName(threadId):依据线程 ID 生成锁的名称
// 省去了那些无关重要的代码private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {    long threadId = Thread.currentThread().getId();    // tryAcquire就是下面剖析的lua残缺脚本    Long ttl = tryAcquire(-1, leaseTime, unit, threadId);    // 返回null就代表上锁胜利。    if (ttl == null) {        return;    }    // 如果没胜利,也就是锁的剩余时间不是null的话,那么就执行上面的逻辑    // 其实就是说 如果有锁(锁剩余时间不是null),那就死循环期待从新抢锁。    try {        while (true) {            // 从新抢锁            ttl = tryAcquire(-1, leaseTime, unit, threadId);            // 抢锁胜利就break退出循环            if (ttl == null) {                break;            }            // 省略一些代码        }    } finally {}}

下面代码实现了一个分布式锁的性能。它应用了Lua脚本来尝试获取锁,并在胜利获取锁后返回锁的剩余时间(ttl)。如果获取锁失败,则进入一个死循环,一直尝试从新获取锁,直到胜利为止。

unlock()

要害代码

    protected RFuture<Boolean> unlockInnerAsync(long threadId) {        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,              "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +                        "return nil;" +                    "end; " +                    "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +                    "if (counter > 0) then " +                        "redis.call('pexpire', KEYS[1], ARGV[2]); " +                        "return 0; " +                    "else " +                        "redis.call('del', KEYS[1]); " +                        "redis.call(ARGV[4], KEYS[2], ARGV[1]); " +                        "return 1; " +                    "end; " +                    "return nil;",                Arrays.asList(getRawName(), getChannelName()),                LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId), getSubscribeService().getPublishCommand());    }
  • RFuture<Boolean>: 示意返回一个异步后果对象,其中泛型参数Boolean示意后果的类型。
  • unlockInnerAsync办法承受以下参数:

    • threadId: 线程ID,用于标识以后线程。
  • 办法体中的代码应用Lua脚本来实现分布式锁的解锁逻辑。以下是对Lua脚本的解释:

    • if (redis.call('hexists', KEYS[1], ARGV[3]) == 0): 如果哈希表中不存在对应的线程ID,则返回nil示意无奈解锁。
    • local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1): 将哈希表中对应线程ID的值减1,并将后果赋值给变量counter。
    • if (counter > 0): 如果counter大于0,示意还有其余线程持有锁,执行以下操作:

      • redis.call('pexpire', KEYS[1], ARGV[2]): 设置键的过期工夫为租约工夫。
      • return 0: 返回0示意锁依然被其余线程持有。
    • else: 如果counter等于0,示意以后线程是最初一个持有锁的线程,执行以下操作:

      • redis.call('del', KEYS[1]): 删除键,开释锁。
      • redis.call(ARGV[4], KEYS[2], ARGV[1]): 调用发布命令,告诉其余线程锁曾经开释。
      • return 1: 返回1示意胜利开释锁。
    • return nil: 如果后面的条件都不满足,返回nil示意无奈解锁。
  • evalWriteAsync办法用于执行Lua脚本并返回异步后果对象。
  • getRawName(): 获取锁的名称。
  • LongCodec.INSTANCE: 用于编码和解码长整型数据。
  • RedisCommands.EVAL_BOOLEAN: 指定Lua脚本的返回类型为布尔值。
  • Arrays.asList(getRawName(), getChannelName()): 创立一个蕴含两个元素的列表,元素别离为锁的名称和频道名称。
  • LockPubSub.UNLOCK_MESSAGE: 公布音讯的内容。
  • internalLockLeaseTime: 锁的租约工夫。
  • getLockName(threadId): 依据线程ID生成锁的名称。
  • getSubscribeService().getPublishCommand(): 获取发布命令。

锁续期

watchDog

外围工作流程是定时监测业务是否执行完结,没完结的话在看你这个锁是不是快到期了(超过锁的三分之一工夫),那就从新续期。这样避免如果业务代码没执行完,锁却过期了所带来的线程不平安问题。

Redisson 的 watchDog 机制底层不是调度线程池,而是间接用的 netty 事件轮。

Redisson的WatchDog机制是用于主动续期分布式锁和监控对象生命周期的一种机制,确保了分布式环境下锁的正确性和资源的及时开释。

  1. 主动续期:当Redisson客户端获取了一个分布式锁后,会启动一个WatchDog线程。这个线程负责在锁行将到期时主动续期,保障持有锁的线程能够继续执行工作。默认状况下,锁的初始超时工夫是30秒,每10秒钟WatchDog会查看一次锁的状态,如果锁仍然被持有,它会将锁的过期工夫从新设置为30秒。
  2. 参数配置:能够通过设置lockWatchdogTimeout参数来调整WatchDog查看锁状态的频率和续期的超时工夫。这个参数默认值是30000毫秒(即30秒),实用于那些没有明确指定leaseTimeout参数的加锁申请。
  3. 重连机制:除了锁主动续期外,WatchDog机制还用作Redisson客户端的主动重连性能。当客户端与Redis服务器失去连贯时,WatchDog会主动尝试从新连贯,从而复原服务的失常运作。
  4. 资源管理:WatchDog也负责监控Redisson对象的生命周期,例如分布式锁。当对象的生命周期到期时,WatchDog会将其从Redis中删除,防止过期数据占用过多内存空间。
  5. 异步加锁:在加锁的过程中,WatchDog会在RedissonLock#tryAcquireAsync办法中发挥作用,该办法是进行异步加锁的逻辑所在。通过这种形式,加锁操作不会阻塞以后线程,进步了零碎的性能。
本文原创,满腹经纶,如有纰漏,欢送斧正。如果本文对您有所帮忙,欢送点赞,并期待您的反馈交换,独特成长。
原文地址: https://www.emanjusaka.top/2024/03/redisson-distributed-lock
微信公众号:emanjusaka的编程栈