关于响应式:响应式关系数据库处理R2DBC

10次阅读

共计 5146 个字符,预计需要花费 13 分钟才能阅读完成。

简介

之前咱们提到过,对于底层的数据源来说,MongoDB, Redis, 和 Cassandra 能够间接以 reactive 的形式反对 Spring Data。而其余很多关系型数据库比方 Postgres, Microsoft SQL Server, MySQL, H2 和 Google Spanner 则能够通过应用 R2DBC 来实现对 reactive 的反对。

明天咱们就来具体解说一下 R2DBC 的应用。

R2DBC 介绍

之前咱们介绍了 Reactor 还有基于其之上的 Spring WebFlux 框架。包含 vert.x,rxjava 等等 reactive 技术。咱们实际上在应用层曾经有很多优良的响应式解决框架。

然而有一个问题就是所有的框架都须要获取底层的数据,而基本上关系型数据库的底层读写都还是同步的。

为了解决这个问题,呈现了两个规范,一个是 oracle 提出的 ADBC (Asynchronous Database Access API),另一个就是 Pivotal 提出的 R2DBC (Reactive Relational Database Connectivity)。

R2DBC 是基于 Reactive Streams 规范来设计的。通过应用 R2DBC,你能够应用 reactive API 来操作数据。

同时 R2DBC 只是一个凋谢的规范,而各个具体的数据库连贯实现,须要实现这个规范。

明天咱们以 r2dbc-h2 为例,解说一下 r2dbc 在 Spring webFlux 中的应用。

我的项目依赖

咱们须要引入 r2dbc-spi 和 r2dbc-h2 两个库,其中 r2dbc-spi 是接口,而 r2dbc-h2 是具体的实现。

同时咱们应用了 Spring webflux, 所以还须要引入 spring-boot-starter-webflux。

具体的依赖如下:

        <!-- R2DBC H2 Driver -->
        <dependency>
            <groupId>io.r2dbc</groupId>
            <artifactId>r2dbc-h2</artifactId>
            <version>${r2dbc-h2.version}</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>

创立 ConnectionFactory

ConnectionFactory 是数据库连贯的一个具体实现,通过 ConnectionFactory 咱们能够创立到数据库的连贯。

先看一下数据库的配置文件,为了不便起见,这里咱们应用的是内存数据库 H2:

r2dbc.url=r2dbc:h2:mem://./r2dbc
r2dbc.user=sa
r2dbc.password=password

第一个 url 指定的是数据库的连贯形式,上面两个是数据库的用户名和明码。

接下来咱们看一下,怎么通过这些属性来创立 ConnectionFactory:

    @Bean
    public ConnectionFactory connectionFactory() {ConnectionFactoryOptions baseOptions = ConnectionFactoryOptions.parse(url);
        ConnectionFactoryOptions.Builder ob = ConnectionFactoryOptions.builder().from(baseOptions);
        if (!StringUtil.isNullOrEmpty(user)) {ob = ob.option(USER, user);
        }
        if (!StringUtil.isNullOrEmpty(password)) {ob = ob.option(PASSWORD, password);
        }
        return ConnectionFactories.get(ob.build());
    }

通过 url 能够 parse 失去 ConnectionFactoryOptions。而后通过 ConnectionFactories 的 get 办法创立 ConnectionFactory。

如果咱们设置了 USER 或者 PASSWORD,还能够加上这两个配置。

创立 Entity Bean

这里,咱们创立一个简略的 User 对象:

@Data
@NoArgsConstructor
@AllArgsConstructor
public class Users {

    private Long id;
    private String firstname;
    private String lastname;
}

初始化数据库

尽管 H5 有很多更加简略的形式来初始化数据库,比方间接读取 SQL 文件,这里为了阐明 R2DBC 的应用,咱们应用手动的形式来创立:

    @Bean
    public CommandLineRunner initDatabase(ConnectionFactory cf) {return (args) ->
                Flux.from(cf.create())
                        .flatMap(c ->
                                Flux.from(c.createBatch()
                                        .add("drop table if exists Users")
                                        .add("create table Users(" +
                                                "id IDENTITY(1,1)," +
                                                "firstname varchar(80) not null," +
                                                "lastname varchar(80) not null)")
                                        .add("insert into Users(firstname,lastname)" +
                                                "values('flydean','ma')")
                                        .add("insert into Users(firstname,lastname)" +
                                                "values('jacken','yu')")
                                        .execute())
                                        .doFinally((st) -> c.close())
                        )
                        .log()
                        .blockLast();}

