一:seata-server端
1:在seata-server 中,在 Server 类的main办法中,创立 NettyRemotingServer 对象,并增加端口。该对象创立的时候,会创立netty中的三个重要对象:
ServerBootstrap,BOSS线程选择器EventLoopGroup,工作线程选择器EventLoopGroup。代码如下:
public static void main(String[] args) throws IOException { int port = PortHelper.getPort(args); System.setProperty(ConfigurationKeys.SERVER_PORT, Integer.toString(port)); // 省略不重要的代码 NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(WORKING_THREADS); //server port nettyRemotingServer.setListenPort(parameterParser.getPort()); UUIDGenerator.init(parameterParser.getServerNode()); // 省略其余代码}public AbstractNettyRemotingServer(ThreadPoolExecutor messageExecutor, NettyServerConfig nettyServerConfig) { super(messageExecutor); serverBootstrap = new NettyServerBootstrap(nettyServerConfig); serverBootstrap.setChannelHandlers(new ServerHandler());}public class NettyServerBootstrap implements RemotingBootstrap { private final ServerBootstrap serverBootstrap = new ServerBootstrap(); private final EventLoopGroup eventLoopGroupWorker; private final EventLoopGroup eventLoopGroupBoss; private final NettyServerConfig nettyServerConfig; private ChannelHandler[] channelHandlers; private int listenPort; private final AtomicBoolean initialized = new AtomicBoolean(false); public NettyServerBootstrap(NettyServerConfig nettyServerConfig) { this.nettyServerConfig = nettyServerConfig; if (NettyServerConfig.enableEpoll()) { //判断是否是linux的epoll this.eventLoopGroupBoss = new EpollEventLoopGroup(nettyServerConfig.getBossThreadSize(), new NamedThreadFactory(nettyServerConfig.getBossThreadPrefix(), nettyServerConfig.getBossThreadSize())); this.eventLoopGroupWorker = new EpollEventLoopGroup(nettyServerConfig.getServerWorkerThreads(), new NamedThreadFactory(nettyServerConfig.getWorkerThreadPrefix(), nettyServerConfig.getServerWorkerThreads())); } else { this.eventLoopGroupBoss = new NioEventLoopGroup(nettyServerConfig.getBossThreadSize(), new NamedThreadFactory(nettyServerConfig.getBossThreadPrefix(), nettyServerConfig.getBossThreadSize())); this.eventLoopGroupWorker = new NioEventLoopGroup(nettyServerConfig.getServerWorkerThreads(), new NamedThreadFactory(nettyServerConfig.getWorkerThreadPrefix(), nettyServerConfig.getServerWorkerThreads())); } }
2:在下面的代码中,创立了 ServerBootstrap。如果不是linux的epoll,则创立的boss线程选择器和工作线程选择器都是 NioEventLoopGroup。
其中,boss线程选择器的线程数默认是1,工作线程选择器的默认线程数量是 CPU核数 * 2。
在代码 serverBootstrap.setChannelHandlers(new ServerHandler()) 中,增加了通道handle:ServerHandler,这个前面会增加到事件处理流水线中。
3: main 办法前面会调用 nettyRemotingServer.init() 办法进行启动netty服务。该办法调用的是 NettyServerBootstrap#start 建设netty服务,代码如下:
public void start() { this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupWorker) //增加BOOT事件选择器和工作事件选择器 .channel(NettyServerConfig.SERVER_CHANNEL_CLAZZ) //设置通道类型,默认的是 NioServerSocketChannel .option(ChannelOption.SO_BACKLOG, nettyServerConfig.getSoBackLogSize()) //设置反对连贯的数量 .option(ChannelOption.SO_REUSEADDR, true) // .childOption(ChannelOption.SO_KEEPALIVE, true) //关上TCP的心态,检测是否放弃连贯 .childOption(ChannelOption.TCP_NODELAY, true) // 立刻发送数据,不进行TCP音讯缓存 .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSendBufSize()) // 发送缓存区大小 .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketResvBufSize()) // 承受音讯缓存区大小 .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, // 设置缓存水位线 new WriteBufferWaterMark(nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark())) .localAddress(new InetSocketAddress(listenPort)) //设置IP和端口 .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { ch.pipeline().addLast(new IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), 0, 0)) //设置通道闲暇检测工作 .addLast(new ProtocolV1Decoder()) // 音讯解码 .addLast(new ProtocolV1Encoder()); // 音讯编码 if (channelHandlers != null) { addChannelPipelineLast(ch, channelHandlers); // 设置音讯处理器,值是下面提到的ServerHandler } } }); try { ChannelFuture future = this.serverBootstrap.bind(listenPort).sync(); //绑定端口 LOGGER.info("Server started, listen port: {}", listenPort); RegistryFactory.getInstance().register(new InetSocketAddress(XID.getIpAddress(), XID.getPort())); initialized.set(true); future.channel().closeFuture().sync(); } catch (Exception exx) { throw new RuntimeException(exx); } }
4:IdleStateHandler 继承了ChannelDuplexHandler,它是一个入站处理器,也是一个出站处理器。其继承关系如下:
参数含意:readerIdleTimeSeconds:读闲暇工夫,当指定工夫内没有读取音讯,会发送事件READER_IDLE
writerIdleTimeSeconds:写闲暇工夫,当指定工夫内没有写入音讯,会发送事件WRITER_IDLE allIdleTimeSeconds:读写闲暇工夫,当指定工夫既没读取音讯,也没有写入音讯,发送事件ALL_IDLE 下面三个参数如果是小于等于0,则示意禁用对应事件。
5:在通道的 channelActive() 办法中,当通道激活时,会创立定时工作。
private void initialize(ChannelHandlerContext ctx) { switch (state) { case 1: case 2: return; } state = 1; initOutputChanged(ctx); lastReadTime = lastWriteTime = ticksInNanos(); if (readerIdleTimeNanos > 0) { readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx), readerIdleTimeNanos, TimeUnit.NANOSECONDS); } if (writerIdleTimeNanos > 0) { writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx), writerIdleTimeNanos, TimeUnit.NANOSECONDS); } if (allIdleTimeNanos > 0) { allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx), allIdleTimeNanos, TimeUnit.NANOSECONDS); }}
5:入站处理器 ProtocolV1Decoder 继承了LengthFieldBasedFrameDecoder,次要目标是解析读取的信息。其继承关系如下:
其构造函数办法:
在 ProtocolV1Decoder#decodeFrame 办法中进行音讯解析,把字节流转换为 RpcMessage 对象。代码如下:
public Object decodeFrame(ByteBuf frame) { byte b0 = frame.readByte(); byte b1 = frame.readByte(); if (ProtocolConstants.MAGIC_CODE_BYTES[0] != b0 || ProtocolConstants.MAGIC_CODE_BYTES[1] != b1) { throw new IllegalArgumentException("Unknown magic code: " + b0 + ", " + b1); } byte version = frame.readByte(); // TODO check version compatible here int fullLength = frame.readInt(); short headLength = frame.readShort(); byte messageType = frame.readByte(); byte codecType = frame.readByte(); byte compressorType = frame.readByte(); int requestId = frame.readInt(); RpcMessage rpcMessage = new RpcMessage(); rpcMessage.setCodec(codecType); rpcMessage.setId(requestId); rpcMessage.setCompressor(compressorType); rpcMessage.setMessageType(messageType); // direct read head with zero-copy int headMapLength = headLength - ProtocolConstants.V1_HEAD_LENGTH; if (headMapLength > 0) { Map<String, String> map = HeadMapSerializer.getInstance().decode(frame, headMapLength); rpcMessage.getHeadMap().putAll(map); } // read body if (messageType == ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST) { rpcMessage.setBody(HeartbeatMessage.PING); } else if (messageType == ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE) { rpcMessage.setBody(HeartbeatMessage.PONG); } else { int bodyLength = fullLength - headLength; if (bodyLength > 0) { byte[] bs = new byte[bodyLength]; frame.readBytes(bs); Compressor compressor = CompressorFactory.getCompressor(compressorType); bs = compressor.decompress(bs); Serializer serializer = EnhancedServiceLoader.load(Serializer.class, SerializerType.getByCode(rpcMessage.getCodec()).name()); rpcMessage.setBody(serializer.deserialize(bs)); } } return rpcMessage;}
6:出站处理器 ProtocolV1Encoder 继承了MessageToByteEncoder,次要是对音讯进行编码解决,把RpcMessage对象转换为字节流。继承关系如下:
在 ProtocolV1Encoder#encode 的源码如下:
public void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) { try { if (msg instanceof RpcMessage) { RpcMessage rpcMessage = (RpcMessage) msg; int fullLength = ProtocolConstants.V1_HEAD_LENGTH; int headLength = ProtocolConstants.V1_HEAD_LENGTH; byte messageType = rpcMessage.getMessageType(); out.writeBytes(ProtocolConstants.MAGIC_CODE_BYTES); out.writeByte(ProtocolConstants.VERSION); // full Length(4B) and head length(2B) will fix in the end. out.writerIndex(out.writerIndex() + 6); out.writeByte(messageType); out.writeByte(rpcMessage.getCodec()); out.writeByte(rpcMessage.getCompressor()); out.writeInt(rpcMessage.getId()); // direct write head with zero-copy Map<String, String> headMap = rpcMessage.getHeadMap(); if (headMap != null && !headMap.isEmpty()) { int headMapBytesLength = HeadMapSerializer.getInstance().encode(headMap, out); headLength += headMapBytesLength; fullLength += headMapBytesLength; } byte[] bodyBytes = null; if (messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST && messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE) { // heartbeat has no body Serializer serializer = EnhancedServiceLoader.load(Serializer.class, SerializerType.getByCode(rpcMessage.getCodec()).name()); bodyBytes = serializer.serialize(rpcMessage.getBody()); Compressor compressor = CompressorFactory.getCompressor(rpcMessage.getCompressor()); bodyBytes = compressor.compress(bodyBytes); fullLength += bodyBytes.length; } if (bodyBytes != null) { out.writeBytes(bodyBytes); } // fix fullLength and headLength int writeIndex = out.writerIndex(); // skip magic code(2B) + version(1B) out.writerIndex(writeIndex - fullLength + 3); out.writeInt(fullLength); out.writeShort(headLength); out.writerIndex(writeIndex); } else { throw new UnsupportedOperationException("Not support this class:" + msg.getClass()); } } catch (Throwable e) { LOGGER.error("Encode request error!", e); }}
7:处理器 ServerHandler 是解决TM,RM发送的RpcMessage音讯。其继承关系如下:
在 AbstractNettyRemotingServer.ServerHandler#channelRead 办法里解决RpcMessage音讯。
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { if (!(msg instanceof RpcMessage)) { return; } processMessage(ctx, (RpcMessage) msg);}