为继续夯实MobTech袤博科技的数智技术创新能力和技术布道能力,本期极客星球邀请了企业服务研发部工程师梁立从 TCP 的粘包/半包、 Netty 解决粘包/半包及源码剖析、 开源我的项目对 channelHandler最佳实际三方面对《netty 中channelHandler的原理与最佳实际》进行了全面的技术分享。

版本信息

本次剖析版本基于netty 4.1.40.Final

TCP 的粘包/半包问题

在TCP/IP 协定传输网络数据包时,用户发送音讯ABCD,服务端可能收到是ABCD. AB?CD?等。对于粘包问题,次要起因是发送方每次写入数据小于套接字缓冲区大小, 以及接受方读取音讯不及时。对于半包问题, 次要起因是发送方每次写入数据大于套接字缓冲区大小,以及发送数据大于协定最大传输单位,底层须要拆包。那么针对此类问题,该当如何解决呢 ?常见的形式解码形式有三种:固定长度,应用固定分隔符来宰割音讯,以及固网长度字段寄存内容长度信息。

解码实现思考

在剖析之前,咱们能够思考一下,如果是咱们来实现下面三种编解码会如何实现 ?

咱们能够整顿如下需要:

1.咱们须要寄存咱们解码好的音讯;

2.咱们须要提供一个解码办法来让不同子类实现, 例如固定长度,分隔符,以及固定长度字段解码的形式必定有差异;

3.咱们从套接字读取音讯后就能够让咱们解码器去解决了。

针对上述需要,咱们还须要带着三个问题,查看源码看下是否和咱们猜测的相似:

问题1:咱们须要一个汇合寄存咱们解码的音讯;

问题2:咱们须要不同子类对解码细节做不同实现,所以咱们须要有一个父类;ByteToMessageDecoder, 能够在父类实现公共逻辑,提供给子类一个decode(List out,ByteBuf in); 办法;

问题3 :咱们从套接字读取数据之后,发送一个读事件(fireChannelRead)让咱们解码器去解决。

Netty 解决粘包/半包及源码剖析

封帧形式

解码

固定长度

FixedLengthFrameDecoder

分隔符

DelimiterBasedFrameDecoder

固定长度字段存内容长度信息

LengthFieldBasedFrameDecoder

咱们以固定长度解码器为例:

