关于react.js:超详细Netty入门看这篇就够了

10次阅读

共计 26926 个字符,预计需要花费 68 分钟才能阅读完成。

思维导图

前言

本文次要讲述 Netty 框架的一些个性以及重要组件,心愿看完之后能对 Netty 框架有一个比拟直观的感触,心愿能帮忙读者疾速入门 Netty,缩小一些弯路。

一、Netty 概述

官网的介绍:

Netty is an asynchronous event-driven network application framework
for rapid development of maintainable high performance protocol servers & clients.

Netty是 一个 异步事件驱动 的网络应用程序框架,用于 疾速开发可保护的高性能协定服务器和客户端

二、为什么应用 Netty

从官网上介绍,Netty 是一个网络应用程序框架,开发服务器和客户端。也就是用于网络编程的一个框架。既然是网络编程,Socket 就不谈了,为什么不必 NIO 呢?

2.1 NIO 的毛病

对于这个问题,之前我写了一篇文章《NIO 入门》对 NIO 有比拟具体的介绍,NIO 的次要问题是:

  • NIO 的类库和 API 繁冗,学习老本高,你须要熟练掌握 Selector、ServerSocketChannel、SocketChannel、ByteBuffer 等。
  • 须要相熟 Java 多线程编程。这是因为 NIO 编程波及到 Reactor 模式,你必须对多线程和网络编程十分相熟,能力写出高质量的 NIO 程序。
  • 臭名远扬的 epoll bug。它会导致 Selector 空轮询,最终导致 CPU 100%。直到 JDK1.7 版本仍然没失去根本性的解决。

2.2 Netty 的长处

绝对地,Netty 的长处有很多:

  • API 应用简略,学习成本低。
  • 功能强大,内置了多种解码编码器,反对多种协定。
  • 性能高,比照其余支流的 NIO 框架,Netty 的性能最优。
  • 社区沉闷,发现 BUG 会及时修复,迭代版本周期短,一直退出新的性能。
  • Dubbo、Elasticsearch 都采纳了 Netty,品质失去验证。

三、架构图

下面这张图就是在官网首页的架构图,咱们从上到下剖析一下。

绿色的局部 Core 外围模块,包含零拷贝、API 库、可扩大的事件模型。

橙色局部 Protocol Support 协定反对,包含 Http 协定、webSocket、SSL(安全套接字协定)、谷歌 Protobuf 协定、zlib/gzip 压缩与解压缩、Large File Transfer 大文件传输等等。

红色的局部 Transport Services 传输服务,包含 Socket、Datagram、Http Tunnel 等等。

以上可看出 Netty 的性能、协定、传输方式都比拟全,比拟弱小。

四、永远的 Hello Word

首先搭建一个 HelloWord 工程,先相熟一下 API,还有为前面的学习做铺垫。以上面这张图为根据:

4.1 引入 Maven 依赖

应用的版本是 4.1.20,绝对比较稳定的一个版本。

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.20.Final</version>
</dependency>

4.2 创立服务端启动类

public class MyServer {public static void main(String[] args) throws Exception {
        // 创立两个线程组 boosGroup、workerGroup
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // 创立服务端的启动对象,设置参数
            ServerBootstrap bootstrap = new ServerBootstrap();
            // 设置两个线程组 boosGroup 和 workerGroup
            bootstrap.group(bossGroup, workerGroup)
                // 设置服务端通道实现类型    
                .channel(NioServerSocketChannel.class)
                // 设置线程队列失去连贯个数    
                .option(ChannelOption.SO_BACKLOG, 128)
                // 设置放弃流动连贯状态    
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                // 应用匿名外部类的模式初始化通道对象    
                .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            // 给 pipeline 管道设置处理器
                            socketChannel.pipeline().addLast(new MyServerHandler());
                        }
                    });// 给 workerGroup 的 EventLoop 对应的管道设置处理器
            System.out.println("java 技术爱好者的服务端曾经准备就绪...");
            // 绑定端口号,启动服务端
            ChannelFuture channelFuture = bootstrap.bind(6666).sync();
            // 对敞开通道进行监听
            channelFuture.channel().closeFuture().sync();} finally {bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();}
    }
}

4.3 创立服务端处理器

/**
 * 自定义的 Handler 须要继承 Netty 规定好的 HandlerAdapter
 * 能力被 Netty 框架所关联,有点相似 SpringMVC 的适配器模式
 **/
public class MyServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 获取客户端发送过去的音讯
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("收到客户端" + ctx.channel().remoteAddress() + "发送的音讯:" + byteBuf.toString(CharsetUtil.UTF_8));
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        // 发送音讯给客户端
        ctx.writeAndFlush(Unpooled.copiedBuffer("服务端已收到音讯,并给你发送一个问号?", CharsetUtil.UTF_8));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // 产生异样,敞开通道
        ctx.close();}
}

4.4 创立客户端启动类

public class MyClient {public static void main(String[] args) throws Exception {NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
        try {
            // 创立 bootstrap 对象,配置参数
            Bootstrap bootstrap = new Bootstrap();
            // 设置线程组
            bootstrap.group(eventExecutors)
                // 设置客户端的通道实现类型    
                .channel(NioSocketChannel.class)
                // 应用匿名外部类初始化通道
                .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            // 增加客户端通道的处理器
                            ch.pipeline().addLast(new MyClientHandler());
                        }
                    });
            System.out.println("客户端准备就绪,随时能够腾飞~");
            // 连贯服务端
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666).sync();
            // 对通道敞开进行监听
            channelFuture.channel().closeFuture().sync();} finally {
            // 敞开线程组
            eventExecutors.shutdownGracefully();}
    }
}

