关于分布式:最强分布式锁工具Redisson

34次阅读

共计 21451 个字符,预计需要花费 54 分钟才能阅读完成。

一、Redisson 概述

什么是 Redisson?

Redisson 是一个在 Redis 的根底上实现的 Java 驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的 Java 罕用对象,还提供了许多分布式服务。

其中包含(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson 提供了应用 Redis 的最简略和最便捷的办法。

Redisson 的主旨是促成使用者对 Redis 的关注拆散(Separation of Concern),从而让使用者可能将精力更集中地放在解决业务逻辑上。

一个基于 Redis 实现的分布式工具,有根本分布式对象和高级又形象的分布式服务,为每个试图再造分布式轮子的程序员带来了大部分分布式问题的解决办法。

Redisson 和 Jedis、Lettuce 有什么区别?倒也不是雷锋和雷锋塔

Redisson 和它俩的区别就像一个用鼠标操作图形化界面,一个用命令行操作文件。Redisson 是更高层的形象,Jedis 和 Lettuce 是 Redis 命令的封装。

  • Jedis 是 Redis 官网推出的用于通过 Java 连贯 Redis 客户端的一个工具包,提供了 Redis 的各种命令反对
  • Lettuce 是一种可扩大的线程平安的 Redis 客户端,通信框架基于 Netty,反对高级的 Redis 个性,比方哨兵,集群,管道,主动从新连贯和 Redis 数据模型。Spring Boot 2.x 开始 Lettuce 已取代 Jedis 成为首选 Redis 的客户端。
  • Redisson 是架设在 Redis 根底上,通信基于 Netty 的综合的、新型的中间件,企业级开发中应用 Redis 的最佳范本

Jedis 把 Redis 命令封装好,Lettuce 则进一步有了更丰盛的 Api,也反对集群等模式。然而两者也都点到为止,只给了你操作 Redis 数据库的脚手架,而 Redisson 则是基于 Redis、Lua 和 Netty 建设起了成熟的分布式解决方案,甚至 redis 官网都举荐的一种工具集。

二、分布式锁

分布式锁怎么实现?

分布式锁是并发业务下的刚需,尽管实现形形色色:ZooKeeper 有 Znode 程序节点,数据库有表级锁和乐 / 乐观锁,Redis 有 setNx,然而必由之路,最终还是要回到互斥上来,本篇介绍 Redisson,那就以 redis 为例。

怎么写一个简略的 Redis 分布式锁?

以 Spring Data Redis 为例,用 RedisTemplate 来操作 Redis(setIfAbsent 曾经是 setNx + expire 的合并命令),如下

// 加锁
public Boolean tryLock(String key, String value, long timeout, TimeUnit unit) {return redisTemplate.opsForValue().setIfAbsent(key, value, timeout, unit);
}
// 解锁,避免删错他人的锁,以 uuid 为 value 校验是否本人的锁
public void unlock(String lockName, String uuid) {if(uuid.equals(redisTemplate.opsForValue().get(lockName)){redisTemplate.opsForValue().del(lockName);    }
}

// 构造
if(tryLock){// todo}finally{unlock;}

简略 1.0 版本实现,聪慧的小张一眼看出,这是锁没错,但 get 和 del 操作非原子性,并发一旦大了,无奈保障过程平安。于是小张提议,用 Lua 脚本

Lua 脚本是什么?

Lua 脚本是 redis 曾经内置的一种轻量玲珑语言,其执行是通过 redis 的 eval/evalsha 命令来运行,把操作封装成一个 Lua 脚本,如论如何都是一次执行的原子操作。

于是 2.0 版本通过 Lua 脚本删除

lockDel.lua 如下

if redis.call('get', KEYS[1]) == ARGV[1] 
    then 
 -- 执行删除操作
        return redis.call('del', KEYS[1]) 
    else 
 -- 不胜利,返回 0
        return 0 
end

delete 操作时执行 Lua 命令

// 解锁脚本
DefaultRedisScript<Object> unlockScript = new DefaultRedisScript();
unlockScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("lockDel.lua")));

// 执行 lua 脚本解锁
redisTemplate.execute(unlockScript, Collections.singletonList(keyName), value);

2.0 仿佛更像一把锁,但如同又短少了什么,小张一拍脑袋,synchronized 和 ReentrantLock 都很丝滑,因为他们都是可重入锁,一个线程屡次拿锁也不会死锁,咱们须要可重入。

怎么保障可重入?

重点就是,同一个线程屡次获取同一把锁是容许的,不会造成死锁,这一点 synchronized 偏差锁提供了很好的思路,synchronized 的实现重入是在 JVM 层面,JAVA 对象头 MARK WORD 中便藏有线程 ID 和计数器来对以后线程做重入判断,防止每次 CAS。

当一个线程拜访同步块并获取锁时,会在对象头和栈帧中的锁记录里存储偏差的线程 ID,当前该线程在进入和退出同步块时不须要进行 CAS 操作来加锁和解锁,只需简略测试一下对象头的 Mark Word 里是否存储着指向以后线程的偏差锁。如果测试胜利,示意线程曾经取得了锁。如果测试失败,则须要再测试一下 Mark Word 中偏差锁标记是否设置成 1:没有则 CAS 竞争;设置了,则 CAS 将对象头偏差锁指向以后线程。

再保护一个计数器,同个线程进入则自增 1,来到再减 1,直到为 0 能力开释

可重入锁

仿造该计划,咱们需革新 Lua 脚本:

1. 须要存储 说名称 lockName、取得该锁的 线程 id和对应线程的 进入次数 count

2. 加锁

每次线程获取锁时,判断是否已存在该锁

不存在

设置 hash 的 key 为线程 id,value 初始化为 1

设置过期工夫

返回获取锁胜利 true

存在

持续判断是否存在以后线程 id 的 hash key

存在,线程 key 的 value + 1,重入次数减少 1,设置过期工夫

不存在,返回加锁失败

3. 解锁

每次线程来解锁时,判断是否已存在该锁

存在

是否有该线程的 id 的 hash key,有则减 1,无则返回解锁失败

减 1 后,判断残余 count 是否为 0,为 0 则阐明不再须要这把锁,执行 del 命令删除

1. 存储构造

为了不便保护这个对象,咱们用 Hash 构造来存储这些字段。Redis 的 Hash 相似 Java 的 HashMap,适宜存储对象。

hset lockname1 threadId 1

设置一个名字为 lockname1 的 hash 构造,该 hash 构造 key 为threadId,值 value 为1

hget lockname1 threadId

获取 lockname1 的 threadId 的值

存储构造为

lockname 锁名称
    key1:threadId   惟一键,线程 id
    value1:count     计数器,记录该线程获取锁的次数

redis 中的构造

2. 计数器的加减

当同一个线程获取同一把锁时,咱们须要对对应线程的计数器 count 做加减

判断一个 redis key 是否存在,能够用 exists,而判断一个 hash 的 key 是否存在,能够用 hexists

而 redis 也有 hash 自增的命令 hincrby

每次自增 1 时 hincrby lockname1 threadId 1,自减 1 时 hincrby lockname1 threadId -1

3. 解锁的判断

当一把锁不再被须要了,每次解锁一次,count 减 1,直到为 0 时,执行删除

综合上述的存储构造和判断流程,加锁和解锁 Lua 如下

加锁 lock.lua

local key = KEYS[1];
local threadId = ARGV[1];
local releaseTime = ARGV[2];

-- lockname 不存在
if(redis.call('exists', key) == 0) then
    redis.call('hset', key, threadId, '1');
    redis.call('expire', key, releaseTime);
    return 1;
end;

-- 以后线程已 id 存在
if(redis.call('hexists', key, threadId) == 1) then
    redis.call('hincrby', key, threadId, '1');
    redis.call('expire', key, releaseTime);
    return 1;
end;
return 0;

解锁 unlock.lua

local key = KEYS[1];
local threadId = ARGV[1];

-- lockname、threadId 不存在
if (redis.call('hexists', key, threadId) == 0) then
    return nil;
end;

-- 计数器 -1
local count = redis.call('hincrby', key, threadId, -1);

-- 删除 lock
if (count == 0) then
    redis.call('del', key);
    return nil;
end;

代码

/**
 * @description 原生 redis 实现分布式锁
 **/
@Getter
@Setter
public class RedisLock {

    private RedisTemplate redisTemplate;
    private DefaultRedisScript<Long> lockScript;
    private DefaultRedisScript<Object> unlockScript;

    public RedisLock(RedisTemplate redisTemplate) {
        this.redisTemplate = redisTemplate;
        // 加载加锁的脚本
        lockScript = new DefaultRedisScript<>();
        this.lockScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("lock.lua")));
        this.lockScript.setResultType(Long.class);
        // 加载开释锁的脚本
        unlockScript = new DefaultRedisScript<>();
        this.unlockScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("unlock.lua")));
    }

    /**
     * 获取锁
     */
    public String tryLock(String lockName, long releaseTime) {
        // 存入的线程信息的前缀
        String key = UUID.randomUUID().toString();

        // 执行脚本
        Long result = (Long) redisTemplate.execute(
                lockScript,
                Collections.singletonList(lockName),
                key + Thread.currentThread().getId(),
                releaseTime);

        if (result != null && result.intValue() == 1) {return key;} else {return null;}
    }

    /**
     * 解锁
     * @param lockName
     * @param key
     */
    public void unlock(String lockName, String key) {
        redisTemplate.execute(unlockScript,
                Collections.singletonList(lockName),
                key + Thread.currentThread().getId()
                );
    }
}

