阿里专家杜万:Java响应式编程,一文全面解读

7次阅读

共计 6261 个字符,预计需要花费 16 分钟才能阅读完成。

本篇文章来自于 2018 年 12 月 22 日举办的《阿里云栖开发者沙龙—Java 技术专场》,杜万专家是该专场第四位演讲的嘉宾,本篇文章是根据杜万专家在《阿里云栖开发者沙龙—Java 技术专场》的演讲视频以及 PPT 整理而成。

摘要:响应式宣言如何解读,Java 中如何进行响应式编程,Reactor Streams 又该如何使用?热衷于整合框架与开发工具的阿里云技术专家杜万,为大家全面解读响应式编程,分享 Spring Webflux 的实践。从响应式理解,到 Reactor 项目示例,再到 Spring Webflux 框架解读,本文带你进入 Java 响应式编程。
演讲嘉宾简介:杜万(倚贤),阿里云技术专家,全栈工程师,从事了 12 年 Java 语言为主的软件开发工作,热衷于整合框架与开发工具,Linux 拥趸,问题终结者。合作翻译《Elixir 程序设计》。目前负责阿里云函数计算的工具链开发,正在实践 WebFlux 和 Reactor 开发新的 Web 应用。
本次直播视频精彩回顾,戳这里!https://yq.aliyun.com/live/721PPT 下载地址:https://yq.aliyun.com/download/3187 以下内容根据演讲嘉宾视频分享以及 PPT 整理而成。
本文围绕以下三部分进行介绍:1.Reactive2.Project Reactor3.Spring Webflux
一.Reactive
1.Reactive Manifesto 下图是 Reactive Manifesto 官方网站上的介绍,这篇文章非常短但也非常精悍,非常值得大家去认真阅读。

响应式宣言是一份构建现代云扩展架构的处方。这个框架主要使用消息驱动的方法来构建系统,在形式上可以达到弹性和韧性,最后可以产生响应性的价值。所谓弹性和韧性,通俗来说就像是橡皮筋,弹性是指橡皮筋可以拉长,而韧性指在拉长后可以缩回原样。这里为大家一一解读其中的关键词:
1)响应性:快速 / 一致的响应时间。假设在有 500 个并发操作时,响应时间为 1s,那么并发操作增长至 5 万时,响应时间也应控制在 1s 左右。快速一致的响应时间才能给予用户信心,是系统设计的追求。
2)韧性:复制 / 遏制 / 隔绝 / 委托。当某个模块出现问题时,需要将这个问题控制在一定范围内,这便需要使用隔绝的技术,避免连锁性问题的发生。或是将出现故障部分的任务委托给其他模块。韧性主要是系统对错误的容忍。
3)弹性:无竞争点或中心瓶颈 / 分片 / 扩展。如果没有状态的话,就进行水平扩展,如果存在状态,就使用分片技术,将数据分至不同的机器上。
4)消息驱动:异步 / 松耦合 / 隔绝 / 地址透明 / 错误作为消息 / 背压 / 无阻塞。消息驱动是实现上述三项的技术支撑。其中,地址透明有很多方法。例如 DNS 提供的一串人类能读懂的地址,而不是 IP,这是一种不依赖于实现,而依赖于声明的设计。再例如 k8s 每个 service 后会有多个 Pod,依赖一个虚拟的服务而不是某一个真实的实例,从何实现调用 1 个或调用 n 个服务实例对于对调用方无感知,这是为分片或扩展做了准备。错误作为消息,这在 Java 中是不太常见的,Java 中通常将错误直接作为异常抛出,而在响应式中,错误也是一种消息,和普通消息地位一致,这和 JavaScript 中的 Promise 类似。背压是指当上游向下游推送数据时,可能下游承受能力不足导致问题,一个经典的比喻是就像用消防水龙头解渴。因此下游需要向上游声明每次只能接受大约多少量的数据,当接受完毕再次向上游申请数据传输。这便转换成是下游向上游申请数据,而不是上游向下游推送数据。无阻塞是通过 no-blocking IO 提供更高的多线程切换效率。

2.Reactive Programming 响应式编程是一种声明式编程范型。下图中左侧显示了一个命令式编程,相信大家都比较熟悉。先声明两个变量,然后进行赋值,让两个变量相加,得到相加的结果。但接着当修改了最早声明的两个变量的值后,sum 的值不会因此产生变化。而在 Java 9 Flow 中,按相同的思路实现上述处理流程,当初始变量的值变化,最后结果的值也同步发生变化,这就是响应式编程。这相当于声明了一个公式,输出值会随着输入值而同步变化。

响应式编程也是一种非阻塞的异步编程。下图是用 reactor.ipc.netty 实现的 TCP 通信。常见的 server 中会用循环发数据后,在循环外取出,但在下图的实现中没有,因为这不是使用阻塞模型实现,是基于非阻塞的异步编程实现。

