一:seata-server端
1:在seata-server 中,在 Server 类的main办法中,创立 NettyRemotingServer 对象,并增加端口。该对象创立的时候,会创立netty中的三个重要对象:
ServerBootstrap,BOSS线程选择器EventLoopGroup,工作线程选择器EventLoopGroup。代码如下:

public static void main(String[] args) throws IOException {    int port = PortHelper.getPort(args);    System.setProperty(ConfigurationKeys.SERVER_PORT, Integer.toString(port));    // 省略不重要的代码    NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(WORKING_THREADS);    //server port    nettyRemotingServer.setListenPort(parameterParser.getPort());    UUIDGenerator.init(parameterParser.getServerNode());    // 省略其余代码}public AbstractNettyRemotingServer(ThreadPoolExecutor messageExecutor, NettyServerConfig nettyServerConfig) {    super(messageExecutor); serverBootstrap = new NettyServerBootstrap(nettyServerConfig); serverBootstrap.setChannelHandlers(new ServerHandler());}public class NettyServerBootstrap implements RemotingBootstrap {    private final ServerBootstrap serverBootstrap = new ServerBootstrap();    private final EventLoopGroup eventLoopGroupWorker;    private final EventLoopGroup eventLoopGroupBoss;    private final NettyServerConfig nettyServerConfig;    private ChannelHandler[] channelHandlers;    private int listenPort;    private final AtomicBoolean initialized = new AtomicBoolean(false);    public NettyServerBootstrap(NettyServerConfig nettyServerConfig) {        this.nettyServerConfig = nettyServerConfig;        if (NettyServerConfig.enableEpoll()) { //判断是否是linux的epoll            this.eventLoopGroupBoss = new EpollEventLoopGroup(nettyServerConfig.getBossThreadSize(),                new NamedThreadFactory(nettyServerConfig.getBossThreadPrefix(), nettyServerConfig.getBossThreadSize()));            this.eventLoopGroupWorker = new EpollEventLoopGroup(nettyServerConfig.getServerWorkerThreads(),                new NamedThreadFactory(nettyServerConfig.getWorkerThreadPrefix(),                nettyServerConfig.getServerWorkerThreads()));        } else {            this.eventLoopGroupBoss = new NioEventLoopGroup(nettyServerConfig.getBossThreadSize(),                new NamedThreadFactory(nettyServerConfig.getBossThreadPrefix(), nettyServerConfig.getBossThreadSize()));            this.eventLoopGroupWorker = new NioEventLoopGroup(nettyServerConfig.getServerWorkerThreads(),                new NamedThreadFactory(nettyServerConfig.getWorkerThreadPrefix(),                nettyServerConfig.getServerWorkerThreads()));       }    }

2:在下面的代码中,创立了 ServerBootstrap。如果不是linux的epoll,则创立的boss线程选择器和工作线程选择器都是 NioEventLoopGroup。
其中,boss线程选择器的线程数默认是1,工作线程选择器的默认线程数量是 CPU核数 * 2。
在代码 serverBootstrap.setChannelHandlers(new ServerHandler()) 中,增加了通道handle:ServerHandler,这个前面会增加到事件处理流水线中。

3: main 办法前面会调用 nettyRemotingServer.init() 办法进行启动netty服务。该办法调用的是 NettyServerBootstrap#start 建设netty服务,代码如下:

public void start() {    this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupWorker) //增加BOOT事件选择器和工作事件选择器        .channel(NettyServerConfig.SERVER_CHANNEL_CLAZZ) //设置通道类型,默认的是 NioServerSocketChannel        .option(ChannelOption.SO_BACKLOG, nettyServerConfig.getSoBackLogSize()) //设置反对连贯的数量        .option(ChannelOption.SO_REUSEADDR, true) //        .childOption(ChannelOption.SO_KEEPALIVE, true)  //关上TCP的心态,检测是否放弃连贯        .childOption(ChannelOption.TCP_NODELAY, true)   // 立刻发送数据,不进行TCP音讯缓存        .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSendBufSize()) // 发送缓存区大小        .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketResvBufSize()) // 承受音讯缓存区大小        .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, // 设置缓存水位线            new WriteBufferWaterMark(nettyServerConfig.getWriteBufferLowWaterMark(),            nettyServerConfig.getWriteBufferHighWaterMark()))        .localAddress(new InetSocketAddress(listenPort)) //设置IP和端口        .childHandler(new ChannelInitializer<SocketChannel>() {            @Override            public void initChannel(SocketChannel ch) {                ch.pipeline().addLast(new IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), 0, 0)) //设置通道闲暇检测工作                    .addLast(new ProtocolV1Decoder()) // 音讯解码                    .addLast(new ProtocolV1Encoder()); // 音讯编码                if (channelHandlers != null) {                    addChannelPipelineLast(ch, channelHandlers); // 设置音讯处理器,值是下面提到的ServerHandler                }            }        });        try {            ChannelFuture future = this.serverBootstrap.bind(listenPort).sync(); //绑定端口            LOGGER.info("Server started, listen port: {}", listenPort);            RegistryFactory.getInstance().register(new InetSocketAddress(XID.getIpAddress(), XID.getPort()));            initialized.set(true);            future.channel().closeFuture().sync();        } catch (Exception exx) {            throw new RuntimeException(exx);        }   }

4:IdleStateHandler 继承了ChannelDuplexHandler,它是一个入站处理器,也是一个出站处理器。其继承关系如下:

参数含意:readerIdleTimeSeconds:读闲暇工夫,当指定工夫内没有读取音讯,会发送事件READER_IDLE

     writerIdleTimeSeconds:写闲暇工夫,当指定工夫内没有写入音讯,会发送事件WRITER_IDLE     allIdleTimeSeconds:读写闲暇工夫,当指定工夫既没读取音讯,也没有写入音讯,发送事件ALL_IDLE     下面三个参数如果是小于等于0,则示意禁用对应事件。

5:在通道的 channelActive() 办法中,当通道激活时,会创立定时工作。

private void initialize(ChannelHandlerContext ctx) {   switch (state) {    case 1:    case 2:        return;    }    state = 1;    initOutputChanged(ctx);    lastReadTime = lastWriteTime = ticksInNanos();    if (readerIdleTimeNanos > 0) {        readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),        readerIdleTimeNanos, TimeUnit.NANOSECONDS);    }    if (writerIdleTimeNanos > 0) {        writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),        writerIdleTimeNanos, TimeUnit.NANOSECONDS);    }    if (allIdleTimeNanos > 0) {        allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),        allIdleTimeNanos, TimeUnit.NANOSECONDS);    }}

