Netty是一个异步基于事件驱动的高性能网络通信框架,能够看做是对NIO和BIO的封装,并提供了简略易用的API、Handler和工具类等,用以疾速开发高性能、高可靠性的网络服务端和客户端程序。
- 创立服务端
服务端启动须要创立 ServerBootstrap 对象,并实现初始化线程模型,配置IO模型和增加业务解决逻辑(Handler)。在增加业务解决逻辑时,调用的是 childHandler() 办法增加了一个 ChannelInitializer,代码示例如下
// 负责服务端的启动
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 以下两个对象能够看做是两个线程组
// boss线程组负责监听端口,承受新的连贯
NioEventLoopGroup boss = new NioEventLoopGroup();
// worker线程组负责读取数据
NioEventLoopGroup worker = new NioEventLoopGroup();
// 配置线程组并指定NIO模型
serverBootstrap.group(boss, worker).channel(NioServerSocketChannel.class)
// 定义后续每个 新连贯 的读写业务逻辑 .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception { nioSocketChannel.pipeline() // 增加业务解决逻辑 .addLast(new SimpleChannelInboundHandler<String>() { @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception { System.out.println(msg); } }); } });
// 绑定端口号
serverBootstrap.bind(2002);
复制代码
通过调用 .channel(NioServerSocketChannel.class) 办法指定 Channel 类型为NIO类型,如果要指定为BIO类型,参数改成 OioServerSocketChannel.class 即可。
其中 nioSocketChannel.pipeline() 用来获取 PipeLine 对象,调用办法 addLast() 增加必要的业务解决逻辑,这里采纳的是责任链模式,会将每个Handler作为一个节点进行解决。
1.1 创立客户端
客户端与服务端启动相似,不同的是,客户端须要创立 Bootstrap 对象来启动,并指定一个客户端线程组,雷同的是都须要实现初始化线程模型,配置IO模型和增加业务解决逻辑(Handler), 代码示例如下
// 负责客户端的启动
Bootstrap bootstrap = new Bootstrap();
// 客户端的线程模型
NioEventLoopGroup group = new NioEventLoopGroup();
// 指定线程组和NIO模型
bootstrap.group(group).channel(NioSocketChannel.class)
// handler() 办法封装业务解决逻辑 .handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel channel) throws Exception { channel.pipeline() // 增加业务解决逻辑 .addLast(new SimpleChannelInboundHandler<String>() { @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception { System.out.println(msg); } }); } });
// 连贯服务端IP和端口
bootstrap.connect("127.0.0.1", 2002);
复制代码
(留神:下文中内容均以服务端代码示例为准)
编码和解码
客户端与服务端进行通信,通信的音讯是以二进制字节流的模式通过 Channel 进行传递的,所以当咱们在客户端封装好Java业务对象后,
须要将其依照协定转换成字节数组,并且当服务端承受到该二进制字节流时,须要将其依据协定再次解码成Java业务对象进行逻辑解决,
这就是编码和解码的过程。Netty 为咱们提供了 MessageToByteEncoder 用于编码,ByteToMessageDecoder 用于解码。
2.1 MessageToByteEncoder
用于将Java对象编码成字节数组并写入 ByteBuf,代码示例如下
public class TcpEncoder extends MessageToByteEncoder<Message> {/**
- 序列化器
*/
private final Serializer serializer;
public TcpEncoder(Serializer serializer) {
this.serializer = serializer;
}
/**
- 编码的执行逻辑
* - @param message 须要被编码的音讯对象
- @param byteBuf 将字节数组写入ByteBuf
*/
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf byteBuf) throws Exception {// 通过自定义的序列化器将对象转换成字节数组 byte[] bytes = serializer.serialize(message); // 将字节数组写入 ByteBuf 便实现了对象的编码流程 byteBuf.writeBytes(bytes);
}
}
复制代码
2.2 ByteToMessageDecoder
它用于将接管到的二进制数据流解码成Java对象,与上述代码相似,只不过是将该过程反过来了而已,代码示例如下
public class TcpDecoder extends ByteToMessageDecoder {
/**- 序列化器
*/
private final Serializer serializer;
public TcpDecoder(Serializer serializer) {
this.serializer = serializer;
}
/**
- 解码的执行逻辑
* - @param byteBuf 接管到的ByteBuf对象
- @param list 任何实现解码的Java对象增加到该List中即可
*/
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> list) throws Exception {// 依据协定自定义的解码逻辑将其解码成Java对象 Message message = serializer.deSerialize(byteBuf); // 解码实现后增加到List中即可 list.add(message);
}
}
复制代码
2.3 留神要点
ByteBuf默认状况下应用的是堆外内存,不进行内存开释会产生内存溢出。不过ByteToMessageDecoder 和 MessageToByteEncoder 这两个解码和编码Handler 会主动帮咱们实现内存开释的操作,无需再次手动开释。因为咱们实现的 encode() 和 decode() 办法只是这两个 Handler 源码中执行的一个环节,最终会在 finally 代码块中实现对内存的开释,具体内容可浏览 MessageToByteEncoder 中第99行 write() 办法源码。
2.4 在服务端中增加编码解码Handler
serverBootstrap.group(boss, worker).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception { nioSocketChannel.pipeline() // 接管到申请时进行解码 .addLast(new TcpDecoder(serializer)) // 发送申请时进行编码 .addLast(new TcpEncoder(serializer)); } });
复制代码
- 序列化器
增加业务解决Handler
在Netty框架中,客户端与服务端的每个连贯都对应着一个 Channel,而这个 Channel 的所有解决逻辑都封装在一个叫作 ChannelPipeline 的对象里。ChannelPipeline 是一个双向链表,它应用的是责任链模式,每个链表节点都是一个 Handler,能通它能获取 Channel 相干的上下文信息(ChannelHandlerContext)。Netty为咱们提供了多种读取 Channel 中数据的 Handler,其中比拟罕用的是 ChannelInboundHandlerAdapter 和 SimpleChannelInboundHandler,下文中咱们以读取心跳音讯为例。
3.1 ChannelInboundHandlerAdapter
如下为解决心跳业务逻辑的 Handler,具体执行逻辑参考代码和正文即可
public class HeartBeatHandler extends ChannelInboundHandlerAdapter {/**
- channel中有数据可读时,会回调该办法
* - @param msg 如果在该Handler前没有解码Handler节点解决,该对象类型为ByteBuf;否则为解码后的Java对象
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {Message message = (Message) msg; // 解决心跳音讯 processHeartBeatMessage(message); // 初始化Ack音讯 Message ackMessage = initialAckMessage(); // 回写给客户端 ctx.channel().writeAndFlush(ackMessage);
}
}
复制代码
3.2 SimpleChannelInboundHandler
SimpleChannelInboundHandler 是 ChannelInboundHandlerAdapter 的实现类,SimpleChannelInboundHandler 可能指定泛型,这样在解决业务逻辑时,便无需再增加上文代码中对象强转的逻辑,这部分代码实现是在 SimpleChannelInboundHandler 的 channelRead() 办法中实现的,它是一个模版办法,咱们仅仅须要实现 channelRead0() 办法即可,代码示例如下
public class HeartBeatHandler extends SimpleChannelInboundHandler<Message> {/**
- @param msg 留神这里的对象类型即为 Message
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception {// 解决心跳音讯 processHeartBeatMessage(message); // 初始化Ack音讯 Message ackMessage = initialAckMessage(); // 回写给客户端 ctx.channel().writeAndFlush(ackMessage);
}
}
复制代码
3.3 在服务端中增加心跳解决Handler
serverBootstrap.group(boss, worker).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception { nioSocketChannel.pipeline() // 接管到进行解码 .addLast(new TcpDecoder(serializer)) // 心跳业务解决Handler .addLast(new HeartBeatHandler()) // 发送申请时进行编码 .addLast(new TcpEncoder(serializer)); } });
复制代码
- channel中有数据可读时,会回调该办法
ChannelHandler的生命周期
在 ChannelInboundHandlerAdapter 能够通过实现不同的办法来实现指定机会的办法回调,具体可参考如下代码
public class LifeCycleHandler extends ChannelInboundHandlerAdapter {
/**- 当检测到新连贯之后,调用 ch.pipeline().addLast(...); 之后的回调
- 示意以后channel中胜利增加了 Handler
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {System.out.println("逻辑处理器被增加时回调:handlerAdded()"); super.handlerAdded(ctx);
}
/**
- 示意以后channel的所有逻辑解决曾经和某个NIO线程建设了绑定关系
- 这里的NIO线程通常指的是 NioEventLoop
*/
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {System.out.println("channel 绑定到线程(NioEventLoop)时回调:channelRegistered()"); super.channelRegistered(ctx);
}
/**
- 当Channel的所有业务逻辑链筹备结束,连贯被激活时
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("channel 准备就绪时回调:channelActive()"); super.channelActive(ctx);
}
/**
- 客户端向服务端发送数据,示意有数据可读时,就会回调该办法
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("channel 有数据可读时回调:channelRead()"); super.channelRead(ctx, msg);
}
/**
- 服务端每残缺的读完一次数据,都会回调该办法
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {System.out.println("channel 某次数据读完时回调:channelReadComplete()"); super.channelReadComplete(ctx);
}
// ---断开连接时---
/**
- 该客户端与服务端的连贯被敞开时回调
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {System.out.println("channel 被敞开时回调:channelInactive()"); super.channelInactive(ctx);
}
/**
- 对应的NIO线程移除了对这个连贯的解决
*/
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {System.out.println("channel 勾销线程(NioEventLoop) 的绑定时回调: channelUnregistered()"); super.channelUnregistered(ctx);
}
/**
- 为该连贯增加的所有业务逻辑Handler被移除时
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {System.out.println("逻辑处理器被移除时回调:handlerRemoved()"); super.handlerRemoved(ctx);
}
}
复制代码- 解决粘包和半包问题
即便咱们发送音讯的时候是以 ByteBuf 的模式发送的,然而到了底层操作系统,依然是以字节流的模式对数据进行发送的,而且服务端也以字节流的模式读取,因而在服务端对字节流进行拼接时,可能就会造成发送时 ByteBuf 与读取时的 ByteBuf 不对等的状况,这就是所谓的粘包或半包景象。
以如下状况为例,当客户端频繁的向服务端发送心跳音讯时,读取到的ByteBuf信息如下,其中一个心跳申请是用红框圈出的局部
能够发现多个心跳申请"粘"在了一起,那么咱们须要对它进行拆包解决,否则只会读取第一条心跳申请,之后的申请会全副生效
Netty 为咱们提供了基于长度的拆包器 LengthFieldBasedFrameDecoder 来进行拆包工作,它能对超过所需数据量的包进行拆分,也能在数据有余的时候期待读取,直到数据足够时,形成一个残缺的数据包并进行业务解决。
5.1 LengthFieldBasedFrameDecoder
以标准接口文档中的协定(图示)为准,代码示例如下,其中的四个参数比拟重要,详细信息可见正文形容
public class SplitHandler extends LengthFieldBasedFrameDecoder {
/** * 在协定中示意数据长度的字段在字节流首尾中的偏移量 */private static final Integer LENGTH_FIELD_OFFSET = 10;/** * 示意数据长度的字节长度 */private static final Integer LENGTH_FIELD_LENGTH = 4;/** * 数据长度后边的头信息中的字节偏移量 */private static final Integer LENGTH_ADJUSTMENT = 10;/** * 示意从第一个字节开始须要舍去的字节数,在咱们的协定中,不须要进行舍去 */private static final Integer INITIAL_BYTES_TO_STRIP = 0;public SplitHandler() { super(Integer.MAX_VALUE, LENGTH_FIELD_OFFSET, LENGTH_FIELD_LENGTH, LENGTH_ADJUSTMENT, INITIAL_BYTES_TO_STRIP);}
}
复制代码
之后将其增加到Handler中即可,如果遇到其余协定,更改其中参数或查看LengthFieldBasedFrameDecoder 的JavaDoc中详细描述。
5.2 在服务端中增加拆包Handler
serverBootstrap.group(boss, worker).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception { nioSocketChannel.pipeline() // 拆包Handler .addLast(new SplitHandler()) // 接管到进行解码 .addLast(new TcpDecoder(serializer)) // 心跳业务解决Handler .addLast(new HeartBeatHandler()) // 发送申请时进行编码 .addLast(new TcpEncoder(serializer)); } });
复制代码
Netty性能优化
6.1 Handler对单例模式的利用
Netty 在每次有新连贯到来的时候,都会调用 ChannelInitializer 的 initChannel() 办法,会将其中相干的 Handler 都创立一次,如果其中的 Handler 是无状态且可能通用的,能够将其改成单例,这样就可能在每次连贯建设时,防止屡次创立雷同的对象。
以如下服务端代码为例,蕴含如下Handler,能够将编码解码、以及业务解决Handler都定义成Spring单例bean的模式注入进来,这样就可能实现对象的复用而无需每次建设连贯都创立雷同的对象了
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline() // 拆包Handler .addLast(new SplitHandler()) // 日志Handler .addLast(new LoggingHandler(LogLevel.INFO)) // 解码Handler .addLast(new TcpDecoder(serializer)) // 心跳、格口状态、设施状态、RFID上报、扫码上报和分拣后果上报Handler .addLast(new HeartBeatHandler(), new ChuteStatusHandler()) .addLast(new DeviceStatusReceiveHandler(), new RfidBindReceiveHandler()) .addLast(new ScanReceiveHandler(), new SortResultHandler()) // 编码Handler .addLast(new TcpEncoder(serializer)); } });
复制代码
革新实现后如下
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline() // 拆包Handler .addLast(new SplitHandler()) // 日志Handler .addLast(new LoggingHandler(LogLevel.INFO)) // 解码Handler .addLast(tcpDecoder) // 心跳、格口状态、设施状态、RFID上报、扫码上报和分拣后果上报Handler .addLast(heartBeatHandler, chuteStatusHandler) .addLast(deviceStatusReceiveHandler, rfidBindReceiveHandler) .addLast(scanReceiveHandler, sortResultHandler) // 编码Handler .addLast(tcpEncoder); } });
复制代码
不过须要留神在每个单例Handler的类上标注 @ChannelHandler.Sharable 注解,否则会抛出如下异样
io.netty.channel.ChannelPipelineException: netty.book.practice.handler.server.LoginHandler is not a @Sharable handler, so can't be added or removed multiple times
另外,SplitHanlder 不能进行单例解决,因为它的外部实现与每个 Channel 都无关,每个 SplitHandler 都须要维持每个 Channel 读到的数据,即它是有状态的。
6.2 缩短责任链调用
对服务端来说,每次解码进去的Java对象在多个业务解决 Handler 中只会通过一个其中 Handler 实现业务解决,那么咱们将所有业务相干的 Handler封装起来到一个Map中,每次只让它通过必要的Handler而不是通过整个责任链,那么便能够进步Netty解决申请的性能。
定义如下 ServerHandlers 单例bean,并应用 策略模式 将对应的 Handler 治理起来,每次解决时依据音讯类型获取对应的 Handler 来实现业务逻辑
@ChannelHandler.Sharable
public class ServerHandlers extends SimpleChannelInboundHandler<Message> {@Resourse
private HeartBeatHandler heartBeatHandler;/**
- 策略模式封装Handler,这样就能在回调 ServerHandler 的 channelRead0 办法时
- 找到具体的Handler,而不须要通过责任链的每个 Handler 节点,以此来进步性能
*/
private final Map<Command, SimpleChannelInboundHandler<Message>> map;
public ServerHandler() {
map = new HashMap<>(); // key: 音讯类型枚举 value: 对应的Handler map.put(MessageType.HEART_BEAT, heartBeatHandler); // ...
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception {// 调用 channelRead() 办法实现业务逻辑解决 map.get(msg.getMessageType()).channelRead(ctx, msg);
}
}
复制代码
革新实现后,服务端代码如下,因为咱们封装了平行的业务解决 Handler,所以代码很清新
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline() // 拆包Handler .addLast(new SplitHandler()) // 日志Handler .addLast(new LoggingHandler(LogLevel.INFO)) // 解码Handler .addLast(tcpDecoder) // serverHandlers 封装了 心跳、格口状态、设施状态、RFID上报、扫码上报和分拣后果上报Handler .addLast(serverHandlers) // 编码Handler .addLast(tcpEncoder); } });
复制代码
6.3 合并编码、解码Handler
Netty 对编码解码提供了对立解决Handler是MessageToMessageCodec,这样咱们就能将编码和解码的Handler合并成一个增加接口,代码示例如下
@ChannelHandler.Sharable
public class MessageCodecHandler extends MessageToMessageCodec<ByteBuf, Message> {/**
- 序列化器
*/
@Resourse
private Serializer serializer;@Override
protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> out) throws Exception {// 将字节数组写入 ByteBuf ByteBuf byteBuf = ctx.alloc().ioBuffer(); serializer.serialize(byteBuf, msg); // 这个编码也须要增加到List中 out.add(byteBuf);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) {// 依据协定自定义的解码逻辑将其解码成Java对象,并增加到List中 out.add(serializer.deSerialize(msg));
}
}
复制代码
革新实现后,服务端代码如下,将其放在业务解决Handler前即可,调用完业务Handler逻辑,会执行编码逻辑
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline() // 拆包Handler .addLast(new SplitHandler()) // 日志Handler .addLast(new LoggingHandler(LogLevel.INFO)) // 解码、编码Handler .addLast(messageCodecHandler) // serverHandlers 封装了 心跳、格口状态、设施状态、RFID上报、扫码上报和分拣后果上报Handler .addLast(serverHandlers); } });
复制代码
6.4 缩小NIO线程阻塞
对于耗时的业务操作,须要将它们都丢到业务线程池中去解决,因为单个NIO线程会治理很多 Channel ,只有有一个 Channel 中的 Handler 的 channelRead() 办法被业务逻辑阻塞,那么它就会拖慢绑定在该NIO线程上的其余所有 Channel。
为了防止上述情况,能够在蕴含长时间业务解决逻辑的Handler中创立一个线程池,并将其丢入线程池中进行执行,伪代码如下
protected void channelRead(ChannelHandlerContext ctx, Object message) {
threadPool.submit(new Runnable() {// 耗时的业务解决逻辑 doSomethingSependTooMuchTime(); writeAndFlush();
});
}
复制代码
6.5 闲暇"假死"检测Handler
如果底层的TCP连贯曾经断开,然而另一端服务并没有捕捉到,在某一端(客户端或服务端)看来会认为这条连贯依然存在,这就是连贯"假死"景象。这造成的问题就是,对于服务端来说,每个连贯连贯都会消耗CPU和内存资源,过多的假死连贯会造成性能降落和服务解体;对客户端来说,连贯假死会使得发往服务端的申请都会超时,所以须要尽可能防止假死景象的产生。
造成假死的起因可能是公网丢包、客户端或服务端网络故障等,Netty为咱们提供了IdleStateHandler 来解决超时假死问题,示例代码如下
public class MyIdleStateHandler extends IdleStateHandler {private static final int READER_IDLE_TIME = 15;
public MyIdleStateHandler() {
// 读超时工夫、写超时工夫、读写超时工夫 指定0值不判断超时 super(READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS);
}
@Override
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) {System.out.println(READER_IDLE_TIME + "秒内没有读到数据,敞开连贯"); ctx.channel().close();
}
}
复制代码
其构造方法中有三个参数来别离指定读、写和读写超时工夫,当指定0时不判断超时,除此之外Netty也有专门用来解决读和写超时的Handler,别离为 ReadTimeoutHandler, WriteTimeoutHandler。
将其增加到服务端 Handler 的首位即可
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline() // 超时判断Handler .addLast(new MyIdleStateHandler()) // 拆包Handler .addLast(new SplitHandler()) // 日志Handler .addLast(new LoggingHandler(LogLevel.INFO)) // 解码、编码Handler .addLast(messageCodecHandler) // serverHandlers 封装了 心跳、格口状态、设施状态、RFID上报、扫码上报和分拣后果上报Handler .addLast(serverHandlers); } });