mysql 应该是咱们在日常工作中应用到的一个十分广泛的数据库,尽管 mysql 当初是 oracle 公司的,然而它是开源的,市场占有率还是十分高的。

明天咱们将会介绍 r2dbc 在 mysql 中的应用。

r2dbc-mysql 的 maven 依赖

要想应用 r2dbc-mysql,咱们须要增加如下的 maven 依赖:


当然,如果你想应用 snapshot 版本的话,能够这样:


    <name>SonaType Snapshots</name>

创立 connectionFactory

创立 connectionFactory 的代码实际上应用的 r2dbc 的标准接口,所以和之前讲到的 h2 的创立代码基本上是一样的:

// Notice: the query string must be URL encoded
ConnectionFactory connectionFactory = ConnectionFactories.get(
    "r2dbcs:mysql://root:database-password-in-here@" +
    "zeroDate=use_round&" +
    "sslMode=verify_identity&" +
    "useServerPrepareStatement=true&" +
    "tlsVersion=TLSv1.3%2CTLSv1.2%2CTLSv1.1&" +
    "sslCa=%2Fpath%2Fto%2Fmysql%2Fca.pem&" +
    "sslKey=%2Fpath%2Fto%2Fmysql%2Fclient-key.pem&" +
    "sslCert=%2Fpath%2Fto%2Fmysql%2Fclient-cert.pem&" +

// Creating a Mono using Project Reactor
Mono<Connection> connectionMono = Mono.from(connectionFactory.create());

不同的是 ConnectionFactories 传入的参数不同。

咱们也反对 unix domain socket 的格局:

// Minimum configuration for unix domain socket
ConnectionFactory connectionFactory = ConnectionFactories.get("r2dbc:mysql://root@unix?unixSocket=%2Fpath%2Fto%2Fmysql.sock")

Mono<Connection> connectionMono = Mono.from(connectionFactory.create());

同样的,咱们也反对从 ConnectionFactoryOptions 中创立 ConnectionFactory:

ConnectionFactoryOptions options = ConnectionFactoryOptions.builder()
    .option(DRIVER, "mysql")
    .option(HOST, "")
    .option(USER, "root")
    .option(PORT, 3306)  // optional, default 3306
    .option(PASSWORD, "database-password-in-here") // optional, default null, null means has no password
    .option(DATABASE, "r2dbc") // optional, default null, null means not specifying the database
    .option(CONNECT_TIMEOUT, Duration.ofSeconds(3)) // optional, default null, null means no timeout
    .option(SSL, true) // optional, default sslMode is "preferred", it will be ignore if sslMode is set
    .option(Option.valueOf("sslMode"), "verify_identity") // optional, default "preferred"
    .option(Option.valueOf("sslCa"), "/path/to/mysql/ca.pem") // required when sslMode is verify_ca or verify_identity, default null, null means has no server CA cert
    .option(Option.valueOf("sslCert"), "/path/to/mysql/client-cert.pem") // optional, default null, null means has no client cert
    .option(Option.valueOf("sslKey"), "/path/to/mysql/client-key.pem") // optional, default null, null means has no client key
    .option(Option.valueOf("sslKeyPassword"), "key-pem-password-in-here") // optional, default null, null means has no password for client key (i.e. "sslKey")
    .option(Option.valueOf("tlsVersion"), "TLSv1.3,TLSv1.2,TLSv1.1") // optional, default is auto-selected by the server
    .option(Option.valueOf("sslHostnameVerifier"), "com.example.demo.MyVerifier") // optional, default is null, null means use standard verifier
    .option(Option.valueOf("sslContextBuilderCustomizer"), "com.example.demo.MyCustomizer") // optional, default is no-op customizer
    .option(Option.valueOf("zeroDate"), "use_null") // optional, default "use_null"
    .option(Option.valueOf("useServerPrepareStatement"), true) // optional, default false
    .option(Option.valueOf("tcpKeepAlive"), true) // optional, default false
    .option(Option.valueOf("tcpNoDelay"), true) // optional, default false
    .option(Option.valueOf("autodetectExtensions"), false) // optional, default false
ConnectionFactory connectionFactory = ConnectionFactories.get(options);

// Creating a Mono using Project Reactor
Mono<Connection> connectionMono = Mono.from(connectionFactory.create());

或者上面的 unix domain socket 格局:

// Minimum configuration for unix domain socket
ConnectionFactoryOptions options = ConnectionFactoryOptions.builder()
    .option(DRIVER, "mysql")
    .option(Option.valueOf("unixSocket"), "/path/to/mysql.sock")
    .option(USER, "root")
ConnectionFactory connectionFactory = ConnectionFactories.get(options);

Mono<Connection> connectionMono = Mono.from(connectionFactory.create());

应用 MySqlConnectionFactory 创立 connection

下面的例子中,咱们应用的是通用的 r2dbc api 来创立 connection,同样的,咱们也能够应用特有的 MySqlConnectionFactory 来创立 connection:

MySqlConnectionConfiguration configuration = MySqlConnectionConfiguration.builder()
    .port(3306) // optional, default 3306
    .password("database-password-in-here") // optional, default null, null means has no password
    .database("r2dbc") // optional, default null, null means not specifying the database
    .serverZoneId(ZoneId.of("Continent/City")) // optional, default null, null means query server time zone when connection init
    .connectTimeout(Duration.ofSeconds(3)) // optional, default null, null means no timeout
    .sslMode(SslMode.VERIFY_IDENTITY) // optional, default SslMode.PREFERRED
    .sslCa("/path/to/mysql/ca.pem") // required when sslMode is VERIFY_CA or VERIFY_IDENTITY, default null, null means has no server CA cert
    .sslCert("/path/to/mysql/client-cert.pem") // optional, default has no client SSL certificate
    .sslKey("/path/to/mysql/client-key.pem") // optional, default has no client SSL key
    .sslKeyPassword("key-pem-password-in-here") // optional, default has no client SSL key password
    .tlsVersion(TlsVersions.TLS1_3, TlsVersions.TLS1_2, TlsVersions.TLS1_1) // optional, default is auto-selected by the server
    .sslHostnameVerifier(MyVerifier.INSTANCE) // optional, default is null, null means use standard verifier
    .sslContextBuilderCustomizer(MyCustomizer.INSTANCE) // optional, default is no-op customizer
    .zeroDateOption(ZeroDateOption.USE_NULL) // optional, default ZeroDateOption.USE_NULL
    .useServerPrepareStatement() // Use server-preparing statements, default use client-preparing statements
    .tcpKeepAlive(true) // optional, controls TCP Keep Alive, default is false
    .tcpNoDelay(true) // optional, controls TCP No Delay, default is false
    .autodetectExtensions(false) // optional, controls extension auto-detect, default is true
    .extendWith(MyExtension.INSTANCE) // optional, manual extend an extension into extensions, default using auto-detect
ConnectionFactory connectionFactory = MySqlConnectionFactory.from(configuration);

// Creating a Mono using Project Reactor
Mono<Connection> connectionMono = Mono.from(connectionFactory.create());

或者上面的 unix domain socket 形式:

// Minimum configuration for unix domain socket
MySqlConnectionConfiguration configuration = MySqlConnectionConfiguration.builder()
ConnectionFactory connectionFactory = MySqlConnectionFactory.from(configuration);

Mono<Connection> connectionMono = Mono.from(connectionFactory.create());

执行 statement

首先看一个简略的不带参数的 statement:

connection.createStatement("INSERT INTO `person` (`first_name`, `last_name`) VALUES ('who','how')")
    .execute(); // return a Publisher include one Result

而后看一个带参数的 statement:

connection.createStatement("INSERT INTO `person` (`birth`, `nickname`, `show_name`) VALUES (?, ?name, ?name)")
    .bind(0, LocalDateTime.of(2019, 6, 25, 12, 12, 12))
    .bind("name", "Some one") // Not one-to-one binding, call twice of native index-bindings, or call once of name-bindings.
    .bind(0, LocalDateTime.of(2009, 6, 25, 12, 12, 12))
    .bind(1, "My Nickname")
    .bind(2, "Naming show")
    .execute(); // return a Publisher include two Results.

留神,如果参数是 null 的话,能够应用 bindNull 来进行 null 值的绑定。


    .add("INSERT INTO `person` (`first_name`, `last_name`) VALUES ('who','how')")
    .add("UPDATE `earth` SET `count` = `count` + 1 WHERE `id` ='human'")
    .execute(); // return a Publisher include two Results.



    .then(Mono.from(connection.createStatement("INSERT INTO `person` (`first_name`, `last_name`) VALUES ('who','how')").execute()))
    .thenMany(connection.createStatement("INSERT INTO `person` (`birth`, `nickname`, `show_name`) VALUES (?, ?name, ?name)")
        .bind(0, LocalDateTime.of(2019, 6, 25, 12, 12, 12))
        .bind("name", "Some one")
        .bind(0, LocalDateTime.of(2009, 6, 25, 12, 12, 12))
        .bind(1, "My Nickname")
        .bind(2, "Naming show")


为了晋升数据库的执行效率,缩小建设连贯的开销,个别数据库连贯都会有连接池的概念,同样的 r2dbc 也有一个叫做 r2dbc-pool 的连接池。

r2dbc-pool 的依赖:


如果你想应用 snapshot 版本,也能够这样指定:


  <name>Spring Snapshot Repository</name>


ConnectionFactory connectionFactory = ConnectionFactories.get("r2dbc:pool:<my-driver>://<host>:<port>/<database>[?maxIdleTime=PT60S[&…]");

Publisher<? extends Connection> connectionPublisher = connectionFactory.create();

能够看到,咱们只须要在连贯 URL 下面增加 pool 这个 driver 即可。

同样的,咱们也能够通过 ConnectionFactoryOptions 来创立:

ConnectionFactory connectionFactory = ConnectionFactories.get(ConnectionFactoryOptions.builder()
   .option(DRIVER, "pool")
   .option(PROTOCOL, "postgresql") // driver identifier, PROTOCOL is delegated as DRIVER by the pool.
   .option(HOST, "…")
   .option(PORT, "…") 
   .option(USER, "…")
   .option(PASSWORD, "…")
   .option(DATABASE, "…")

Publisher<? extends Connection> connectionPublisher = connectionFactory.create();

// Alternative: Creating a Mono using Project Reactor
Mono<Connection> connectionMono = Mono.from(connectionFactory.create());

最初,你也能够间接通过创立 ConnectionPoolConfiguration 来应用线程池:

ConnectionFactory connectionFactory = …;

ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactory)

ConnectionPool pool = new ConnectionPool(configuration);

Mono<Connection> connectionMono = pool.create();

// later

Connection connection = …;
Mono<Void> release = connection.close(); // released the connection back to the pool

// application shutdown