4.5 创立客户端处理器

public class MyClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 发送音讯到服务端
        ctx.writeAndFlush(Unpooled.copiedBuffer("歪比巴卜~ 茉莉~Are you good~ 马来西亚~", CharsetUtil.UTF_8));
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 接管服务端发送过去的音讯
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("收到服务端" + ctx.channel().remoteAddress() + "的音讯:" + byteBuf.toString(CharsetUtil.UTF_8));
    }
}

4.6 测试

先启动服务端,再启动客户端,就能够看到后果:

MyServer 打印后果:

MyClient 打印后果:

五、Netty 的个性与重要组件

5.1 taskQueue 工作队列

如果 Handler 处理器有一些长时间的业务解决,能够交给taskQueue 异步解决。怎么用呢,请看代码演示:

public class MyServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 获取到线程池 eventLoop,增加线程,执行
        ctx.channel().eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                try {
                    // 长时间操作,不至于长时间的业务操作导致 Handler 阻塞
                    Thread.sleep(1000);
                    System.out.println("长时间的业务解决");
                } catch (Exception e) {e.printStackTrace();
                }
            }
        });
    }
}

咱们打一个 debug 调试,是能够看到增加进去的 taskQueue 有一个工作。

5.2 scheduleTaskQueue 延时工作队列

延时工作队列和下面介绍的工作队列十分类似,只是多了一个可提早肯定工夫再执行的设置,请看代码演示:

ctx.channel().eventLoop().schedule(new Runnable() {
    @Override
    public void run() {
        try {
            // 长时间操作,不至于长时间的业务操作导致 Handler 阻塞
            Thread.sleep(1000);
            System.out.println("长时间的业务解决");
        } catch (Exception e) {e.printStackTrace();
        }
    }
},5, TimeUnit.SECONDS);// 5 秒后执行

仍然关上 debug 进行调试查看,咱们能够有一个 scheduleTaskQueue 工作待执行中

5.3 Future 异步机制

在搭建 HelloWord 工程的时候,咱们看到有一行这样的代码:

ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666);

很多操作都返回这个 ChannelFuture 对象,到底这个 ChannelFuture 对象是用来做什么的呢?

ChannelFuture 提供操作实现时一种异步告诉的形式。个别在 Socket 编程中,期待响应后果都是同步阻塞的,而 Netty 则不会造成阻塞,因为 ChannelFuture 是采取相似观察者模式的模式进行获取后果。请看一段代码演示:

// 增加监听器
channelFuture.addListener(new ChannelFutureListener() {
    // 应用匿名外部类,ChannelFutureListener 接口
    // 重写 operationComplete 办法
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
        // 判断是否操作胜利    
        if (future.isSuccess()) {System.out.println("连贯胜利");
        } else {System.out.println("连贯失败");
        }
    }
});

5.4 Bootstrap 与 ServerBootStrap

Bootstrap 和 ServerBootStrap 是 Netty 提供的一个创立客户端和服务端启动器的工厂类,应用这个工厂类十分便当地创立启动类,依据下面的一些例子,其实也看得出来能大大地缩小了开发的难度。首先看一个类图:

能够看出都是继承于 AbstractBootStrap 抽象类,所以大抵上的配置办法都雷同。

一般来说,应用 Bootstrap 创立启动器的步骤可分为以下几步:

5.4.1 group()

在上一篇文章《Reactor 模式》中,咱们就讲过服务端要应用两个线程组:

  • bossGroup 用于监听客户端连贯,专门负责与客户端创立连贯,并把连贯注册到 workerGroup 的 Selector 中。
  • workerGroup 用于解决每一个连贯产生的读写事件。

个别创立线程组间接应用以下 new 就完事了:

EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();

有点好奇的是,既然是线程组,那线程数默认是多少呢?深刻源码:

    // 应用一个常量保留
    private static final int DEFAULT_EVENT_LOOP_THREADS;

    static {//NettyRuntime.availableProcessors() * 2,cpu 核数的两倍赋值给常量
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

        if (logger.isDebugEnabled()) {logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
        }
    }
    
    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        // 如果不传入,则应用常量的值,也就是 cpu 核数的两倍
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }

通过源码能够看到,默认的线程数是 cpu 核数的两倍。假如想自定义线程数,能够应用有参结构器:

// 设置 bossGroup 线程数为 1
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// 设置 workerGroup 线程数为 16
EventLoopGroup workerGroup = new NioEventLoopGroup(16);

5.4.2 channel()

这个办法用于设置通道类型,当建设连贯后,会依据这个设置创立对应的 Channel 实例。

应用 debug 模式能够看到

通道类型有以下:

NioSocketChannel:异步非阻塞的客户端 TCP Socket 连贯。

NioServerSocketChannel:异步非阻塞的服务器端 TCP Socket 连贯。

罕用的就是这两个通道类型,因为是异步非阻塞的。所以是首选。

OioSocketChannel:同步阻塞的客户端 TCP Socket 连贯。

OioServerSocketChannel:同步阻塞的服务器端 TCP Socket 连贯。

略微在本地调试过,用起来和 Nio 有一些不同,是阻塞的,所以 API 调用也不一样。因为是阻塞的 IO,简直没什么人会抉择应用 Oio,所以也很难找到例子。我略微推敲了一下,通过几次报错之后,总算调通了。代码如下:

