共计 5359 个字符,预计需要花费 14 分钟才能阅读完成。
Reactor 是一个齐全 非阻塞 的 JVM 响应式 编程根底,有着高效的需要治理(背压的模式)。它间接整合 Java8 的函数式 API,尤其是
CompletableFuture
,Stream
,还有Duration
。提供了可组合的异步化序列 API — Flux (对于 [N] 个元素) and Mono (对于 [0|1] 元素) — 并宽泛实现 响应式 Stream 标准。
这次带大家从零开始,应用 Spring Boot 框架建设一个 Reactor 响应式我的项目。
1 创立我的项目
应用 https://start.spring.io/ 创立我的项目。增加依赖项:H2、Lombok、Spring Web、JPA、JDBC
而后导入 Reactor
包
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
2 集成 H2 数据库
application.properties
文件中增加 H2
数据连贯信息。此外,端口应用 8081(随便,本地未被应用的端口即可)。
server.port=8081
################ H2 数据库 根底配置 ##############
spring.datasource.driverClassName=org.h2.Driver
spring.datasource.url=jdbc:h2:~/user
spring.datasource.username=sa
spring.datasource.password=
spring.jpa.database=h2
spring.jpa.hibernate.ddl-auto=update
spring.h2.console.path=/h2-console
spring.h2.console.enable=true
3 创立测试类
3.1 user 实体
建设简略数据操作实体 User。
import lombok.Data;
import lombok.NoArgsConstructor;
import javax.persistence.*;
/**
* @Author: prepared
* @Date: 2022/8/29 21:40
*/
@Data
@NoArgsConstructor
@Table(name = "t_user")
@Entity
public class User {
@Id
@GeneratedValue(strategy = GenerationType.AUTO)
private Long id;
private String userName;
private int age;
private String sex;
public User(String userName, int age, String sex) {
this.userName = userName;
this.age = age;
this.sex = sex;
}
}
3.2 UserRepository
数据模型层应用 JPA
框架。
import com.prepared.user.domain.User;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;
/**
* @Author: prepared
* @Date: 2022/8/29 21:45
*/
@Repository
public interface UserRepository extends JpaRepository<User, Long> {}
3.3 UserService
service 减少两个办法,add 办法,用来增加数据;list 办法,用来查问所有数据。所有接口返回 Mono/Flux 对象。
最佳实际:所有的第三方接口、IO 耗时比拟长的操作都能够放在 Mono 对象中。
doOnError
监控异常情况;
doFinally
监控整体执行状况,如:耗时、调用量监控等。
import com.prepared.user.dao.UserRepository;
import com.prepared.user.domain.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import javax.annotation.Resource;
import java.util.List;
/**
* @Author: prepared
* @Date: 2022/8/29 21:45
*/
@Service
public class UserService {private Logger logger = LoggerFactory.getLogger(UserService.class);
@Resource
private UserRepository userRepository;
public Mono<Boolean> save(User user) {long startTime = System.currentTimeMillis();
return Mono.fromSupplier(() -> {return userRepository.save(user) != null;
})
.doOnError(e -> {
// 打印异样日志 & 减少监控(自行处理)logger.error("save.user.error, user={}, e", user, e);
})
.doFinally(e -> {
// 耗时 & 整体健康
logger.info("save.user.time={}, user={}", user, System.currentTimeMillis() - startTime);
});
}
public Mono<User> findById(Long id) {long startTime = System.currentTimeMillis();
return Mono.fromSupplier(() -> {return userRepository.getReferenceById(id);
}).doOnError(e -> {
// 打印异样日志 & 减少监控(自行处理)logger.error("findById.user.error, id={}, e", id, e);
})
.doFinally(e -> {
// 耗时 & 整体健康
logger.info("findById.user.time={}, id={}", id, System.currentTimeMillis() - startTime);
});
}
public Mono<List<User>> list() {long startTime = System.currentTimeMillis();
return Mono.fromSupplier(() -> {return userRepository.findAll();
}).doOnError(e -> {
// 打印异样日志 & 减少监控(自行处理)logger.error("list.user.error, e", e);
})
.doFinally(e -> {
// 耗时 & 整体健康
logger.info("list.user.time={},", System.currentTimeMillis() - startTime);
});
}
public Flux<User> listFlux() {long startTime = System.currentTimeMillis();
return Flux.fromIterable(userRepository.findAll())
.doOnError(e -> {
// 打印异样日志 & 减少监控(自行处理)logger.error("list.user.error, e", e);
})
.doFinally(e -> {
// 耗时 & 整体健康
logger.info("list.user.time={},", System.currentTimeMillis() - startTime);
});
}
}
3.4 UserController
controller
减少两个办法,add 办法,用来增加数据;list 办法,用来查问所有数据。
list 办法还有另外一种写法,这就波及到 Mono 和 Flux 的不同了。
返回 List
能够应用Mono<List<User>>
,也能够应用 Flux<User>
。
Mono<T>
是一个特定的Publisher<T>
,最多能够收回一个元素Flux<T>
是一个规范的Publisher<T>
,示意为收回 0 到 N 个元素的异步序列
import com.prepared.user.domain.User;
import com.prepared.user.service.UserService;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
/**
* @Author: prepared
* @Date: 2022/8/29 21:47
*/
@RestController
public class UserController {
@Resource
private UserService userService;
@RequestMapping("/add")
public Mono<Boolean> add() {User user = new User("xiaoming", 10, "F");
return userService.save(user) ;
}
@RequestMapping("/list")
public Mono<List<User>> list() {return userService.list();
}
}
@RequestMapping("/listFlux")
public Flux<User> listFlux() {return userService.listFlux();
}
3.5 SpringReactorApplication 增加注解反对
Application 启动类增加注解 @EnableJpaRepositories
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
/**
* Hello world!
*/
@SpringBootApplication
@EnableJpaRepositories
public class SpringReactorApplication {public static void main(String[] args) {SpringApplication.run(SpringReactorApplication.class, args);
}
}
测试
启动我的项目,拜访 localhost:8081/add
,失常返回 true。
查问所有数据,拜访localhost:8081/list
,能够看到插入的数据,曾经查问进去了。PS:我这里执行了屡次 add,所以有多条记录。
后盾日志:
2022-09-05 20:13:17.385 INFO 15696 --- [nio-8082-exec-2] com.prepared.user.service.UserService : list.user.time=181,
执行了 UserService list()
办法的 doFinnally
代码块,打印耗时日志。
总结
响应式编程的劣势是不会阻塞。那么失常咱们的代码中有哪些阻塞的操作呢?
Future
的get()
办法;Reactor
中的block()
办法,subcribe()
办法,所以在应用 Reactor 的时候,除非编写测试代码,否则不要间接调用以上两个办法;- 同步办法调用,所以高并发状况下,会应用异步调用(如 Future)来晋升响应速度。
下一篇,解说如何将熔断、限流框架 resilience4j
整合到我的项目中,敬请期待。