思维导图

前言

本文次要讲述Netty框架的一些个性以及重要组件,心愿看完之后能对Netty框架有一个比拟直观的感触,心愿能帮忙读者疾速入门Netty,缩小一些弯路。

一、Netty概述

官网的介绍:

Netty is an asynchronous event-driven network application framework
for rapid development of maintainable high performance protocol servers & clients.

Netty是 一个异步事件驱动的网络应用程序框架,用于疾速开发可保护的高性能协定服务器和客户端

二、为什么应用Netty

从官网上介绍,Netty是一个网络应用程序框架,开发服务器和客户端。也就是用于网络编程的一个框架。既然是网络编程,Socket就不谈了,为什么不必NIO呢?

2.1 NIO的毛病

对于这个问题,之前我写了一篇文章《NIO入门》对NIO有比拟具体的介绍,NIO的次要问题是:

  • NIO的类库和API繁冗,学习老本高,你须要熟练掌握Selector、ServerSocketChannel、SocketChannel、ByteBuffer等。
  • 须要相熟Java多线程编程。这是因为NIO编程波及到Reactor模式,你必须对多线程和网络编程十分相熟,能力写出高质量的NIO程序。
  • 臭名远扬的epoll bug。它会导致Selector空轮询,最终导致CPU 100%。直到JDK1.7版本仍然没失去根本性的解决。

2.2 Netty的长处

绝对地,Netty的长处有很多:

  • API应用简略,学习成本低。
  • 功能强大,内置了多种解码编码器,反对多种协定。
  • 性能高,比照其余支流的NIO框架,Netty的性能最优。
  • 社区沉闷,发现BUG会及时修复,迭代版本周期短,一直退出新的性能。
  • Dubbo、Elasticsearch都采纳了Netty,品质失去验证。

三、架构图

下面这张图就是在官网首页的架构图,咱们从上到下剖析一下。

绿色的局部Core外围模块,包含零拷贝、API库、可扩大的事件模型。

橙色局部Protocol Support协定反对,包含Http协定、webSocket、SSL(安全套接字协定)、谷歌Protobuf协定、zlib/gzip压缩与解压缩、Large File Transfer大文件传输等等。

红色的局部Transport Services传输服务,包含Socket、Datagram、Http Tunnel等等。

以上可看出Netty的性能、协定、传输方式都比拟全,比拟弱小。

四、永远的Hello Word

首先搭建一个HelloWord工程,先相熟一下API,还有为前面的学习做铺垫。以上面这张图为根据:

4.1 引入Maven依赖

应用的版本是4.1.20,绝对比较稳定的一个版本。

<dependency>    <groupId>io.netty</groupId>    <artifactId>netty-all</artifactId>    <version>4.1.20.Final</version></dependency>

4.2 创立服务端启动类

