关于java:seata中netty的使用源码一

33次阅读

共计 8687 个字符,预计需要花费 22 分钟才能阅读完成。

一:seata-server 端
1:在 seata-server 中,在 Server 类的 main 办法中,创立 NettyRemotingServer 对象,并增加端口。该对象创立的时候,会创立 netty 中的三个重要对象:
ServerBootstrap,BOSS 线程选择器 EventLoopGroup,工作线程选择器 EventLoopGroup。代码如下:

public static void main(String[] args) throws IOException {int port = PortHelper.getPort(args);
    System.setProperty(ConfigurationKeys.SERVER_PORT, Integer.toString(port));
    // 省略不重要的代码
    NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(WORKING_THREADS);
    //server port
    nettyRemotingServer.setListenPort(parameterParser.getPort());
    UUIDGenerator.init(parameterParser.getServerNode());
    // 省略其余代码
}

public AbstractNettyRemotingServer(ThreadPoolExecutor messageExecutor, NettyServerConfig nettyServerConfig) {super(messageExecutor);
 serverBootstrap = new NettyServerBootstrap(nettyServerConfig);
 serverBootstrap.setChannelHandlers(new ServerHandler());
}

public class NettyServerBootstrap implements RemotingBootstrap {private final ServerBootstrap serverBootstrap = new ServerBootstrap();
    private final EventLoopGroup eventLoopGroupWorker;
    private final EventLoopGroup eventLoopGroupBoss;
    private final NettyServerConfig nettyServerConfig;
    private ChannelHandler[] channelHandlers;
    private int listenPort;
    private final AtomicBoolean initialized = new AtomicBoolean(false);
    public NettyServerBootstrap(NettyServerConfig nettyServerConfig) {
        this.nettyServerConfig = nettyServerConfig;
        if (NettyServerConfig.enableEpoll()) { // 判断是否是 linux 的 epoll
            this.eventLoopGroupBoss = new EpollEventLoopGroup(nettyServerConfig.getBossThreadSize(),
                new NamedThreadFactory(nettyServerConfig.getBossThreadPrefix(), nettyServerConfig.getBossThreadSize()));
            this.eventLoopGroupWorker = new EpollEventLoopGroup(nettyServerConfig.getServerWorkerThreads(),
                new NamedThreadFactory(nettyServerConfig.getWorkerThreadPrefix(),
                nettyServerConfig.getServerWorkerThreads()));
        } else {this.eventLoopGroupBoss = new NioEventLoopGroup(nettyServerConfig.getBossThreadSize(),
                new NamedThreadFactory(nettyServerConfig.getBossThreadPrefix(), nettyServerConfig.getBossThreadSize()));
            this.eventLoopGroupWorker = new NioEventLoopGroup(nettyServerConfig.getServerWorkerThreads(),
                new NamedThreadFactory(nettyServerConfig.getWorkerThreadPrefix(),
                nettyServerConfig.getServerWorkerThreads()));
       }
    }

2:在下面的代码中,创立了 ServerBootstrap。如果不是 linux 的 epoll,则创立的 boss 线程选择器和工作线程选择器都是 NioEventLoopGroup。
其中,boss 线程选择器的线程数默认是 1,工作线程选择器的默认线程数量是 CPU 核数 * 2。
在代码 serverBootstrap.setChannelHandlers(new ServerHandler()) 中,增加了通道 handle:ServerHandler,这个前面会增加到事件处理流水线中。

3: main 办法前面会调用 nettyRemotingServer.init() 办法进行启动 netty 服务。该办法调用的是 NettyServerBootstrap#start 建设 netty 服务,代码如下:

public void start() {this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupWorker) // 增加 BOOT 事件选择器和工作事件选择器
        .channel(NettyServerConfig.SERVER_CHANNEL_CLAZZ) // 设置通道类型,默认的是 NioServerSocketChannel
        .option(ChannelOption.SO_BACKLOG, nettyServerConfig.getSoBackLogSize()) // 设置反对连贯的数量
        .option(ChannelOption.SO_REUSEADDR, true) //
        .childOption(ChannelOption.SO_KEEPALIVE, true)  // 关上 TCP 的心态,检测是否放弃连贯
        .childOption(ChannelOption.TCP_NODELAY, true)   // 立刻发送数据,不进行 TCP 音讯缓存
        .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSendBufSize()) // 发送缓存区大小
        .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketResvBufSize()) // 承受音讯缓存区大小
        .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, // 设置缓存水位线
            new WriteBufferWaterMark(nettyServerConfig.getWriteBufferLowWaterMark(),
            nettyServerConfig.getWriteBufferHighWaterMark()))
        .localAddress(new InetSocketAddress(listenPort)) // 设置 IP 和端口
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) {ch.pipeline().addLast(new IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), 0, 0)) // 设置通道闲暇检测工作
                    .addLast(new ProtocolV1Decoder()) // 音讯解码
                    .addLast(new ProtocolV1Encoder()); // 音讯编码
                if (channelHandlers != null) {addChannelPipelineLast(ch, channelHandlers); // 设置音讯处理器,值是下面提到的 ServerHandler
                }
            }
        });
        try {ChannelFuture future = this.serverBootstrap.bind(listenPort).sync(); // 绑定端口
            LOGGER.info("Server started, listen port: {}", listenPort);
            RegistryFactory.getInstance().register(new InetSocketAddress(XID.getIpAddress(), XID.getPort()));
            initialized.set(true);
            future.channel().closeFuture().sync();} catch (Exception exx) {throw new RuntimeException(exx);
        }
   }