//server 端代码,跟下面简直一样,只需改三个中央
// 这个中央应用的是 OioEventLoopGroup
EventLoopGroup bossGroup = new OioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup)// 只须要设置一个线程组 boosGroup
        .channel(OioServerSocketChannel.class)// 设置服务端通道实现类型

//client 端代码,只需改两个中央
// 应用的是 OioEventLoopGroup
EventLoopGroup eventExecutors = new OioEventLoopGroup();
// 通道类型设置为 OioSocketChannel
bootstrap.group(eventExecutors)// 设置线程组
        .channel(OioSocketChannel.class)// 设置客户端的通道实现类型

NioSctpChannel:异步的客户端 Sctp(Stream Control Transmission Protocol,流控制传输协定)连贯。

NioSctpServerChannel:异步的 Sctp 服务器端连贯。

本地没启动胜利,网上看了一些网友的评论,说是只能在 linux 环境下才能够启动。从报错信息看:SCTP not supported on this platform,不反对这个平台。因为我电脑是 window 零碎,所以网友说的有点情理。

5.4.3 option()与 childOption()

首先说一下这两个的区别。

option()设置的是服务端用于接管进来的连贯,也就是 boosGroup 线程。

childOption()是提供给父管道接管到的连贯,也就是 workerGroup 线程。

搞清楚了之后,咱们看一下罕用的一些设置有哪些:

SocketChannel 参数,也就是 childOption()罕用的参数:

SO_RCVBUF Socket 参数,TCP 数据接收缓冲区大小。
TCP_NODELAY TCP 参数,立刻发送数据,默认值为 Ture。
SO_KEEPALIVE Socket 参数,连贯保活,默认值为 False。启用该性能时,TCP 会被动探测闲暇连贯的有效性。

ServerSocketChannel 参数,也就是 option()罕用参数:

SO_BACKLOG Socket 参数,服务端承受连贯的队列长度,如果队列已满,客户端连贯将被回绝。默认值,Windows 为 200,其余为 128。

因为篇幅限度,其余就不列举了,大家能够去网上找材料看看,理解一下。

5.4.4 设置流水线(重点)

ChannelPipeline 是 Netty 解决申请的责任链,ChannelHandler 则是具体解决申请的处理器。实际上每一个 channel 都有一个处理器的流水线。

在 Bootstrap 中 childHandler()办法须要初始化通道,实例化一个 ChannelInitializer,这时候须要重写 initChannel()初始化通道的办法,拆卸流水线就是在这个中央进行。代码演示如下:

// 应用匿名外部类的模式初始化通道对象
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        // 给 pipeline 管道设置自定义的处理器
        socketChannel.pipeline().addLast(new MyServerHandler());
    }
});

处理器 Handler 次要分为两种:

ChannelInboundHandlerAdapter(入站处理器)、ChannelOutboundHandler(出站处理器)

入站指的是数据从底层 java NIO Channel 到 Netty 的 Channel。

出站指的是通过 Netty 的 Channel 来操作底层的 java NIO Channel。

ChannelInboundHandlerAdapter 处理器罕用的事件有

  1. 注册事件 fireChannelRegistered。
  2. 连贯建设事件 fireChannelActive。
  3. 读事件和读实现事件 fireChannelRead、fireChannelReadComplete。
  4. 异样告诉事件 fireExceptionCaught。
  5. 用户自定义事件 fireUserEventTriggered。
  6. Channel 可写状态变动事件 fireChannelWritabilityChanged。
  7. 连贯敞开事件 fireChannelInactive。

ChannelOutboundHandler 处理器罕用的事件有

  1. 端口绑定 bind。
  2. 连贯服务端 connect。
  3. 写事件 write。
  4. 刷新工夫 flush。
  5. 读事件 read。
  6. 被动断开连接 disconnect。
  7. 敞开 channel 事件 close。

还有一个相似的 handler(),次要用于拆卸 parent 通道,也就是 bossGroup 线程。个别状况下,都用不上这个办法。

5.4.5 bind()

提供用于服务端或者客户端绑定服务器地址和端口号,默认是异步启动。如果加上 sync()办法则是同步。

有五个同名的重载办法,作用都是用于绑定地址端口号。不一一介绍了。

5.4.6 优雅地敞开 EventLoopGroup

// 开释掉所有的资源,包含创立的线程
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();

会敞开所有的 child Channel。敞开之后,开释掉底层的资源。

5.5 Channel

Channel 是什么?无妨看一下官网文档的阐明:

A nexus to a network socket or a component which is capable of I/O operations such as read, write, connect, and bind

翻译粗心:一种连贯到网络套接字或能进行读、写、连贯和绑定等 I / O 操作的组件。

如果下面这段阐明比拟形象,上面还有一段阐明:

A channel provides a user:

the current state of the channel (e.g. is it open? is it connected?),
the configuration parameters of the channel (e.g. receive buffer size),
the I/O operations that the channel supports (e.g. read, write, connect, and bind), and
the ChannelPipeline which handles all I/O events and requests associated with the channel.

翻译粗心:

channel 为用户提供:

  1. 通道以后的状态(例如它是关上?还是已连贯?)
  2. channel 的配置参数(例如接收缓冲区的大小)
  3. channel 反对的 IO 操作(例如读、写、连贯和绑定),以及解决与 channel 相关联的所有 IO 事件和申请的 ChannelPipeline。

5.5.1 获取 channel 的状态