ServerBootstrap b = new ServerBootstrap();
// ....
b..childHandler(new ChannelInitializer<SocketChannel>() {

@Overridepublic void initChannel(SocketChannel ch) throws Exception {    ChannelPipeline p = ch.pipeline();    p.addLast(new FixedLengthFrameDecoder(2));    //.... 后续业务解决handler   }

});
public class FixedLengthFrameDecoder extends ByteToMessageDecoder {

//....

}
public class ByteToMessageDecoder {

// ....protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;

}
咱们查看 FixedLengthFrameDecoder ,发现果然继承父类ByteToMessageDecoder,而后父类也有一个channelRead办法解决音讯,并提供一个decode形象办法让子类实现。

channelRead

假如咱们发送端发送ABCD音讯,从套节字读取之后,后续会调用channelRead 办法进行解码。

咱们看到获取一个汇合实例CodecOutputList, 该类实现List接口。如果是首次调用,会把以后ByteBuf 赋值给cumulation,并调用callDecode(ctx, cumulation, out)。

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {    if (msg instanceof ByteBuf) {        CodecOutputList out = CodecOutputList.newInstance();        try {            ByteBuf data = (ByteBuf) msg;            first = cumulation == null;            if (first) {                cumulation = data;            } else {                cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);            }            callDecode(ctx, cumulation, out);        } catch (DecoderException e) {            throw e;        } catch (Exception e) {            throw new DecoderException(e);        } finally {          //.....        }    } else {        ctx.fireChannelRead(msg);    }}

callDecode

通过字面意思就晓得这个办法会做和解码相干操作。首先会判断in.isReadable() 是否可读,而后咱们的outSize 目前是空, 进入到 decodeRemovalReentryProtection , 该办法会调用子类FixedLengthFrameDecoder的decode办法进行具体解码,该decode 办法比较简单就是当从ByteBuf 读取到指定长度就增加到out 中。咱们读取实现后, outSize == out.size() 和 oldInputLength == in.readableBytes()都不满足,进入下一次循环, 咱们outSize 大于0, 发送fireChannelRead。到此音讯就被解码,并发送给咱们业务channelHandler 。

protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {

try {    while (in.isReadable()) {        int outSize = out.size();        if (outSize > 0) {            fireChannelRead(ctx, out, outSize);            out.clear();            // Check if this handler was removed before continuing with decoding.            // If it was removed, it is not safe to continue to operate on the buffer.            //            // See:            // - https://github.com/netty/netty/issues/4635            if (ctx.isRemoved()) {                break;            }            outSize = 0;        }        int oldInputLength = in.readableBytes();        //decode中时,不能执行完handler remove清理操作。        //那decode完之后须要清理数据。        decodeRemovalReentryProtection(ctx, in, out);        // Check if this handler was removed before continuing the loop.        // If it was removed, it is not safe to continue to operate on the buffer.        //        // See https://github.com/netty/netty/issues/1664        if (ctx.isRemoved()) {            break;        }        if (outSize == out.size()) {            if (oldInputLength == in.readableBytes()) {                break;            } else {                continue;            }        }        if (oldInputLength == in.readableBytes()) {            throw new DecoderException(                StringUtil.simpleClassName(getClass()) +                ".decode() did not read anything but decoded a message.");        }        if (isSingleDecode()) {            break;        }    }} catch (DecoderException e) {    throw e;} catch (Exception cause) {    throw new DecoderException(cause);}

}
final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)

throws Exception {decodeState = STATE_CALLING_CHILD_DECODE;try {    decode(ctx, in, out);} finally {    boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;    decodeState = STATE_INIT;    if (removePending) {        handlerRemoved(ctx);    }}

}
public class FixedLengthFrameDecoder extends ByteToMessageDecoder {
@Override

protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {    Object decoded = decode(ctx, in);    if (decoded != null) {        out.add(decoded);    }}protected Object decode(        @SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception {    if (in.readableBytes() < frameLength) {        return null;    } else {        return in.readRetainedSlice(frameLength);    }}

}

channelHandler 的最佳实际

理解Netty 的小伙伴都晓得channelHandler 分为ChannelInboundHandler 和 ChannelOutboundHandler, 别离用来解决inbound 和 outbound。

channelHandler 的最佳实际实质就是inbound 和outbound 的最佳实际。

上面列举了三种具备代表性的场景

• 依照职责划分channelHandler,例如有解决编解码,有解决心跳的,有专门解决业务的;

• 因为channel和eventLoop 线程绑定,而后一个evnetLoop 可能服务多个channel,所以咱们不要在channelHandler 做耗时操作;

• outbound 咱们能够优化写,缩小零碎调用。

依照职责划分channelHandler

rocketMq
咱们能够查看rocketMq 是如何划分channelHandler , 比方具备专门解决编/解码的NettyEncoder/NettyDecoder,通过IdleStatHandler 发现不沉闷连贯,治理连贯handlerNettyConnectManageHandler 进行解决,

业务解决 NettyServerHandler 。

dubbo

解决编解码,查看不沉闷channel,以及业务解决handler。

不在channelHandler 做耗时操作

之前介绍过一个eventLoop 线程服务多个channel,假如某个channelHandler解决耗时的工作,会影响其余channel,所以咱们不要在channelHandler 执行耗时操作。
如果的确须要执行耗时操作,咱们能够给channelHandler 增加一个线程池解决
final DefaultEventLoopGroup defaultEventLoopGroup = new DefaultEventLoopGroup();
// 为咱们的serverHandler 增加独自的线程池处理事件。
pipeline.addLast(defaultEventLoopGroup,serverHandler);

outbound 优化写

writeAndFlush存在的问题
咱们来看一下上面代码有什么问题?

public class EchoServerHandler
extends ChannelInboundHandlerAdapter {

@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {    ctx.writeAndFlush(msg);}

}
代码的问题在于ctx.writeAndFlush 每次调用都会触发一次零碎调用。而后channelRead 在一次业务解决中可能被调用屡次,问题就变为一次业务申请,执行屡次零碎调用。

优化writeAndFlush
怎么优化?
咱们能够重写channelRead 和 channelReadComplete,在channelRead 中调用write 办法,
在channelReadComplete中调用flush 办法 。
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {    ctx.write(msg);}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) {    ctx.flush();}

}
下面的实现形式的确缩小零碎调用,然而在netty 外部当有数据可读,会默认会间断16次,最初在调用channelReadComplete() 办法。
默认的行为存在两个问题:
1.写出数据到对端的工夫被提早了;
2.默认16 次这个数据不肯定适宜所有业务场景(不够灵便)。
咱们须要联合业务的个性,例如业务如果关注吞吐量,能够适当把读取几次后刷新设置的大一些。如果业务关注及时性,读取几次后刷新就适当设置小一点。基于上述需要,FlushConsolidationHandler 就诞生了, 能够指定读取几次后刷新一次。

FlushConsolidationHandler 优化写

应用在pipeline中增加FlushConsolidationHandler,读取几次刷新一次能够依据业务设置,例如这里设置5次,咱们是优化 EchoServerHandler的写,就放在它的后面。

// 每5次就触发一次flush
// ....
p.addLast(new FlushConsolidationHandler(5));
p.addLast(new EchoServerHandler());
// ....
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {    ctx.writeAndFlush(msg);}

}
原理剖析:

首先FlushConsolidationHandler 继承 ChannelDuplexHandler,能同时解决入站和出站音讯,

入站咱们查看 channelRead 和 channelReadComplete 实现,出站咱们查看 flush 办法 (没有对write办法进行重写)。

channelRead

• 设置readInProgress 就把事件向下传递

• 咱们的EchoServerHandler 会channelRead 会被调用,咱们在channelRead 中调用ctx.writeAndFlush。

• 触发write 和 flush 的出站音讯, FlushConsolidationHandler的flush进行解决

• 先判断readInProgress, ++flushPendingCount == explicitFlushAfterFlushes 判断是否达到冀望刷新次数,咱们设置为5 ,不执行刷新。

• 接着channelReadComplete 被调用,会重置筹备刷新次数,并执行刷新。

要害就在channelRead 和 channelReadComplete

假如咱们channelRead 读取了屡次, 当读取次数大于等于5次就会刷新,小于5次时由channelReadComplete 刷新。

这样就达到了缩小零碎调用并且每读取几次在刷新也能够配置

public class FlushConsolidationHandler extends ChannelDuplexHandler {

// explicitFlushAfterFlushes 示意几次flush后,才真正调用flush 办法// consolidateWhenNoReadInProgress 反对异步的状况,当readInProgress不为true 也能够反对flushpublic FlushConsolidationHandler(int explicitFlushAfterFlushes, boolean consolidateWhenNoReadInProgress){    //....}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {    readInProgress = true;    ctx.fireChannelRead(msg);}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {    // This may be the last event in the read loop, so flush now!    // 外部就是将 readInProgress = false; 当flushPendingCount 就调用flush    resetReadAndFlushIfNeeded(ctx);    ctx.fireChannelReadComplete();}@Overridepublic void flush(ChannelHandlerContext ctx) throws Exception {    //依据业务线程是否复用IO线程两种状况来思考:    //复用状况    if (readInProgress) { //正在读的时候        // If there is still a read in progress we are sure we will see a channelReadComplete(...) call. Thus        // we only need to flush if we reach the explicitFlushAfterFlushes limit.        //每explicitFlushAfterFlushes个“批量”写(flush)一次        //有余怎么办?channelReadComplete会flush掉前面的        if (++flushPendingCount == explicitFlushAfterFlushes) {            flushNow(ctx);        }        //以下是非复用状况:异步状况    } else if (consolidateWhenNoReadInProgress) {        //(业务异步化状况下)开启consolidateWhenNoReadInProgress时,优化flush        //(比方没有读申请了,然而外部还是忙的团团转,没有消化的时候,所以还是会写响应)        // Flush immediately if we reach the threshold, otherwise schedule        if (++flushPendingCount == explicitFlushAfterFlushes) {            flushNow(ctx);        } else {            scheduleFlush(ctx);        }    } else {        //(业务异步化状况下)没有开启consolidateWhenNoReadInProgress时,间接flush        // Always flush directly        flushNow(ctx);    }}

}

附录

默认读取16次设置入口源码剖析

默认创立DefaultChannelConfig ,会接着调用重载的构造函数。

在setRecvByteBufAllocator能够看到获取metadata.defaultMaxMessagesPerRead()。

而ChannelMetadata 默认结构为 16次 new ChannelMetadata(false, 16)。

public abstract class AbstractNioByteChannel extends AbstractNioChannel {

private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);//.....

}

// 默认抉择自适应承受缓存分配器,而后在调用setRecvByteBufAllocator。
// setRecvByteBufAllocator就是指定最大读取多少次的入口 ,默认为16次
public class DefaultChannelConfig implements ChannelConfig {

public DefaultChannelConfig(Channel channel) {    //除UDP外都默认抉择自适应承受缓存分配器    this(channel, new AdaptiveRecvByteBufAllocator());}protected DefaultChannelConfig(Channel channel, RecvByteBufAllocator allocator) {    //UDP的应用固定SIZE的承受缓存分配器:FixedRecvByteBufAllocator    setRecvByteBufAllocator(allocator, channel.metadata());    this.channel = channel;}

}

private void setRecvByteBufAllocator(RecvByteBufAllocator allocator, ChannelMetadata metadata) {

if (allocator instanceof MaxMessagesRecvByteBufAllocator) {    ((MaxMessagesRecvByteBufAllocator) allocator).maxMessagesPerRead(metadata.defaultMaxMessagesPerRead());} else if (allocator == null) {    throw new NullPointerException("allocator");}setRecvByteBufAllocator(allocator);

}

public final class ChannelMetadata {

private final boolean hasDisconnect;private final int defaultMaxMessagesPerRead;// .... 

}