关于java:翻译Reactor-Netty参考指南-3TCP服务端

Reactor Netty参考指南目录


原文地址

Reactor Netty提供了易于应用和配置的TcpServer 。它暗藏了创立TCP服务器所需的大部分Netty的性能,并减少了Reactive Streams背压。

3.1.启动和进行

如果要启动一个TCP服务器,您必须创立并且配置一个TcpServer实例对象。默认状况下,host是配置为任何本地地址,当执行bind操作的时候零碎会抉择一个长期端口。上面是创立并且配置一个TcpServer实例的例子:

https://github.com/reactor/re…

import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;

public class Application {

    public static void main(String[] args) {
        DisposableServer server =
                TcpServer.create()   //<1>
                         .bindNow(); //<2>

        server.onDispose()
              .block();
    }
}

<1> 创立一个TcpServer实例用来做配置操作。

<2> 应用阻塞期待的形式启动服务器,直到初始化实现。

返回的DisposableServer提供了简略的服务器API,包含disposeNow(),这个办法能够以阻塞期待的形式来敞开服务器。

3.1.1.Host和Port

想要设置特定hostport,您能够用上面的形式来配置TCP服务器:

https://github.com/reactor/re…

import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;

public class Application {

    public static void main(String[] args) {
        DisposableServer server =
                TcpServer.create()
                         .host("localhost") //<1>
                         .port(8080)        //<2>
                         .bindNow();

        server.onDispose()
              .block();
    }
}

<1> 配置TCP服务器的host

<2> 配置TCP服务器的port

3.2.事后初始化

默认状况下,TcpServer初始化资源的操作在须要应用的时候才进行。这意味着初始化加载的时候bind operation会占用额定的工夫:

  • 事件循环组
  • native传输库(当应用了native传输的时候)
  • 用于安全性的native库(应用了OpenSsl的时候)

当您须要预加载这些资源的时候,您能够依照以下形式来配置TcpServer

https://github.com/reactor/re…

import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;

public class Application {

    public static void main(String[] args) {
        TcpServer tcpServer =
                TcpServer.create()
                         .handle((inbound, outbound) -> inbound.receive().then());

        tcpServer.warmup() //<1>
                 .block();

        DisposableServer server = tcpServer.bindNow();

        server.onDispose()
              .block();
    }
}

<1> 初始化和加载事件循环组,native传输库和用于安全性的native库

3.3.写出数据

如果要发送数据到一个已连贯的客户端,您必须增加一个I/O处理器。这个I/O处理器能够通过NettyOutbound来写出数据。上面是增加一个I/O处理器的例子:

https://github.com/reactor/re…

import reactor.core.publisher.Mono;
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;

public class Application {

    public static void main(String[] args) {
        DisposableServer server =
                TcpServer.create()
                         .handle((inbound, outbound) -> outbound.sendString(Mono.just("hello"))) //<1>
                         .bindNow();

        server.onDispose()
              .block();
    }
}

<1> 给连贯的客户端发送hello字符串

3.4.生产数据

如果要接管从连贯的客户端发过来的数据,您必须增加一个I/O处理器。这个I/O处理器能够通过NettyInbound来读取数据。示例如下:

https://github.com/reactor/re…

import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;

public class Application {

    public static void main(String[] args) {
        DisposableServer server =
                TcpServer.create()
                         .handle((inbound, outbound) -> inbound.receive().then()) //<1>
                         .bindNow();

        server.onDispose()
              .block();
    }
}

<1> 接管从已连贯的客户端发过来的数据

3.5.生命周期回调

上面的生命周期回调用参数是提供给您用来扩大TcpServer的:

Callback Description
doOnBind 当服务器channel行将被绑定的时候调用。
doOnBound 当服务器channel曾经被绑定的时候调用。
doOnChannelInit 当channel初始化的时候被调用。
doOnConnection 当一个近程客户端连贯上的时候被调用。
doOnUnbound 当服务器channel解绑的时候被调用。