boolean isOpen(); // 如果通道关上,则返回 true
boolean isRegistered();// 如果通道注册到 EventLoop,则返回 true
boolean isActive();// 如果通道处于活动状态并且已连贯,则返回 true
boolean isWritable();// 当且仅当 I / O 线程将立刻执行申请的写入操作时,返回 true。

以上就是获取 channel 的四种状态的办法。

5.5.2 获取 channel 的配置参数

获取单条配置信息,应用 getOption(),代码演示:

ChannelConfig config = channel.config();// 获取配置参数
// 获取 ChannelOption.SO_BACKLOG 参数,
Integer soBackLogConfig = config.getOption(ChannelOption.SO_BACKLOG);
// 因为我启动器配置的是 128,所以我这里获取的 soBackLogConfig=128

获取多条配置信息,应用 getOptions(),代码演示:

ChannelConfig config = channel.config();
Map<ChannelOption<?>, Object> options = config.getOptions();
for (Map.Entry<ChannelOption<?>, Object> entry : options.entrySet()) {System.out.println(entry.getKey() + ":" + entry.getValue());
}
/**
SO_REUSEADDR : false
WRITE_BUFFER_LOW_WATER_MARK : 32768
WRITE_BUFFER_WATER_MARK : WriteBufferWaterMark(low: 32768, high: 65536)
SO_BACKLOG : 128
以下省略...
*/

5.5.3 channel 反对的 IO 操作

写操作,这里演示从服务端写音讯发送到客户端:

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ctx.channel().writeAndFlush(Unpooled.copiedBuffer("这波啊,这波是肉蛋葱鸡~", CharsetUtil.UTF_8));
}

客户端控制台:

// 收到服务端 /127.0.0.1:6666 的音讯:这波啊,这波是肉蛋葱鸡~

连贯 操作,代码演示:

ChannelFuture connect = channelFuture.channel().connect(new InetSocketAddress("127.0.0.1", 6666));// 个别应用启动器,这种形式不罕用

通过 channel 获取 ChannelPipeline,并做相干的解决:

// 获取 ChannelPipeline 对象
ChannelPipeline pipeline = ctx.channel().pipeline();
// 往 pipeline 中增加 ChannelHandler 处理器,拆卸流水线
pipeline.addLast(new MyServerHandler());

5.6 Selector

在 NioEventLoop 中,有一个成员变量 selector,这是 nio 包的 Selector,在之前《NIO 入门》中,我曾经讲过 Selector 了。

Netty 中的 Selector 也和 NIO 的 Selector 是一样的,就是用于监听事件,治理注册到 Selector 中的 channel,实现多路复用器。

5.7 PiPeline 与 ChannelPipeline

在后面介绍 Channel 时,咱们晓得能够在 channel 中拆卸 ChannelHandler 流水线处理器,那一个 channel 不可能只有一个 channelHandler 处理器,必定是有很多的,既然是很多 channelHandler 在一个流水线工作,必定是有程序的。

于是 pipeline 就呈现了,pipeline 相当于处理器的容器。初始化 channel 时,把 channelHandler 按程序装在 pipeline 中,就能够实现按序执行 channelHandler 了。

在一个 Channel 中,只有一个 ChannelPipeline。该 pipeline 在 Channel 被创立的时候创立。ChannelPipeline 蕴含了一个 ChannelHander 造成的列表,且所有 ChannelHandler 都会注册到 ChannelPipeline 中。

5.8 ChannelHandlerContext

在 Netty 中,Handler 处理器是有咱们定义的,下面讲过通过集成入站处理器或者出站处理器实现。这时如果咱们想在 Handler 中获取 pipeline 对象,或者 channel 对象,怎么获取呢。

于是 Netty 设计了这个 ChannelHandlerContext 上下文对象,就能够拿到 channel、pipeline 等对象,就能够进行读写等操作。

通过类图,ChannelHandlerContext 是一个接口,上面有三个实现类。

实际上 ChannelHandlerContext 在 pipeline 中是一个链表的模式。看一段源码就明确了:

//ChannelPipeline 实现类 DefaultChannelPipeline 的结构器办法
protected DefaultChannelPipeline(Channel channel) {this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise =  new VoidChannelPromise(channel, true);
    // 设置头结点 head,尾结点 tail
    tail = new TailContext(this);
    head = new HeadContext(this);
    
    head.next = tail;
    tail.prev = head;
}

上面我用一张图来示意,会更加清晰一点:

5.9 EventLoopGroup

咱们先看一下 EventLoopGroup 的类图:

其中包含了罕用的实现类 NioEventLoopGroup。OioEventLoopGroup 在后面的例子中也有应用过。

从 Netty 的架构图中,能够晓得服务器是须要两个线程组进行配合工作的,而这个线程组的接口就是 EventLoopGroup。

每个 EventLoopGroup 里包含一个或多个 EventLoop,每个 EventLoop 中保护一个 Selector 实例。

5.9.1 轮询机制的实现原理

咱们无妨看一段 DefaultEventExecutorChooserFactory 的源码:

private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;

@Override
public EventExecutor next() {//idx.getAndIncrement()相当于 idx++,而后对工作长度取模
    return executors[idx.getAndIncrement() & executors.length - 1];
}

这段代码能够确定执行的形式是轮询机制,接下来 debug 调试一下:

它这里还有一个判断,如果线程数不是 2 的 N 次方,则采纳取模算法实现。

@Override
public EventExecutor next() {return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}

