关于java:翻译Reactor-Netty参考指南-3TCP服务端

47次阅读

共计 11164 个字符,预计需要花费 28 分钟才能阅读完成。

Reactor Netty 参考指南目录


原文地址

Reactor Netty提供了易于应用和配置的 TcpServer。它暗藏了创立TCP 服务器所需的大部分 Netty 的性能,并减少了 Reactive Streams 背压。

3.1. 启动和进行

如果要启动一个 TCP 服务器,您必须创立并且配置一个 TcpServer 实例对象。默认状况下,host是配置为任何本地地址,当执行 bind 操作的时候零碎会抉择一个长期端口。上面是创立并且配置一个 TcpServer 实例的例子:

https://github.com/reactor/re…

import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;

public class Application {public static void main(String[] args) {
        DisposableServer server =
                TcpServer.create()   //<1>
                         .bindNow(); //<2>

        server.onDispose()
              .block();}
}

<1> 创立一个 TcpServer 实例用来做配置操作。

<2> 应用阻塞期待的形式启动服务器,直到初始化实现。

返回的 DisposableServer 提供了简略的服务器 API,包含disposeNow(),这个办法能够以阻塞期待的形式来敞开服务器。

3.1.1.Host 和 Port

想要设置特定 hostport,您能够用上面的形式来配置 TCP 服务器:

https://github.com/reactor/re…

import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;

public class Application {public static void main(String[] args) {
        DisposableServer server =
                TcpServer.create()
                         .host("localhost") //<1>
                         .port(8080)        //<2>
                         .bindNow();

        server.onDispose()
              .block();}
}

<1> 配置 TCP 服务器的 host

<2> 配置 TCP 服务器的 port

3.2. 事后初始化

默认状况下,TcpServer初始化资源的操作在须要应用的时候才进行。这意味着初始化加载的时候 bind operation 会占用额定的工夫:

  • 事件循环组
  • native 传输库(当应用了 native 传输的时候)
  • 用于安全性的 native 库(应用了 OpenSsl 的时候)

当您须要预加载这些资源的时候,您能够依照以下形式来配置TcpServer

https://github.com/reactor/re…

import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;

public class Application {public static void main(String[] args) {
        TcpServer tcpServer =
                TcpServer.create()
                         .handle((inbound, outbound) -> inbound.receive().then());

        tcpServer.warmup() //<1>
                 .block();

        DisposableServer server = tcpServer.bindNow();

        server.onDispose()
              .block();}
}

<1> 初始化和加载事件循环组,native 传输库和用于安全性的 native 库

3.3. 写出数据

如果要发送数据到一个已连贯的客户端,您必须增加一个 I / O 处理器。这个 I / O 处理器能够通过 NettyOutbound 来写出数据。上面是增加一个 I / O 处理器的例子:

https://github.com/reactor/re…

import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;

public class Application {public static void main(String[] args) {
        DisposableServer server =
                TcpServer.create()
                         .handle((inbound, outbound) -> outbound.sendString(Mono.just("hello"))) //<1>
                         .bindNow();

        server.onDispose()
              .block();}
}

<1> 给连贯的客户端发送 hello 字符串

3.4. 生产数据

如果要接管从连贯的客户端发过来的数据,您必须增加一个 I / O 处理器。这个 I / O 处理器能够通过 NettyInbound 来读取数据。示例如下:

https://github.com/reactor/re…

import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;

public class Application {public static void main(String[] args) {
        DisposableServer server =
                TcpServer.create()
                         .handle((inbound, outbound) -> inbound.receive().then()) //<1>
                         .bindNow();

        server.onDispose()
              .block();}
}

<1> 接管从已连贯的客户端发过来的数据

3.5. 生命周期回调

上面的生命周期回调用参数是提供给您用来扩大 TcpServer 的:

Callback Description
doOnBind 当服务器 channel 行将被绑定的时候调用。
doOnBound 当服务器 channel 曾经被绑定的时候调用。
doOnChannelInit 当 channel 初始化的时候被调用。
doOnConnection 当一个近程客户端连贯上的时候被调用。
doOnUnbound 当服务器 channel 解绑的时候被调用。

上面是应用 doOnConnectiondoOnChannelInit回调的例子:

https://github.com/reactor/re…

import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;
import java.util.concurrent.TimeUnit;

public class Application {public static void main(String[] args) {
        DisposableServer server =
                TcpServer.create()
                         .doOnConnection(conn ->
                             conn.addHandler(new ReadTimeoutHandler(10, TimeUnit.SECONDS))) //<1>
                         .doOnChannelInit((observer, channel, remoteAddress) ->
                             channel.pipeline()
                                    .addFirst(new LoggingHandler("reactor.netty.examples")))//<2>
                         .bindNow();

        server.onDispose()
              .block();}
}

<1> 当一个近程客户端连贯的时候增加了一个 ReadTimeoutHandlerNetty pipeline。

<2> 当初始化 channel 的时候增加了一个 LoggingHandlerNetty pipeline。

3.6.TCP 层的配置

这一章节形容了三种 TCP 层的配置形式:

  • 设置 Channel Options
  • 应用 Wire Logger
  • 应用 Event Loop Group

3.6.1. 设置 Channel Options

默认状况下,TCP服务器配置了以下 options:

./../../reactor-netty-core/src/main/java/reactor/netty/tcp/TcpServerBind.java

TcpServerBind() {Map<ChannelOption<?>, Boolean> childOptions = new HashMap<>(2);
    childOptions.put(ChannelOption.AUTO_READ, false);
    childOptions.put(ChannelOption.TCP_NODELAY, true);
    this.config = new TcpServerConfig(Collections.singletonMap(ChannelOption.SO_REUSEADDR, true),
            childOptions,
            () -> new InetSocketAddress(DEFAULT_PORT));
}

如果须要增加新的 option 或者批改已有的 option,您能够应用如下的形式:

https://github.com/reactor/re…

import io.netty.channel.ChannelOption;
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;

public class Application {public static void main(String[] args) {
        DisposableServer server =
                TcpServer.create()
                         .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
                         .bindNow();

        server.onDispose()
              .block();}
}

您能够通过以下的链接找到更多对于Nettychannel options 的信息:

  • ChannelOption
  • Socket Options

3.6.2. 应用 Wire Logger

Reactor Netty 提供了线路记录(wire logging)用来查看点对点的流量。默认状况下,线路记录是敞开的。如果想要开启它,您必须将日志 reactor.netty.tcp.TcpServer 的设置为 DEBUG 等级并且按如下形式进行配置:

https://github.com/reactor/re…

import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;

public class Application {public static void main(String[] args) {
        DisposableServer server =
                TcpServer.create()
                         .wiretap(true) //<1>
                         .bindNow();

        server.onDispose()
              .block();}
}

<1> 开启线路记录

默认状况下,线路记录在输入内容的时候会应用 AdvancedByteBufFormat#HEX_DUMP。您也能够通过配置 TcpServer 改为 AdvancedByteBufFormat#SIMPLE 或者 AdvancedByteBufFormat#TEXTUAL:

https://github.com/reactor/re…

import io.netty.handler.logging.LogLevel;
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;
import reactor.netty.transport.logging.AdvancedByteBufFormat;

public class Application {public static void main(String[] args) {
        DisposableServer server =
                TcpServer.create()
                         .wiretap("logger-name", LogLevel.DEBUG, AdvancedByteBufFormat.TEXTUAL) //<1>
                         .bindNow();

        server.onDispose()
              .block();}
}

<1> 开启线路记录并应用 AdvancedByteBufFormat#TEXTUAL 来输入内容。

3.6.3. 应用 Event Loop Group

默认状况下,TCP服务器应用一个 ”Event Loop Group”,工作线程数等于初始化的时候能够用的处理器数量(但最小是 4)。您也能够应用 LoopResource#create其中的一个办法来批改配置。