上面是应用doOnConnectiondoOnChannelInit回调的例子:

https://github.com/reactor/re…

import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;
import java.util.concurrent.TimeUnit;

public class Application {

    public static void main(String[] args) {
        DisposableServer server =
                TcpServer.create()
                         .doOnConnection(conn ->
                             conn.addHandler(new ReadTimeoutHandler(10, TimeUnit.SECONDS))) //<1>
                         .doOnChannelInit((observer, channel, remoteAddress) ->
                             channel.pipeline()
                                    .addFirst(new LoggingHandler("reactor.netty.examples")))//<2>
                         .bindNow();

        server.onDispose()
              .block();
    }
}

<1> 当一个近程客户端连贯的时候增加了一个ReadTimeoutHandlerNetty pipeline。

<2> 当初始化channel的时候增加了一个LoggingHandlerNetty pipeline。

3.6.TCP层的配置

这一章节形容了三种TCP层的配置形式:

  • 设置Channel Options
  • 应用Wire Logger
  • 应用Event Loop Group

3.6.1.设置Channel Options

默认状况下,TCP服务器配置了以下options:

./../../reactor-netty-core/src/main/java/reactor/netty/tcp/TcpServerBind.java

TcpServerBind() {
    Map<ChannelOption<?>, Boolean> childOptions = new HashMap<>(2);
    childOptions.put(ChannelOption.AUTO_READ, false);
    childOptions.put(ChannelOption.TCP_NODELAY, true);
    this.config = new TcpServerConfig(
            Collections.singletonMap(ChannelOption.SO_REUSEADDR, true),
            childOptions,
            () -> new InetSocketAddress(DEFAULT_PORT));
}

如果须要增加新的option或者批改已有的option,您能够应用如下的形式:

https://github.com/reactor/re…

import io.netty.channel.ChannelOption;
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;

public class Application {

    public static void main(String[] args) {
        DisposableServer server =
                TcpServer.create()
                         .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
                         .bindNow();

        server.onDispose()
              .block();
    }
}

您能够通过以下的链接找到更多对于Nettychannel options的信息:

  • ChannelOption
  • Socket Options

3.6.2.应用Wire Logger

Reactor Netty提供了线路记录(wire logging)用来查看点对点的流量。默认状况下,线路记录是敞开的。如果想要开启它,您必须将日志reactor.netty.tcp.TcpServer的设置为DEBUG等级并且按如下形式进行配置:

https://github.com/reactor/re…

import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;

public class Application {

    public static void main(String[] args) {
        DisposableServer server =
                TcpServer.create()
                         .wiretap(true) //<1>
                         .bindNow();

        server.onDispose()
              .block();
    }
}

<1> 开启线路记录

默认状况下,线路记录在输入内容的时候会应用AdvancedByteBufFormat#HEX_DUMP。您也能够通过配置TcpServer改为AdvancedByteBufFormat#SIMPLE或者AdvancedByteBufFormat#TEXTUAL:

https://github.com/reactor/re…

import io.netty.handler.logging.LogLevel;
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;
import reactor.netty.transport.logging.AdvancedByteBufFormat;

public class Application {

    public static void main(String[] args) {
        DisposableServer server =
                TcpServer.create()
                         .wiretap("logger-name", LogLevel.DEBUG, AdvancedByteBufFormat.TEXTUAL) //<1>
                         .bindNow();

        server.onDispose()
              .block();
    }
}

<1> 开启线路记录并应用AdvancedByteBufFormat#TEXTUAL来输入内容。

3.6.3.应用Event Loop Group

默认状况下,TCP服务器应用一个”Event Loop Group”,工作线程数等于初始化的时候能够用的处理器数量(但最小是4)。您也能够应用LoopResource#create其中的一个办法来批改配置。

默认的Event Loop Group配置如下:

./../../reactor-netty-core/src/main/java/reactor/netty/ReactorNetty.java

/**
 * Default worker thread count, fallback to available processor
 * (but with a minimum value of 4)
 */
public static final String IO_WORKER_COUNT = "reactor.netty.ioWorkerCount";
/**
 * Default selector thread count, fallback to -1 (no selector thread)
 */
public static final String IO_SELECT_COUNT = "reactor.netty.ioSelectCount";
/**
 * Default worker thread count for UDP, fallback to available processor
 * (but with a minimum value of 4)
 */
public static final String UDP_IO_THREAD_COUNT = "reactor.netty.udp.ioThreadCount";
/**
 * Default quiet period that guarantees that the disposal of the underlying LoopResources
 * will not happen, fallback to 2 seconds.
 */
public static final String SHUTDOWN_QUIET_PERIOD = "reactor.netty.ioShutdownQuietPeriod";
/**
 * Default maximum amount of time to wait until the disposal of the underlying LoopResources
 * regardless if a task was submitted during the quiet period, fallback to 15 seconds.
 */
public static final String SHUTDOWN_TIMEOUT = "reactor.netty.ioShutdownTimeout";

/**
 * Default value whether the native transport (epoll, kqueue) will be preferred,
 * fallback it will be preferred when available
 */
public static final String NATIVE = "reactor.netty.native";

如果须要批改这些设置,您也能够通过如下形式进行配置:

https://github.com/reactor/re…

import reactor.netty.DisposableServer;
import reactor.netty.resources.LoopResources;
import reactor.netty.tcp.TcpServer;

public class Application {

    public static void main(String[] args) {
        LoopResources loop = LoopResources.create("event-loop", 1, 4, true);

        DisposableServer server =
                TcpServer.create()
                         .runOn(loop)
                         .bindNow();

        server.onDispose()
              .block();
    }
}

3.7.SSL和TLS

当您须要应用SSL或者TLS的时候,能够应用上面列出来形式进行配置。默认状况,如果OpenSSL可用的话,则应用SslProvider.OPENSSL。否则应用SslProvider.JDK。能够通过SslContextBuilder或者设置-Dio.netty.handler.ssl.noOpenSsl=true来进行切换。

上面的是应用SslContextBuilder的例子:

https://github.com/reactor/re…

import io.netty.handler.ssl.SslContextBuilder;
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;
import java.io.File;

public class Application {

    public static void main(String[] args) {
        File cert = new File("certificate.crt");
        File key = new File("private.key");

        SslContextBuilder sslContextBuilder = SslContextBuilder.forServer(cert, key);

        DisposableServer server =
                TcpServer.create()
                         .secure(spec -> spec.sslContext(sslContextBuilder))
                         .bindNow();

        server.onDispose()
              .block();
    }
}

3.7.1.服务器名称标识

您能够配置TCP服务器的多个SslContext映射到一个特定的域。配置SNI映射时,能够应用确切的域名或蕴含通配符的域名。

上面是应用蕴含通配符的域名的例子:

https://github.com/reactor/re…

import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;

import java.io.File;

public class Application {

    public static void main(String[] args) throws Exception {
        File defaultCert = new File("default_certificate.crt");
        File defaultKey = new File("default_private.key");

        File testDomainCert = new File("default_certificate.crt");
        File testDomainKey = new File("default_private.key");

        SslContext defaultSslContext = SslContextBuilder.forServer(defaultCert, defaultKey).build();
        SslContext testDomainSslContext = SslContextBuilder.forServer(testDomainCert, testDomainKey).build();

        DisposableServer server =
                TcpServer.create()
                         .secure(spec -> spec.sslContext(defaultSslContext)
                                             .addSniMapping("*.test.com",
                                                     testDomainSpec -> testDomainSpec.sslContext(testDomainSslContext)))
                         .bindNow();

        server.onDispose()
              .block();
    }
}