响应式编程是一种数据流编程,关注于数据流而不是控制流。下图中,首先当页面出现点击操作时产生一个 click stream,然后页面会将 250ms 内的 clickStream 缓存,如此实现了一个归组过程。然后再进行 map 操作,得到每个 list 的长度,筛选出长度大于 2 的,这便可以得出多次点击操作的流。这种方法应用非常广泛,例如可以筛选出双击操作。由此可见,这种编程方式是一种数据流编程,而不是 if else 的控制流编程。

之前有提及消息驱动,那么消息驱动(Message-driven)和事件驱动(Event-driven)有什么区别呢。
1)消息驱动有确定的目标,一定会有消息的接受者,而事件驱动是一件事情希望被观察到,观察者是谁无关紧要。消息驱动系统关注消息的接受者,事件驱动系统关注事件源。
2)在一个使用响应式编程实现的响应式系统中,消息擅长于通讯,事件擅长于反应事实。
3.Reactive StreamsReactive Streams 提供了一套非阻塞背压的异步流处理标准,主要应用在 JVM、JavaScript 和网络协议工作中。通俗来说,它定义了一套响应式编程的标准。在 Java 中,有 4 个 Reactive Streams API,如下图所示:

这个 API 中定义了 Publisher,即事件的发生源,它只有一个 subscribe 方法。其中的 Subscriber 就是订阅消息的对象。

作为订阅者,有四个方法。onSubscribe 会在每次接收消息时调用,得到的数据都会经过 onNext 方法。onError 方法会在出现问题时调用,Throwable 即是出现的错误消息。在结束时调用 onComplete 方法。

Subscription 接口用来描述每个订阅的消息。request 方法用来向上游索要指定个数的消息,cancel 方法用于取消上游的数据推送,不再接受消息。

Processor 接口继承了 Subscriber 和 Publisher,它既是消息的发生者也是消息的订阅者。这是发生者和订阅者间的过渡桥梁,负责一些中间转换的处理。Reactor Library 从开始到现在已经历经多代。第 0 代就是 java 包 Observable 接口,也就是观察者模式。具体的发展见下图:

第四代虽然仍然是 RxJava2,但是相比第三代的 RxJava2,其中的小版本有了不一样的改进,出现了新特性。Reactor Library 主要有两点特性。一是基于回调(callback-based),在事件源附加回调函数,并在事件通过数据流链时被调用;二是声明式编程(Declarative),很多函数处理业务类似,例如 map/filter/fold 等,这些操作被类库固化后便可以使用声明式方法,以在程序中快速便捷使用。在生产者、订阅者都定义后,声明式方法便可以用来实现中间处理者。
二.Project Reactor
Project Reactor,实现了完全非阻塞,并且基于网络 HTTP/TCP/UDP 等的背压,即数据传输上游为网络层协议时,通过远程调用也可以实现背压。同时,它还实现了 Reactive Streams API 和 Reactive Extensions,以及支持 Java 8 functional API/Completable Future/Stream /Duration 等各新特性。下图所示为 Reactor 的一个示例:

首先定义了一个 words 的数组,然后使用 flatMap 做映射,再将每个词和 s 做连接,得出的结果和另一个等长的序列进行一个 zipWith 操作,最后打印结果。这和 Java 8 Stream 非常类似,但仍存在一些区别:1)Stream 是 pull-based,下游从上游拉数据的过程,它会有中间操作例如 map 和 reduce,和终止操作例如 collect 等,只有在终止操作时才会真正的拉取数据。Reactive 是 push-based,可以先将整个处理数据量构造完成,然后向其中填充数据,在出口处可以取出转换结果。
2)Stream 只能使用一次,因为它是 pull-based 操作,拉取一次之后源头不能更改。但 Reactive 可以使用多次,因为 push-based 操作像是一个数据加工厂,只要填充数据就可以一直产出。
3)Stream#parallel() 使用 fork-join 并发,就是将每一个大任务一直拆分至指定大小颗粒的小任务,每个小任务可以在不同的线程中执行,这种多线程模型符合了它的多核特性。Reactive 使用 Event loop,用一个单线程不停的做循环,每个循环处理有限的数据直至处理完成。
在上例中,大家可以看到很多 Reactive 的操作符,例如 flatMap/concatWith/zipWith 等,这样的操作符有 300 多个,这可能是学习这个框架最大的压力。如何理解如此繁多的操作符,可能一个归类会有所帮助:

1)新序列创建,例如创建数组类序列等;2)现有序列转换,将其转换为新的序列,例如常见的 map 操作;3)从现有的序列取出某些元素;4)序列过滤;5)序列异常处理。6)与时间相关的操作,例如某个序列是由时间触发器定期发起事件;7)序列分割;8)序列拉至同步世界,不是所有的框架都支持异步,再需要和同步操作进行交互时就需要这种处理。上述 300+ 操作符都有如下所示的弹珠图(Marble Diagrams),用表意的方式解释其作用。例如下图的操作符是指,随着时间推移,逐个产生了 6 个元素的序列,黑色竖线表示新元素产生终止。在这个操作符的作用下,下方只取了前三个元素,到第四个元素就不取了。这些弹珠图大家可以自行了解。