至此曾经实现了一把分布式锁,合乎互斥、可重入、防死锁的根本特点。

谨严的小张感觉尽管当个一般互斥锁,曾经稳稳够用,可是业务里总是又很多非凡状况的,比方 A 过程在获取到锁的时候,因业务操作工夫太长,锁开释了然而业务还在执行,而此刻 B 过程又能够失常拿到锁做业务操作,两个过程操作就会存在仍旧有共享资源的问题

而且如果负责贮存这个分布式锁的Redis 节点宕机当前,而且这个锁正好处于锁住的状态时,这个锁会呈现锁死的状态

小张不是杠精,因为库存操作总有这样那样的非凡。

所以咱们心愿在这种状况时,能够缩短锁的 releaseTime 提早开释锁来直到实现业务冀望后果,这种一直缩短锁过期工夫来保障业务执行实现的操作就是锁续约。

读写拆散也是常见,一个读多写少的业务为了性能,经常是有读锁和写锁的。

而此刻的扩大曾经超出了一把简略轮子的复杂程度,光是解决续约,就够小张喝一壶,何况在性能(锁的最大等待时间)、优雅(有效锁申请)、重试(失败重试机制)等方面还要下功夫钻研。

在小张苦思冥想时,旁边的小白凑过去看了看小张,很好奇,都 2021 年了,为什么不间接用 redisson 呢?

