关于reactive-programming:Reactive分布式锁Redis实现

背景起因1 . 原有受权我的项目集成了Spring中的RedisLockRegistry以实现分布式锁,在迁徙受权服务为Reactive编程的时候,须要实现Reactive形式的分布式锁实现(Reference[1])。2 . 原有RedisLockRegistry是基于Lua-Script和ThreadId来进行解决的。3 . 次要目标是放弃迁徙后的我的项目中原有业务逻辑不变,并可保障并发问题。技术计划及难点1 . 因为Reactive的编程模式绝对于传统编程模式的变动,在Reactor-Netty的Event-Loop环境下,无奈再应用线程ID进行逻辑辨别.但依然能够应用Redis的Lua-Script来实现并发管制2 . 在并发时,无奈再应用传统的while(true) {... break}和Thread.sleep的形式,来期待获取锁和查看锁状态。须要转换思路,应用Reactive的形式进行解决。3 . 最终实现和RedisLockRegistry根本保持一致的锁解决计划,并适配Reactive环境外围依赖<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> <optional>true</optional></dependency><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis-reactive</artifactId> <optional>true</optional></dependency><dependency> <groupId>io.projectreactor.addons</groupId> <artifactId>reactor-extra</artifactId></dependency>实现逻辑1 . 锁解决Lua-Scriptprivate static final String OBTAIN_LOCK_SCRIPT = "local lockSet = redis.call('SETNX', KEYS[1], ARGV[1])\n" + "if lockSet == 1 then\n" + " redis.call('PEXPIRE', KEYS[1], ARGV[2])\n" + " return true\n" + "else\n" + " return false\n" + "end";2 . 外围获取锁代码片段/** * execute redis-script to obtain lock * @return if obtain success then return true otherwise return false */private Mono<Boolean> obtainLock() { return Mono.from(ReactiveRedisDistributedLockRegistry.this.reactiveStringRedisTemplate .execute(ReactiveRedisDistributedLockRegistry.this.obtainLockScript, Collections.singletonList(this.lockKey), List.of(this.lockId,String.valueOf(ReactiveRedisDistributedLockRegistry.this.expireAfter))) ) .map(success -> { boolean result = Boolean.TRUE.equals(success); if (result) { this.lockedAt = System.currentTimeMillis(); } return result; });}3 . 外围开释锁代码片段/** * remove redis lock key * @return */private Mono<Boolean> removeLockKey() { return Mono.just(ReactiveRedisDistributedLockRegistry.this.unlinkAvailable) .filter(unlink -> unlink) .flatMap(unlink -> ReactiveRedisDistributedLockRegistry.this.reactiveStringRedisTemplate .unlink(this.lockKey) .doOnError(throwable -> { ReactiveRedisDistributedLockRegistry.this.unlinkAvailable = false; if (log.isDebugEnabled()) { log.debug("The UNLINK command has failed (not supported on the Redis server?); " + "falling back to the regular DELETE command", throwable); } else { log.warn("The UNLINK command has failed (not supported on the Redis server?); " + "falling back to the regular DELETE command: " + throwable.getMessage()); } }) .onErrorResume(throwable -> ReactiveRedisDistributedLockRegistry.this.reactiveStringRedisTemplate.delete(this.lockKey)) ) .switchIfEmpty(ReactiveRedisDistributedLockRegistry.this.reactiveStringRedisTemplate.delete(this.lockKey)) .then(Mono.just(true));}5 . 查看锁是否被占用代码片段/** * check is the acquired is in this process * @return */Mono<Boolean> isAcquiredInThisProcess() { return ReactiveRedisDistributedLockRegistry.this.reactiveStringRedisTemplate.opsForValue() .get(this.lockKey) .map(this.lockId::equals);}4 . 根底锁接口定义public interface ReactiveDistributedLock { /** * get lock Key * @return */ String getLockKey(); /** * Try to acquire the lock once. Lock is acquired for a pre configured duration. * @return if lock succeeded then return true otherwise return false * <strong>if flow is empty default return false</strong> */ Mono<Boolean> acquireOnce(); /** * Try to acquire the lock. Lock is acquired for a pre configured duration. * @return * <strong>if flow is empty then throw an excpetion {@link CannotAcquireLockException}</strong> */ Mono<Boolean> acquire(); /** * Try to acquire the lock for a given duration. * @param duration duration in used * @return * <strong>the given duration must less than the default duration.Otherwise the lockKey well be expire by redis with default expire duration</strong> * <strong>if flow is empty then throw an excpetion {@link CannotAcquireLockException}</strong> */ Mono<Boolean> acquire(Duration duration); /** * Release the lock. * @return * <strong>if lock key doesn't exist in the redis,then throw an exception {@link IllegalStateException}</strong> */ Mono<Boolean> release(); } 5 . 根底锁接口实现private final class ReactiveRedisDistributedLock implements ReactiveDistributedLock { @Override public String getLockKey() { return this.lockKey; } @Override public Mono<Boolean> acquireOnce() { log.debug("Acquire Lock Once,LockKey:{}",this.lockKey); return this.obtainLock() .doOnNext(lockResult -> log.info("Obtain Lock Once,LockKey:{},Result:{}",this.lockKey,lockResult)) .doOnError(this::rethrowAsLockException); } @Override public Mono<Boolean> acquire() { log.debug("Acquire Lock By Default Duration :{}" ,expireDuration); // 这里应用默认配置的最大等待时间获取锁 return this.acquire(ReactiveRedisDistributedLockRegistry.this.expireDuration); } @Override public Mono<Boolean> acquire(Duration duration) { //尝试获取锁 return this.obtainLock() //过滤获取锁胜利 .filter(result -> result) //如果是Empty,则重试 .repeatWhenEmpty(Repeat.onlyIf(repeatContext -> true) //重试超时工夫 .timeout(duration) //重试距离 .fixedBackoff(Duration.ofMillis(100)) .//重试日志记录 .doOnRepeat(objectRepeatContext -> { if (log.isTraceEnabled()) { log.trace("Repeat Acquire Lock Repeat Content:{}",objectRepeatContext); } }) ) //这里必须应用 `defaultIfEmpty`,在repeat超时后,整个流的信号会变为empty,如果不解决empty则整个留就中断了或者由最外层的empty解决办法解决 .defaultIfEmpty(false) //记录上锁后果日志 .doOnNext(lockResult -> log.info("Obtain Lock,Lock Result :{},Lock Info:{}",lockResult,this)) //如果出错,则抛出异样信息 .doOnError(this::rethrowAsLockException); } @Override public Mono<Boolean> release() { //查看以后锁是否是本人占用 return this.isAcquiredInThisProcess() //占用的锁 .filter(isThisProcess -> isThisProcess) //开释锁 .flatMap(isThisProcess -> this.removeLockKey() //记录日志 .doOnNext(releaseResult -> log.info("Released Lock:{},Lock Info:{}",releaseResult,this)) //呈现未知异样,则从新抛出 .onErrorResume(throwable -> Mono.fromRunnable(() -> ReflectionUtils.rethrowRuntimeException(throwable))) //如果流是empty,则示意,锁曾经不存在了,被Redis配置的最大过期工夫开释 .switchIfEmpty(Mono.error(new IllegalStateException("Lock was released in the store due to expiration. " + "The integrity of data protected by this lock may have been compromised."))) ); }}6 . 内置定时工作,用于检测过期没在应用的RedisLock,并开释内存缓存。定时工作挂载SpringBean的申明周期,已实现定时工作启动和敞开。(InitializingBean,DisposableBean)private Scheduler scheduler = Schedulers.newSingle("redis-lock-evict",true);//挂载Spring 申明周期@Overridepublic void afterPropertiesSet() { log.debug("Initialize Auto Remove Unused Lock Execution"); //应用Flux的个性来实现定时工作 Flux.interval(expireEvictIdle, scheduler) .flatMap(value -> { long now = System.currentTimeMillis(); log.trace("Auto Remove Unused Lock ,Evict Triggered"); return Flux.fromIterable(this.locks.entrySet()) //过滤曾经过期的锁对象 .filter(entry -> now - entry.getValue().getLockedAt() > expireAfter) //将没有被占用的锁删除 .flatMap(entry -> entry.getValue() .isAcquiredInThisProcess() .filter(inProcess -> !inProcess) .doOnNext(inProcess -> { this.locks.remove(entry.getKey()); log.debug("Auto Remove Unused Lock,Lock Info:{}", entry); }) //谬误记录日志 .onErrorResume(throwable -> { log.error("Auto Remove Unused Locks Occur Exception,Lock Info: " + entry, throwable); return Mono.empty(); }) ); }) //Scheduler 须要订阅能力执行 .subscribe();}@Overridepublic void destroy() { log.debug("Shutdown Auto Remove Unused Lock Execution"); //挂载SpringBean申明周期,销毁Scheduler this.scheduler.dispose();}7 .优化锁接口解决逻辑,减少接口默认办法,便于锁管制和解决。将锁上游执行逻辑包装成Supplier便于调用和解决/** * Acquire a lock and release it after action is executed or fails. * * @param <T> type od value emitted by the action * @param executionSupplier to be executed subscribed to when lock is acquired * @return true if lock is acquired. * @see ReactiveDistributedLock#acquire() */default <T> Mono<T> acquireAndExecute(Supplier<Mono<T>> executionSupplier) { return acquire() .flatMap(acquireResult -> Mono.just(acquireResult) .filter(result -> result) //这里配合上锁逻辑,如果是空,则示意无奈获取锁 .switchIfEmpty(Mono.error(new CannotAcquireLockException("Failed to Obtain Lock ,LockKey: " + getLockKey()))) .flatMap(lockResult -> executionSupplier .get() .flatMap(result -> this.release() .flatMap(releaseResult -> Mono.just(result)) ) .switchIfEmpty(this.release().then(Mono.empty())) .onErrorResume(throwable -> this.release().flatMap(r -> Mono.error(throwable))) ) );}/** * Acquire a lock for a given duration and release it after action is executed. * * @param <T> type od value emitted by the action * @param duration how much time must pass for the acquired lock to expire * @param executionSupplier to be executed subscribed to when lock is acquired * @return true, if lock is acquired * @see ReactiveDistributedLock#acquire(Duration) */default <T> Mono<T> acquireAndExecute(Duration duration, Supplier<Mono<T>> executionSupplier) { return acquire(duration) .flatMap(acquireResult -> Mono.just(acquireResult) .filter(result -> result) .switchIfEmpty(Mono.error(new CannotAcquireLockException("Failed to Obtain Lock ,LockKey: " + getLockKey()))) .flatMap(lockResult -> executionSupplier .get() .flatMap(result -> this.release() .flatMap(releaseResult -> Mono.just(result)) ) .switchIfEmpty(this.release().then(Mono.empty())) .onErrorResume(throwable -> this.release().flatMap(r -> Mono.error(throwable))) ) );}/** * Acquire a lock and release it after action is executed or fails. * * @param <T> type od value emitted by the action * @param executionSupplier to be executed subscribed to when lock is acquired * @return true if lock is acquired. * @see ReactiveDistributedLock#acquire() */default <T> Flux<T> acquireAndExecuteMany(Supplier<Flux<T>> executionSupplier) { return acquire() .flatMapMany(acquireResult -> Mono.just(acquireResult) .filter(result -> result) .switchIfEmpty(Mono.error(new CannotAcquireLockException("Failed to Obtain Lock ,LockKey: " + getLockKey()))) .flatMapMany(lockResult -> executionSupplier .get() .flatMap(result -> this.release() .flatMap(releaseResult -> Mono.just(result)) ) .switchIfEmpty(this.release().thenMany(Flux.empty())) .onErrorResume(throwable -> this.release().flatMap(r -> Mono.error(throwable))) ) );}/** * Acquire a lock for a given duration and release it after action is executed. * * @param <T> type od value emitted by the action * @param duration how much time must pass for the acquired lock to expire * @param executionSupplier to be executed subscribed to when lock is acquired * @return true, if lock is acquired * @see ReactiveDistributedLock#acquire(Duration) */default <T> Flux<T> acquireAndExecuteMany(Duration duration, Supplier<Flux<T>> executionSupplier) { return acquire(duration) .flatMapMany(acquireResult -> Mono.just(acquireResult) .filter(result -> result) .switchIfEmpty(Mono.error(new CannotAcquireLockException("Failed to Obtain Lock ,LockKey: " + getLockKey()))) .flatMapMany(lockResult -> executionSupplier .get() .flatMap(result -> this.release() .flatMap(releaseResult -> Mono.just(result)) ) .switchIfEmpty(this.release().thenMany(Flux.empty())) .onErrorResume(throwable -> this.release().flatMap(r -> Mono.error(throwable))) ) );}应用办法在application.yml中进行参数配置lock: redis: reactive: expire-after: 10s expire-evict-idle: 1s注入Bean@Autowiredprivate ReactiveRedisDistributedLockRegistry reactiveRedisDistributedLockRegistry;1 . 上锁一次,疾速失败@Testpublic void testAcquireOnce() throws Exception { ProcessFunctions processFunctions = new ProcessFunctions(); String key = "LOCK_ONCE"; Flux<String> flux = Flux.range(0, 5) .flatMap(value -> this.reactiveRedisDistributedLockRegistry.obtain(key) .acquireOnce() .filter(acquireResult -> acquireResult) .flatMap(acquireResult -> processFunctions.processFunction()) .switchIfEmpty(Mono.just(FAILED)) ) .doOnNext(System.out::println); StepVerifier.create(flux) .expectNext(OK) .expectNext(FAILED) .expectNext(FAILED) .expectNext(FAILED) .expectNext(FAILED) .verifyComplete();}2 . 上锁期待默认超时工夫@Testpublic void testAcquireDefaultDurationAndProcessDuringTheExpireDuration() throws Exception { //default lock expire is 10S ProcessFunctions processFunctions = new ProcessFunctions(); String key = "LOCK_DEFAULT"; Flux<String> flux = Flux.range(0, 3) .flatMap(value -> this.reactiveRedisDistributedLockRegistry.obtain(key) .acquireAndExecute(() -> processFunctions.processDelayFunction(Duration.ofSeconds(2)) ) .doOnNext(System.out::println) .onErrorResume(throwable -> CannotAcquireLockException.class.isAssignableFrom(throwable.getClass()),throwable -> { System.out.println("Lock Error"); return Mono.just(FAILED); }) ); StepVerifier.create(flux) .expectNext(OK) .expectNext(OK) .expectNext(OK) .verifyComplete();}3 . 上锁指定工夫@Testpublic void testAcquireDuration() throws Exception { ProcessFunctions processFunctions = new ProcessFunctions(); String key = "LOCK_GIVEN_DURATION"; Flux<String> flux = Flux.range(0, 3) .subscribeOn(Schedulers.parallel()) .flatMap(value -> this.reactiveRedisDistributedLockRegistry.obtain(key) .acquireAndExecute(Duration.ofSeconds(3), () -> processFunctions.processDelayFunction(Duration.ofSeconds(2)) ) .doOnNext(System.out::println) .onErrorResume(throwable -> CannotAcquireLockException.class.isAssignableFrom(throwable.getClass()), throwable -> { System.out.println("Lock Error"); return Mono.just(FAILED); }) ); StepVerifier.create(flux) .expectNext(OK) .expectNext(FAILED) .expectNext(OK) .verifyComplete();}参考文档1 . RedisLockRegistry: https://docs.spring.io/spring-integration/docs/5.3.6.RELEASE/reference/html/redis.html#redis-lock-registry2 . Trigger Mono Execution After Another Mono Terminates: https://stackoverflow.com/questions/50686524/how-to-trigger-mono-execution-after-another-mono-terminates源码相干保护在GitHub,欢送Issue和Star reactive-redis-distributed-lock目前以SpringBoot脚手架的模式编写,并没有公布到Maven地方仓库,若有须要能够自行打包。

May 20, 2021 · 6 min · jiezi

Java编程方法论响应式RxJava与代码设计实战序

原文链接:《Java编程方法论:响应式RxJava与代码设计实战》序,来自于微信公众号:次灵均阁正文内容在《2019 一月的InfoQ 架构和设计趋势报告》1中,响应式编程(Reactive Programming)和函数式(Functional Programing)仍旧编列在第一季度(Q1)的 Early Adopters(早期采纳者) 中。尽管这仅是一家之言,然而不少的开发人员逐渐意识到 Reactive 之风俨然吹起。也许您的生产系统尚未出现 Reactive 的身影,不过您可能听说过 Spring WebFlux 或 Netflix Hystrix 等开源框架。笔者曾请教过 Pivotal(Spring 母公司)布道师 Josh Long2:”Spring 技术栈未来的重心是否要布局在 Reactive 之上?“。对方的答复是:”没错,Reactive 是未来趋势。“。同时,越来越多的开源项目开始签署 Reactive 宣言(The Reactive Manifesto)3,并喊出 ”Web Are Reactive“ 的口号。 或许开源界的种种举动无法说服您向 Reactive 的”港湾“中停靠,不过 Java 9 Flow API4 的引入,又给业界注入了一剂强心针。不难看出,无论是 Java API,还是 Java 框架均走向了 Reactive 编程模型的道路,这并非是一种巧合。 通常,人们谈到的 Reactive 可与 Reactive 编程划上等号,以”非阻塞(Non-Blocking)“和”异步(Asynchronous)“的特性并述,数据结构与操作相辅相成。Reactive 涉及函数式和并发两种编程模型,前者关注语法特性,后者强调执行效率。简言之,Reactive 编程的意图在于 ”Less Code,More Efficient“。除此之外,个人认为 Reactive 更大的价值在于统一 Java 并发编程模型,使得同步和异步的实现代码无异,同时做到 Java 编程风格与其他编程语言更好地融合,或许您也发现 Java 与 JS 在 Reactive 方面并不存在本质区别。纵观 Java 在 Reactive 编程上的发展而看,其特性更新可谓是步步为营,如履薄冰。尽管 Java 线程 API Thread 与 Runnable 就已具备异步以及非阻塞的能力,然而同步和异步编程的模式并不统一,并且理解 Thread API 的细节和管理线程生命周期的成本均由开发人员概括承受。虽然 Java 5 引入 J.U.C 框架(Java 并发框架)之后, ExecutorService 实现减轻了以上负担。不过开发人员仍需关注 ExecutorService 实现细节,比如怎样合理地设置线程池空间以及阻塞队列又成为新的挑战。为此,Java 7 又引入 ForkJoinPool API,不过此时的J.U.C 框架与 Reactive 理念仍存在距离,即使是线程安全的数据结构,也并不具备并行计算的能力,如:集合并行排序,同时操作集合的手段也相当的贫瘠,缺少类似 Map/Reduce 等操作。不过这些困难只是暂时的,终究被 Java 8 ”救赎“。Stream API 的出现不但具备数据操作在串行和并行间自由切换的能力,如 sequential() 以及 parallel() 方法,而且淡化了并发的特性,如 sorted() 方法即可能是传统排序,亦或是并行排序。相同的设计哲学也体现在 Java Reactive 实现框架中,如同书中提及的 RxJava5 API io.reactivex.Observable 。统一编程模型只是 Stream 其中设计目标之一,它结合 Lambda 语法特性,虽然提供了数量可观的操作方法,如 flatMap() 等,然而无论对比 RxJava,还是 Reactor6 ,Stream 操作方法却又相形见绌。值得一提的是,这些操作方法在 Reactive 的术语中称之为操作符(Operators)。当然框架内建的操作符的多与寡,并非判断其是否为 Reactive 实现的依据。其中决定性因素在于数据必须来源于发布方(生产者)的”推送(Push)“,而非消费端的”拉取(Pull)“。显然,Stream 属于消费端已就绪(Ready)的数据集合,并不存在其他数据推送源。不过 JVM 语言早期的 Reactive 定义处于模糊地带,如 RxJava API 属于观察者模式(Observer Pattern)7的扩展,而非迭代器(Iterator Pattern)模式8的实现。而 Reactor 的实现则拥抱 Reactive Streams 规范9 ,该规范消费端对于数据的操作是被动的处理,而非主动的索。换言之,数据的到达存在着不确定性10。当推送的数据无法得到消费端及时效应时,Reactive 框架必须提供背压(Backpressure)11实现,确保消费端拥有”拒绝的权利(cancel)”。在此理论基础上,Reactive Streams 规范定义了一套抽象的 API,作为 Java 9 java.util.concurrent.Flow API 的顶层设计。不过关于操作符的部分,该规范似乎不太关心,这也是为什么 RxJava 和 Reactor 均称自身为 Reactive 扩展框架的原因,同时两者在 API 级别提供多种调度器(Schedulers)12实现,适配不同并发场景提供。尽管 Reactive 定义在不同的阵营之间存在差异,援引本人在《Reactive-Programming-一种技术-各自表述》13文中的总结: ...

June 20, 2019 · 3 min · jiezi

Java-Reactive-Web设计与实现

注: 本文是由读者观看小马哥公开课视频过程中的笔记整理而成。更多Spring Framework文章可参看笔者个人github: spring-framework-lesson 。0. 编程模型与并发模型Spring 5实现了一部分Reactive Spring WebFlux: Reactive Web(non-blocking servers in general) Spring Web MVC:传统Servlet Web(servlet applications in general) 0.1 编程模型编程模型:阻塞、非阻塞 NIO:同步+非阻塞,基于事件非阻塞 基本上采用Callback方式当时不阻塞,后续再输出(再回调)0.2 并发模型并发模型: 同步(Sync)异步(Async)0.3 比较同步+非阻塞:线程不会改变,不会切换[线程:main] Observable 添加观察者! [线程:main] 通知所有观察者! [线程:main] 3. 收到数据更新:Hello World [线程:main] 2. 收到数据更新:Hello World [线程:main] 1. 收到数据更新:Hello World 异步+非阻塞:线程会被切换[线程:main] 启动一个JFrame窗口! [线程:AWT-EventQueue-0] 销毁当前窗口! [线程:AWT-EventQueue-0] 窗口被关闭,退出程序!使用Jconsole查看改异步非阻塞程序 等待总数一直在增加,说明异步程序一直在等待。NIO就是无限地在处理,无限地在等待。 1. Reactive概念Reactive Programming:响应式编程,异步非阻塞就是响应式编程(Reactive Programming),与之相对应的是命令式编程。 Reactive并不是一种新的技术,不用Reactive照样可以实现非阻塞(同步、异步均可,推拉模式的结合),比如利用观察者模式实现(比如Java Swing GUI技术)。 Reactive的另外一种实现方式就是消息队列。 1.1 标准概念1.1.1 维基百科讲法https://en.wikipedia.org/wiki... In computing, reactive programming is a declarative programming paradigm concerned with data streams and the propagation of change. With this paradigm it is possible to express static (e.g., arrays) or dynamic (e.g., event emitters) data streams with ease, and also communicate that an inferred dependency within the associated execution model exists, which facilitates the automatic propagation of the changed data flow关键点: ...

May 25, 2019 · 8 min · jiezi