乐趣区

关于netty入门:Netty网络编程Netty入门

1. 原生 NIO 存在的问题

2.Netty 介绍

3.Netty 的工作模型

4. 用 Netty 编写 TCP 服务

5. 工作队列的三种经典应用场景

1. 原生 NIO 存在的问题

后面咱们通过 NIO 的原生 API 实现了服务端与客户端的交互:Netty 网络编程——NIO 编程介绍。咱们在编程的过程中,发现了如下的问题:

1)咱们会发现NIO 的类库和 API 品种繁多,须要把握 Selector,ServerSocketChannel,SocketChannel,ByteBuffer 等能力顺利编程。

2)咱们还须要 对多线程编程,网络编程等十分相熟,咱们能力编写高质量的 NIO 程序。

3)开发的难度十分大,如果说有:断线重连,网络闪断,半包读写,加密解密等等

4)NIO 还会有 epoll bug,导致 selector 空轮训,使 cpu 空轮训。

2.Netty 介绍

为了针对下面这些问题,Netty 对 JDK 自带的 NIO 进行了封装,解决了上述一系列问题。

1)操作简略,对于各种类型传输有对立的 api,而且扩大不便,清晰地把变动的代码和不变的代码拆散开来。

2)使用方便,有具体的文档,而且没有其它依赖项。

3)高性能,吞吐量更高,提早升高。

4)平安

5)社区沉闷,被发现的 bug 能够被及时修复。

3.Netty 的工作模型

后面咱们介绍了 Netty 网络编程——Reactor 模式高性能架构设计原理,Netty 也次要基于 Reactor 的多线程模型做了肯定的批改。

1)netty 线程模型 形象出了两组线程池 BossGroup 专门 负责解决客户端连贯 WorkerGroup 专门负责 网络的读写

2)两组线程池的类型都是 NioEventLoopGroup 示意一个一直循环解决工作的线程组

3)每个 NioEventLoop 都有一个 selector,用于 监听绑定在其上的 socket 网络通讯

4)每个 BossNioEventLoop循环分 3 步
4.1) 轮询 accept 事件

4.2)解决 accept 事件,与 client 建设连贯,生成 NioSocketChannel,并将其注册到某个 workerNioEventLoop 上。

4.3)解决工作队列的工作(可能会有很耗时的操作,放在工作队列中异步执行)

5)workerNioEventLoop的工作也分三步:
5.1)轮询 read,write 事件
5.2) 解决 IO 事件
5.3) 解决工作队列的工作 可能会有很耗时的操作,放在工作队列中异步执行)

6)每个 worker 解决工作的时候,会应用pipeline(管道),pipeline 中蕴含了 channel,通过 pipeline 能够取得对应的通道。

4. 用 Netty 编写 TCP 服务

咱们初步编写一个程序:

Netty 服务在 6668 端口进行监听,客户端能发送音讯给服务器,服务器也能发送音讯给客户端

NettyServer:

package com.example.demo.netty.nettyDemo;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;

/**
 * @author sulingfeng
 * @title: NettyServer
 * @projectName netty-learn
 * @description: TODO
 * @date 2022/7/7 17:45
 */
@Slf4j
public class NettyServer  {public static void main(String[] args) throws Exception {

        // 创立两个线程组
        //bossGroup 只解决连贯申请
        //workerGroup 负责解决业务逻辑
        //bossGroup 和 workerGroup 含有的子线程个数默认理论为 cpu 核数 *2
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();


        try{
            // 服务器启动对象
            ServerBootstrap bootstrap = new ServerBootstrap();

            bootstrap.group(bossGroup,workerGroup)// 设置两个线程组
                    .channel(NioServerSocketChannel.class)// 应用 NioSocketChannel 作为通道的实现
                    .option(ChannelOption.SO_BACKLOG,128)// 设置线程队列的连贯数量下限
                    .childOption(ChannelOption.SO_KEEPALIVE,true)// 放弃流动连贯状态
                    .childHandler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            // 设置处理器,解决的业务逻辑在这里
                            log.info("客户端 socketChannel hashCode =" + ch.hashCode());
                            ch.pipeline().addLast(new NettyServerHandler());
                        }
                    });
            log.info("服务器筹备好了");
            // 监听 6668 端口
            ChannelFuture cf = bootstrap.bind(6668).sync();

            cf.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture channelFuture) throws Exception {if(cf.isSuccess()){log.info("监听端口 6668 胜利");
                    }else{log.info("监听端口 6668 失败");
                    }
                }
            });
            
            // 对敞开通道事件  进行监听,如果敞开了就返回执行 finally
            cf.channel().closeFuture().sync();}finally {bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();}
    }
}

NettyServerHandler:

package com.example.demo.netty.nettyDemo;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;

/**
 * @author sulingfeng
 * @title: NettyServerHandler
 * @projectName netty-learn
 * @description: TODO
 * @date 2022/7/8 13:35
 */
