1.原生NIO存在的问题

2.Netty介绍

3.Netty的工作模型

4.用Netty编写TCP服务

5.工作队列的三种经典应用场景

1.原生NIO存在的问题

后面咱们通过NIO的原生API实现了服务端与客户端的交互:Netty网络编程——NIO编程介绍。咱们在编程的过程中,发现了如下的问题:

1)咱们会发现NIO的类库和API品种繁多,须要把握Selector,ServerSocketChannel,SocketChannel,ByteBuffer等能力顺利编程。

2)咱们还须要对多线程编程,网络编程等十分相熟,咱们能力编写高质量的NIO程序。

3)开发的难度十分大,如果说有:断线重连,网络闪断,半包读写,加密解密等等

4)NIO还会有epoll bug,导致selector空轮训,使cpu空轮训。

2.Netty介绍

为了针对下面这些问题,Netty对JDK自带的NIO进行了封装,解决了上述一系列问题。

1)操作简略,对于各种类型传输有对立的api,而且扩大不便,清晰地把变动的代码和不变的代码拆散开来。

2)使用方便,有具体的文档,而且没有其它依赖项。

3)高性能,吞吐量更高,提早升高。

4)平安

5)社区沉闷,被发现的bug能够被及时修复。

3.Netty的工作模型

后面咱们介绍了Netty网络编程——Reactor模式高性能架构设计原理,Netty也次要基于Reactor的多线程模型做了肯定的批改。

1)netty线程模型形象出了两组线程池BossGroup专门负责解决客户端连贯WorkerGroup专门负责网络的读写

2)两组线程池的类型都是NioEventLoopGroup示意一个一直循环解决工作的线程组

3)每个NioEventLoop都有一个selector,用于监听绑定在其上的socket网络通讯

4)每个BossNioEventLoop循环分3步
4.1)轮询accept事件

4.2)解决accept事件,与client建设连贯,生成NioSocketChannel,并将其注册到某个workerNioEventLoop上。

4.3)解决工作队列的工作(可能会有很耗时的操作,放在工作队列中异步执行)

5)workerNioEventLoop的工作也分三步:
5.1)轮询read,write事件
5.2)解决IO事件
5.3)解决工作队列的工作可能会有很耗时的操作,放在工作队列中异步执行)

6)每个worker解决工作的时候,会应用pipeline(管道),pipeline中蕴含了channel,通过pipeline能够取得对应的通道。

4.用Netty编写TCP服务

咱们初步编写一个程序:

Netty服务在6668端口进行监听,客户端能发送音讯给服务器,服务器也能发送音讯给客户端

NettyServer:

package com.example.demo.netty.nettyDemo;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import lombok.extern.slf4j.Slf4j;/** * @author sulingfeng * @title: NettyServer * @projectName netty-learn * @description: TODO * @date 2022/7/7 17:45 */@Slf4jpublic class NettyServer  {    public static void main(String[] args) throws Exception {        //创立两个线程组        //bossGroup只解决连贯申请        //workerGroup负责解决业务逻辑        //bossGroup和workerGroup含有的子线程个数默认理论为cpu核数*2        EventLoopGroup bossGroup = new NioEventLoopGroup();        EventLoopGroup workerGroup = new NioEventLoopGroup();        try{            //服务器启动对象            ServerBootstrap bootstrap = new ServerBootstrap();            bootstrap.group(bossGroup,workerGroup)//设置两个线程组                    .channel(NioServerSocketChannel.class)//应用NioSocketChannel作为通道的实现                    .option(ChannelOption.SO_BACKLOG,128)//设置线程队列的连贯数量下限                    .childOption(ChannelOption.SO_KEEPALIVE,true)//放弃流动连贯状态                    .childHandler(new ChannelInitializer<SocketChannel>() {                        @Override                        protected void initChannel(SocketChannel ch) throws Exception {                            //设置处理器,解决的业务逻辑在这里                            log.info("客户端socketChannel hashCode = " + ch.hashCode());                            ch.pipeline().addLast(new NettyServerHandler());                        }                    });            log.info("服务器筹备好了");            //监听6668端口            ChannelFuture cf = bootstrap.bind(6668).sync();            cf.addListener(new ChannelFutureListener() {                @Override                public void operationComplete(ChannelFuture channelFuture) throws Exception {                    if(cf.isSuccess()){                        log.info("监听端口 6668 胜利");                    }else{                        log.info("监听端口 6668 失败");                    }                }            });                        //对敞开通道事件  进行监听,如果敞开了就返回执行finally            cf.channel().closeFuture().sync();        }finally {            bossGroup.shutdownGracefully();            workerGroup.shutdownGracefully();        }    }}

NettyServerHandler:

package com.example.demo.netty.nettyDemo;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.Channel;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.channel.ChannelPipeline;import io.netty.util.CharsetUtil;import lombok.extern.slf4j.Slf4j;/** * @author sulingfeng * @title: NettyServerHandler * @projectName netty-learn * @description: TODO * @date 2022/7/8 13:35 *///咱们自定义一个Handler,把业务逻辑全都放在这里@Slf4jpublic class NettyServerHandler extends ChannelInboundHandlerAdapter {    //服务器端读取客户端的数据    /**     * @param ctx 上下文对象,蕴含连贯,管道     * @param msg 客户端发送的数据     * @throws Exception     */    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        log.info("服务器读取线程"+Thread.currentThread().getName()+" channel="+ctx.channel());        Channel channel = ctx.channel();        ChannelPipeline pipeline = ctx.pipeline(); //实质是一个双向链表        ByteBuf buf = (ByteBuf) msg;        System.out.println("客户端发送音讯是:" + buf.toString(CharsetUtil.UTF_8));        System.out.println("客户端地址:" + channel.remoteAddress());    }    //当服务器端实现    @Override    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {        ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~", CharsetUtil.UTF_8));    }    //当产生异样的时候    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {        ctx.close();    }}

NettyClient:

package com.example.demo.netty.nettyDemo;import io.netty.bootstrap.Bootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;/** * @author sulingfeng * @title: NettyClient * @projectName netty-learn * @description: TODO * @date 2022/7/8 13:46 */public class NettyClient {    public static void main(String[] args) throws Exception {        //客户端须要一个事件循环组        NioEventLoopGroup group = new NioEventLoopGroup();        try{            //创立客户端启动对象            Bootstrap bootstrap = new Bootstrap();            bootstrap.group(group)                    .channel(NioSocketChannel.class)                    .handler(new ChannelInitializer<SocketChannel>() {                        @Override                        protected void initChannel(SocketChannel socketChannel) throws Exception {                            //客户端解决逻辑类                            socketChannel.pipeline().addLast(new NettyClientHandler());                        }                    });            //启动客户端去连贯服务器端            //对于 ChannelFuture 要剖析,波及到netty的异步模型            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();            //对敞开通道事件  进行监听            channelFuture.channel().closeFuture().sync();        }finally {            group.shutdownGracefully();        }    }}

NettyClientHandlerr:

package com.example.demo.netty.nettyDemo;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.Channel;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.channel.ChannelPipeline;import io.netty.util.CharsetUtil;import lombok.extern.slf4j.Slf4j;/** * @author sulingfeng * @title: NettyServerHandler * @projectName netty-learn * @description: TODO * @date 2022/7/8 13:35 */@Slf4jpublic class NettyClientHandler extends ChannelInboundHandlerAdapter {    //当通道就绪就会触发该办法    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {        System.out.println("client " + ctx);        ctx.writeAndFlush(Unpooled.copiedBuffer("hello, server", CharsetUtil.UTF_8));    }    //当通道有读取事件时,会触发    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        ByteBuf buf = (ByteBuf) msg;        System.out.println("服务器回复的音讯:" + buf.toString(CharsetUtil.UTF_8));        System.out.println("服务器的地址: "+ ctx.channel().remoteAddress());    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {        cause.printStackTrace();        ctx.close();    }}

5.工作队列的三种经典应用场景

咱们在察看Netty架构图,发现有一个TaskQueue的货色,咱们就来解说一下它:

咱们发现,在handler里,如果工作解决工夫十分地长,势必会影响到其它线程的执行,这个时候咱们能够放在TaskQueue里进行进行异步执行。

TaskQueue有两种:
1)用户自定义的一般工作,放入队列中,轮到这个工作的时候,会立即执行。

2)用户自定义的定时工作,放入队列中,轮到这个工作的时候,会立即执行,相比一般工作,多一个提早的工夫属性,到点主动执行。

package com.example.demo.netty.nettyDemo;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.Channel;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.channel.ChannelPipeline;import io.netty.util.CharsetUtil;import lombok.extern.slf4j.Slf4j;import java.util.ArrayList;import java.util.List;import java.util.concurrent.TimeUnit;/** * @author sulingfeng * @title: NettyServerHandler * @projectName netty-learn * @description: TODO * @date 2022/7/8 13:35 *///咱们自定义一个Handler,把业务逻辑全都放在这里@Slf4jpublic class NettyServerHandler extends ChannelInboundHandlerAdapter {    public static List<Channel> channels = new ArrayList<>();    //服务器端读取客户端的数据    /**     * @param ctx 上下文对象,蕴含连贯,管道     * @param msg 客户端发送的数据     * @throws Exception     */    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        log.info("服务器读取线程"+Thread.currentThread().getName()+" channel="+ctx.channel());        Channel channel = ctx.channel();        channels.add(channel);        ByteBuf buf = (ByteBuf) msg;        //解决方案1 : 用户定义的一般自定义工作        ctx.channel().eventLoop().execute(()->{            try {                Thread.sleep(5000);                ctx.writeAndFlush(Unpooled.copiedBuffer("hello, sleep 5000", CharsetUtil.UTF_8));            } catch (InterruptedException e) {                e.printStackTrace();            }        });        //用户自定义定时工作,能够领有延时(delay)的成果        ctx.channel().eventLoop().schedule(()->{            try {                Thread.sleep(5000);                ctx.writeAndFlush(Unpooled.copiedBuffer("hello, sechdule sleep 5000", CharsetUtil.UTF_8));            } catch (InterruptedException e) {                e.printStackTrace();            }        },5, TimeUnit.SECONDS);        //把所有channel放在一个汇合中,进行转发        channels.stream().forEach(record->{            record.writeAndFlush(Unpooled.copiedBuffer("群发音讯", CharsetUtil.UTF_8));        });        System.out.println("客户端发送音讯是:" + buf.toString(CharsetUtil.UTF_8));        System.out.println("客户端地址:" + channel.remoteAddress());    }    //当服务器端实现    @Override    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {        ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~", CharsetUtil.UTF_8));    }    //当产生异样的时候    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {        ctx.close();    }}