下面的代码中,咱们应用 c.createBatch() 来向数据库插入一些数据。

除了 createBatch, 还能够应用 create 来创立单个的执行语句。

获取所有的用户

在 Dao 中,咱们提供了一个 findAll 的办法:

    public Flux<Users> findAll() {return Mono.from(connectionFactory.create())
                .flatMap((c) -> Mono.from(c.createStatement("select id,firstname,lastname from users")
                        .execute())
                        .doFinally((st) -> close(c)))
                .flatMapMany(result -> Flux.from(result.map((row, meta) -> {Users acc = new Users();
                    acc.setId(row.get("id", Long.class));
                    acc.setFirstname(row.get("firstname", String.class));
                    acc.setLastname(row.get("lastname", String.class));
                    return acc;
                })));
    }

简略解释一下下面的应用。

因为是一个 findAll 办法,咱们须要找出所有的用户信息。所以咱们返回的是一个 Flux 而不是一个 Mono。

怎么从 Mono 转换成为一个 Flux 呢?

这里咱们应用的是 flatMapMany,将 select 进去的后果,分成一行一行的,最初转换成为 Flux。

Prepare Statement

为了避免 SQL 注入,咱们须要在 SQL 中应用 Prepare statement:

    public Mono<Users> findById(long id) {return Mono.from(connectionFactory.create())
                .flatMap(c -> Mono.from(c.createStatement("select id,firstname,lastname from Users where id = $1")
                        .bind("$1", id)
                        .execute())
                        .doFinally((st) -> close(c)))
                .map(result -> result.map((row, meta) ->
                        new Users(row.get("id", Long.class),
                                row.get("firstname", String.class),
                                row.get("lastname", String.class))))
                .flatMap(p -> Mono.from(p));
    }

看下咱们是怎么在 R2DBC 中应用 prepare statement 的。

事务处理

接下来咱们看一下怎么在 R2DBC 中应用事务:

    public Mono<Users> createAccount(Users account) {return Mono.from(connectionFactory.create())
                .flatMap(c -> Mono.from(c.beginTransaction())
                        .then(Mono.from(c.createStatement("insert into Users(firstname,lastname) values($1,$2)")
                                .bind("$1", account.getFirstname())
                                .bind("$2", account.getLastname())
                                .returnGeneratedValues("id")
                                .execute()))
                        .map(result -> result.map((row, meta) ->
                                new Users(row.get("id", Long.class),
                                        account.getFirstname(),
                                        account.getLastname())))
                        .flatMap(pub -> Mono.from(pub))
                        .delayUntil(r -> c.commitTransaction())
                        .doFinally((st) -> c.close()));

    }

下面的代码中,咱们应用了事务,具体的代码有两局部:

c -> Mono.from(c.beginTransaction())
.delayUntil(r -> c.commitTransaction())

开启是的时候须要应用 beginTransaction,前面提交就须要调用 commitTransaction。

WebFlux 应用

最初,咱们须要创立 WebFlux 利用来对外提供服务:

    @GetMapping("/users/{id}")
    public Mono<ResponseEntity<Users>> getUsers(@PathVariable("id") Long id) {return usersDao.findById(id)
                .map(acc -> new ResponseEntity<>(acc, HttpStatus.OK))
                .switchIfEmpty(Mono.just(new ResponseEntity<>(null, HttpStatus.NOT_FOUND)));
    }

    @GetMapping("/users")
    public Flux<Users> getAllAccounts() {return usersDao.findAll();
    }

    @PostMapping("/createUser")
    public Mono<ResponseEntity<Users>> createUser(@RequestBody Users user) {return usersDao.createAccount(user)
                .map(acc -> new ResponseEntity<>(acc, HttpStatus.CREATED))
                .log();}

执行成果

最初,咱们运行一下代码,执行下 users:

 curl "localhost:8080/users"       
[{"id":1,"firstname":"flydean","lastname":"ma"},{"id":2,"firstname":"jacken","lastname":"yu"}]%    

完满,试验胜利。

本文的代码:webflux-with-r2dbc

本文作者:flydean 程序那些事

本文链接:http://www.flydean.com/r2dbc-introduce/

本文起源:flydean 的博客

欢送关注我的公众号:「程序那些事」最艰深的解读,最粗浅的干货,最简洁的教程,泛滥你不晓得的小技巧等你来发现!

正文完
 0