Redisson 就有这把你要的锁。

三、Redisson 分布式锁

号称简略的 Redisson 分布式锁的应用姿态是什么?

1. 依赖

<!-- 原生,本章应用 -->
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.13.6</version>
</dependency>

<!-- 另一种 Spring 集成 starter,本章未应用 -->
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.13.6</version>
</dependency>

2. 配置

@Configuration
public class RedissionConfig {@Value("${spring.redis.host}")
    private String redisHost;

    @Value("${spring.redis.password}")
    private String password;

    private int port = 6379;

    @Bean
    public RedissonClient getRedisson() {Config config = new Config();
        config.useSingleServer().
                setAddress("redis://" + redisHost + ":" + port).
                setPassword(password);
        config.setCodec(new JsonJacksonCodec());
        return Redisson.create(config);
    }
}

3. 启用分布式锁

@Resource
private RedissonClient redissonClient;

RLock rLock = redissonClient.getLock(lockName);
try {boolean isLocked = rLock.tryLock(expireTime, TimeUnit.MILLISECONDS);
    if (isLocked) {// TODO}
    } catch (Exception e) {rLock.unlock();
    }

简洁明了,只须要一个 RLock,既然举荐 Redisson,就往里面看看他是怎么实现的。

四、RLock

RLock 是 Redisson 分布式锁的最外围接口,继承了 concurrent 包的 Lock 接口和本人的 RLockAsync 接口,RLockAsync 的返回值都是 RFuture,是 Redisson 执行异步实现的外围逻辑,也是 Netty 施展的次要阵地。

RLock 如何加锁?

从 RLock 进入,找到 RedissonLock 类,找到 tryLock 办法再递进到干事的 tryAcquireOnceAsync 办法,这是加锁的次要代码(版本不一此处实现有差异,和最新 3.15.x 有肯定出入,然而外围逻辑仍然未变。此处以 3.13.6 为例)

private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {if (leaseTime != -1L) {return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
        } else {RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
            ttlRemainingFuture.onComplete((ttlRemaining, e) -> {if (e == null) {if (ttlRemaining) {this.scheduleExpirationRenewal(threadId);
                    }

                }
            });
            return ttlRemainingFuture;
        }
    }

此处呈现 leaseTime 工夫判断的 2 个分支,实际上就是加锁时是否设置过期工夫,未设置过期工夫(-1)时则会有 watchDog锁续约 (下文),一个注册了加锁事件的续约工作。咱们先来看有过期工夫tryLockInnerAsync 局部,

evalWriteAsync 是 eval 命令执行 lua 的入口

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {this.internalLockLeaseTime = unit.toMillis(leaseTime);
        return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
    }

这里揭开真面目,eval 命令执行 Lua 脚本的中央,此处的 Lua 脚本开展

-- 不存在该 key 时
if (redis.call('exists', KEYS[1]) == 0) then 
  -- 新增该锁并且 hash 中该线程 id 对应的 count 置 1
  redis.call('hincrby', KEYS[1], ARGV[2], 1); 
  -- 设置过期工夫
  redis.call('pexpire', KEYS[1], ARGV[1]); 
  return nil; 
end; 

-- 存在该 key 并且 hash 中线程 id 的 key 也存在
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then 
  -- 线程重入次数 ++
  redis.call('hincrby', KEYS[1], ARGV[2], 1); 
  redis.call('pexpire', KEYS[1], ARGV[1]); 
  return nil; 
end; 
return redis.call('pttl', KEYS[1]);

和后面咱们写自定义的分布式锁的脚本简直统一,看来 redisson 也是一样的实现,具体参数剖析:

// keyName
KEYS[1] = Collections.singletonList(this.getName())
// leaseTime
ARGV[1] = this.internalLockLeaseTime
// uuid+threadId 组合的惟一值
ARGV[2] = this.getLockName(threadId)

总共 3 个参数实现了一段逻辑:

判断该锁是否曾经有对应 hash 表存在,

• 没有对应的 hash 表:则 set 该 hash 表中一个 entry 的 key 为锁名称,value 为 1,之后设置该 hash 表生效工夫为 leaseTime

• 存在对应的 hash 表:则将该 lockName 的 value 执行 + 1 操作,也就是计算进入次数,再设置生效工夫 leaseTime

• 最初返回这把锁的 ttl 剩余时间

也和上述自定义锁没有区别

既然如此,那解锁的步骤也必定有对应的 - 1 操作,再看 unlock 办法,同样查找办法名,一路到

protected RFuture<Boolean> unlockInnerAsync(long threadId) {return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('exists', KEYS[1]) == 0) then redis.call('publish', KEYS[2], ARGV[1]); return 1; end;if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;", Arrays.asList(this.getName(), this.getChannelName()), new Object[]{LockPubSub.unlockMessage, this.internalLockLeaseTime, this.getLockName(threadId)});
    }