默认的 Event Loop Group 配置如下:

./../../reactor-netty-core/src/main/java/reactor/netty/ReactorNetty.java

/**
 * Default worker thread count, fallback to available processor
 * (but with a minimum value of 4)
 */
public static final String IO_WORKER_COUNT = "reactor.netty.ioWorkerCount";
/**
 * Default selector thread count, fallback to -1 (no selector thread)
 */
public static final String IO_SELECT_COUNT = "reactor.netty.ioSelectCount";
/**
 * Default worker thread count for UDP, fallback to available processor
 * (but with a minimum value of 4)
 */
public static final String UDP_IO_THREAD_COUNT = "reactor.netty.udp.ioThreadCount";
/**
 * Default quiet period that guarantees that the disposal of the underlying LoopResources
 * will not happen, fallback to 2 seconds.
 */
public static final String SHUTDOWN_QUIET_PERIOD = "reactor.netty.ioShutdownQuietPeriod";
/**
 * Default maximum amount of time to wait until the disposal of the underlying LoopResources
 * regardless if a task was submitted during the quiet period, fallback to 15 seconds.
 */
public static final String SHUTDOWN_TIMEOUT = "reactor.netty.ioShutdownTimeout";

/**
 * Default value whether the native transport (epoll, kqueue) will be preferred,
 * fallback it will be preferred when available
 */
public static final String NATIVE = "reactor.netty.native";

如果须要批改这些设置,您也能够通过如下形式进行配置:

https://github.com/reactor/re…

import reactor.netty.DisposableServer;
import reactor.netty.resources.LoopResources;
import reactor.netty.tcp.TcpServer;

public class Application {public static void main(String[] args) {LoopResources loop = LoopResources.create("event-loop", 1, 4, true);

        DisposableServer server =
                TcpServer.create()
                         .runOn(loop)
                         .bindNow();

        server.onDispose()
              .block();}
}

3.7.SSL 和 TLS

当您须要应用 SSL 或者 TLS 的时候,能够应用上面列出来形式进行配置。默认状况,如果 OpenSSL 可用的话,则应用 SslProvider.OPENSSL。否则应用SslProvider.JDK。能够通过SslContextBuilder 或者设置 -Dio.netty.handler.ssl.noOpenSsl=true 来进行切换。

上面的是应用 SslContextBuilder 的例子:

https://github.com/reactor/re…

import io.netty.handler.ssl.SslContextBuilder;
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;
import java.io.File;

public class Application {public static void main(String[] args) {File cert = new File("certificate.crt");
        File key = new File("private.key");

        SslContextBuilder sslContextBuilder = SslContextBuilder.forServer(cert, key);

        DisposableServer server =
                TcpServer.create()
                         .secure(spec -> spec.sslContext(sslContextBuilder))
                         .bindNow();

        server.onDispose()
              .block();}
}

3.7.1. 服务器名称标识

您能够配置 TCP 服务器的多个 SslContext 映射到一个特定的域。配置 SNI 映射时,能够应用确切的域名或蕴含通配符的域名。

上面是应用蕴含通配符的域名的例子:

https://github.com/reactor/re…

import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;

import java.io.File;

public class Application {public static void main(String[] args) throws Exception {File defaultCert = new File("default_certificate.crt");
        File defaultKey = new File("default_private.key");

        File testDomainCert = new File("default_certificate.crt");
        File testDomainKey = new File("default_private.key");

        SslContext defaultSslContext = SslContextBuilder.forServer(defaultCert, defaultKey).build();
        SslContext testDomainSslContext = SslContextBuilder.forServer(testDomainCert, testDomainKey).build();

        DisposableServer server =
                TcpServer.create()
                         .secure(spec -> spec.sslContext(defaultSslContext)
                                             .addSniMapping("*.test.com",
                                                     testDomainSpec -> testDomainSpec.sslContext(testDomainSslContext)))
                         .bindNow();