5:入站处理器 ProtocolV1Decoder 继承了LengthFieldBasedFrameDecoder,次要目标是解析读取的信息。其继承关系如下:

其构造函数办法:

在 ProtocolV1Decoder#decodeFrame 办法中进行音讯解析,把字节流转换为 RpcMessage 对象。代码如下:

public Object decodeFrame(ByteBuf frame) {    byte b0 = frame.readByte();    byte b1 = frame.readByte();    if (ProtocolConstants.MAGIC_CODE_BYTES[0] != b0            || ProtocolConstants.MAGIC_CODE_BYTES[1] != b1) {        throw new IllegalArgumentException("Unknown magic code: " + b0 + ", " + b1);    }    byte version = frame.readByte();    // TODO  check version compatible here    int fullLength = frame.readInt();    short headLength = frame.readShort();    byte messageType = frame.readByte();    byte codecType = frame.readByte();    byte compressorType = frame.readByte();    int requestId = frame.readInt();    RpcMessage rpcMessage = new RpcMessage();    rpcMessage.setCodec(codecType);    rpcMessage.setId(requestId);    rpcMessage.setCompressor(compressorType);    rpcMessage.setMessageType(messageType);    // direct read head with zero-copy    int headMapLength = headLength - ProtocolConstants.V1_HEAD_LENGTH;    if (headMapLength > 0) {        Map<String, String> map = HeadMapSerializer.getInstance().decode(frame, headMapLength);    rpcMessage.getHeadMap().putAll(map);    }    // read body    if (messageType == ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST) {        rpcMessage.setBody(HeartbeatMessage.PING);    } else if (messageType == ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE) {        rpcMessage.setBody(HeartbeatMessage.PONG);    } else {        int bodyLength = fullLength - headLength;        if (bodyLength > 0) {            byte[] bs = new byte[bodyLength];            frame.readBytes(bs);            Compressor compressor = CompressorFactory.getCompressor(compressorType);            bs = compressor.decompress(bs);            Serializer serializer = EnhancedServiceLoader.load(Serializer.class, SerializerType.getByCode(rpcMessage.getCodec()).name());            rpcMessage.setBody(serializer.deserialize(bs));        }    }    return rpcMessage;}

6:出站处理器 ProtocolV1Encoder 继承了MessageToByteEncoder,次要是对音讯进行编码解决,把RpcMessage对象转换为字节流。继承关系如下:

在 ProtocolV1Encoder#encode 的源码如下:

public void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) {    try {        if (msg instanceof RpcMessage) {            RpcMessage rpcMessage = (RpcMessage) msg;            int fullLength = ProtocolConstants.V1_HEAD_LENGTH;            int headLength = ProtocolConstants.V1_HEAD_LENGTH;            byte messageType = rpcMessage.getMessageType();            out.writeBytes(ProtocolConstants.MAGIC_CODE_BYTES);            out.writeByte(ProtocolConstants.VERSION);            // full Length(4B) and head length(2B) will fix in the end.             out.writerIndex(out.writerIndex() + 6);            out.writeByte(messageType);            out.writeByte(rpcMessage.getCodec());            out.writeByte(rpcMessage.getCompressor());            out.writeInt(rpcMessage.getId());            // direct write head with zero-copy            Map<String, String> headMap = rpcMessage.getHeadMap();            if (headMap != null && !headMap.isEmpty()) {                int headMapBytesLength = HeadMapSerializer.getInstance().encode(headMap, out);                headLength += headMapBytesLength;                fullLength += headMapBytesLength;            }            byte[] bodyBytes = null;            if (messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST                && messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE) {                // heartbeat has no body                Serializer serializer = EnhancedServiceLoader.load(Serializer.class, SerializerType.getByCode(rpcMessage.getCodec()).name());                bodyBytes = serializer.serialize(rpcMessage.getBody());                Compressor compressor = CompressorFactory.getCompressor(rpcMessage.getCompressor());                bodyBytes = compressor.compress(bodyBytes);                fullLength += bodyBytes.length;            }            if (bodyBytes != null) {                out.writeBytes(bodyBytes);            }            // fix fullLength and headLength            int writeIndex = out.writerIndex();            // skip magic code(2B) + version(1B)            out.writerIndex(writeIndex - fullLength + 3);            out.writeInt(fullLength);            out.writeShort(headLength);            out.writerIndex(writeIndex);            } else {              throw new UnsupportedOperationException("Not support this class:" + msg.getClass());            }        } catch (Throwable e) {          LOGGER.error("Encode request error!", e);       }}

7:处理器 ServerHandler 是解决TM,RM发送的RpcMessage音讯。其继承关系如下:

在 AbstractNettyRemotingServer.ServerHandler#channelRead 办法里解决RpcMessage音讯。

public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {    if (!(msg instanceof RpcMessage)) {        return;    }    processMessage(ctx, (RpcMessage) msg);}