掏出 Lua 局部

-- 不存在 key
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then 
  return nil;
end;
-- 计数器 -1
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); 
if (counter > 0) then 
  -- 过期工夫重设
  redis.call('pexpire', KEYS[1], ARGV[2]); 
  return 0; 
else
  -- 删除并公布解锁音讯
  redis.call('del', KEYS[1]); 
  redis.call('publish', KEYS[2], ARGV[1]); 
  return 1;
end; 
return nil;

该 Lua KEYS 有 2 个 Arrays.asList(getName(), getChannelName())

name 锁名称
channelName,用于 pubSub 公布音讯的 channel 名称

ARGV 变量有三个 LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)

LockPubSub.UNLOCK_MESSAGE,channel 发送音讯的类别,此处解锁为 0
internalLockLeaseTime,watchDog 配置的超时工夫,默认为 30s
lockName 这里的 lockName 指的是 uuid 和 threadId 组合的惟一值

步骤如下:

1. 如果该锁不存在则返回 nil;

2. 如果该锁存在则将其线程的 hash key 计数器 -1,

3. 计数器 counter>0,重置下生效工夫,返回 0;否则,删除该锁,公布解锁音讯 unlockMessage,返回 1;

其中 unLock 的时候应用到了 Redis 公布订阅 PubSub 实现音讯告诉。

而订阅的步骤就在 RedissonLock 的加锁入口的 lock 办法里