原文链接

本文为阿里云原创内容,未经容许不得转载。思维导图

前言

本文次要讲述 Netty 框架的一些个性以及重要组件,心愿看完之后能对 Netty 框架有一个比拟直观的感触,心愿能帮忙读者疾速入门 Netty,缩小一些弯路。

一、Netty 概述

官网的介绍:

Netty is an asynchronous event-driven network application framework
for rapid development of maintainable high performance protocol servers & clients.

Netty是 一个 异步事件驱动 的网络应用程序框架,用于 疾速开发可保护的高性能协定服务器和客户端

二、为什么应用 Netty

从官网上介绍,Netty 是一个网络应用程序框架,开发服务器和客户端。也就是用于网络编程的一个框架。既然是网络编程,Socket 就不谈了,为什么不必 NIO 呢?

2.1 NIO 的毛病

对于这个问题,之前我写了一篇文章《NIO 入门》对 NIO 有比拟具体的介绍,NIO 的次要问题是:

  • NIO 的类库和 API 繁冗,学习老本高,你须要熟练掌握 Selector、ServerSocketChannel、SocketChannel、ByteBuffer 等。
  • 须要相熟 Java 多线程编程。这是因为 NIO 编程波及到 Reactor 模式,你必须对多线程和网络编程十分相熟,能力写出高质量的 NIO 程序。
  • 臭名远扬的 epoll bug。它会导致 Selector 空轮询,最终导致 CPU 100%。直到 JDK1.7 版本仍然没失去根本性的解决。

2.2 Netty 的长处

绝对地,Netty 的长处有很多:

  • API 应用简略,学习成本低。
  • 功能强大,内置了多种解码编码器,反对多种协定。
  • 性能高,比照其余支流的 NIO 框架,Netty 的性能最优。
  • 社区沉闷,发现 BUG 会及时修复,迭代版本周期短,一直退出新的性能。
  • Dubbo、Elasticsearch 都采纳了 Netty,品质失去验证。

三、架构图

下面这张图就是在官网首页的架构图,咱们从上到下剖析一下。

绿色的局部 Core 外围模块,包含零拷贝、API 库、可扩大的事件模型。

橙色局部 Protocol Support 协定反对,包含 Http 协定、webSocket、SSL(安全套接字协定)、谷歌 Protobuf 协定、zlib/gzip 压缩与解压缩、Large File Transfer 大文件传输等等。

红色的局部 Transport Services 传输服务,包含 Socket、Datagram、Http Tunnel 等等。

以上可看出 Netty 的性能、协定、传输方式都比拟全,比拟弱小。

四、永远的 Hello Word

首先搭建一个 HelloWord 工程,先相熟一下 API,还有为前面的学习做铺垫。以上面这张图为根据:

4.1 引入 Maven 依赖

应用的版本是 4.1.20,绝对比较稳定的一个版本。

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.20.Final</version>
</dependency>

4.2 创立服务端启动类

public class MyServer {public static void main(String[] args) throws Exception {
        // 创立两个线程组 boosGroup、workerGroup
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // 创立服务端的启动对象,设置参数
            ServerBootstrap bootstrap = new ServerBootstrap();
            // 设置两个线程组 boosGroup 和 workerGroup
            bootstrap.group(bossGroup, workerGroup)
                // 设置服务端通道实现类型    
                .channel(NioServerSocketChannel.class)
                // 设置线程队列失去连贯个数    
                .option(ChannelOption.SO_BACKLOG, 128)
                // 设置放弃流动连贯状态    
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                // 应用匿名外部类的模式初始化通道对象    
                .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            // 给 pipeline 管道设置处理器
                            socketChannel.pipeline().addLast(new MyServerHandler());
                        }
                    });// 给 workerGroup 的 EventLoop 对应的管道设置处理器
            System.out.println("java 技术爱好者的服务端曾经准备就绪...");
            // 绑定端口号,启动服务端
            ChannelFuture channelFuture = bootstrap.bind(6666).sync();
            // 对敞开通道进行监听
            channelFuture.channel().closeFuture().sync();} finally {bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();}
    }
}

4.3 创立服务端处理器

/**
 * 自定义的 Handler 须要继承 Netty 规定好的 HandlerAdapter
 * 能力被 Netty 框架所关联,有点相似 SpringMVC 的适配器模式
 **/
public class MyServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 获取客户端发送过去的音讯
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("收到客户端" + ctx.channel().remoteAddress() + "发送的音讯:" + byteBuf.toString(CharsetUtil.UTF_8));
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        // 发送音讯给客户端
        ctx.writeAndFlush(Unpooled.copiedBuffer("服务端已收到音讯,并给你发送一个问号?", CharsetUtil.UTF_8));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // 产生异样,敞开通道
        ctx.close();}
}

4.4 创立客户端启动类

public class MyClient {public static void main(String[] args) throws Exception {NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
        try {
            // 创立 bootstrap 对象,配置参数
            Bootstrap bootstrap = new Bootstrap();
            // 设置线程组
            bootstrap.group(eventExecutors)
                // 设置客户端的通道实现类型    
                .channel(NioSocketChannel.class)
                // 应用匿名外部类初始化通道
                .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            // 增加客户端通道的处理器
                            ch.pipeline().addLast(new MyClientHandler());
                        }
                    });
            System.out.println("客户端准备就绪,随时能够腾飞~");
            // 连贯服务端
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666).sync();
            // 对通道敞开进行监听
            channelFuture.channel().closeFuture().sync();} finally {
            // 敞开线程组
            eventExecutors.shutdownGracefully();}
    }
}

