为继续夯实 MobTech 袤博科技的数智技术创新能力和技术布道能力,本期极客星球邀请了企业服务研发部工程师梁立从 TCP 的粘包 / 半包、Netty 解决粘包 / 半包及源码剖析、开源我的项目对 channelHandler 最佳实际三方面对《netty 中 channelHandler 的原理与最佳实际》进行了全面的技术分享。
版本信息
本次剖析版本基于 netty 4.1.40.Final
TCP 的粘包 / 半包问题
在 TCP/IP 协定传输网络数据包时,用户发送音讯 ABCD,服务端可能收到是 ABCD. AB?CD?等。对于粘包问题,次要起因是发送方每次写入数据小于套接字缓冲区大小,以及接受方读取音讯不及时。对于半包问题,次要起因是发送方每次写入数据大于套接字缓冲区大小,以及发送数据大于协定最大传输单位,底层须要拆包。那么针对此类问题,该当如何解决呢?常见的形式解码形式有三种:固定长度,应用固定分隔符来宰割音讯,以及固网长度字段寄存内容长度信息。
解码实现思考
在剖析之前,咱们能够思考一下,如果是咱们来实现下面三种编解码会如何实现?
咱们能够整顿如下需要:
1. 咱们须要寄存咱们解码好的音讯;
2. 咱们须要提供一个解码办法来让不同子类实现,例如固定长度,分隔符,以及固定长度字段解码的形式必定有差异;
3. 咱们从套接字读取音讯后就能够让咱们解码器去解决了。
针对上述需要,咱们还须要带着三个问题,查看源码看下是否和咱们猜测的相似:
问题 1:咱们须要一个汇合寄存咱们解码的音讯;
问题 2:咱们须要不同子类对解码细节做不同实现,所以咱们须要有一个父类;ByteToMessageDecoder,能够在父类实现公共逻辑,提供给子类一个 decode(List out,ByteBuf in); 办法;
问题 3:咱们从套接字读取数据之后,发送一个读事件 (fireChannelRead) 让咱们解码器去解决。
Netty 解决粘包 / 半包及源码剖析
封帧形式
解码
固定长度
FixedLengthFrameDecoder
分隔符
DelimiterBasedFrameDecoder
固定长度字段存内容长度信息
LengthFieldBasedFrameDecoder
咱们以固定长度解码器为例:
ServerBootstrap b = new ServerBootstrap();
// ….
b..childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();
p.addLast(new FixedLengthFrameDecoder(2));
//.... 后续业务解决 handler
}
});
public class FixedLengthFrameDecoder extends ByteToMessageDecoder {
//....
}
public class ByteToMessageDecoder {
// ....
protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
}
咱们查看 FixedLengthFrameDecoder,发现果然继承父类 ByteToMessageDecoder,而后父类也有一个 channelRead 办法解决音讯,并提供一个 decode 形象办法让子类实现。
channelRead
假如咱们发送端发送 ABCD 音讯,从套节字读取之后,后续会调用 channelRead 办法进行解码。
咱们看到获取一个汇合实例 CodecOutputList,该类实现 List 接口。如果是首次调用,会把以后 ByteBuf 赋值给 cumulation,并调用 callDecode(ctx, cumulation, out)。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if (msg instanceof ByteBuf) {CodecOutputList out = CodecOutputList.newInstance();
try {ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
if (first) {cumulation = data;} else {cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {throw e;} catch (Exception e) {throw new DecoderException(e);
} finally {//.....}
} else {ctx.fireChannelRead(msg);
}
}
callDecode
通过字面意思就晓得这个办法会做和解码相干操作。首先会判断 in.isReadable() 是否可读,而后咱们的 outSize 目前是空,进入到 decodeRemovalReentryProtection , 该办法会调用子类 FixedLengthFrameDecoder 的 decode 办法进行具体解码,该 decode 办法比较简单就是当从 ByteBuf 读取到指定长度就增加到 out 中。咱们读取实现后,outSize == out.size() 和 oldInputLength == in.readableBytes()都不满足,进入下一次循环,咱们 outSize 大于 0,发送 fireChannelRead。到此音讯就被解码,并发送给咱们业务 channelHandler。
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {while (in.isReadable()) {int outSize = out.size();
if (outSize > 0) {fireChannelRead(ctx, out, outSize);
out.clear();
// Check if this handler was removed before continuing with decoding.
// If it was removed, it is not safe to continue to operate on the buffer.
//
// See:
// - https://github.com/netty/netty/issues/4635
if (ctx.isRemoved()) {break;}
outSize = 0;
}
int oldInputLength = in.readableBytes();
//decode 中时,不能执行完 handler remove 清理操作。// 那 decode 完之后须要清理数据。decodeRemovalReentryProtection(ctx, in, out);
// Check if this handler was removed before continuing the loop.
// If it was removed, it is not safe to continue to operate on the buffer.
//
// See https://github.com/netty/netty/issues/1664
if (ctx.isRemoved()) {break;}
if (outSize == out.size()) {if (oldInputLength == in.readableBytes()) {break;} else {continue;}
}
if (oldInputLength == in.readableBytes()) {
throw new DecoderException(StringUtil.simpleClassName(getClass()) +
".decode() did not read anything but decoded a message.");
}
if (isSingleDecode()) {break;}
}
} catch (DecoderException e) {throw e;} catch (Exception cause) {throw new DecoderException(cause);
}
}
final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
throws Exception {
decodeState = STATE_CALLING_CHILD_DECODE;
try {decode(ctx, in, out);
} finally {
boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
decodeState = STATE_INIT;
if (removePending) {handlerRemoved(ctx);
}
}
}
public class FixedLengthFrameDecoder extends ByteToMessageDecoder {
@Override
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {Object decoded = decode(ctx, in);
if (decoded != null) {out.add(decoded);
}
}
protected Object decode(@SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception {if (in.readableBytes() < frameLength) {return null;} else {return in.readRetainedSlice(frameLength);
}
}
}
channelHandler 的最佳实际
理解 Netty 的小伙伴都晓得 channelHandler 分为 ChannelInboundHandler 和 ChannelOutboundHandler,别离用来解决 inbound 和 outbound。
channelHandler 的最佳实际实质就是 inbound 和 outbound 的最佳实际。
上面列举了三种具备代表性的场景
• 依照职责划分 channelHandler,例如有解决编解码,有解决心跳的,有专门解决业务的;
• 因为 channel 和 eventLoop 线程绑定,而后一个 evnetLoop 可能服务多个 channel,所以咱们不要在 channelHandler 做耗时操作;
• outbound 咱们能够优化写,缩小零碎调用。
依照职责划分 channelHandler
rocketMq
咱们能够查看 rocketMq 是如何划分 channelHandler,比方具备专门解决编 / 解码的 NettyEncoder/NettyDecoder,通过 IdleStatHandler 发现不沉闷连贯,治理连贯 handlerNettyConnectManageHandler 进行解决,
业务解决 NettyServerHandler。
dubbo
解决编解码,查看不沉闷 channel,以及业务解决 handler。
不在 channelHandler 做耗时操作
之前介绍过一个 eventLoop 线程服务多个 channel,假如某个 channelHandler 解决耗时的工作,会影响其余 channel,所以咱们不要在 channelHandler 执行耗时操作。
如果的确须要执行耗时操作,咱们能够给 channelHandler 增加一个线程池解决
final DefaultEventLoopGroup defaultEventLoopGroup = new DefaultEventLoopGroup();
// 为咱们的 serverHandler 增加独自的线程池处理事件。
pipeline.addLast(defaultEventLoopGroup,serverHandler);
outbound 优化写
writeAndFlush 存在的问题
咱们来看一下上面代码有什么问题?
public class EchoServerHandler
extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {ctx.writeAndFlush(msg);
}
}
代码的问题在于 ctx.writeAndFlush 每次调用都会触发一次零碎调用。而后 channelRead 在一次业务解决中可能被调用屡次,问题就变为一次业务申请,执行屡次零碎调用。
优化 writeAndFlush
怎么优化?
咱们能够重写 channelRead 和 channelReadComplete,在 channelRead 中调用 write 办法,
在 channelReadComplete 中调用 flush 办法。
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {ctx.flush();
}
}
下面的实现形式的确缩小零碎调用,然而在 netty 外部当有数据可读,会默认会间断 16 次,最初在调用 channelReadComplete() 办法。
默认的行为存在两个问题:
1. 写出数据到对端的工夫被提早了;
2. 默认 16 次这个数据不肯定适宜所有业务场景(不够灵便)。
咱们须要联合业务的个性,例如业务如果关注吞吐量,能够适当把读取几次后刷新设置的大一些。如果业务关注及时性,读取几次后刷新就适当设置小一点。基于上述需要,FlushConsolidationHandler 就诞生了,能够指定读取几次后刷新一次。
FlushConsolidationHandler 优化写
应用在 pipeline 中增加 FlushConsolidationHandler,读取几次刷新一次能够依据业务设置,例如这里设置 5 次,咱们是优化 EchoServerHandler 的写,就放在它的后面。
// 每 5 次就触发一次 flush
// ….
p.addLast(new FlushConsolidationHandler(5));
p.addLast(new EchoServerHandler());
// ….
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {ctx.writeAndFlush(msg);
}
}
原理剖析:
首先 FlushConsolidationHandler 继承 ChannelDuplexHandler,能同时解决入站和出站音讯,
入站咱们查看 channelRead 和 channelReadComplete 实现,出站咱们查看 flush 办法(没有对 write 办法进行重写)。
channelRead
• 设置 readInProgress 就把事件向下传递
• 咱们的 EchoServerHandler 会 channelRead 会被调用, 咱们在 channelRead 中调用 ctx.writeAndFlush。
• 触发 write 和 flush 的出站音讯,FlushConsolidationHandler 的 flush 进行解决
• 先判断 readInProgress,++flushPendingCount == explicitFlushAfterFlushes 判断是否达到冀望刷新次数,咱们设置为 5,不执行刷新。
• 接着 channelReadComplete 被调用,会重置筹备刷新次数,并执行刷新。
要害就在 channelRead 和 channelReadComplete
假如咱们 channelRead 读取了屡次,当读取次数大于等于 5 次就会刷新,小于 5 次时由 channelReadComplete 刷新。
这样就达到了缩小零碎调用并且每读取几次在刷新也能够配置
public class FlushConsolidationHandler extends ChannelDuplexHandler {
// explicitFlushAfterFlushes 示意几次 flush 后,才真正调用 flush 办法
// consolidateWhenNoReadInProgress 反对异步的状况,当 readInProgress 不为 true 也能够反对 flush
public FlushConsolidationHandler(int explicitFlushAfterFlushes, boolean consolidateWhenNoReadInProgress){//....}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
readInProgress = true;
ctx.fireChannelRead(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// This may be the last event in the read loop, so flush now!
// 外部就是将 readInProgress = false; 当 flushPendingCount 就调用 flush
resetReadAndFlushIfNeeded(ctx);
ctx.fireChannelReadComplete();}
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
// 依据业务线程是否复用 IO 线程两种状况来思考:// 复用状况
if (readInProgress) { // 正在读的时候
// If there is still a read in progress we are sure we will see a channelReadComplete(...) call. Thus
// we only need to flush if we reach the explicitFlushAfterFlushes limit.
// 每 explicitFlushAfterFlushes 个“批量”写(flush)一次
// 有余怎么办?channelReadComplete 会 flush 掉前面的
if (++flushPendingCount == explicitFlushAfterFlushes) {flushNow(ctx);
}
// 以下是非复用状况:异步状况
} else if (consolidateWhenNoReadInProgress) {
//(业务异步化状况下)开启 consolidateWhenNoReadInProgress 时,优化 flush
//(比方没有读申请了,然而外部还是忙的团团转,没有消化的时候,所以还是会写响应)// Flush immediately if we reach the threshold, otherwise schedule
if (++flushPendingCount == explicitFlushAfterFlushes) {flushNow(ctx);
} else {scheduleFlush(ctx);
}
} else {
//(业务异步化状况下)没有开启 consolidateWhenNoReadInProgress 时,间接 flush
// Always flush directly
flushNow(ctx);
}
}
}
附录
默认读取 16 次设置入口源码剖析
默认创立 DefaultChannelConfig,会接着调用重载的构造函数。
在 setRecvByteBufAllocator 能够看到获取 metadata.defaultMaxMessagesPerRead()。
而 ChannelMetadata 默认结构为 16 次 new ChannelMetadata(false, 16)。
public abstract class AbstractNioByteChannel extends AbstractNioChannel {
private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
//.....
}
// 默认抉择自适应承受缓存分配器, 而后在调用 setRecvByteBufAllocator。
// setRecvByteBufAllocator 就是指定最大读取多少次的入口,默认为 16 次
public class DefaultChannelConfig implements ChannelConfig {
public DefaultChannelConfig(Channel channel) {
// 除 UDP 外都默认抉择自适应承受缓存分配器
this(channel, new AdaptiveRecvByteBufAllocator());
}
protected DefaultChannelConfig(Channel channel, RecvByteBufAllocator allocator) {
//UDP 的应用固定 SIZE 的承受缓存分配器:FixedRecvByteBufAllocator
setRecvByteBufAllocator(allocator, channel.metadata());
this.channel = channel;
}
}
private void setRecvByteBufAllocator(RecvByteBufAllocator allocator, ChannelMetadata metadata) {
if (allocator instanceof MaxMessagesRecvByteBufAllocator) {((MaxMessagesRecvByteBufAllocator) allocator).maxMessagesPerRead(metadata.defaultMaxMessagesPerRead());
} else if (allocator == null) {throw new NullPointerException("allocator");
}
setRecvByteBufAllocator(allocator);
}
public final class ChannelMetadata {
private final boolean hasDisconnect;
private final int defaultMaxMessagesPerRead;
// ....
}