long threadId = Thread.currentThread().getId();
        Long ttl = this.tryAcquire(-1L, leaseTime, unit, threadId);
        if (ttl != null) {
            // 订阅
            RFuture<RedissonLockEntry> future = this.subscribe(threadId);
            if (interruptibly) {this.commandExecutor.syncSubscriptionInterrupted(future);
            } else {this.commandExecutor.syncSubscription(future);
            }
            // 省略

当锁被其余线程占用时,通过监听锁的开释告诉(在其余线程通过 RedissonLock 开释锁时,会通过公布订阅 pub/sub 性能发动告诉),期待锁被其余线程开释,也是为了防止自旋的一种罕用效率伎俩。

1. 解锁音讯

为了一探到底告诉了什么,告诉后又做了什么,进入 LockPubSub。

这里只有一个显著的监听办法 onMessage,其订阅和信号量的开释都在父类 PublishSubscribe,咱们只关注监听事件的实际操作

protected void onMessage(RedissonLockEntry value, Long message) {
        Runnable runnableToExecute;
        if (message.equals(unlockMessage)) {
            // 从监听器队列取监听线程执行监听回调
            runnableToExecute = (Runnable)value.getListeners().poll();
            if (runnableToExecute != null) {runnableToExecute.run();
            }
            // getLatch()返回的是 Semaphore,信号量,此处是开释信号量
            // 开释信号量后会唤醒期待的 entry.getLatch().tryAcquire 去再次尝试申请锁
            value.getLatch().release();
        } else if (message.equals(readUnlockMessage)) {while(true) {runnableToExecute = (Runnable)value.getListeners().poll();
                if (runnableToExecute == null) {value.getLatch().release(value.getLatch().getQueueLength());
                    break;
                }
                runnableToExecute.run();}
        }
    }

发现一个是 默认解锁音讯 ,一个是 读锁解锁音讯,因为 redisson 是有提供读写锁的,而读写锁读读状况和读写、写写状况互斥状况不同,咱们只看下面的默认解锁音讯 unlockMessage 分支

LockPubSub 监听最终执行了 2 件事

  1. runnableToExecute.run() 执行监听回调
  2. value.getLatch().release(); 开释信号量

Redisson 通过 LockPubSub 监听解锁音讯,执行监听回调和开释信号量告诉期待线程能够从新抢锁。

这时再回来看 tryAcquireOnceAsync 另一分支

private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {if (leaseTime != -1L) {return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
        } else {RFuture<Boolean> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
            ttlRemainingFuture.onComplete((ttlRemaining, e) -> {if (e == null) {if (ttlRemaining) {this.scheduleExpirationRenewal(threadId);
                    }

                }
            });
            return ttlRemainingFuture;
        }
    }

能够看到,无超时工夫时,在执行加锁操作后,还执行了一段费解的逻辑

ttlRemainingFuture.onComplete((ttlRemaining, e) -> {if (e == null) {if (ttlRemaining) {this.scheduleExpirationRenewal(threadId);
                    }

                }
            })                   }                 }             }) 

此处波及到 Netty 的 Future/Promise-Listener 模型,Redisson 中简直全副以这种形式通信(所以说 Redisson 是基于 Netty 通信机制实现的),了解这段逻辑能够试着先了解

在 Java 的 Future 中,业务逻辑为一个 Callable 或 Runnable 实现类,该类的 call()或 run()执行结束意味着业务逻辑的完结,在 Promise 机制中,能够在业务逻辑中人工设置业务逻辑的胜利与失败,这样更加不便的监控本人的业务逻辑。

这块代码的外表意义就是,在执行异步加锁的操作后,加锁胜利则依据加锁实现返回的 ttl 是否过期来确认是否执行一段定时工作。

这段定时工作的就是 watchDog 的外围。

2. 锁续约

查看 RedissonLock.this.scheduleExpirationRenewal(threadId)

private void scheduleExpirationRenewal(long threadId) {RedissonLock.ExpirationEntry entry = new RedissonLock.ExpirationEntry();
        RedissonLock.ExpirationEntry oldEntry = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);
        if (oldEntry != null) {oldEntry.addThreadId(threadId);
        } else {entry.addThreadId(threadId);
            this.renewExpiration();}

    }

private void renewExpiration() {RedissonLock.ExpirationEntry ee = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
        if (ee != null) {Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {public void run(Timeout timeout) throws Exception {RedissonLock.ExpirationEntry ent = (RedissonLock.ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName());
                    if (ent != null) {Long threadId = ent.getFirstThreadId();
                        if (threadId != null) {RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId);
                            future.onComplete((res, e) -> {if (e != null) {RedissonLock.log.error("Can't update lock "+ RedissonLock.this.getName() +" expiration", e);
                                } else {if (res) {RedissonLock.this.renewExpiration();
                                    }

                                }
                            });
                        }
                    }
                }
            }, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
            ee.setTimeout(task);
        }
    }

拆分来看,这段间断嵌套且简短的代码实际上做了几步