public class MyServer {    public static void main(String[] args) throws Exception {        //创立两个线程组 boosGroup、workerGroup        EventLoopGroup bossGroup = new NioEventLoopGroup();        EventLoopGroup workerGroup = new NioEventLoopGroup();        try {            //创立服务端的启动对象,设置参数            ServerBootstrap bootstrap = new ServerBootstrap();            //设置两个线程组boosGroup和workerGroup            bootstrap.group(bossGroup, workerGroup)                //设置服务端通道实现类型                    .channel(NioServerSocketChannel.class)                //设置线程队列失去连贯个数                    .option(ChannelOption.SO_BACKLOG, 128)                //设置放弃流动连贯状态                    .childOption(ChannelOption.SO_KEEPALIVE, true)                //应用匿名外部类的模式初始化通道对象                    .childHandler(new ChannelInitializer<SocketChannel>() {                        @Override                        protected void initChannel(SocketChannel socketChannel) throws Exception {                            //给pipeline管道设置处理器                            socketChannel.pipeline().addLast(new MyServerHandler());                        }                    });//给workerGroup的EventLoop对应的管道设置处理器            System.out.println("java技术爱好者的服务端曾经准备就绪...");            //绑定端口号,启动服务端            ChannelFuture channelFuture = bootstrap.bind(6666).sync();            //对敞开通道进行监听            channelFuture.channel().closeFuture().sync();        } finally {            bossGroup.shutdownGracefully();            workerGroup.shutdownGracefully();        }    }}

4.3 创立服务端处理器

/** * 自定义的Handler须要继承Netty规定好的HandlerAdapter * 能力被Netty框架所关联,有点相似SpringMVC的适配器模式 **/public class MyServerHandler extends ChannelInboundHandlerAdapter {    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        //获取客户端发送过去的音讯        ByteBuf byteBuf = (ByteBuf) msg;        System.out.println("收到客户端" + ctx.channel().remoteAddress() + "发送的音讯:" + byteBuf.toString(CharsetUtil.UTF_8));    }    @Override    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {        //发送音讯给客户端        ctx.writeAndFlush(Unpooled.copiedBuffer("服务端已收到音讯,并给你发送一个问号?", CharsetUtil.UTF_8));    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {        //产生异样,敞开通道        ctx.close();    }}

4.4 创立客户端启动类

public class MyClient {    public static void main(String[] args) throws Exception {        NioEventLoopGroup eventExecutors = new NioEventLoopGroup();        try {            //创立bootstrap对象,配置参数            Bootstrap bootstrap = new Bootstrap();            //设置线程组            bootstrap.group(eventExecutors)                //设置客户端的通道实现类型                    .channel(NioSocketChannel.class)                //应用匿名外部类初始化通道                .handler(new ChannelInitializer<SocketChannel>() {                        @Override                        protected void initChannel(SocketChannel ch) throws Exception {                            //增加客户端通道的处理器                            ch.pipeline().addLast(new MyClientHandler());                        }                    });            System.out.println("客户端准备就绪,随时能够腾飞~");            //连贯服务端            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666).sync();            //对通道敞开进行监听            channelFuture.channel().closeFuture().sync();        } finally {            //敞开线程组            eventExecutors.shutdownGracefully();        }    }}

4.5 创立客户端处理器

public class MyClientHandler extends ChannelInboundHandlerAdapter {    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {        //发送音讯到服务端        ctx.writeAndFlush(Unpooled.copiedBuffer("歪比巴卜~茉莉~Are you good~马来西亚~", CharsetUtil.UTF_8));    }    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        //接管服务端发送过去的音讯        ByteBuf byteBuf = (ByteBuf) msg;        System.out.println("收到服务端" + ctx.channel().remoteAddress() + "的音讯:" + byteBuf.toString(CharsetUtil.UTF_8));    }}

4.6 测试

先启动服务端,再启动客户端,就能够看到后果:

MyServer打印后果:

MyClient打印后果:

五、Netty的个性与重要组件

5.1 taskQueue工作队列

如果Handler处理器有一些长时间的业务解决,能够交给taskQueue异步解决。怎么用呢,请看代码演示:

public class MyServerHandler extends ChannelInboundHandlerAdapter {    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        //获取到线程池eventLoop,增加线程,执行        ctx.channel().eventLoop().execute(new Runnable() {            @Override            public void run() {                try {                    //长时间操作,不至于长时间的业务操作导致Handler阻塞                    Thread.sleep(1000);                    System.out.println("长时间的业务解决");                } catch (Exception e) {                    e.printStackTrace();                }            }        });    }}

咱们打一个debug调试,是能够看到增加进去的taskQueue有一个工作。

5.2 scheduleTaskQueue延时工作队列

延时工作队列和下面介绍的工作队列十分类似,只是多了一个可提早肯定工夫再执行的设置,请看代码演示:

ctx.channel().eventLoop().schedule(new Runnable() {    @Override    public void run() {        try {            //长时间操作,不至于长时间的业务操作导致Handler阻塞            Thread.sleep(1000);            System.out.println("长时间的业务解决");        } catch (Exception e) {            e.printStackTrace();        }    }},5, TimeUnit.SECONDS);//5秒后执行

仍然关上debug进行调试查看,咱们能够有一个scheduleTaskQueue工作待执行中

5.3 Future异步机制

在搭建HelloWord工程的时候,咱们看到有一行这样的代码:

ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666);

很多操作都返回这个ChannelFuture对象,到底这个ChannelFuture对象是用来做什么的呢?

ChannelFuture提供操作实现时一种异步告诉的形式。个别在Socket编程中,期待响应后果都是同步阻塞的,而Netty则不会造成阻塞,因为ChannelFuture是采取相似观察者模式的模式进行获取后果。请看一段代码演示:

//增加监听器channelFuture.addListener(new ChannelFutureListener() {    //应用匿名外部类,ChannelFutureListener接口    //重写operationComplete办法    @Override    public void operationComplete(ChannelFuture future) throws Exception {        //判断是否操作胜利            if (future.isSuccess()) {            System.out.println("连贯胜利");        } else {            System.out.println("连贯失败");        }    }});

5.4 Bootstrap与ServerBootStrap

Bootstrap和ServerBootStrap是Netty提供的一个创立客户端和服务端启动器的工厂类,应用这个工厂类十分便当地创立启动类,依据下面的一些例子,其实也看得出来能大大地缩小了开发的难度。首先看一个类图:

能够看出都是继承于AbstractBootStrap抽象类,所以大抵上的配置办法都雷同。

一般来说,应用Bootstrap创立启动器的步骤可分为以下几步:

5.4.1 group()

在上一篇文章《Reactor模式》中,咱们就讲过服务端要应用两个线程组:

  • bossGroup 用于监听客户端连贯,专门负责与客户端创立连贯,并把连贯注册到workerGroup的Selector中。
  • workerGroup用于解决每一个连贯产生的读写事件。

个别创立线程组间接应用以下new就完事了:

EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();

有点好奇的是,既然是线程组,那线程数默认是多少呢?深刻源码:

    //应用一个常量保留    private static final int DEFAULT_EVENT_LOOP_THREADS;    static {        //NettyRuntime.availableProcessors() * 2,cpu核数的两倍赋值给常量        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));        if (logger.isDebugEnabled()) {            logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);        }    }        protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {        //如果不传入,则应用常量的值,也就是cpu核数的两倍        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);    }

通过源码能够看到,默认的线程数是cpu核数的两倍。假如想自定义线程数,能够应用有参结构器:

//设置bossGroup线程数为1EventLoopGroup bossGroup = new NioEventLoopGroup(1);//设置workerGroup线程数为16EventLoopGroup workerGroup = new NioEventLoopGroup(16);

5.4.2 channel()

这个办法用于设置通道类型,当建设连贯后,会依据这个设置创立对应的Channel实例。

应用debug模式能够看到

通道类型有以下:

NioSocketChannel: 异步非阻塞的客户端 TCP Socket 连贯。

NioServerSocketChannel: 异步非阻塞的服务器端 TCP Socket 连贯。

罕用的就是这两个通道类型,因为是异步非阻塞的。所以是首选。

OioSocketChannel: 同步阻塞的客户端 TCP Socket 连贯。

OioServerSocketChannel: 同步阻塞的服务器端 TCP Socket 连贯。

略微在本地调试过,用起来和Nio有一些不同,是阻塞的,所以API调用也不一样。因为是阻塞的IO,简直没什么人会抉择应用Oio,所以也很难找到例子。我略微推敲了一下,通过几次报错之后,总算调通了。代码如下:
//server端代码,跟下面简直一样,只需改三个中央//这个中央应用的是OioEventLoopGroupEventLoopGroup bossGroup = new OioEventLoopGroup();ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup)//只须要设置一个线程组boosGroup        .channel(OioServerSocketChannel.class)//设置服务端通道实现类型//client端代码,只需改两个中央//应用的是OioEventLoopGroupEventLoopGroup eventExecutors = new OioEventLoopGroup();//通道类型设置为OioSocketChannelbootstrap.group(eventExecutors)//设置线程组        .channel(OioSocketChannel.class)//设置客户端的通道实现类型

NioSctpChannel: 异步的客户端 Sctp(Stream Control Transmission Protocol,流控制传输协定)连贯。

NioSctpServerChannel: 异步的 Sctp 服务器端连贯。

本地没启动胜利,网上看了一些网友的评论,说是只能在linux环境下才能够启动。从报错信息看:SCTP not supported on this platform,不反对这个平台。因为我电脑是window零碎,所以网友说的有点情理。

5.4.3 option()与childOption()

首先说一下这两个的区别。

option()设置的是服务端用于接管进来的连贯,也就是boosGroup线程。

childOption()是提供给父管道接管到的连贯,也就是workerGroup线程。

搞清楚了之后,咱们看一下罕用的一些设置有哪些:

SocketChannel参数,也就是childOption()罕用的参数:

SO_RCVBUF Socket参数,TCP数据接收缓冲区大小。
TCP_NODELAY TCP参数,立刻发送数据,默认值为Ture。
SO_KEEPALIVE Socket参数,连贯保活,默认值为False。启用该性能时,TCP会被动探测闲暇连贯的有效性。

ServerSocketChannel参数,也就是option()罕用参数:

SO_BACKLOG Socket参数,服务端承受连贯的队列长度,如果队列已满,客户端连贯将被回绝。默认值,Windows为200,其余为128。

因为篇幅限度,其余就不列举了,大家能够去网上找材料看看,理解一下。

5.4.4 设置流水线(重点)

ChannelPipeline是Netty解决申请的责任链,ChannelHandler则是具体解决申请的处理器。实际上每一个channel都有一个处理器的流水线。

在Bootstrap中childHandler()办法须要初始化通道,实例化一个ChannelInitializer,这时候须要重写initChannel()初始化通道的办法,拆卸流水线就是在这个中央进行。代码演示如下:

//应用匿名外部类的模式初始化通道对象bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {    @Override    protected void initChannel(SocketChannel socketChannel) throws Exception {        //给pipeline管道设置自定义的处理器        socketChannel.pipeline().addLast(new MyServerHandler());    }});

处理器Handler次要分为两种:

ChannelInboundHandlerAdapter(入站处理器)、ChannelOutboundHandler(出站处理器)

入站指的是数据从底层java NIO Channel到Netty的Channel。

出站指的是通过Netty的Channel来操作底层的java NIO Channel。

ChannelInboundHandlerAdapter处理器罕用的事件有

  1. 注册事件 fireChannelRegistered。
  2. 连贯建设事件 fireChannelActive。
  3. 读事件和读实现事件 fireChannelRead、fireChannelReadComplete。
  4. 异样告诉事件 fireExceptionCaught。
  5. 用户自定义事件 fireUserEventTriggered。
  6. Channel 可写状态变动事件 fireChannelWritabilityChanged。
  7. 连贯敞开事件 fireChannelInactive。

ChannelOutboundHandler处理器罕用的事件有

  1. 端口绑定 bind。
  2. 连贯服务端 connect。
  3. 写事件 write。
  4. 刷新工夫 flush。
  5. 读事件 read。
  6. 被动断开连接 disconnect。
  7. 敞开 channel 事件 close。
还有一个相似的handler(),次要用于拆卸parent通道,也就是bossGroup线程。个别状况下,都用不上这个办法。

5.4.5 bind()

提供用于服务端或者客户端绑定服务器地址和端口号,默认是异步启动。如果加上sync()办法则是同步。

有五个同名的重载办法,作用都是用于绑定地址端口号。不一一介绍了。

5.4.6 优雅地敞开EventLoopGroup

//开释掉所有的资源,包含创立的线程bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();

会敞开所有的child Channel。敞开之后,开释掉底层的资源。

5.5 Channel

Channel是什么?无妨看一下官网文档的阐明:

A nexus to a network socket or a component which is capable of I/O operations such as read, write, connect, and bind

翻译粗心:一种连贯到网络套接字或能进行读、写、连贯和绑定等I/O操作的组件。

如果下面这段阐明比拟形象,上面还有一段阐明:

A channel provides a user:

the current state of the channel (e.g. is it open? is it connected?),
the configuration parameters of the channel (e.g. receive buffer size),
the I/O operations that the channel supports (e.g. read, write, connect, and bind), and
the ChannelPipeline which handles all I/O events and requests associated with the channel.

翻译粗心:

channel为用户提供:

  1. 通道以后的状态(例如它是关上?还是已连贯?)
  2. channel的配置参数(例如接收缓冲区的大小)
  3. channel反对的IO操作(例如读、写、连贯和绑定),以及解决与channel相关联的所有IO事件和申请的ChannelPipeline。

5.5.1 获取channel的状态

boolean isOpen(); //如果通道关上,则返回trueboolean isRegistered();//如果通道注册到EventLoop,则返回trueboolean isActive();//如果通道处于活动状态并且已连贯,则返回trueboolean isWritable();//当且仅当I/O线程将立刻执行申请的写入操作时,返回true。

以上就是获取channel的四种状态的办法。

5.5.2 获取channel的配置参数

获取单条配置信息,应用getOption(),代码演示:

ChannelConfig config = channel.config();//获取配置参数//获取ChannelOption.SO_BACKLOG参数,Integer soBackLogConfig = config.getOption(ChannelOption.SO_BACKLOG);//因为我启动器配置的是128,所以我这里获取的soBackLogConfig=128

获取多条配置信息,应用getOptions(),代码演示:

ChannelConfig config = channel.config();Map<ChannelOption<?>, Object> options = config.getOptions();for (Map.Entry<ChannelOption<?>, Object> entry : options.entrySet()) {    System.out.println(entry.getKey() + " : " + entry.getValue());}/**SO_REUSEADDR : falseWRITE_BUFFER_LOW_WATER_MARK : 32768WRITE_BUFFER_WATER_MARK : WriteBufferWaterMark(low: 32768, high: 65536)SO_BACKLOG : 128以下省略...*/

5.5.3 channel反对的IO操作

写操作,这里演示从服务端写音讯发送到客户端:

@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {    ctx.channel().writeAndFlush(Unpooled.copiedBuffer("这波啊,这波是肉蛋葱鸡~", CharsetUtil.UTF_8));}

客户端控制台:

//收到服务端/127.0.0.1:6666的音讯:这波啊,这波是肉蛋葱鸡~

连贯操作,代码演示:

ChannelFuture connect = channelFuture.channel().connect(new InetSocketAddress("127.0.0.1", 6666));//个别应用启动器,这种形式不罕用

通过channel获取ChannelPipeline,并做相干的解决:

//获取ChannelPipeline对象ChannelPipeline pipeline = ctx.channel().pipeline();//往pipeline中增加ChannelHandler处理器,拆卸流水线pipeline.addLast(new MyServerHandler());

5.6 Selector

在NioEventLoop中,有一个成员变量selector,这是nio包的Selector,在之前《NIO入门》中,我曾经讲过Selector了。

Netty中的Selector也和NIO的Selector是一样的,就是用于监听事件,治理注册到Selector中的channel,实现多路复用器。

5.7 PiPeline与ChannelPipeline

在后面介绍Channel时,咱们晓得能够在channel中拆卸ChannelHandler流水线处理器,那一个channel不可能只有一个channelHandler处理器,必定是有很多的,既然是很多channelHandler在一个流水线工作,必定是有程序的。

于是pipeline就呈现了,pipeline相当于处理器的容器。初始化channel时,把channelHandler按程序装在pipeline中,就能够实现按序执行channelHandler了。

在一个Channel中,只有一个ChannelPipeline。该pipeline在Channel被创立的时候创立。ChannelPipeline蕴含了一个ChannelHander造成的列表,且所有ChannelHandler都会注册到ChannelPipeline中。

5.8 ChannelHandlerContext

在Netty中,Handler处理器是有咱们定义的,下面讲过通过集成入站处理器或者出站处理器实现。这时如果咱们想在Handler中获取pipeline对象,或者channel对象,怎么获取呢。

于是Netty设计了这个ChannelHandlerContext上下文对象,就能够拿到channel、pipeline等对象,就能够进行读写等操作。

通过类图,ChannelHandlerContext是一个接口,上面有三个实现类。

实际上ChannelHandlerContext在pipeline中是一个链表的模式。看一段源码就明确了:

//ChannelPipeline实现类DefaultChannelPipeline的结构器办法protected DefaultChannelPipeline(Channel channel) {    this.channel = ObjectUtil.checkNotNull(channel, "channel");    succeededFuture = new SucceededChannelFuture(channel, null);    voidPromise =  new VoidChannelPromise(channel, true);    //设置头结点head,尾结点tail    tail = new TailContext(this);    head = new HeadContext(this);        head.next = tail;    tail.prev = head;}

上面我用一张图来示意,会更加清晰一点:

5.9 EventLoopGroup

咱们先看一下EventLoopGroup的类图:

其中包含了罕用的实现类NioEventLoopGroup。OioEventLoopGroup在后面的例子中也有应用过。

从Netty的架构图中,能够晓得服务器是须要两个线程组进行配合工作的,而这个线程组的接口就是EventLoopGroup。

每个EventLoopGroup里包含一个或多个EventLoop,每个EventLoop中保护一个Selector实例。

5.9.1 轮询机制的实现原理

咱们无妨看一段DefaultEventExecutorChooserFactory的源码:

private final AtomicInteger idx = new AtomicInteger();private final EventExecutor[] executors;@Overridepublic EventExecutor next() {    //idx.getAndIncrement()相当于idx++,而后对工作长度取模    return executors[idx.getAndIncrement() & executors.length - 1];}

这段代码能够确定执行的形式是轮询机制,接下来debug调试一下:

它这里还有一个判断,如果线程数不是2的N次方,则采纳取模算法实现。

@Overridepublic EventExecutor next() {    return executors[Math.abs(idx.getAndIncrement() % executors.length)];}

原文链接

本文为阿里云原创内容,未经容许不得转载。思维导图

前言

本文次要讲述Netty框架的一些个性以及重要组件,心愿看完之后能对Netty框架有一个比拟直观的感触,心愿能帮忙读者疾速入门Netty,缩小一些弯路。

一、Netty概述

官网的介绍:

Netty is an asynchronous event-driven network application framework
for rapid development of maintainable high performance protocol servers & clients.

Netty是 一个异步事件驱动的网络应用程序框架,用于疾速开发可保护的高性能协定服务器和客户端

二、为什么应用Netty

从官网上介绍,Netty是一个网络应用程序框架,开发服务器和客户端。也就是用于网络编程的一个框架。既然是网络编程,Socket就不谈了,为什么不必NIO呢?

2.1 NIO的毛病

对于这个问题,之前我写了一篇文章《NIO入门》对NIO有比拟具体的介绍,NIO的次要问题是:

  • NIO的类库和API繁冗,学习老本高,你须要熟练掌握Selector、ServerSocketChannel、SocketChannel、ByteBuffer等。
  • 须要相熟Java多线程编程。这是因为NIO编程波及到Reactor模式,你必须对多线程和网络编程十分相熟,能力写出高质量的NIO程序。
  • 臭名远扬的epoll bug。它会导致Selector空轮询,最终导致CPU 100%。直到JDK1.7版本仍然没失去根本性的解决。

2.2 Netty的长处

绝对地,Netty的长处有很多:

  • API应用简略,学习成本低。
  • 功能强大,内置了多种解码编码器,反对多种协定。
  • 性能高,比照其余支流的NIO框架,Netty的性能最优。
  • 社区沉闷,发现BUG会及时修复,迭代版本周期短,一直退出新的性能。
  • Dubbo、Elasticsearch都采纳了Netty,品质失去验证。

三、架构图

下面这张图就是在官网首页的架构图,咱们从上到下剖析一下。

绿色的局部Core外围模块,包含零拷贝、API库、可扩大的事件模型。

橙色局部Protocol Support协定反对,包含Http协定、webSocket、SSL(安全套接字协定)、谷歌Protobuf协定、zlib/gzip压缩与解压缩、Large File Transfer大文件传输等等。

红色的局部Transport Services传输服务,包含Socket、Datagram、Http Tunnel等等。

以上可看出Netty的性能、协定、传输方式都比拟全,比拟弱小。

四、永远的Hello Word

首先搭建一个HelloWord工程,先相熟一下API,还有为前面的学习做铺垫。以上面这张图为根据:

4.1 引入Maven依赖

应用的版本是4.1.20,绝对比较稳定的一个版本。

<dependency>    <groupId>io.netty</groupId>    <artifactId>netty-all</artifactId>    <version>4.1.20.Final</version></dependency>

4.2 创立服务端启动类

public class MyServer {    public static void main(String[] args) throws Exception {        //创立两个线程组 boosGroup、workerGroup        EventLoopGroup bossGroup = new NioEventLoopGroup();        EventLoopGroup workerGroup = new NioEventLoopGroup();        try {            //创立服务端的启动对象,设置参数            ServerBootstrap bootstrap = new ServerBootstrap();            //设置两个线程组boosGroup和workerGroup            bootstrap.group(bossGroup, workerGroup)                //设置服务端通道实现类型                    .channel(NioServerSocketChannel.class)                //设置线程队列失去连贯个数                    .option(ChannelOption.SO_BACKLOG, 128)                //设置放弃流动连贯状态                    .childOption(ChannelOption.SO_KEEPALIVE, true)                //应用匿名外部类的模式初始化通道对象                    .childHandler(new ChannelInitializer<SocketChannel>() {                        @Override                        protected void initChannel(SocketChannel socketChannel) throws Exception {                            //给pipeline管道设置处理器                            socketChannel.pipeline().addLast(new MyServerHandler());                        }                    });//给workerGroup的EventLoop对应的管道设置处理器            System.out.println("java技术爱好者的服务端曾经准备就绪...");            //绑定端口号,启动服务端            ChannelFuture channelFuture = bootstrap.bind(6666).sync();            //对敞开通道进行监听            channelFuture.channel().closeFuture().sync();        } finally {            bossGroup.shutdownGracefully();            workerGroup.shutdownGracefully();        }    }}

4.3 创立服务端处理器

/** * 自定义的Handler须要继承Netty规定好的HandlerAdapter * 能力被Netty框架所关联,有点相似SpringMVC的适配器模式 **/public class MyServerHandler extends ChannelInboundHandlerAdapter {    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        //获取客户端发送过去的音讯        ByteBuf byteBuf = (ByteBuf) msg;        System.out.println("收到客户端" + ctx.channel().remoteAddress() + "发送的音讯:" + byteBuf.toString(CharsetUtil.UTF_8));    }    @Override    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {        //发送音讯给客户端        ctx.writeAndFlush(Unpooled.copiedBuffer("服务端已收到音讯,并给你发送一个问号?", CharsetUtil.UTF_8));    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {        //产生异样,敞开通道        ctx.close();    }}

4.4 创立客户端启动类

public class MyClient {    public static void main(String[] args) throws Exception {        NioEventLoopGroup eventExecutors = new NioEventLoopGroup();        try {            //创立bootstrap对象,配置参数            Bootstrap bootstrap = new Bootstrap();            //设置线程组            bootstrap.group(eventExecutors)                //设置客户端的通道实现类型                    .channel(NioSocketChannel.class)                //应用匿名外部类初始化通道                .handler(new ChannelInitializer<SocketChannel>() {                        @Override                        protected void initChannel(SocketChannel ch) throws Exception {                            //增加客户端通道的处理器                            ch.pipeline().addLast(new MyClientHandler());                        }                    });            System.out.println("客户端准备就绪,随时能够腾飞~");            //连贯服务端            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666).sync();            //对通道敞开进行监听            channelFuture.channel().closeFuture().sync();        } finally {            //敞开线程组            eventExecutors.shutdownGracefully();        }    }}

4.5 创立客户端处理器

public class MyClientHandler extends ChannelInboundHandlerAdapter {    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {        //发送音讯到服务端        ctx.writeAndFlush(Unpooled.copiedBuffer("歪比巴卜~茉莉~Are you good~马来西亚~", CharsetUtil.UTF_8));    }    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        //接管服务端发送过去的音讯        ByteBuf byteBuf = (ByteBuf) msg;        System.out.println("收到服务端" + ctx.channel().remoteAddress() + "的音讯:" + byteBuf.toString(CharsetUtil.UTF_8));    }}

4.6 测试

先启动服务端,再启动客户端,就能够看到后果:

MyServer打印后果:

MyClient打印后果:

五、Netty的个性与重要组件

5.1 taskQueue工作队列

如果Handler处理器有一些长时间的业务解决,能够交给taskQueue异步解决。怎么用呢,请看代码演示:

public class MyServerHandler extends ChannelInboundHandlerAdapter {    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        //获取到线程池eventLoop,增加线程,执行        ctx.channel().eventLoop().execute(new Runnable() {            @Override            public void run() {                try {                    //长时间操作,不至于长时间的业务操作导致Handler阻塞                    Thread.sleep(1000);                    System.out.println("长时间的业务解决");                } catch (Exception e) {                    e.printStackTrace();                }            }        });    }}

咱们打一个debug调试,是能够看到增加进去的taskQueue有一个工作。

5.2 scheduleTaskQueue延时工作队列

延时工作队列和下面介绍的工作队列十分类似,只是多了一个可提早肯定工夫再执行的设置,请看代码演示:

ctx.channel().eventLoop().schedule(new Runnable() {    @Override    public void run() {        try {            //长时间操作,不至于长时间的业务操作导致Handler阻塞            Thread.sleep(1000);            System.out.println("长时间的业务解决");        } catch (Exception e) {            e.printStackTrace();        }    }},5, TimeUnit.SECONDS);//5秒后执行

仍然关上debug进行调试查看,咱们能够有一个scheduleTaskQueue工作待执行中

5.3 Future异步机制

在搭建HelloWord工程的时候,咱们看到有一行这样的代码:

ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666);

很多操作都返回这个ChannelFuture对象,到底这个ChannelFuture对象是用来做什么的呢?

ChannelFuture提供操作实现时一种异步告诉的形式。个别在Socket编程中,期待响应后果都是同步阻塞的,而Netty则不会造成阻塞,因为ChannelFuture是采取相似观察者模式的模式进行获取后果。请看一段代码演示:

//增加监听器channelFuture.addListener(new ChannelFutureListener() {    //应用匿名外部类,ChannelFutureListener接口    //重写operationComplete办法    @Override    public void operationComplete(ChannelFuture future) throws Exception {        //判断是否操作胜利            if (future.isSuccess()) {            System.out.println("连贯胜利");        } else {            System.out.println("连贯失败");        }    }});

5.4 Bootstrap与ServerBootStrap

Bootstrap和ServerBootStrap是Netty提供的一个创立客户端和服务端启动器的工厂类,应用这个工厂类十分便当地创立启动类,依据下面的一些例子,其实也看得出来能大大地缩小了开发的难度。首先看一个类图:

能够看出都是继承于AbstractBootStrap抽象类,所以大抵上的配置办法都雷同。

一般来说,应用Bootstrap创立启动器的步骤可分为以下几步:

5.4.1 group()

在上一篇文章《Reactor模式》中,咱们就讲过服务端要应用两个线程组:

  • bossGroup 用于监听客户端连贯,专门负责与客户端创立连贯,并把连贯注册到workerGroup的Selector中。
  • workerGroup用于解决每一个连贯产生的读写事件。

个别创立线程组间接应用以下new就完事了:

EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();

有点好奇的是,既然是线程组,那线程数默认是多少呢?深刻源码:

    //应用一个常量保留    private static final int DEFAULT_EVENT_LOOP_THREADS;    static {        //NettyRuntime.availableProcessors() * 2,cpu核数的两倍赋值给常量        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));        if (logger.isDebugEnabled()) {            logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);        }    }        protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {        //如果不传入,则应用常量的值,也就是cpu核数的两倍        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);    }

通过源码能够看到,默认的线程数是cpu核数的两倍。假如想自定义线程数,能够应用有参结构器:

//设置bossGroup线程数为1EventLoopGroup bossGroup = new NioEventLoopGroup(1);//设置workerGroup线程数为16EventLoopGroup workerGroup = new NioEventLoopGroup(16);

5.4.2 channel()

这个办法用于设置通道类型,当建设连贯后,会依据这个设置创立对应的Channel实例。

应用debug模式能够看到

通道类型有以下:

NioSocketChannel: 异步非阻塞的客户端 TCP Socket 连贯。

NioServerSocketChannel: 异步非阻塞的服务器端 TCP Socket 连贯。

罕用的就是这两个通道类型,因为是异步非阻塞的。所以是首选。

OioSocketChannel: 同步阻塞的客户端 TCP Socket 连贯。

OioServerSocketChannel: 同步阻塞的服务器端 TCP Socket 连贯。

略微在本地调试过,用起来和Nio有一些不同,是阻塞的,所以API调用也不一样。因为是阻塞的IO,简直没什么人会抉择应用Oio,所以也很难找到例子。我略微推敲了一下,通过几次报错之后,总算调通了。代码如下:
//server端代码,跟下面简直一样,只需改三个中央//这个中央应用的是OioEventLoopGroupEventLoopGroup bossGroup = new OioEventLoopGroup();ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup)//只须要设置一个线程组boosGroup        .channel(OioServerSocketChannel.class)//设置服务端通道实现类型//client端代码,只需改两个中央//应用的是OioEventLoopGroupEventLoopGroup eventExecutors = new OioEventLoopGroup();//通道类型设置为OioSocketChannelbootstrap.group(eventExecutors)//设置线程组        .channel(OioSocketChannel.class)//设置客户端的通道实现类型

NioSctpChannel: 异步的客户端 Sctp(Stream Control Transmission Protocol,流控制传输协定)连贯。

NioSctpServerChannel: 异步的 Sctp 服务器端连贯。

本地没启动胜利,网上看了一些网友的评论,说是只能在linux环境下才能够启动。从报错信息看:SCTP not supported on this platform,不反对这个平台。因为我电脑是window零碎,所以网友说的有点情理。

5.4.3 option()与childOption()

首先说一下这两个的区别。

option()设置的是服务端用于接管进来的连贯,也就是boosGroup线程。

childOption()是提供给父管道接管到的连贯,也就是workerGroup线程。

搞清楚了之后,咱们看一下罕用的一些设置有哪些:

SocketChannel参数,也就是childOption()罕用的参数:

SO_RCVBUF Socket参数,TCP数据接收缓冲区大小。
TCP_NODELAY TCP参数,立刻发送数据,默认值为Ture。
SO_KEEPALIVE Socket参数,连贯保活,默认值为False。启用该性能时,TCP会被动探测闲暇连贯的有效性。

ServerSocketChannel参数,也就是option()罕用参数:

SO_BACKLOG Socket参数,服务端承受连贯的队列长度,如果队列已满,客户端连贯将被回绝。默认值,Windows为200,其余为128。

因为篇幅限度,其余就不列举了,大家能够去网上找材料看看,理解一下。

5.4.4 设置流水线(重点)

ChannelPipeline是Netty解决申请的责任链,ChannelHandler则是具体解决申请的处理器。实际上每一个channel都有一个处理器的流水线。

在Bootstrap中childHandler()办法须要初始化通道,实例化一个ChannelInitializer,这时候须要重写initChannel()初始化通道的办法,拆卸流水线就是在这个中央进行。代码演示如下:

//应用匿名外部类的模式初始化通道对象bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {    @Override    protected void initChannel(SocketChannel socketChannel) throws Exception {        //给pipeline管道设置自定义的处理器        socketChannel.pipeline().addLast(new MyServerHandler());    }});

处理器Handler次要分为两种:

ChannelInboundHandlerAdapter(入站处理器)、ChannelOutboundHandler(出站处理器)

入站指的是数据从底层java NIO Channel到Netty的Channel。

出站指的是通过Netty的Channel来操作底层的java NIO Channel。

ChannelInboundHandlerAdapter处理器罕用的事件有

  1. 注册事件 fireChannelRegistered。
  2. 连贯建设事件 fireChannelActive。
  3. 读事件和读实现事件 fireChannelRead、fireChannelReadComplete。
  4. 异样告诉事件 fireExceptionCaught。
  5. 用户自定义事件 fireUserEventTriggered。
  6. Channel 可写状态变动事件 fireChannelWritabilityChanged。
  7. 连贯敞开事件 fireChannelInactive。

ChannelOutboundHandler处理器罕用的事件有

  1. 端口绑定 bind。
  2. 连贯服务端 connect。
  3. 写事件 write。
  4. 刷新工夫 flush。
  5. 读事件 read。
  6. 被动断开连接 disconnect。
  7. 敞开 channel 事件 close。
还有一个相似的handler(),次要用于拆卸parent通道,也就是bossGroup线程。个别状况下,都用不上这个办法。

5.4.5 bind()

提供用于服务端或者客户端绑定服务器地址和端口号,默认是异步启动。如果加上sync()办法则是同步。

有五个同名的重载办法,作用都是用于绑定地址端口号。不一一介绍了。

5.4.6 优雅地敞开EventLoopGroup

//开释掉所有的资源,包含创立的线程bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();

会敞开所有的child Channel。敞开之后,开释掉底层的资源。

5.5 Channel

Channel是什么?无妨看一下官网文档的阐明:

A nexus to a network socket or a component which is capable of I/O operations such as read, write, connect, and bind

翻译粗心:一种连贯到网络套接字或能进行读、写、连贯和绑定等I/O操作的组件。

如果下面这段阐明比拟形象,上面还有一段阐明:

A channel provides a user:

the current state of the channel (e.g. is it open? is it connected?),
the configuration parameters of the channel (e.g. receive buffer size),
the I/O operations that the channel supports (e.g. read, write, connect, and bind), and
the ChannelPipeline which handles all I/O events and requests associated with the channel.

翻译粗心:

channel为用户提供:

  1. 通道以后的状态(例如它是关上?还是已连贯?)
  2. channel的配置参数(例如接收缓冲区的大小)
  3. channel反对的IO操作(例如读、写、连贯和绑定),以及解决与channel相关联的所有IO事件和申请的ChannelPipeline。

5.5.1 获取channel的状态

boolean isOpen(); //如果通道关上,则返回trueboolean isRegistered();//如果通道注册到EventLoop,则返回trueboolean isActive();//如果通道处于活动状态并且已连贯,则返回trueboolean isWritable();//当且仅当I/O线程将立刻执行申请的写入操作时,返回true。

以上就是获取channel的四种状态的办法。

5.5.2 获取channel的配置参数

获取单条配置信息,应用getOption(),代码演示:

ChannelConfig config = channel.config();//获取配置参数//获取ChannelOption.SO_BACKLOG参数,Integer soBackLogConfig = config.getOption(ChannelOption.SO_BACKLOG);//因为我启动器配置的是128,所以我这里获取的soBackLogConfig=128

获取多条配置信息,应用getOptions(),代码演示:

ChannelConfig config = channel.config();Map<ChannelOption<?>, Object> options = config.getOptions();for (Map.Entry<ChannelOption<?>, Object> entry : options.entrySet()) {    System.out.println(entry.getKey() + " : " + entry.getValue());}/**SO_REUSEADDR : falseWRITE_BUFFER_LOW_WATER_MARK : 32768WRITE_BUFFER_WATER_MARK : WriteBufferWaterMark(low: 32768, high: 65536)SO_BACKLOG : 128以下省略...*/

5.5.3 channel反对的IO操作

写操作,这里演示从服务端写音讯发送到客户端:

@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {    ctx.channel().writeAndFlush(Unpooled.copiedBuffer("这波啊,这波是肉蛋葱鸡~", CharsetUtil.UTF_8));}

客户端控制台:

//收到服务端/127.0.0.1:6666的音讯:这波啊,这波是肉蛋葱鸡~

连贯操作,代码演示:

ChannelFuture connect = channelFuture.channel().connect(new InetSocketAddress("127.0.0.1", 6666));//个别应用启动器,这种形式不罕用

通过channel获取ChannelPipeline,并做相干的解决:

//获取ChannelPipeline对象ChannelPipeline pipeline = ctx.channel().pipeline();//往pipeline中增加ChannelHandler处理器,拆卸流水线pipeline.addLast(new MyServerHandler());

5.6 Selector

在NioEventLoop中,有一个成员变量selector,这是nio包的Selector,在之前《NIO入门》中,我曾经讲过Selector了。

Netty中的Selector也和NIO的Selector是一样的,就是用于监听事件,治理注册到Selector中的channel,实现多路复用器。

5.7 PiPeline与ChannelPipeline

在后面介绍Channel时,咱们晓得能够在channel中拆卸ChannelHandler流水线处理器,那一个channel不可能只有一个channelHandler处理器,必定是有很多的,既然是很多channelHandler在一个流水线工作,必定是有程序的。

于是pipeline就呈现了,pipeline相当于处理器的容器。初始化channel时,把channelHandler按程序装在pipeline中,就能够实现按序执行channelHandler了。

在一个Channel中,只有一个ChannelPipeline。该pipeline在Channel被创立的时候创立。ChannelPipeline蕴含了一个ChannelHander造成的列表,且所有ChannelHandler都会注册到ChannelPipeline中。

5.8 ChannelHandlerContext

在Netty中,Handler处理器是有咱们定义的,下面讲过通过集成入站处理器或者出站处理器实现。这时如果咱们想在Handler中获取pipeline对象,或者channel对象,怎么获取呢。

于是Netty设计了这个ChannelHandlerContext上下文对象,就能够拿到channel、pipeline等对象,就能够进行读写等操作。

通过类图,ChannelHandlerContext是一个接口,上面有三个实现类。

实际上ChannelHandlerContext在pipeline中是一个链表的模式。看一段源码就明确了:

//ChannelPipeline实现类DefaultChannelPipeline的结构器办法protected DefaultChannelPipeline(Channel channel) {    this.channel = ObjectUtil.checkNotNull(channel, "channel");    succeededFuture = new SucceededChannelFuture(channel, null);    voidPromise =  new VoidChannelPromise(channel, true);    //设置头结点head,尾结点tail    tail = new TailContext(this);    head = new HeadContext(this);        head.next = tail;    tail.prev = head;}

上面我用一张图来示意,会更加清晰一点:

5.9 EventLoopGroup

咱们先看一下EventLoopGroup的类图:

其中包含了罕用的实现类NioEventLoopGroup。OioEventLoopGroup在后面的例子中也有应用过。

从Netty的架构图中,能够晓得服务器是须要两个线程组进行配合工作的,而这个线程组的接口就是EventLoopGroup。

每个EventLoopGroup里包含一个或多个EventLoop,每个EventLoop中保护一个Selector实例。

5.9.1 轮询机制的实现原理

咱们无妨看一段DefaultEventExecutorChooserFactory的源码:

private final AtomicInteger idx = new AtomicInteger();private final EventExecutor[] executors;@Overridepublic EventExecutor next() {    //idx.getAndIncrement()相当于idx++,而后对工作长度取模    return executors[idx.getAndIncrement() & executors.length - 1];}

这段代码能够确定执行的形式是轮询机制,接下来debug调试一下:

它这里还有一个判断,如果线程数不是2的N次方,则采纳取模算法实现。

@Overridepublic EventExecutor next() {    return executors[Math.abs(idx.getAndIncrement() % executors.length)];}

原文链接
本文为阿里云原创内容,未经容许不得转载。