// 咱们自定义一个 Handler,把业务逻辑全都放在这里
@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    // 服务器端读取客户端的数据

    /**
     * @param ctx 上下文对象,蕴含连贯,管道
     * @param msg 客户端发送的数据
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info("服务器读取线程"+Thread.currentThread().getName()+"channel="+ctx.channel());
        Channel channel = ctx.channel();
        ChannelPipeline pipeline = ctx.pipeline(); // 实质是一个双向链表
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("客户端发送音讯是:" + buf.toString(CharsetUtil.UTF_8));
        System.out.println("客户端地址:" + channel.remoteAddress());
    }

    // 当服务器端实现
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~", CharsetUtil.UTF_8));
    }

    // 当产生异样的时候
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();
    }
}

NettyClient:

package com.example.demo.netty.nettyDemo;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

/**
 * @author sulingfeng
 * @title: NettyClient
 * @projectName netty-learn
 * @description: TODO
 * @date 2022/7/8 13:46
 */
public class NettyClient {public static void main(String[] args) throws Exception {
        // 客户端须要一个事件循环组
        NioEventLoopGroup group = new NioEventLoopGroup();

        try{
            // 创立客户端启动对象
            Bootstrap bootstrap = new Bootstrap();

            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            // 客户端解决逻辑类
                            socketChannel.pipeline().addLast(new NettyClientHandler());
                        }
                    });
            // 启动客户端去连贯服务器端
            // 对于 ChannelFuture 要剖析,波及到 netty 的异步模型
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
            // 对敞开通道事件  进行监听
            channelFuture.channel().closeFuture().sync();}finally {group.shutdownGracefully();
        }
    }
}

NettyClientHandlerr:

package com.example.demo.netty.nettyDemo;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;

/**
 * @author sulingfeng
 * @title: NettyServerHandler
 * @projectName netty-learn
 * @description: TODO
 * @date 2022/7/8 13:35
 */
@Slf4j
public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    // 当通道就绪就会触发该办法
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("client" + ctx);
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello, server", CharsetUtil.UTF_8));
    }

    // 当通道有读取事件时,会触发
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;
        System.out.println("服务器回复的音讯:" + buf.toString(CharsetUtil.UTF_8));
        System.out.println("服务器的地址:"+ ctx.channel().remoteAddress());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();
        ctx.close();}
}

5. 工作队列的三种经典应用场景

咱们在察看 Netty 架构图,发现有一个 TaskQueue 的货色,咱们就来解说一下它:

咱们发现,在 handler 里,如果工作解决工夫十分地长,势必会影响到其它线程的执行,这个时候咱们能够放在 TaskQueue 里进行进行异步执行。

TaskQueue 有两种:
1)用户自定义的一般工作,放入队列中,轮到这个工作的时候,会立即执行。

2)用户自定义的定时工作,放入队列中,轮到这个工作的时候,会立即执行,相比一般工作,多一个提早的工夫属性,到点主动执行。

package com.example.demo.netty.nettyDemo;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
 * @author sulingfeng
 * @title: NettyServerHandler
 * @projectName netty-learn
 * @description: TODO
 * @date 2022/7/8 13:35
 */
// 咱们自定义一个 Handler,把业务逻辑全都放在这里
@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {public static List<Channel> channels = new ArrayList<>();

    // 服务器端读取客户端的数据
    /**
     * @param ctx 上下文对象,蕴含连贯,管道
     * @param msg 客户端发送的数据
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info("服务器读取线程"+Thread.currentThread().getName()+"channel="+ctx.channel());
        Channel channel = ctx.channel();
        channels.add(channel);
        ByteBuf buf = (ByteBuf) msg;

        // 解决方案 1:用户定义的一般自定义工作
        ctx.channel().eventLoop().execute(()->{
            try {Thread.sleep(5000);
                ctx.writeAndFlush(Unpooled.copiedBuffer("hello, sleep 5000", CharsetUtil.UTF_8));
            } catch (InterruptedException e) {e.printStackTrace();
            }
        });

        // 用户自定义定时工作,能够领有延时(delay)的成果
        ctx.channel().eventLoop().schedule(()->{
            try {Thread.sleep(5000);
                ctx.writeAndFlush(Unpooled.copiedBuffer("hello, sechdule sleep 5000", CharsetUtil.UTF_8));
            } catch (InterruptedException e) {e.printStackTrace();
            }
        },5, TimeUnit.SECONDS);

        // 把所有 channel 放在一个汇合中,进行转发
        channels.stream().forEach(record->{record.writeAndFlush(Unpooled.copiedBuffer("群发音讯", CharsetUtil.UTF_8));
        });

        System.out.println("客户端发送音讯是:" + buf.toString(CharsetUtil.UTF_8));
        System.out.println("客户端地址:" + channel.remoteAddress());


    }

    // 当服务器端实现
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~", CharsetUtil.UTF_8));
    }

    // 当产生异样的时候
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();
    }
}
退出移动版