关于reactive-programming:Reactive分布式锁Redis实现

背景起因1 . 原有受权我的项目集成了Spring中的RedisLockRegistry以实现分布式锁,在迁徙受权服务为Reactive编程的时候,须要实现Reactive形式的分布式锁实现(Reference[1])。2 . 原有RedisLockRegistry是基于Lua-Script和ThreadId来进行解决的。3 . 次要目标是放弃迁徙后的我的项目中原有业务逻辑不变,并可保障并发问题。技术计划及难点1 . 因为Reactive的编程模式绝对于传统编程模式的变动,在Reactor-Netty的Event-Loop环境下,无奈再应用线程ID进行逻辑辨别.但依然能够应用Redis的Lua-Script来实现并发管制2 . 在并发时,无奈再应用传统的while(true) {... break}和Thread.sleep的形式,来期待获取锁和查看锁状态。须要转换思路,应用Reactive的形式进行解决。3 . 最终实现和RedisLockRegistry根本保持一致的锁解决计划,并适配Reactive环境外围依赖<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> <optional>true</optional></dependency><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis-reactive</artifactId> <optional>true</optional></dependency><dependency> <groupId>io.projectreactor.addons</groupId> <artifactId>reactor-extra</artifactId></dependency>实现逻辑1 . 锁解决Lua-Scriptprivate static final String OBTAIN_LOCK_SCRIPT = "local lockSet = redis.call('SETNX', KEYS[1], ARGV[1])\n" + "if lockSet == 1 then\n" + " redis.call('PEXPIRE', KEYS[1], ARGV[2])\n" + " return true\n" + "else\n" + " return false\n" + "end";2 . 外围获取锁代码片段/** * execute redis-script to obtain lock * @return if obtain success then return true otherwise return false */private Mono<Boolean> obtainLock() { return Mono.from(ReactiveRedisDistributedLockRegistry.this.reactiveStringRedisTemplate .execute(ReactiveRedisDistributedLockRegistry.this.obtainLockScript, Collections.singletonList(this.lockKey), List.of(this.lockId,String.valueOf(ReactiveRedisDistributedLockRegistry.this.expireAfter))) ) .map(success -> { boolean result = Boolean.TRUE.equals(success); if (result) { this.lockedAt = System.currentTimeMillis(); } return result; });}3 . 外围开释锁代码片段/** * remove redis lock key * @return */private Mono<Boolean> removeLockKey() { return Mono.just(ReactiveRedisDistributedLockRegistry.this.unlinkAvailable) .filter(unlink -> unlink) .flatMap(unlink -> ReactiveRedisDistributedLockRegistry.this.reactiveStringRedisTemplate .unlink(this.lockKey) .doOnError(throwable -> { ReactiveRedisDistributedLockRegistry.this.unlinkAvailable = false; if (log.isDebugEnabled()) { log.debug("The UNLINK command has failed (not supported on the Redis server?); " + "falling back to the regular DELETE command", throwable); } else { log.warn("The UNLINK command has failed (not supported on the Redis server?); " + "falling back to the regular DELETE command: " + throwable.getMessage()); } }) .onErrorResume(throwable -> ReactiveRedisDistributedLockRegistry.this.reactiveStringRedisTemplate.delete(this.lockKey)) ) .switchIfEmpty(ReactiveRedisDistributedLockRegistry.this.reactiveStringRedisTemplate.delete(this.lockKey)) .then(Mono.just(true));}5 . 查看锁是否被占用代码片段/** * check is the acquired is in this process * @return */Mono<Boolean> isAcquiredInThisProcess() { return ReactiveRedisDistributedLockRegistry.this.reactiveStringRedisTemplate.opsForValue() .get(this.lockKey) .map(this.lockId::equals);}4 . 根底锁接口定义public interface ReactiveDistributedLock { /** * get lock Key * @return */ String getLockKey(); /** * Try to acquire the lock once. Lock is acquired for a pre configured duration. * @return if lock succeeded then return true otherwise return false * <strong>if flow is empty default return false</strong> */ Mono<Boolean> acquireOnce(); /** * Try to acquire the lock. Lock is acquired for a pre configured duration. * @return * <strong>if flow is empty then throw an excpetion {@link CannotAcquireLockException}</strong> */ Mono<Boolean> acquire(); /** * Try to acquire the lock for a given duration. * @param duration duration in used * @return * <strong>the given duration must less than the default duration.Otherwise the lockKey well be expire by redis with default expire duration</strong> * <strong>if flow is empty then throw an excpetion {@link CannotAcquireLockException}</strong> */ Mono<Boolean> acquire(Duration duration); /** * Release the lock. * @return * <strong>if lock key doesn't exist in the redis,then throw an exception {@link IllegalStateException}</strong> */ Mono<Boolean> release(); } 5 . 根底锁接口实现private final class ReactiveRedisDistributedLock implements ReactiveDistributedLock { @Override public String getLockKey() { return this.lockKey; } @Override public Mono<Boolean> acquireOnce() { log.debug("Acquire Lock Once,LockKey:{}",this.lockKey); return this.obtainLock() .doOnNext(lockResult -> log.info("Obtain Lock Once,LockKey:{},Result:{}",this.lockKey,lockResult)) .doOnError(this::rethrowAsLockException); } @Override public Mono<Boolean> acquire() { log.debug("Acquire Lock By Default Duration :{}" ,expireDuration); // 这里应用默认配置的最大等待时间获取锁 return this.acquire(ReactiveRedisDistributedLockRegistry.this.expireDuration); } @Override public Mono<Boolean> acquire(Duration duration) { //尝试获取锁 return this.obtainLock() //过滤获取锁胜利 .filter(result -> result) //如果是Empty,则重试 .repeatWhenEmpty(Repeat.onlyIf(repeatContext -> true) //重试超时工夫 .timeout(duration) //重试距离 .fixedBackoff(Duration.ofMillis(100)) .//重试日志记录 .doOnRepeat(objectRepeatContext -> { if (log.isTraceEnabled()) { log.trace("Repeat Acquire Lock Repeat Content:{}",objectRepeatContext); } }) ) //这里必须应用 `defaultIfEmpty`,在repeat超时后,整个流的信号会变为empty,如果不解决empty则整个留就中断了或者由最外层的empty解决办法解决 .defaultIfEmpty(false) //记录上锁后果日志 .doOnNext(lockResult -> log.info("Obtain Lock,Lock Result :{},Lock Info:{}",lockResult,this)) //如果出错,则抛出异样信息 .doOnError(this::rethrowAsLockException); } @Override public Mono<Boolean> release() { //查看以后锁是否是本人占用 return this.isAcquiredInThisProcess() //占用的锁 .filter(isThisProcess -> isThisProcess) //开释锁 .flatMap(isThisProcess -> this.removeLockKey() //记录日志 .doOnNext(releaseResult -> log.info("Released Lock:{},Lock Info:{}",releaseResult,this)) //呈现未知异样,则从新抛出 .onErrorResume(throwable -> Mono.fromRunnable(() -> ReflectionUtils.rethrowRuntimeException(throwable))) //如果流是empty,则示意,锁曾经不存在了,被Redis配置的最大过期工夫开释 .switchIfEmpty(Mono.error(new IllegalStateException("Lock was released in the store due to expiration. " + "The integrity of data protected by this lock may have been compromised."))) ); }}6 . 内置定时工作,用于检测过期没在应用的RedisLock,并开释内存缓存。定时工作挂载SpringBean的申明周期,已实现定时工作启动和敞开。(InitializingBean,DisposableBean)private Scheduler scheduler = Schedulers.newSingle("redis-lock-evict",true);//挂载Spring 申明周期@Overridepublic void afterPropertiesSet() { log.debug("Initialize Auto Remove Unused Lock Execution"); //应用Flux的个性来实现定时工作 Flux.interval(expireEvictIdle, scheduler) .flatMap(value -> { long now = System.currentTimeMillis(); log.trace("Auto Remove Unused Lock ,Evict Triggered"); return Flux.fromIterable(this.locks.entrySet()) //过滤曾经过期的锁对象 .filter(entry -> now - entry.getValue().getLockedAt() > expireAfter) //将没有被占用的锁删除 .flatMap(entry -> entry.getValue() .isAcquiredInThisProcess() .filter(inProcess -> !inProcess) .doOnNext(inProcess -> { this.locks.remove(entry.getKey()); log.debug("Auto Remove Unused Lock,Lock Info:{}", entry); }) //谬误记录日志 .onErrorResume(throwable -> { log.error("Auto Remove Unused Locks Occur Exception,Lock Info: " + entry, throwable); return Mono.empty(); }) ); }) //Scheduler 须要订阅能力执行 .subscribe();}@Overridepublic void destroy() { log.debug("Shutdown Auto Remove Unused Lock Execution"); //挂载SpringBean申明周期,销毁Scheduler this.scheduler.dispose();}7 .优化锁接口解决逻辑,减少接口默认办法,便于锁管制和解决。将锁上游执行逻辑包装成Supplier便于调用和解决/** * Acquire a lock and release it after action is executed or fails. * * @param <T> type od value emitted by the action * @param executionSupplier to be executed subscribed to when lock is acquired * @return true if lock is acquired. * @see ReactiveDistributedLock#acquire() */default <T> Mono<T> acquireAndExecute(Supplier<Mono<T>> executionSupplier) { return acquire() .flatMap(acquireResult -> Mono.just(acquireResult) .filter(result -> result) //这里配合上锁逻辑,如果是空,则示意无奈获取锁 .switchIfEmpty(Mono.error(new CannotAcquireLockException("Failed to Obtain Lock ,LockKey: " + getLockKey()))) .flatMap(lockResult -> executionSupplier .get() .flatMap(result -> this.release() .flatMap(releaseResult -> Mono.just(result)) ) .switchIfEmpty(this.release().then(Mono.empty())) .onErrorResume(throwable -> this.release().flatMap(r -> Mono.error(throwable))) ) );}/** * Acquire a lock for a given duration and release it after action is executed. * * @param <T> type od value emitted by the action * @param duration how much time must pass for the acquired lock to expire * @param executionSupplier to be executed subscribed to when lock is acquired * @return true, if lock is acquired * @see ReactiveDistributedLock#acquire(Duration) */default <T> Mono<T> acquireAndExecute(Duration duration, Supplier<Mono<T>> executionSupplier) { return acquire(duration) .flatMap(acquireResult -> Mono.just(acquireResult) .filter(result -> result) .switchIfEmpty(Mono.error(new CannotAcquireLockException("Failed to Obtain Lock ,LockKey: " + getLockKey()))) .flatMap(lockResult -> executionSupplier .get() .flatMap(result -> this.release() .flatMap(releaseResult -> Mono.just(result)) ) .switchIfEmpty(this.release().then(Mono.empty())) .onErrorResume(throwable -> this.release().flatMap(r -> Mono.error(throwable))) ) );}/** * Acquire a lock and release it after action is executed or fails. * * @param <T> type od value emitted by the action * @param executionSupplier to be executed subscribed to when lock is acquired * @return true if lock is acquired. * @see ReactiveDistributedLock#acquire() */default <T> Flux<T> acquireAndExecuteMany(Supplier<Flux<T>> executionSupplier) { return acquire() .flatMapMany(acquireResult -> Mono.just(acquireResult) .filter(result -> result) .switchIfEmpty(Mono.error(new CannotAcquireLockException("Failed to Obtain Lock ,LockKey: " + getLockKey()))) .flatMapMany(lockResult -> executionSupplier .get() .flatMap(result -> this.release() .flatMap(releaseResult -> Mono.just(result)) ) .switchIfEmpty(this.release().thenMany(Flux.empty())) .onErrorResume(throwable -> this.release().flatMap(r -> Mono.error(throwable))) ) );}/** * Acquire a lock for a given duration and release it after action is executed. * * @param <T> type od value emitted by the action * @param duration how much time must pass for the acquired lock to expire * @param executionSupplier to be executed subscribed to when lock is acquired * @return true, if lock is acquired * @see ReactiveDistributedLock#acquire(Duration) */default <T> Flux<T> acquireAndExecuteMany(Duration duration, Supplier<Flux<T>> executionSupplier) { return acquire(duration) .flatMapMany(acquireResult -> Mono.just(acquireResult) .filter(result -> result) .switchIfEmpty(Mono.error(new CannotAcquireLockException("Failed to Obtain Lock ,LockKey: " + getLockKey()))) .flatMapMany(lockResult -> executionSupplier .get() .flatMap(result -> this.release() .flatMap(releaseResult -> Mono.just(result)) ) .switchIfEmpty(this.release().thenMany(Flux.empty())) .onErrorResume(throwable -> this.release().flatMap(r -> Mono.error(throwable))) ) );}应用办法在application.yml中进行参数配置lock: redis: reactive: expire-after: 10s expire-evict-idle: 1s注入Bean@Autowiredprivate ReactiveRedisDistributedLockRegistry reactiveRedisDistributedLockRegistry;1 . 上锁一次,疾速失败@Testpublic void testAcquireOnce() throws Exception { ProcessFunctions processFunctions = new ProcessFunctions(); String key = "LOCK_ONCE"; Flux<String> flux = Flux.range(0, 5) .flatMap(value -> this.reactiveRedisDistributedLockRegistry.obtain(key) .acquireOnce() .filter(acquireResult -> acquireResult) .flatMap(acquireResult -> processFunctions.processFunction()) .switchIfEmpty(Mono.just(FAILED)) ) .doOnNext(System.out::println); StepVerifier.create(flux) .expectNext(OK) .expectNext(FAILED) .expectNext(FAILED) .expectNext(FAILED) .expectNext(FAILED) .verifyComplete();}2 . 上锁期待默认超时工夫@Testpublic void testAcquireDefaultDurationAndProcessDuringTheExpireDuration() throws Exception { //default lock expire is 10S ProcessFunctions processFunctions = new ProcessFunctions(); String key = "LOCK_DEFAULT"; Flux<String> flux = Flux.range(0, 3) .flatMap(value -> this.reactiveRedisDistributedLockRegistry.obtain(key) .acquireAndExecute(() -> processFunctions.processDelayFunction(Duration.ofSeconds(2)) ) .doOnNext(System.out::println) .onErrorResume(throwable -> CannotAcquireLockException.class.isAssignableFrom(throwable.getClass()),throwable -> { System.out.println("Lock Error"); return Mono.just(FAILED); }) ); StepVerifier.create(flux) .expectNext(OK) .expectNext(OK) .expectNext(OK) .verifyComplete();}3 . 上锁指定工夫@Testpublic void testAcquireDuration() throws Exception { ProcessFunctions processFunctions = new ProcessFunctions(); String key = "LOCK_GIVEN_DURATION"; Flux<String> flux = Flux.range(0, 3) .subscribeOn(Schedulers.parallel()) .flatMap(value -> this.reactiveRedisDistributedLockRegistry.obtain(key) .acquireAndExecute(Duration.ofSeconds(3), () -> processFunctions.processDelayFunction(Duration.ofSeconds(2)) ) .doOnNext(System.out::println) .onErrorResume(throwable -> CannotAcquireLockException.class.isAssignableFrom(throwable.getClass()), throwable -> { System.out.println("Lock Error"); return Mono.just(FAILED); }) ); StepVerifier.create(flux) .expectNext(OK) .expectNext(FAILED) .expectNext(OK) .verifyComplete();}参考文档1 . RedisLockRegistry: https://docs.spring.io/spring-integration/docs/5.3.6.RELEASE/reference/html/redis.html#redis-lock-registry2 . Trigger Mono Execution After Another Mono Terminates: https://stackoverflow.com/questions/50686524/how-to-trigger-mono-execution-after-another-mono-terminates源码相干保护在GitHub,欢送Issue和Star reactive-redis-distributed-lock目前以SpringBoot脚手架的模式编写,并没有公布到Maven地方仓库,若有须要能够自行打包。

May 20, 2021 · 6 min · jiezi