


  • 1 . 原有受权我的项目集成了 Spring 中的 RedisLockRegistry 以实现分布式锁,在迁徙受权服务为 Reactive 编程的时候,须要实现 Reactive 形式的分布式锁实现(Reference[1])。
  • 2 . 原有 RedisLockRegistry 是基于 Lua-ScriptThreadId来进行解决的。
  • 3 . 次要目标是放弃迁徙后的我的项目中原有业务逻辑不变,并可保障并发问题。


  • 1 . 因为 Reactive 的编程模式绝对于传统编程模式的变动,在 Reactor-Netty 的 Event-Loop 环境下,无奈再应用线程 ID 进行逻辑辨别. 但依然能够应用 Redis 的 Lua-Script 来实现并发管制
  • 2 . 在并发时,无奈再应用传统的 while(true) {... break}Thread.sleep的形式,来期待获取锁和查看锁状态。须要转换思路,应用 Reactive 的形式进行解决。
  • 3 . 最终实现和 RedisLockRegistry 根本保持一致的锁解决计划,并适配 Reactive 环境




  • 1 . 锁解决Lua-Script
private static final String OBTAIN_LOCK_SCRIPT = "local lockSet = redis.call('SETNX', KEYS[1], ARGV[1])\n" +
            "if lockSet == 1 then\n" +
            "redis.call('PEXPIRE', KEYS[1], ARGV[2])\n" +
            "return true\n" +
            "else\n" +
            "return false\n" +
  • 2 . 外围获取锁代码片段
 * execute redis-script to obtain lock
 * @return if obtain success then return true otherwise return false