3.8.度量

TCP服务器反对与Micrometer的内置集成。它裸露了所有前缀为reactor.netty.tcp.server的度量。

上面的表格提供了TCP服务器度量的相干信息:

度量名称 类型 形容
reactor.netty.tcp.server.data.received DistributionSummary 收到的数据量,以字节为单位
reactor.netty.tcp.server.data.sent DistributionSummary 发送的数据量,以字节为单位
reactor.netty.tcp.server.errors Counter 产生的谬误数量
reactor.netty.tcp.server.tls.handshake.time Timer TLS握手所破费的工夫

上面额定的度量也是可用的:

ByteBufAllocator度量

度量名称 类型 形容
reactor.netty.bytebuf.allocator.used.heap.memory Gauge 堆内存的字节数
reactor.netty.bytebuf.allocator.used.direct.memory Gauge 堆外内存的字节数
reactor.netty.bytebuf.allocator.used.heap.arenas Gauge 堆内存的个数(当应用PooledByteBufAllocator的时候)
reactor.netty.bytebuf.allocator.used.direct.arenas Gauge 堆外内存的个数(当应用PooledByteBufAllocator的时候)
reactor.netty.bytebuf.allocator.used.threadlocal.caches Gauge threadlocal的缓存数量(当应用PooledByteBufAllocator的时候)
reactor.netty.bytebuf.allocator.used.tiny.cache.size Gauge 渺小缓存的大小(当应用PooledByteBufAllocator的时候)
reactor.netty.bytebuf.allocator.used.small.cache.size Gauge 小缓存的大小(当应用PooledByteBufAllocator的时候)
reactor.netty.bytebuf.allocator.used.normal.cache.size Gauge 个别缓存的大小(当应用PooledByteBufAllocator的时候)
reactor.netty.bytebuf.allocator.used.chunk.size Gauge 一个区域的块大小(当应用PooledByteBufAllocator的时候)

上面是开启集成的度量的例子:

https://github.com/reactor/re…

import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;

public class Application {

    public static void main(String[] args) {
        DisposableServer server =
                TcpServer.create()
                         .metrics(true) //<1>
                         .bindNow();

        server.onDispose()
              .block();
    }
}

<1> 开启内建集成的Micrometer

如果您想让TCP服务端度量与除了Micrometer之外的系统集成或者想提供本人与Micrometer的集成来增加本人的度量记录器,您能够按如下形式实现:

https://github.com/reactor/re…

import reactor.netty.DisposableServer;
import reactor.netty.channel.ChannelMetricsRecorder;
import reactor.netty.tcp.TcpServer;

import java.net.SocketAddress;
import java.time.Duration;

public class Application {

    public static void main(String[] args) {
        DisposableServer server =
                TcpServer.create()
                         .metrics(true, CustomChannelMetricsRecorder::new) //<1>
                         .bindNow();

        server.onDispose()
              .block();
    }
}

<1> 开启TCP服务端度量并且提供ChannelMetricsRecorder的实现。

3.9.Unix域套接字

当应用本地传输时,TCP服务器反对Unix域套接字(UDS)。

上面是应用UDS的例子:

https://github.com/reactor/re…

import io.netty.channel.unix.DomainSocketAddress;
import reactor.netty.DisposableServer;
import reactor.netty.tcp.TcpServer;

public class Application {

    public static void main(String[] args) {
        DisposableServer server =
                TcpServer.create()
                         .bindAddress(() -> new DomainSocketAddress("/tmp/test.sock")) //<1>
                         .bindNow();

        server.onDispose()
              .block();
    }
}

<1> 指定将应用的DomainSocketAddress

Suggest Edit to “TCP Server”


Reactor Netty参考指南目录


版权申明:如需转载,请带上本文链接、注明起源和本申明。否则将查究法律责任。https://www.immuthex.com/posts/reactor-netty-reference-guide/tcp-server

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理