• 增加一个 netty 的 Timeout 回调工作,每(internalLockLeaseTime / 3)毫秒执行一次,执行的办法是 renewExpirationAsync

• renewExpirationAsync 重置了锁超时工夫,又注册一个监听器,监听回调又执行了 renewExpiration

renewExpirationAsync 的 Lua 如下

protected RFuture<Boolean> renewExpirationAsync(long threadId) {return this.commandExecutor.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(this.getName()), new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
    }

if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then 
  redis.call('pexpire', KEYS[1], ARGV[1]); 
  return 1; 
end; 
return 0;

从新设置了超时工夫。

Redisson 加这段逻辑的目标是什么?

目标是为了某种场景下保障业务不影响,如工作执行超时但未完结,锁曾经开释的问题。

当一个线程持有了一把锁,因为并未设置超时工夫 leaseTime,Redisson 默认配置了 30S,开启 watchDog,每 10S 对该锁进行一次续约,维持 30S 的超时工夫,直到工作实现再删除锁。

这就是 Redisson 的 锁续约 ,也就是WatchDog 实现的基本思路。

3. 流程概括

通过整体的介绍,流程简略概括:

A、B 线程争抢一把锁,A 获取到后,B 阻塞

B 线程阻塞时并非被动 CAS,而是 PubSub 形式订阅该锁的播送音讯

A 操作实现开释了锁,B 线程收到订阅音讯告诉

B 被唤醒开始持续抢锁,拿到锁

具体加锁解锁流程总结如下图:

五、偏心锁

以上介绍的可重入锁是非偏心锁,Redisson 还基于 Redis 的队列(List)和 ZSet 实现了偏心锁

偏心的定义是什么?

偏心就是依照客户端的申请先来后到排队来获取锁,先到先得,也就是 FIFO,所以队列和容器程序编排必不可少

FairSync

回顾 JUC 的 ReentrantLock 偏心锁的实现

/**
 * Sync object for fair locks
 */
static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;

    final void lock() {acquire(1);
    }

    /**
     * Fair version of tryAcquire.  Don't grant access unless
     * recursive call or no waiters or is first.
     */
    protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

AQS 曾经提供了整个实现,是否偏心取决于实现类取出节点逻辑是否程序取

AbstractQueuedSynchronizer 是用来构建锁或者其余同步组件的根底框架,通过内置 FIFO 队列来实现资源获取线程的排队工作,他本身没有实现同步接口,仅仅定义了若干同步状态获取和开释的办法来供自定义同步组件应用(上图),反对独占和共享获取,这是基于模版办法模式的一种设计,给偏心 / 非偏心提供了土壤。

咱们用 2 张图来简略解释 AQS 的期待流程(出自《JAVA 并发编程的艺术》)

一张是同步队列(FIFO 双向队列)治理 获取同步状态失败(抢锁失败)的线程援用、期待状态和前驱后继节点的流程图

一张是 独占式获取同步状态的总流程 ,外围 acquire(int arg) 办法调用流程

能够看出锁的获取流程

AQS 保护一个同步队列,获取状态失败的线程都会退出到队列中进行自旋,移出队列或进行自旋的条件是前驱节点为头节点切胜利获取了同步状态。

而比拟另一段非偏心锁类 NonfairSync 能够发现,管制偏心和非偏心的要害代码,在于 hasQueuedPredecessors 办法。

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    /**
     * Performs lock.  Try immediate barge, backing up to normal
     * acquire on failure.
     */
    final void lock() {if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);
    }
}

NonfairSync 缩小了了 hasQueuedPredecessors 判断条件,该办法的作用就是

查看同步队列中以后节点是否有前驱节点,如果有比以后线程更早申请获取锁则返回 true。

保障每次都取队列的第一个节点(线程)来获取锁,这就是偏心规定

为什么 JUC 以默认非偏心锁呢?

因为当一个线程申请锁时,只有获取来同步状态即胜利获取。在此前提下,刚开释的线程再次获取同步状态的几率会十分大,使得其余线程只能在同步队列中期待。但这样带来的益处是,非偏心锁大大减少了零碎线程上下文的切换开销。

可见偏心的代价是性能与吞吐量。

Redis 里没有 AQS,然而有 List 和 zSet,看看 Redisson 是怎么实现偏心的。

RedissonFairLock

RedissonFairLock 用法仍然很简略

RLock fairLock = redissonClient.getFairLock(lockName);

fairLock.lock();

RedissonFairLock 继承自 RedissonLock,同样一路向下找到加锁实现办法tryLockInnerAsync