4:IdleStateHandler 继承了 ChannelDuplexHandler,它是一个入站处理器,也是一个出站处理器。其继承关系如下:

参数含意:readerIdleTimeSeconds:读闲暇工夫,当指定工夫内没有读取音讯,会发送事件 READER_IDLE

     writerIdleTimeSeconds:写闲暇工夫,当指定工夫内没有写入音讯,会发送事件 WRITER_IDLE
     allIdleTimeSeconds:读写闲暇工夫,当指定工夫既没读取音讯,也没有写入音讯,发送事件 ALL_IDLE
     下面三个参数如果是小于等于 0,则示意禁用对应事件。

5:在通道的 channelActive() 办法中,当通道激活时,会创立定时工作。

private void initialize(ChannelHandlerContext ctx) {switch (state) {
    case 1:
    case 2:
        return;
    }
    state = 1;
    initOutputChanged(ctx);
    lastReadTime = lastWriteTime = ticksInNanos();
    if (readerIdleTimeNanos > 0) {readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
        readerIdleTimeNanos, TimeUnit.NANOSECONDS);
    }
    if (writerIdleTimeNanos > 0) {writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
        writerIdleTimeNanos, TimeUnit.NANOSECONDS);
    }
    if (allIdleTimeNanos > 0) {allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
        allIdleTimeNanos, TimeUnit.NANOSECONDS);
    }
}

5:入站处理器 ProtocolV1Decoder 继承了 LengthFieldBasedFrameDecoder,次要目标是解析读取的信息。其继承关系如下:

其构造函数办法:

在 ProtocolV1Decoder#decodeFrame 办法中进行音讯解析,把字节流转换为 RpcMessage 对象。代码如下:

