Reactor 是一个齐全非阻塞的 JVM 响应式编程根底,有着高效的需要治理(背压的模式)。它间接整合 Java8 的函数式 API,尤其是 CompletableFutureStream,还有 Duration 。提供了可组合的异步化序列 API — Flux (对于 [N] 个元素) and Mono (对于 [0|1] 元素) — 并宽泛实现 响应式Stream 标准。

这次带大家从零开始,应用 Spring Boot 框架建设一个 Reactor 响应式我的项目。

1 创立我的项目

应用 https://start.spring.io/ 创立我的项目。增加依赖项:H2、Lombok、Spring Web、JPA、JDBC

而后导入 Reactor

<dependency>    <groupId>io.projectreactor</groupId>    <artifactId>reactor-core</artifactId></dependency><dependency>    <groupId>io.projectreactor</groupId>    <artifactId>reactor-test</artifactId>    <scope>test</scope></dependency>

2 集成 H2 数据库

application.properties 文件中增加 H2 数据连贯信息。此外,端口应用 8081(随便,本地未被应用的端口即可)。

server.port=8081################ H2 数据库 根底配置 ##############spring.datasource.driverClassName=org.h2.Driverspring.datasource.url=jdbc:h2:~/userspring.datasource.username=saspring.datasource.password=spring.jpa.database=h2spring.jpa.hibernate.ddl-auto=updatespring.h2.console.path=/h2-consolespring.h2.console.enable=true

3 创立测试类

3.1 user 实体

建设简略数据操作实体 User。

import lombok.Data;import lombok.NoArgsConstructor;import javax.persistence.*;/** * @Author: prepared * @Date: 2022/8/29 21:40 */@Data@NoArgsConstructor@Table(name = "t_user")@Entitypublic class User {    @Id    @GeneratedValue(strategy = GenerationType.AUTO)    private Long id;    private String userName;    private int age;    private String sex;    public User(String userName, int age, String sex) {        this.userName = userName;        this.age = age;        this.sex = sex;    }}

3.2 UserRepository

数据模型层应用 JPA 框架。

import com.prepared.user.domain.User;import org.springframework.data.jpa.repository.JpaRepository;import org.springframework.stereotype.Repository;/** * @Author: prepared * @Date: 2022/8/29 21:45 */@Repositorypublic interface UserRepository extends JpaRepository<User, Long> {}

3.3 UserService

service 减少两个办法,add 办法,用来增加数据;list 办法,用来查问所有数据。所有接口返回 Mono/Flux 对象。

最佳实际:所有的第三方接口、IO 耗时比拟长的操作都能够放在 Mono 对象中。

doOnError 监控异常情况;

doFinally 监控整体执行状况,如:耗时、调用量监控等。

import com.prepared.user.dao.UserRepository;import com.prepared.user.domain.User;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Service;import reactor.core.publisher.Mono;import javax.annotation.Resource;import java.util.List;/** * @Author: prepared * @Date: 2022/8/29 21:45 */@Servicepublic class UserService {    private Logger logger = LoggerFactory.getLogger(UserService.class);    @Resource    private UserRepository userRepository;    public Mono<Boolean> save(User user) {        long startTime = System.currentTimeMillis();        return Mono.fromSupplier(() -> {                    return userRepository.save(user) != null;                })                .doOnError(e -> {                    // 打印异样日志&减少监控(自行处理)                    logger.error("save.user.error, user={}, e", user, e);                })                .doFinally(e -> {                    // 耗时 & 整体健康                    logger.info("save.user.time={}, user={}", user, System.currentTimeMillis() - startTime);                });    }    public Mono<User> findById(Long id) {        long startTime = System.currentTimeMillis();        return Mono.fromSupplier(() -> {                    return userRepository.getReferenceById(id);                }).doOnError(e -> {                    // 打印异样日志&减少监控(自行处理)                    logger.error("findById.user.error, id={}, e", id, e);                })                .doFinally(e -> {                    // 耗时 & 整体健康                    logger.info("findById.user.time={}, id={}", id, System.currentTimeMillis() - startTime);                });    }    public Mono<List<User>> list() {        long startTime = System.currentTimeMillis();        return Mono.fromSupplier(() -> {                    return userRepository.findAll();                }).doOnError(e -> {                    // 打印异样日志&减少监控(自行处理)                    logger.error("list.user.error, e", e);                })                .doFinally(e -> {                    // 耗时 & 整体健康                    logger.info("list.user.time={}, ", System.currentTimeMillis() - startTime);                });    }    public Flux<User> listFlux() {        long startTime = System.currentTimeMillis();        return Flux.fromIterable(userRepository.findAll())                .doOnError(e -> {                    // 打印异样日志&减少监控(自行处理)                    logger.error("list.user.error, e", e);                })                .doFinally(e -> {                    // 耗时 & 整体健康                    logger.info("list.user.time={}, ", System.currentTimeMillis() - startTime);                });    }}

3.4 UserController

controller 减少两个办法,add 办法,用来增加数据;list 办法,用来查问所有数据。

list 办法还有另外一种写法,这就波及到 Mono 和 Flux 的不同了。

返回List能够应用Mono<List<User>> ,也能够应用 Flux<User>

  • Mono<T> 是一个特定的 Publisher<T>,最多能够收回一个元素
  • Flux<T> 是一个规范的 Publisher<T>,示意为收回 0 到 N 个元素的异步序列
import com.prepared.user.domain.User;import com.prepared.user.service.UserService;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import reactor.core.publisher.Mono;import javax.annotation.Resource;import java.util.ArrayList;import java.util.List;/** * @Author: prepared * @Date: 2022/8/29 21:47 */@RestControllerpublic class UserController {    @Resource    private UserService userService;    @RequestMapping("/add")    public Mono<Boolean> add() {        User user = new User("xiaoming", 10, "F");        return userService.save(user) ;    }    @RequestMapping("/list")    public Mono<List<User>> list() {        return userService.list();    }}    @RequestMapping("/listFlux")    public Flux<User> listFlux() {        return userService.listFlux();    }

3.5 SpringReactorApplication 增加注解反对

Application 启动类增加注解 @EnableJpaRepositories

import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.data.jpa.repository.config.EnableJpaRepositories;/** * Hello world! */@SpringBootApplication@EnableJpaRepositoriespublic class SpringReactorApplication {    public static void main(String[] args) {        SpringApplication.run(SpringReactorApplication.class, args);    }}

测试

启动我的项目,拜访 localhost:8081/add,失常返回 true。

查问所有数据,拜访localhost:8081/list,能够看到插入的数据,曾经查问进去了。PS:我这里执行了屡次 add,所以有多条记录。

后盾日志:

2022-09-05 20:13:17.385  INFO 15696 --- [nio-8082-exec-2] com.prepared.user.service.UserService    : list.user.time=181, 

执行了 UserService list() 办法的 doFinnally 代码块,打印耗时日志。

总结

响应式编程的劣势是不会阻塞。那么失常咱们的代码中有哪些阻塞的操作呢?

  1. Futureget() 办法;
  2. Reactor 中的 block() 办法,subcribe() 办法,所以在应用 Reactor 的时候,除非编写测试代码,否则不要间接调用以上两个办法;
  3. 同步办法调用,所以高并发状况下,会应用异步调用(如Future)来晋升响应速度。

下一篇,解说如何将熔断、限流框架 resilience4j 整合到我的项目中,敬请期待。