这里有 2 段简短的 Lua,然而 Debug 发现,偏心锁的入口在 command == RedisCommands.EVAL_LONG 之后,此段 Lua 较长,参数也多,咱们着重剖析 Lua 的实现规定

参数

-- lua 中的几个参数
KEYS = Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName)
KEYS[1]: lock_name, 锁名称                   
KEYS[2]: "redisson_lock_queue:{xxx}"  线程队列
KEYS[3]: "redisson_lock_timeout:{xxx}"  线程 id 对应的超时汇合

ARGV =  internalLockLeaseTime, getLockName(threadId), currentTime + threadWaitTime, currentTime
ARGV[1]: "{leaseTime}" 过期工夫
ARGV[2]: "{Redisson.UUID}:{threadId}"   
ARGV[3] = 以后工夫 + 线程等待时间:(10:00:00)+ 5000 毫秒 = 10:00:05
ARGV[4] = 以后工夫(10:00:00)部署服务器工夫,非 redis-server 服务器工夫

偏心锁实现的 Lua 脚本

-- 1. 死循环革除过期 key
while true do 
  -- 获取头节点
    local firstThreadId2 = redis.call('lindex', KEYS[2], 0);
    -- 首次获取必空跳出循环
  if firstThreadId2 == false then 
    break;
  end;
  -- 革除过期 key
  local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));
  if timeout <= tonumber(ARGV[4]) then
    redis.call('zrem', KEYS[3], firstThreadId2);
    redis.call('lpop', KEYS[2]);
  else
    break;
  end;
end;

-- 2. 不存在该锁 &&(不存在线程期待队列 || 存在线程期待队列而且第一个节点就是此线程 ID),加锁局部次要逻辑
if (redis.call('exists', KEYS[1]) == 0) and 
  ((redis.call('exists', KEYS[2]) == 0)  or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then
  -- 弹出队列中线程 id 元素,删除 Zset 中该线程 id 对应的元素
  redis.call('lpop', KEYS[2]);
  redis.call('zrem', KEYS[3], ARGV[2]);
  local keys = redis.call('zrange', KEYS[3], 0, -1);
  -- 遍历 zSet 所有 key,将 key 的超时工夫(score) - 以后工夫 ms
  for i = 1, #keys, 1 do 
    redis.call('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);
  end;
    -- 加锁设置锁过期工夫
  redis.call('hset', KEYS[1], ARGV[2], 1);
  redis.call('pexpire', KEYS[1], ARGV[1]);
  return nil;
end;

-- 3. 线程存在,重入判断
if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then
  redis.call('hincrby', KEYS[1], ARGV[2],1);
  redis.call('pexpire', KEYS[1], ARGV[1]);
  return nil;
end;

-- 4. 返回以后线程残余存活工夫
local timeout = redis.call('zscore', KEYS[3], ARGV[2]);
    if timeout ~= false then
  -- 过期工夫 timeout 的值在下方设置,此处的减法算出的仍旧是以后线程的 ttl
  return timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]);
end;

-- 5. 尾节点残余存活工夫
local lastThreadId = redis.call('lindex', KEYS[2], -1);
local ttl;
-- 尾节点不空 && 尾节点非以后线程
if lastThreadId ~= false and lastThreadId ~= ARGV[2] then
  -- 计算队尾节点残余存活工夫
  ttl = tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);
else
  -- 获取 lock_name 残余存活工夫
  ttl = redis.call('pttl', KEYS[1]);
end;

-- 6. 开端排队
-- zSet 超时工夫(score),尾节点 ttl + 以后工夫 + 5000ms + 以后工夫,无则新增,有则更新
-- 线程 id 放入队列尾部排队,无则插入,有则不再插入
local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);
if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then
  redis.call('rpush', KEYS[2], ARGV[2]);
end;
return ttl;

1. 偏心锁加锁步骤

通过以上 Lua,能够发现,lua 操作的要害构造是列表(list)和有序汇合(zSet)。

其中 list 保护了一个期待的线程队列 redisson_lock_queue:{xxx},zSet 保护了一个线程超时状况的有序汇合 redisson_lock_timeout:{xxx},只管 lua 较长,然而能够拆分为 6 个步骤

1. 队列清理

  • 保障队列中只有未过期的期待线程

2. 首次加锁

  • hset 加锁,pexpire 过期工夫

3. 重入判断

  • 此处同可重入锁 lua

4. 返回 ttl