4.5 创立客户端处理器

public class MyClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 发送音讯到服务端
        ctx.writeAndFlush(Unpooled.copiedBuffer("歪比巴卜~ 茉莉~Are you good~ 马来西亚~", CharsetUtil.UTF_8));
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 接管服务端发送过去的音讯
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("收到服务端" + ctx.channel().remoteAddress() + "的音讯:" + byteBuf.toString(CharsetUtil.UTF_8));
    }
}

4.6 测试

先启动服务端,再启动客户端,就能够看到后果:

MyServer 打印后果:

MyClient 打印后果:

五、Netty 的个性与重要组件

5.1 taskQueue 工作队列

如果 Handler 处理器有一些长时间的业务解决,能够交给taskQueue 异步解决。怎么用呢,请看代码演示:

public class MyServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 获取到线程池 eventLoop,增加线程,执行
        ctx.channel().eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                try {
                    // 长时间操作,不至于长时间的业务操作导致 Handler 阻塞
                    Thread.sleep(1000);
                    System.out.println("长时间的业务解决");
                } catch (Exception e) {e.printStackTrace();
                }
            }
        });
    }
}

咱们打一个 debug 调试,是能够看到增加进去的 taskQueue 有一个工作。

5.2 scheduleTaskQueue 延时工作队列

延时工作队列和下面介绍的工作队列十分类似,只是多了一个可提早肯定工夫再执行的设置,请看代码演示:

ctx.channel().eventLoop().schedule(new Runnable() {
    @Override
    public void run() {
        try {
            // 长时间操作,不至于长时间的业务操作导致 Handler 阻塞
            Thread.sleep(1000);
            System.out.println("长时间的业务解决");
        } catch (Exception e) {e.printStackTrace();
        }
    }
},5, TimeUnit.SECONDS);// 5 秒后执行

仍然关上 debug 进行调试查看,咱们能够有一个 scheduleTaskQueue 工作待执行中

5.3 Future 异步机制

在搭建 HelloWord 工程的时候,咱们看到有一行这样的代码:

ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666);

很多操作都返回这个 ChannelFuture 对象,到底这个 ChannelFuture 对象是用来做什么的呢?

ChannelFuture 提供操作实现时一种异步告诉的形式。个别在 Socket 编程中,期待响应后果都是同步阻塞的,而 Netty 则不会造成阻塞,因为 ChannelFuture 是采取相似观察者模式的模式进行获取后果。请看一段代码演示:

// 增加监听器
channelFuture.addListener(new ChannelFutureListener() {
    // 应用匿名外部类,ChannelFutureListener 接口
    // 重写 operationComplete 办法
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
        // 判断是否操作胜利    
        if (future.isSuccess()) {System.out.println("连贯胜利");
        } else {System.out.println("连贯失败");
        }
    }
});

5.4 Bootstrap 与 ServerBootStrap

Bootstrap 和 ServerBootStrap 是 Netty 提供的一个创立客户端和服务端启动器的工厂类,应用这个工厂类十分便当地创立启动类,依据下面的一些例子,其实也看得出来能大大地缩小了开发的难度。首先看一个类图:

能够看出都是继承于 AbstractBootStrap 抽象类,所以大抵上的配置办法都雷同。

一般来说,应用 Bootstrap 创立启动器的步骤可分为以下几步:

5.4.1 group()

在上一篇文章《Reactor 模式》中,咱们就讲过服务端要应用两个线程组:

  • bossGroup 用于监听客户端连贯,专门负责与客户端创立连贯,并把连贯注册到 workerGroup 的 Selector 中。
  • workerGroup 用于解决每一个连贯产生的读写事件。

个别创立线程组间接应用以下 new 就完事了:

EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();

有点好奇的是,既然是线程组,那线程数默认是多少呢?深刻源码:

    // 应用一个常量保留
    private static final int DEFAULT_EVENT_LOOP_THREADS;

    static {//NettyRuntime.availableProcessors() * 2,cpu 核数的两倍赋值给常量
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

        if (logger.isDebugEnabled()) {logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
        }
    }
    
    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        // 如果不传入,则应用常量的值,也就是 cpu 核数的两倍
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }

通过源码能够看到,默认的线程数是 cpu 核数的两倍。假如想自定义线程数,能够应用有参结构器:

// 设置 bossGroup 线程数为 1
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// 设置 workerGroup 线程数为 16
EventLoopGroup workerGroup = new NioEventLoopGroup(16);

5.4.2 channel()

这个办法用于设置通道类型,当建设连贯后,会依据这个设置创立对应的 Channel 实例。

应用 debug 模式能够看到

通道类型有以下:

NioSocketChannel:异步非阻塞的客户端 TCP Socket 连贯。

NioServerSocketChannel:异步非阻塞的服务器端 TCP Socket 连贯。

罕用的就是这两个通道类型,因为是异步非阻塞的。所以是首选。

OioSocketChannel:同步阻塞的客户端 TCP Socket 连贯。

OioServerSocketChannel:同步阻塞的服务器端 TCP Socket 连贯。

略微在本地调试过,用起来和 Nio 有一些不同,是阻塞的,所以 API 调用也不一样。因为是阻塞的 IO,简直没什么人会抉择应用 Oio,所以也很难找到例子。我略微推敲了一下,通过几次报错之后,总算调通了。代码如下:

//server 端代码,跟下面简直一样,只需改三个中央
// 这个中央应用的是 OioEventLoopGroup
EventLoopGroup bossGroup = new OioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup)// 只须要设置一个线程组 boosGroup
        .channel(OioServerSocketChannel.class)// 设置服务端通道实现类型

//client 端代码,只需改两个中央
// 应用的是 OioEventLoopGroup
EventLoopGroup eventExecutors = new OioEventLoopGroup();
// 通道类型设置为 OioSocketChannel
bootstrap.group(eventExecutors)// 设置线程组
        .channel(OioSocketChannel.class)// 设置客户端的通道实现类型

NioSctpChannel:异步的客户端 Sctp(Stream Control Transmission Protocol,流控制传输协定)连贯。

NioSctpServerChannel:异步的 Sctp 服务器端连贯。

本地没启动胜利,网上看了一些网友的评论,说是只能在 linux 环境下才能够启动。从报错信息看:SCTP not supported on this platform,不反对这个平台。因为我电脑是 window 零碎,所以网友说的有点情理。

5.4.3 option()与 childOption()

首先说一下这两个的区别。

option()设置的是服务端用于接管进来的连贯,也就是 boosGroup 线程。

childOption()是提供给父管道接管到的连贯,也就是 workerGroup 线程。

搞清楚了之后,咱们看一下罕用的一些设置有哪些:

SocketChannel 参数,也就是 childOption()罕用的参数:

SO_RCVBUF Socket 参数,TCP 数据接收缓冲区大小。
TCP_NODELAY TCP 参数,立刻发送数据,默认值为 Ture。
SO_KEEPALIVE Socket 参数,连贯保活,默认值为 False。启用该性能时,TCP 会被动探测闲暇连贯的有效性。

ServerSocketChannel 参数,也就是 option()罕用参数:

SO_BACKLOG Socket 参数,服务端承受连贯的队列长度,如果队列已满,客户端连贯将被回绝。默认值,Windows 为 200,其余为 128。

因为篇幅限度,其余就不列举了,大家能够去网上找材料看看,理解一下。

5.4.4 设置流水线(重点)

ChannelPipeline 是 Netty 解决申请的责任链,ChannelHandler 则是具体解决申请的处理器。实际上每一个 channel 都有一个处理器的流水线。

在 Bootstrap 中 childHandler()办法须要初始化通道,实例化一个 ChannelInitializer,这时候须要重写 initChannel()初始化通道的办法,拆卸流水线就是在这个中央进行。代码演示如下:

// 应用匿名外部类的模式初始化通道对象
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        // 给 pipeline 管道设置自定义的处理器
        socketChannel.pipeline().addLast(new MyServerHandler());
    }
});

处理器 Handler 次要分为两种:

ChannelInboundHandlerAdapter(入站处理器)、ChannelOutboundHandler(出站处理器)

入站指的是数据从底层 java NIO Channel 到 Netty 的 Channel。

出站指的是通过 Netty 的 Channel 来操作底层的 java NIO Channel。

ChannelInboundHandlerAdapter 处理器罕用的事件有

  1. 注册事件 fireChannelRegistered。
  2. 连贯建设事件 fireChannelActive。
  3. 读事件和读实现事件 fireChannelRead、fireChannelReadComplete。
  4. 异样告诉事件 fireExceptionCaught。
  5. 用户自定义事件 fireUserEventTriggered。
  6. Channel 可写状态变动事件 fireChannelWritabilityChanged。
  7. 连贯敞开事件 fireChannelInactive。

ChannelOutboundHandler 处理器罕用的事件有

  1. 端口绑定 bind。
  2. 连贯服务端 connect。
  3. 写事件 write。
  4. 刷新工夫 flush。
  5. 读事件 read。
  6. 被动断开连接 disconnect。
  7. 敞开 channel 事件 close。

还有一个相似的 handler(),次要用于拆卸 parent 通道,也就是 bossGroup 线程。个别状况下,都用不上这个办法。

5.4.5 bind()

提供用于服务端或者客户端绑定服务器地址和端口号,默认是异步启动。如果加上 sync()办法则是同步。

有五个同名的重载办法,作用都是用于绑定地址端口号。不一一介绍了。

5.4.6 优雅地敞开 EventLoopGroup

// 开释掉所有的资源,包含创立的线程
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();

会敞开所有的 child Channel。敞开之后,开释掉底层的资源。

5.5 Channel

Channel 是什么?无妨看一下官网文档的阐明:

A nexus to a network socket or a component which is capable of I/O operations such as read, write, connect, and bind

翻译粗心:一种连贯到网络套接字或能进行读、写、连贯和绑定等 I / O 操作的组件。

如果下面这段阐明比拟形象,上面还有一段阐明:

A channel provides a user:

the current state of the channel (e.g. is it open? is it connected?),
the configuration parameters of the channel (e.g. receive buffer size),
the I/O operations that the channel supports (e.g. read, write, connect, and bind), and
the ChannelPipeline which handles all I/O events and requests associated with the channel.

翻译粗心:

channel 为用户提供:

  1. 通道以后的状态(例如它是关上?还是已连贯?)
  2. channel 的配置参数(例如接收缓冲区的大小)
  3. channel 反对的 IO 操作(例如读、写、连贯和绑定),以及解决与 channel 相关联的所有 IO 事件和申请的 ChannelPipeline。

5.5.1 获取 channel 的状态