三.Spring Webflux
1.Spring Webflux 框架 Spring Boot 2.0 相较之前的版本,在基于 Spring Framework 5 的构建添加了新模块 Webflux,将默认的 web 服务器改为 Netty,支持 Reactive 应用,并且 Webflux 默认运行在 Netty 上。而 Spring Framework 5 也有了一些变化。Java 版本最低依赖 Java 8,支持 Java 9 和 Java 10,提供许多支持 Reactive 的基础设施,提供面向 Netty 等运行时环境的适配器,新增 Webflux 模块(集成的是 Reactor 3.x)。下图所示为 Webflux 的框架:

左侧是通常使用的框架,通过 Servlet API 的规范和 Container 进行交互,上一层是 Spring-Webmvc,再上一层则是经常使用的一些注解。右侧为对应的 Webflux 层级,只要是支持 NIO 的 Container,例如 Tomcat,Jetty,Netty 或 Undertow 都可以实现。在协议层的是 HTTP/Reactive Streams。再上一层是 Spring-Webflux,为了保持兼容性,它支持这些常用的注解,同时也有一套新的语法规则 Router Functions。下图显示了一个调用的实例:

在 Client 端,首先创建一个 WebClient,调用其 get 方法,写入 URL,接收格式为 APPLICATION_STREAM_JSON 的数据,retrieve 获得数据,取得数据后用 bodyToFlux 将数据转换为 Car 类型的对象,在 doOnNext 中打印构造好的 Car 对象,block 方法意思是直到回调函数被执行才可以结束。在 Server 端,在指定的 path 中进行 get 操作,produces 和以前不同,这里是 application/stream+json,然后返回 Flux 范型的 Car 对象。传统意义上,如果数据中有一万条数据,那么便直接返回一万条数据,但在这个示例返回的 Flux 范型中,是不包含数据的,但在数据库也支持 Reactive 的情况下,request 可以一直往下传递,响应式的批量返回。传统方式这样的查询很有可能是一个全表遍历,这会需要较多资源和时间,甚至影响其他任务的执行。而响应式的方法除了可以避免这种情况,还可以让用户在第一时间看到数据而不是等待数据采集完毕,这在架构体验的完整性上有了很大的提升。application/stream+json 也是可以让前端识别出,这些数据是分批响应式传递,而不会等待传完才显示。
现在的 Java web 应用可以使用 Servlet 栈或 Reactive 栈。Servlet 栈已经有很久的使用历史了,而现在又增加了更有优势的 Reactive 栈,大家可以尝试实现更好的用户体验。

2.Reactive 编程模型下图中是 Spring 实现的一个向后兼容模型,可以使用 annotation 来标注 Container。这是一个非常清晰、支持非常细节化的模型,也非常利于同事间的交流沟通。

下图是一个 Functional 编程模型,通过写函数的方式构造。例如下图中传入一个 Request,返回 Response,通过函数的方法重点关注输入输出,不需要区分状态。然后将这些函数注册至 Route。这个模型和 Node.js 非常接近,也利于使用。

3.Spring Data 框架 Spring Data 框架支持多种数据库,如下图所示,最常用的是 JPA 和 JDBC。在实践中,不同的语言访问不同的数据库时,访问接口是不一样的,这对编程人员来说是个很大的工作量。
Spring Data 便是做了另一层抽象,使你无论使用哪种数据库,都可以使用同一个接口。具体特性这里不做详谈。

下图展示了一个 Spring Data 的使用示例。只需要写一个方法签名,然后注解为 Query,这个方法不需要实现,因为框架后台已经采用一些技术,直接根据 findByFirstnameAndLastname 就可以查询到。这种一致的调用方式无疑提供了巨大的方便。

现在 Reactive 对 Spring Data 的支持还是不完整的,只支持了 MongoDB/Redis/Cassandra 和 Couchbase,对 JPA/LDAP/Elasticsearch/Neo4j/Solr 等还不兼容。但也不是不能使用,例如对 JDBC 数据库,将其转为同步即可使用,重点在于 findAll 和 async 两个函数,这里不再展开详述,具体代码如下图所示:

Reactive 不支持 JDBC 最根本的原因是,JDBC 不是 non-blocking 设计。但是现在 JavaOne 已经在 2016 年 9 月宣布了 Non-blocking JDBC API 的草案,虽然还未得到 Java 10 的支持,但可见这已经成为一种趋势。
四.总结
Spring MVC 框架是一个命令式逻辑,方便编写和调试。Spring WebFlux 也具有众多优势,但调试却不太容易,因为它经常需要切换线程执行,出现错误的栈可能已经销毁。当然这也是现今 Java 的编译工具对 WebFlux 不太友好,相信以后会改善。下图中列出了 Spring MVC 和 Spring WebFlux 各自的特性及交叉的部分。最后也附上一些参考资料。

本文作者:李博 bluemind 阅读原文
本文为云栖社区原创内容,未经允许不得转载。

正文完
 0