5. 计算尾节点 ttl

  • 初始值为锁的残余过期工夫

6. 开端排队

  • ttl + 2 * currentTime + waitTime 是 score 的默认值计算公式

2. 模仿

如果模仿以下程序,就会明了 redisson 偏心锁整个加锁流程

假如 t1 10:00:00 < t2 10:00:10 < t3 10:00:20

t1:当线程 1 首次获取锁

1. 期待队列无头节点,跳出死循环 ->2

2. 不存在该锁 && 不存在线程期待队列 成立

2.1 lpop 和 zerm、zincrby 都是有效操作,只有加锁失效,阐明是首次加锁,加锁后返回 nil

加锁胜利,线程 1 获取到锁,完结

t2:线程 2 尝试获取锁(线程 1 未开释锁)

1. 期待队列无头节点,跳出死循环 ->2

2. 不存在该锁 不成立 ->3

3. 非重入线程 ->4

4.score 无值 ->5

5. 尾节点为空,设置 ttl 初始值为 lock_name 的 ttl -> 6

6. 依照 ttl + waitTime + currentTime + currentTime 来设置 zSet 超时工夫 score,并且退出期待队列,线程 2 为头节点

score = 20S + 5000ms + 10:00:10 + 10:00:10 = 10:00:35 + 10:00:10

t3:线程 3 尝试获取锁(线程 1 未开释锁)

1. 期待队列有头节点

1.1 未过期 ->2

2. 不存在该锁不成立 ->3

3. 非重入线程 ->4

4.score 无值 ->5

5. 尾节点不为空 && 尾节点线程为 2,非以后线程

5.1 取出之前设置的 score,减去以后工夫:ttl = score – currentTime ->6

6. 依照 ttl + waitTime + currentTime + currentTime 来设置 zSet 超时工夫 score,并且退出期待队列

score = 10S + 5000ms + 10:00:20 + 10:00:20 = 10:00:35 + 10:00:20

如此一来,三个须要争夺一把锁的线程,实现了一次排队,在 list 中排列他们期待线程 id,在 zSet 中寄存过期工夫(便于排列优先级)。其中返回 ttl 的线程 2 客户端、线程 3 客户端将会始终按肯定距离自旋反复执行该段 Lua,尝试加锁,如此一来便和 AQS 有了殊途同归之处。

而当线程 1 开释锁之后(这里仍旧有通过 Pub/Sub 公布解锁音讯,告诉其余线程获取)

10:00:30 线程 2 尝试获取锁(线程 1 已开释锁)

1. 期待队列有头节点,未过期 ->2

2. 不存在该锁 & 期待队列头节点是以后线程 成立

2.1 删除以后线程的队列信息和 zSet 信息,超时工夫为:

线程 2 10:00:35 + 10:00:10 – 10:00:30 = 10:00:15

线程 3 10:00:35 + 10:00:20 – 10:00:30 = 10:00:25

2.2 线程 2 获取到锁,从新设置过期工夫

加锁胜利,线程 2 获取到锁,完结

排队构造如图

偏心锁的开释脚本和重入锁相似,多了一步加锁结尾的清理过期 key 的 while true 逻辑,在此不再开展篇幅形容。

由上能够看出,Redisson 偏心锁的玩法相似于提早队列的玩法,外围都在 Redis 的 List 和 zSet 构造的搭配,但又借鉴了 AQS 实现,在定时判断头节点上一模一样(watchDog),保障了锁的竞争偏心和互斥。并发场景下,lua 脚本里,zSet 的 score 很好地解决了程序插入的问题,排列好优先级。

并且为了避免因异样而退出的线程无奈清理,每次申请都会判断头节点的过期状况给予清理,最初开释时通过 CHANNEL 告诉订阅线程能够来获取锁,反复一开始的步骤,顺利交接到下一个程序线程。

六、总结

Redisson 整体实现分布式加解锁流程的实现稍显简单,作者 Rui Gu 对 Netty 和 JUC、Redis 钻研深刻,利用了很多高级个性和语义,值得深刻学习,本次介绍也只是单机 Redis 下锁实现。

Redisson 也提供了多机状况下的联锁 MultiLock:

https://github.com/redisson/r… 分布式锁和同步器 #81- 可重入锁 reentrant-lock

和官网举荐的红锁 RedLock:

https://github.com/redisson/r… 分布式锁和同步器 #84- 红锁 redlock

所以,当你真的须要分布式锁时,无妨先来 Redisson 里找找。

正文完
 0