boolean isOpen(); // 如果通道关上,则返回 true
boolean isRegistered();// 如果通道注册到 EventLoop,则返回 true
boolean isActive();// 如果通道处于活动状态并且已连贯,则返回 true
boolean isWritable();// 当且仅当 I / O 线程将立刻执行申请的写入操作时,返回 true。

以上就是获取 channel 的四种状态的办法。

5.5.2 获取 channel 的配置参数

获取单条配置信息,应用 getOption(),代码演示:

ChannelConfig config = channel.config();// 获取配置参数
// 获取 ChannelOption.SO_BACKLOG 参数,
Integer soBackLogConfig = config.getOption(ChannelOption.SO_BACKLOG);
// 因为我启动器配置的是 128,所以我这里获取的 soBackLogConfig=128

获取多条配置信息,应用 getOptions(),代码演示:

ChannelConfig config = channel.config();
Map<ChannelOption<?>, Object> options = config.getOptions();
for (Map.Entry<ChannelOption<?>, Object> entry : options.entrySet()) {System.out.println(entry.getKey() + ":" + entry.getValue());
}
/**
SO_REUSEADDR : false
WRITE_BUFFER_LOW_WATER_MARK : 32768
WRITE_BUFFER_WATER_MARK : WriteBufferWaterMark(low: 32768, high: 65536)
SO_BACKLOG : 128
以下省略...
*/

5.5.3 channel 反对的 IO 操作

写操作,这里演示从服务端写音讯发送到客户端:

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ctx.channel().writeAndFlush(Unpooled.copiedBuffer("这波啊,这波是肉蛋葱鸡~", CharsetUtil.UTF_8));
}

客户端控制台:

// 收到服务端 /127.0.0.1:6666 的音讯:这波啊,这波是肉蛋葱鸡~

连贯 操作,代码演示:

ChannelFuture connect = channelFuture.channel().connect(new InetSocketAddress("127.0.0.1", 6666));// 个别应用启动器,这种形式不罕用

通过 channel 获取 ChannelPipeline,并做相干的解决:

// 获取 ChannelPipeline 对象
ChannelPipeline pipeline = ctx.channel().pipeline();
// 往 pipeline 中增加 ChannelHandler 处理器,拆卸流水线
pipeline.addLast(new MyServerHandler());

5.6 Selector

在 NioEventLoop 中,有一个成员变量 selector,这是 nio 包的 Selector,在之前《NIO 入门》中,我曾经讲过 Selector 了。

Netty 中的 Selector 也和 NIO 的 Selector 是一样的,就是用于监听事件,治理注册到 Selector 中的 channel,实现多路复用器。

5.7 PiPeline 与 ChannelPipeline

在后面介绍 Channel 时,咱们晓得能够在 channel 中拆卸 ChannelHandler 流水线处理器,那一个 channel 不可能只有一个 channelHandler 处理器,必定是有很多的,既然是很多 channelHandler 在一个流水线工作,必定是有程序的。

于是 pipeline 就呈现了,pipeline 相当于处理器的容器。初始化 channel 时,把 channelHandler 按程序装在 pipeline 中,就能够实现按序执行 channelHandler 了。

在一个 Channel 中,只有一个 ChannelPipeline。该 pipeline 在 Channel 被创立的时候创立。ChannelPipeline 蕴含了一个 ChannelHander 造成的列表,且所有 ChannelHandler 都会注册到 ChannelPipeline 中。

5.8 ChannelHandlerContext

在 Netty 中,Handler 处理器是有咱们定义的,下面讲过通过集成入站处理器或者出站处理器实现。这时如果咱们想在 Handler 中获取 pipeline 对象,或者 channel 对象,怎么获取呢。

于是 Netty 设计了这个 ChannelHandlerContext 上下文对象,就能够拿到 channel、pipeline 等对象,就能够进行读写等操作。

通过类图,ChannelHandlerContext 是一个接口,上面有三个实现类。

实际上 ChannelHandlerContext 在 pipeline 中是一个链表的模式。看一段源码就明确了:

//ChannelPipeline 实现类 DefaultChannelPipeline 的结构器办法
protected DefaultChannelPipeline(Channel channel) {this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise =  new VoidChannelPromise(channel, true);
    // 设置头结点 head,尾结点 tail
    tail = new TailContext(this);
    head = new HeadContext(this);
    
    head.next = tail;
    tail.prev = head;
}

上面我用一张图来示意,会更加清晰一点:

5.9 EventLoopGroup

咱们先看一下 EventLoopGroup 的类图:

其中包含了罕用的实现类 NioEventLoopGroup。OioEventLoopGroup 在后面的例子中也有应用过。

从 Netty 的架构图中,能够晓得服务器是须要两个线程组进行配合工作的,而这个线程组的接口就是 EventLoopGroup。

每个 EventLoopGroup 里包含一个或多个 EventLoop,每个 EventLoop 中保护一个 Selector 实例。

5.9.1 轮询机制的实现原理

咱们无妨看一段 DefaultEventExecutorChooserFactory 的源码:

private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;

@Override
public EventExecutor next() {//idx.getAndIncrement()相当于 idx++,而后对工作长度取模
    return executors[idx.getAndIncrement() & executors.length - 1];
}

这段代码能够确定执行的形式是轮询机制,接下来 debug 调试一下:

它这里还有一个判断,如果线程数不是 2 的 N 次方,则采纳取模算法实现。

@Override
public EventExecutor next() {return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}

原文链接
本文为阿里云原创内容,未经容许不得转载。

正文完
 0