public Object decodeFrame(ByteBuf frame) {byte b0 = frame.readByte();
    byte b1 = frame.readByte();
    if (ProtocolConstants.MAGIC_CODE_BYTES[0] != b0
            || ProtocolConstants.MAGIC_CODE_BYTES[1] != b1) {throw new IllegalArgumentException("Unknown magic code:" + b0 + "," + b1);
    }
    byte version = frame.readByte();
    // TODO  check version compatible here
    int fullLength = frame.readInt();
    short headLength = frame.readShort();
    byte messageType = frame.readByte();
    byte codecType = frame.readByte();
    byte compressorType = frame.readByte();
    int requestId = frame.readInt();
    RpcMessage rpcMessage = new RpcMessage();
    rpcMessage.setCodec(codecType);
    rpcMessage.setId(requestId);
    rpcMessage.setCompressor(compressorType);
    rpcMessage.setMessageType(messageType);
    // direct read head with zero-copy
    int headMapLength = headLength - ProtocolConstants.V1_HEAD_LENGTH;
    if (headMapLength > 0) {Map<String, String> map = HeadMapSerializer.getInstance().decode(frame, headMapLength);
    rpcMessage.getHeadMap().putAll(map);
    }
    // read body
    if (messageType == ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST) {rpcMessage.setBody(HeartbeatMessage.PING);
    } else if (messageType == ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE) {rpcMessage.setBody(HeartbeatMessage.PONG);
    } else {
        int bodyLength = fullLength - headLength;
        if (bodyLength > 0) {byte[] bs = new byte[bodyLength];
            frame.readBytes(bs);
            Compressor compressor = CompressorFactory.getCompressor(compressorType);
            bs = compressor.decompress(bs);
            Serializer serializer = EnhancedServiceLoader.load(Serializer.class, SerializerType.getByCode(rpcMessage.getCodec()).name());
            rpcMessage.setBody(serializer.deserialize(bs));
        }
    }
    return rpcMessage;
}

6:出站处理器 ProtocolV1Encoder 继承了 MessageToByteEncoder,次要是对音讯进行编码解决,把 RpcMessage 对象转换为字节流。继承关系如下:

在 ProtocolV1Encoder#encode 的源码如下:

public void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) {
    try {if (msg instanceof RpcMessage) {RpcMessage rpcMessage = (RpcMessage) msg;
            int fullLength = ProtocolConstants.V1_HEAD_LENGTH;
            int headLength = ProtocolConstants.V1_HEAD_LENGTH;
            byte messageType = rpcMessage.getMessageType();
            out.writeBytes(ProtocolConstants.MAGIC_CODE_BYTES);
            out.writeByte(ProtocolConstants.VERSION);
            // full Length(4B) and head length(2B) will fix in the end. 
            out.writerIndex(out.writerIndex() + 6);
            out.writeByte(messageType);
            out.writeByte(rpcMessage.getCodec());
            out.writeByte(rpcMessage.getCompressor());
            out.writeInt(rpcMessage.getId());
            // direct write head with zero-copy
            Map<String, String> headMap = rpcMessage.getHeadMap();
            if (headMap != null && !headMap.isEmpty()) {int headMapBytesLength = HeadMapSerializer.getInstance().encode(headMap, out);
                headLength += headMapBytesLength;
                fullLength += headMapBytesLength;
            }
            byte[] bodyBytes = null;
            if (messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST
                && messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE) {
                // heartbeat has no body
                Serializer serializer = EnhancedServiceLoader.load(Serializer.class, SerializerType.getByCode(rpcMessage.getCodec()).name());
                bodyBytes = serializer.serialize(rpcMessage.getBody());
                Compressor compressor = CompressorFactory.getCompressor(rpcMessage.getCompressor());
                bodyBytes = compressor.compress(bodyBytes);
                fullLength += bodyBytes.length;
            }
            if (bodyBytes != null) {out.writeBytes(bodyBytes);
            }
            // fix fullLength and headLength
            int writeIndex = out.writerIndex();
            // skip magic code(2B) + version(1B)
            out.writerIndex(writeIndex - fullLength + 3);
            out.writeInt(fullLength);
            out.writeShort(headLength);
            out.writerIndex(writeIndex);
            } else {throw new UnsupportedOperationException("Not support this class:" + msg.getClass());
            }
        } catch (Throwable e) {LOGGER.error("Encode request error!", e);
       }
}

7:处理器 ServerHandler 是解决 TM,RM 发送的 RpcMessage 音讯。其继承关系如下:

在 AbstractNettyRemotingServer.ServerHandler#channelRead 办法里解决 RpcMessage 音讯。

public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {if (!(msg instanceof RpcMessage)) {return;}
    processMessage(ctx, (RpcMessage) msg);
}

正文完
 0