为继续夯实MobTech袤博科技的数智技术创新能力和技术布道能力,本期极客星球邀请了企业服务研发部工程师梁立从 TCP 的粘包/半包、 Netty 解决粘包/半包及源码剖析、 开源我的项目对 channelHandler最佳实际三方面对《netty 中channelHandler的原理与最佳实际》进行了全面的技术分享。
版本信息
本次剖析版本基于netty 4.1.40.Final
TCP 的粘包/半包问题
在TCP/IP 协定传输网络数据包时,用户发送音讯ABCD,服务端可能收到是ABCD. AB?CD?等。对于粘包问题,次要起因是发送方每次写入数据小于套接字缓冲区大小, 以及接受方读取音讯不及时。对于半包问题, 次要起因是发送方每次写入数据大于套接字缓冲区大小,以及发送数据大于协定最大传输单位,底层须要拆包。那么针对此类问题,该当如何解决呢 ?常见的形式解码形式有三种:固定长度,应用固定分隔符来宰割音讯,以及固网长度字段寄存内容长度信息。
解码实现思考
在剖析之前,咱们能够思考一下,如果是咱们来实现下面三种编解码会如何实现 ?
咱们能够整顿如下需要:
1.咱们须要寄存咱们解码好的音讯;
2.咱们须要提供一个解码办法来让不同子类实现, 例如固定长度,分隔符,以及固定长度字段解码的形式必定有差异;
3.咱们从套接字读取音讯后就能够让咱们解码器去解决了。
针对上述需要,咱们还须要带着三个问题,查看源码看下是否和咱们猜测的相似:
问题1:咱们须要一个汇合寄存咱们解码的音讯;
问题2:咱们须要不同子类对解码细节做不同实现,所以咱们须要有一个父类;ByteToMessageDecoder, 能够在父类实现公共逻辑,提供给子类一个decode(List out,ByteBuf in); 办法;
问题3 :咱们从套接字读取数据之后,发送一个读事件(fireChannelRead)让咱们解码器去解决。
Netty 解决粘包/半包及源码剖析
封帧形式
解码
固定长度
FixedLengthFrameDecoder
分隔符
DelimiterBasedFrameDecoder
固定长度字段存内容长度信息
LengthFieldBasedFrameDecoder
咱们以固定长度解码器为例:
ServerBootstrap b = new ServerBootstrap();
// ....
b..childHandler(new ChannelInitializer<SocketChannel>() {
@Overridepublic void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new FixedLengthFrameDecoder(2)); //.... 后续业务解决handler }
});
public class FixedLengthFrameDecoder extends ByteToMessageDecoder {
//....
}
public class ByteToMessageDecoder {
// ....protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
}
咱们查看 FixedLengthFrameDecoder ,发现果然继承父类ByteToMessageDecoder,而后父类也有一个channelRead办法解决音讯,并提供一个decode形象办法让子类实现。
channelRead
假如咱们发送端发送ABCD音讯,从套节字读取之后,后续会调用channelRead 办法进行解码。
咱们看到获取一个汇合实例CodecOutputList, 该类实现List接口。如果是首次调用,会把以后ByteBuf 赋值给cumulation,并调用callDecode(ctx, cumulation, out)。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf) { CodecOutputList out = CodecOutputList.newInstance(); try { ByteBuf data = (ByteBuf) msg; first = cumulation == null; if (first) { cumulation = data; } else { cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data); } callDecode(ctx, cumulation, out); } catch (DecoderException e) { throw e; } catch (Exception e) { throw new DecoderException(e); } finally { //..... } } else { ctx.fireChannelRead(msg); }}
callDecode
通过字面意思就晓得这个办法会做和解码相干操作。首先会判断in.isReadable() 是否可读,而后咱们的outSize 目前是空, 进入到 decodeRemovalReentryProtection , 该办法会调用子类FixedLengthFrameDecoder的decode办法进行具体解码,该decode 办法比较简单就是当从ByteBuf 读取到指定长度就增加到out 中。咱们读取实现后, outSize == out.size() 和 oldInputLength == in.readableBytes()都不满足,进入下一次循环, 咱们outSize 大于0, 发送fireChannelRead。到此音讯就被解码,并发送给咱们业务channelHandler 。
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try { while (in.isReadable()) { int outSize = out.size(); if (outSize > 0) { fireChannelRead(ctx, out, outSize); out.clear(); // Check if this handler was removed before continuing with decoding. // If it was removed, it is not safe to continue to operate on the buffer. // // See: // - https://github.com/netty/netty/issues/4635 if (ctx.isRemoved()) { break; } outSize = 0; } int oldInputLength = in.readableBytes(); //decode中时,不能执行完handler remove清理操作。 //那decode完之后须要清理数据。 decodeRemovalReentryProtection(ctx, in, out); // Check if this handler was removed before continuing the loop. // If it was removed, it is not safe to continue to operate on the buffer. // // See https://github.com/netty/netty/issues/1664 if (ctx.isRemoved()) { break; } if (outSize == out.size()) { if (oldInputLength == in.readableBytes()) { break; } else { continue; } } if (oldInputLength == in.readableBytes()) { throw new DecoderException( StringUtil.simpleClassName(getClass()) + ".decode() did not read anything but decoded a message."); } if (isSingleDecode()) { break; } }} catch (DecoderException e) { throw e;} catch (Exception cause) { throw new DecoderException(cause);}
}
final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
throws Exception {decodeState = STATE_CALLING_CHILD_DECODE;try { decode(ctx, in, out);} finally { boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING; decodeState = STATE_INIT; if (removePending) { handlerRemoved(ctx); }}
}
public class FixedLengthFrameDecoder extends ByteToMessageDecoder {
@Override
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { Object decoded = decode(ctx, in); if (decoded != null) { out.add(decoded); }}protected Object decode( @SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception { if (in.readableBytes() < frameLength) { return null; } else { return in.readRetainedSlice(frameLength); }}
}
channelHandler 的最佳实际
理解Netty 的小伙伴都晓得channelHandler 分为ChannelInboundHandler 和 ChannelOutboundHandler, 别离用来解决inbound 和 outbound。
channelHandler 的最佳实际实质就是inbound 和outbound 的最佳实际。
上面列举了三种具备代表性的场景
• 依照职责划分channelHandler,例如有解决编解码,有解决心跳的,有专门解决业务的;
• 因为channel和eventLoop 线程绑定,而后一个evnetLoop 可能服务多个channel,所以咱们不要在channelHandler 做耗时操作;
• outbound 咱们能够优化写,缩小零碎调用。
依照职责划分channelHandler
rocketMq
咱们能够查看rocketMq 是如何划分channelHandler , 比方具备专门解决编/解码的NettyEncoder/NettyDecoder,通过IdleStatHandler 发现不沉闷连贯,治理连贯handlerNettyConnectManageHandler 进行解决,
业务解决 NettyServerHandler 。
dubbo
解决编解码,查看不沉闷channel,以及业务解决handler。
不在channelHandler 做耗时操作
之前介绍过一个eventLoop 线程服务多个channel,假如某个channelHandler解决耗时的工作,会影响其余channel,所以咱们不要在channelHandler 执行耗时操作。
如果的确须要执行耗时操作,咱们能够给channelHandler 增加一个线程池解决
final DefaultEventLoopGroup defaultEventLoopGroup = new DefaultEventLoopGroup();
// 为咱们的serverHandler 增加独自的线程池处理事件。
pipeline.addLast(defaultEventLoopGroup,serverHandler);
outbound 优化写
writeAndFlush存在的问题
咱们来看一下上面代码有什么问题?
public class EchoServerHandler
extends ChannelInboundHandlerAdapter {
@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) { ctx.writeAndFlush(msg);}
}
代码的问题在于ctx.writeAndFlush 每次调用都会触发一次零碎调用。而后channelRead 在一次业务解决中可能被调用屡次,问题就变为一次业务申请,执行屡次零碎调用。
优化writeAndFlush
怎么优化?
咱们能够重写channelRead 和 channelReadComplete,在channelRead 中调用write 办法,
在channelReadComplete中调用flush 办法 。
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) { ctx.write(msg);}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush();}
}
下面的实现形式的确缩小零碎调用,然而在netty 外部当有数据可读,会默认会间断16次,最初在调用channelReadComplete() 办法。
默认的行为存在两个问题:
1.写出数据到对端的工夫被提早了;
2.默认16 次这个数据不肯定适宜所有业务场景(不够灵便)。
咱们须要联合业务的个性,例如业务如果关注吞吐量,能够适当把读取几次后刷新设置的大一些。如果业务关注及时性,读取几次后刷新就适当设置小一点。基于上述需要,FlushConsolidationHandler 就诞生了, 能够指定读取几次后刷新一次。
FlushConsolidationHandler 优化写
应用在pipeline中增加FlushConsolidationHandler,读取几次刷新一次能够依据业务设置,例如这里设置5次,咱们是优化 EchoServerHandler的写,就放在它的后面。
// 每5次就触发一次flush
// ....
p.addLast(new FlushConsolidationHandler(5));
p.addLast(new EchoServerHandler());
// ....
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) { ctx.writeAndFlush(msg);}
}
原理剖析:
首先FlushConsolidationHandler 继承 ChannelDuplexHandler,能同时解决入站和出站音讯,
入站咱们查看 channelRead 和 channelReadComplete 实现,出站咱们查看 flush 办法 (没有对write办法进行重写)。
channelRead
• 设置readInProgress 就把事件向下传递
• 咱们的EchoServerHandler 会channelRead 会被调用,咱们在channelRead 中调用ctx.writeAndFlush。
• 触发write 和 flush 的出站音讯, FlushConsolidationHandler的flush进行解决
• 先判断readInProgress, ++flushPendingCount == explicitFlushAfterFlushes 判断是否达到冀望刷新次数,咱们设置为5 ,不执行刷新。
• 接着channelReadComplete 被调用,会重置筹备刷新次数,并执行刷新。
要害就在channelRead 和 channelReadComplete
假如咱们channelRead 读取了屡次, 当读取次数大于等于5次就会刷新,小于5次时由channelReadComplete 刷新。
这样就达到了缩小零碎调用并且每读取几次在刷新也能够配置
public class FlushConsolidationHandler extends ChannelDuplexHandler {
// explicitFlushAfterFlushes 示意几次flush后,才真正调用flush 办法// consolidateWhenNoReadInProgress 反对异步的状况,当readInProgress不为true 也能够反对flushpublic FlushConsolidationHandler(int explicitFlushAfterFlushes, boolean consolidateWhenNoReadInProgress){ //....}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { readInProgress = true; ctx.fireChannelRead(msg);}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception { // This may be the last event in the read loop, so flush now! // 外部就是将 readInProgress = false; 当flushPendingCount 就调用flush resetReadAndFlushIfNeeded(ctx); ctx.fireChannelReadComplete();}@Overridepublic void flush(ChannelHandlerContext ctx) throws Exception { //依据业务线程是否复用IO线程两种状况来思考: //复用状况 if (readInProgress) { //正在读的时候 // If there is still a read in progress we are sure we will see a channelReadComplete(...) call. Thus // we only need to flush if we reach the explicitFlushAfterFlushes limit. //每explicitFlushAfterFlushes个“批量”写(flush)一次 //有余怎么办?channelReadComplete会flush掉前面的 if (++flushPendingCount == explicitFlushAfterFlushes) { flushNow(ctx); } //以下是非复用状况:异步状况 } else if (consolidateWhenNoReadInProgress) { //(业务异步化状况下)开启consolidateWhenNoReadInProgress时,优化flush //(比方没有读申请了,然而外部还是忙的团团转,没有消化的时候,所以还是会写响应) // Flush immediately if we reach the threshold, otherwise schedule if (++flushPendingCount == explicitFlushAfterFlushes) { flushNow(ctx); } else { scheduleFlush(ctx); } } else { //(业务异步化状况下)没有开启consolidateWhenNoReadInProgress时,间接flush // Always flush directly flushNow(ctx); }}
}
附录
默认读取16次设置入口源码剖析
默认创立DefaultChannelConfig ,会接着调用重载的构造函数。
在setRecvByteBufAllocator能够看到获取metadata.defaultMaxMessagesPerRead()。
而ChannelMetadata 默认结构为 16次 new ChannelMetadata(false, 16)。
public abstract class AbstractNioByteChannel extends AbstractNioChannel {
private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);//.....
}
// 默认抉择自适应承受缓存分配器,而后在调用setRecvByteBufAllocator。
// setRecvByteBufAllocator就是指定最大读取多少次的入口 ,默认为16次
public class DefaultChannelConfig implements ChannelConfig {
public DefaultChannelConfig(Channel channel) { //除UDP外都默认抉择自适应承受缓存分配器 this(channel, new AdaptiveRecvByteBufAllocator());}protected DefaultChannelConfig(Channel channel, RecvByteBufAllocator allocator) { //UDP的应用固定SIZE的承受缓存分配器:FixedRecvByteBufAllocator setRecvByteBufAllocator(allocator, channel.metadata()); this.channel = channel;}
}
private void setRecvByteBufAllocator(RecvByteBufAllocator allocator, ChannelMetadata metadata) {
if (allocator instanceof MaxMessagesRecvByteBufAllocator) { ((MaxMessagesRecvByteBufAllocator) allocator).maxMessagesPerRead(metadata.defaultMaxMessagesPerRead());} else if (allocator == null) { throw new NullPointerException("allocator");}setRecvByteBufAllocator(allocator);
}
public final class ChannelMetadata {
private final boolean hasDisconnect;private final int defaultMaxMessagesPerRead;// ....
}