        server.onDispose()
              .block();}
}

3.8. 度量

TCP 服务器反对与 Micrometer 的内置集成。它裸露了所有前缀为 reactor.netty.tcp.server 的度量。

上面的表格提供了 TCP 服务器度量的相干信息:

度量名称 类型 形容
reactor.netty.tcp.server.data.received DistributionSummary 收到的数据量,以字节为单位
reactor.netty.tcp.server.data.sent DistributionSummary 发送的数据量,以字节为单位
reactor.netty.tcp.server.errors Counter 产生的谬误数量
reactor.netty.tcp.server.tls.handshake.time Timer TLS 握手所破费的工夫

上面额定的度量也是可用的:

ByteBufAllocator度量

度量名称 类型 形容
reactor.netty.bytebuf.allocator.used.heap.memory Gauge 堆内存的字节数
reactor.netty.bytebuf.allocator.used.direct.memory Gauge 堆外内存的字节数
reactor.netty.bytebuf.allocator.used.heap.arenas Gauge 堆内存的个数(当应用 PooledByteBufAllocator 的时候)
reactor.netty.bytebuf.allocator.used.direct.arenas Gauge 堆外内存的个数(当应用 PooledByteBufAllocator 的时候)
reactor.netty.bytebuf.allocator.used.threadlocal.caches Gauge threadlocal 的缓存数量(当应用 PooledByteBufAllocator 的时候)
reactor.netty.bytebuf.allocator.used.tiny.cache.size Gauge 渺小缓存的大小(当应用 PooledByteBufAllocator 的时候)
reactor.netty.bytebuf.allocator.used.small.cache.size Gauge 小缓存的大小(当应用 PooledByteBufAllocator 的时候)
reactor.netty.bytebuf.allocator.used.normal.cache.size Gauge 个别缓存的大小(当应用 PooledByteBufAllocator 的时候)
reactor.netty.bytebuf.allocator.used.chunk.size Gauge 一个区域的块大小(当应用 PooledByteBufAllocator 的时候)

上面是开启集成的度量的例子:

https://github.com/reactor/re…

import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;

public class Application {public static void main(String[] args) {
        DisposableServer server =
                TcpServer.create()
                         .metrics(true) //<1>
                         .bindNow();

        server.onDispose()
              .block();}
}

<1> 开启内建集成的 Micrometer

如果您想让 TCP 服务端度量与除了 Micrometer 之外的系统集成或者想提供本人与 Micrometer 的集成来增加本人的度量记录器,您能够按如下形式实现:

https://github.com/reactor/re…

import reactor.netty.DisposableServer;
import reactor.netty.channel.ChannelMetricsRecorder;
import reactor.netty.tcp.TcpServer;

import java.net.SocketAddress;
import java.time.Duration;

public class Application {public static void main(String[] args) {
        DisposableServer server =
                TcpServer.create()
                         .metrics(true, CustomChannelMetricsRecorder::new) //<1>
                         .bindNow();

        server.onDispose()
              .block();}
}

<1> 开启 TCP 服务端度量并且提供 ChannelMetricsRecorder 的实现。

3.9.Unix 域套接字

当应用本地传输时,TCP服务器反对 Unix 域套接字(UDS)。

上面是应用 UDS 的例子:

https://github.com/reactor/re…

import io.netty.channel.unix.DomainSocketAddress;
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;

public class Application {public static void main(String[] args) {
        DisposableServer server =
                TcpServer.create()
                         .bindAddress(() -> new DomainSocketAddress("/tmp/test.sock")) //<1>
                         .bindNow();

        server.onDispose()
              .block();}
}

<1> 指定将应用的DomainSocketAddress

Suggest Edit to “TCP Server”


Reactor Netty 参考指南目录


版权申明:如需转载,请带上本文链接、注明起源和本申明。否则将查究法律责任。https://www.immuthex.com/posts/reactor-netty-reference-guide/tcp-server

正文完
 0