private Mono<Boolean> obtainLock() {
    return Mono.from(ReactiveRedisDistributedLockRegistry.this.reactiveStringRedisTemplate
            .map(success -> {boolean result = Boolean.TRUE.equals(success);
                if (result) {this.lockedAt = System.currentTimeMillis();
                return result;
  • 3 . 外围开释锁代码片段
 * remove redis lock key
 * @return
private Mono<Boolean> removeLockKey() {return Mono.just(ReactiveRedisDistributedLockRegistry.this.unlinkAvailable)
            .filter(unlink -> unlink)
            .flatMap(unlink -> ReactiveRedisDistributedLockRegistry.this.reactiveStringRedisTemplate
                    .doOnError(throwable -> {
                        ReactiveRedisDistributedLockRegistry.this.unlinkAvailable = false;
                        if (log.isDebugEnabled()) {log.debug("The UNLINK command has failed (not supported on the Redis server?);" +
                                    "falling back to the regular DELETE command", throwable);
                        } else {log.warn("The UNLINK command has failed (not supported on the Redis server?);" +
                                    "falling back to the regular DELETE command:" + throwable.getMessage());
                    .onErrorResume(throwable -> ReactiveRedisDistributedLockRegistry.this.reactiveStringRedisTemplate.delete(this.lockKey))
  • 5 . 查看锁是否被占用代码片段
 * check is the acquired is in this process
 * @return
Mono<Boolean> isAcquiredInThisProcess() {return ReactiveRedisDistributedLockRegistry.this.reactiveStringRedisTemplate.opsForValue()
  • 4 . 根底锁接口定义
public interface ReactiveDistributedLock {

     * get lock Key
     * @return
    String getLockKey();

     * Try to acquire the lock once. Lock is acquired for a pre configured duration.
     * @return if lock succeeded then return true otherwise return false
     * <strong>if flow is empty default return false</strong>
    Mono<Boolean> acquireOnce();

     * Try to acquire the lock. Lock is acquired for a pre configured duration.
     * @return
     * <strong>if flow is empty then throw an excpetion {@link CannotAcquireLockException}</strong>
    Mono<Boolean> acquire();

     * Try to acquire the lock for a given duration.
     * @param duration duration in used
     * @return
     * <strong>the given duration must less than the default duration.Otherwise the lockKey well be expire by redis with default expire duration</strong>
     * <strong>if flow is empty then throw an excpetion {@link CannotAcquireLockException}</strong>
    Mono<Boolean> acquire(Duration duration);

     * Release the lock.
     * @return
     * <strong>if lock key doesn't exist in the redis,then throw an exception {@link IllegalStateException}</strong>
    Mono<Boolean> release();}    
  • 5 . 根底锁接口实现
private final class ReactiveRedisDistributedLock implements ReactiveDistributedLock {

        public String getLockKey() {return this.lockKey;}

        public Mono<Boolean> acquireOnce() {log.debug("Acquire Lock Once,LockKey:{}",this.lockKey);
            return this.obtainLock()
                    .doOnNext(lockResult -> log.info("Obtain Lock Once,LockKey:{},Result:{}",this.lockKey,lockResult))

        public Mono<Boolean> acquire() {log.debug("Acquire Lock By Default Duration :{}" ,expireDuration);
            // 这里应用默认配置的最大等待时间获取锁
            return this.acquire(ReactiveRedisDistributedLockRegistry.this.expireDuration);

        public Mono<Boolean> acquire(Duration duration) {
          // 尝试获取锁
            return this.obtainLock()
                 // 过滤获取锁胜利
                    .filter(result -> result)
                    // 如果是 Empty,则重试
                    .repeatWhenEmpty(Repeat.onlyIf(repeatContext -> true)
                           // 重试超时工夫
                            // 重试距离
                            .// 重试日志记录
                            .doOnRepeat(objectRepeatContext -> {if (log.isTraceEnabled()) {log.trace("Repeat Acquire Lock Repeat Content:{}",objectRepeatContext);
                    // 这里必须应用 `defaultIfEmpty`, 在 repeat 超时后,整个流的信号会变为 empty,如果不解决 empty 则整个留就中断了或者由最外层的 empty 解决办法解决
                    // 记录上锁后果日志
                    .doOnNext(lockResult -> log.info("Obtain Lock,Lock Result :{},Lock Info:{}",lockResult,this))
                    // 如果出错,则抛出异样信息

        public Mono<Boolean> release() {
          // 查看以后锁是否是本人占用
            return this.isAcquiredInThisProcess()
                 // 占用的锁
                    .filter(isThisProcess -> isThisProcess)
                    // 开释锁
                    .flatMap(isThisProcess -> this.removeLockKey()
                           // 记录日志
                            .doOnNext(releaseResult -> log.info("Released Lock:{},Lock Info:{}",releaseResult,this))
                            // 呈现未知异样,则从新抛出
                            .onErrorResume(throwable -> Mono.fromRunnable(() -> ReflectionUtils.rethrowRuntimeException(throwable)))
                            // 如果流是 empty,则示意,锁曾经不存在了,被 Redis 配置的最大过期工夫开释
                            .switchIfEmpty(Mono.error(new IllegalStateException("Lock was released in the store due to expiration." + "The integrity of data protected by this lock may have been compromised.")))
  • 6 . 内置定时工作,用于检测过期没在应用的 RedisLock, 并开释内存缓存。定时工作挂载 SpringBean 的申明周期,已实现定时工作启动和敞开。(InitializingBean,DisposableBean)
private Scheduler scheduler = Schedulers.newSingle("redis-lock-evict",true);

// 挂载 Spring 申明周期
public void afterPropertiesSet() {log.debug("Initialize Auto Remove Unused Lock Execution");
    // 应用 Flux 的个性来实现定时工作
    Flux.interval(expireEvictIdle, scheduler)
            .flatMap(value -> {long now = System.currentTimeMillis();
                log.trace("Auto Remove Unused Lock ,Evict Triggered");
                return Flux.fromIterable(this.locks.entrySet())
                        // 过滤曾经过期的锁对象
                        .filter(entry -> now - entry.getValue().getLockedAt() > expireAfter)
                        // 将没有被占用的锁删除
                        .flatMap(entry -> entry.getValue()
                                .filter(inProcess -> !inProcess)
                                .doOnNext(inProcess -> {this.locks.remove(entry.getKey());
                                    log.debug("Auto Remove Unused Lock,Lock Info:{}", entry);
                                // 谬误记录日志
                                .onErrorResume(throwable -> {log.error("Auto Remove Unused Locks Occur Exception,Lock Info:" + entry, throwable);
                                    return Mono.empty();})
            //Scheduler 须要订阅能力执行

public void destroy() {log.debug("Shutdown Auto Remove Unused Lock Execution");
    // 挂载 SpringBean 申明周期,销毁 Scheduler
  • 7 . 优化锁接口解决逻辑,减少接口默认办法,便于锁管制和解决。将锁上游执行逻辑包装成 Supplier 便于调用和解决
 * Acquire a lock and release it after action is executed or fails.
 * @param <T>  type od value emitted by the action
 * @param executionSupplier to be executed subscribed to when lock is acquired
 * @return true if lock is acquired.
 * @see ReactiveDistributedLock#acquire()
default <T> Mono<T> acquireAndExecute(Supplier<Mono<T>> executionSupplier) {return acquire()
            .flatMap(acquireResult -> Mono.just(acquireResult)
                        .filter(result -> result)
                        // 这里配合上锁逻辑,如果是空,则示意无奈获取锁
                        .switchIfEmpty(Mono.error(new CannotAcquireLockException("Failed to Obtain Lock ,LockKey:" + getLockKey())))
                        .flatMap(lockResult -> executionSupplier
                                .flatMap(result -> this.release()
                                        .flatMap(releaseResult -> Mono.just(result))
                                .onErrorResume(throwable -> this.release().flatMap(r -> Mono.error(throwable)))

 * Acquire a lock for a given duration and release it after action is executed.
 * @param <T>      type od value emitted by the action
 * @param duration how much time must pass for the acquired lock to expire
 * @param executionSupplier     to be executed subscribed to when lock is acquired
 * @return true, if lock is acquired
 * @see ReactiveDistributedLock#acquire(Duration)
default <T> Mono<T> acquireAndExecute(Duration duration, Supplier<Mono<T>> executionSupplier) {return acquire(duration)
            .flatMap(acquireResult -> Mono.just(acquireResult)
                    .filter(result -> result)
                    .switchIfEmpty(Mono.error(new CannotAcquireLockException("Failed to Obtain Lock ,LockKey:" + getLockKey())))
                    .flatMap(lockResult -> executionSupplier
                            .flatMap(result -> this.release()
                                    .flatMap(releaseResult -> Mono.just(result))
                            .onErrorResume(throwable -> this.release().flatMap(r -> Mono.error(throwable)))

 * Acquire a lock and release it after action is executed or fails.
 * @param <T>  type od value emitted by the action
 * @param executionSupplier     to be executed subscribed to when lock is acquired
 * @return true if lock is acquired.
 * @see ReactiveDistributedLock#acquire()
default <T> Flux<T> acquireAndExecuteMany(Supplier<Flux<T>> executionSupplier) {return acquire()
            .flatMapMany(acquireResult -> Mono.just(acquireResult)
                    .filter(result -> result)
                    .switchIfEmpty(Mono.error(new CannotAcquireLockException("Failed to Obtain Lock ,LockKey:" + getLockKey())))
                    .flatMapMany(lockResult -> executionSupplier
                            .flatMap(result -> this.release()
                                    .flatMap(releaseResult -> Mono.just(result))
                            .onErrorResume(throwable -> this.release().flatMap(r -> Mono.error(throwable)))

 * Acquire a lock for a given duration and release it after action is executed.
 * @param <T>      type od value emitted by the action
 * @param duration how much time must pass for the acquired lock to expire
 * @param executionSupplier     to be executed subscribed to when lock is acquired
 * @return true, if lock is acquired
 * @see ReactiveDistributedLock#acquire(Duration)
default <T> Flux<T> acquireAndExecuteMany(Duration duration, Supplier<Flux<T>> executionSupplier) {return acquire(duration)
            .flatMapMany(acquireResult -> Mono.just(acquireResult)
                    .filter(result -> result)
                    .switchIfEmpty(Mono.error(new CannotAcquireLockException("Failed to Obtain Lock ,LockKey:" + getLockKey())))
                    .flatMapMany(lockResult -> executionSupplier
                            .flatMap(result -> this.release()
                                    .flatMap(releaseResult -> Mono.just(result))
                            .onErrorResume(throwable -> this.release().flatMap(r -> Mono.error(throwable)))


  • application.yml 中进行参数配置
      expire-after: 10s
      expire-evict-idle: 1s
  • 注入 Bean
private ReactiveRedisDistributedLockRegistry reactiveRedisDistributedLockRegistry;
  • 1 . 上锁一次,疾速失败
public void testAcquireOnce() throws Exception {ProcessFunctions processFunctions = new ProcessFunctions();
    String key = "LOCK_ONCE";
    Flux<String> flux = Flux.range(0, 5)
            .flatMap(value -> this.reactiveRedisDistributedLockRegistry.obtain(key)
                    .filter(acquireResult -> acquireResult)
                    .flatMap(acquireResult -> processFunctions.processFunction())
  • 2 . 上锁期待默认超时工夫
public void testAcquireDefaultDurationAndProcessDuringTheExpireDuration() throws Exception {
    //default lock expire is 10S
    ProcessFunctions processFunctions = new ProcessFunctions();
    String key = "LOCK_DEFAULT";
    Flux<String> flux = Flux.range(0, 3)
            .flatMap(value -> this.reactiveRedisDistributedLockRegistry.obtain(key)
                    .acquireAndExecute(() ->
                    .onErrorResume(throwable -> CannotAcquireLockException.class.isAssignableFrom(throwable.getClass()),throwable -> {System.out.println("Lock Error");
                        return Mono.just(FAILED);
  • 3 . 上锁指定工夫
public void testAcquireDuration() throws Exception {ProcessFunctions processFunctions = new ProcessFunctions();
    String key = "LOCK_GIVEN_DURATION";
    Flux<String> flux = Flux.range(0, 3)
            .flatMap(value -> this.reactiveRedisDistributedLockRegistry.obtain(key)
                    .acquireAndExecute(Duration.ofSeconds(3), () ->
                    .onErrorResume(throwable -> CannotAcquireLockException.class.isAssignableFrom(throwable.getClass()), throwable -> {System.out.println("Lock Error");
                        return Mono.just(FAILED);


  • 1 . RedisLockRegistry: https://docs.spring.io/spring-integration/docs/5.3.6.RELEASE/reference/html/redis.html#redis-lock-registry
  • 2 . Trigger Mono Execution After Another Mono Terminates: https://stackoverflow.com/questions/50686524/how-to-trigger-mono-execution-after-another-mono-terminates


  • 保护在 GitHub,欢送 Issue 和 Star reactive-redis-distributed-lock
  • 目前以 SpringBoot 脚手架的模式编写,并没有公布到 Maven 地方仓库,若有须要能够自行打包。