共计 8687 个字符,预计需要花费 22 分钟才能阅读完成。
一:seata-server 端
1:在 seata-server 中,在 Server 类的 main 办法中,创立 NettyRemotingServer 对象,并增加端口。该对象创立的时候,会创立 netty 中的三个重要对象:
ServerBootstrap,BOSS 线程选择器 EventLoopGroup,工作线程选择器 EventLoopGroup。代码如下:
public static void main(String[] args) throws IOException {int port = PortHelper.getPort(args);
System.setProperty(ConfigurationKeys.SERVER_PORT, Integer.toString(port));
// 省略不重要的代码
NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(WORKING_THREADS);
//server port
nettyRemotingServer.setListenPort(parameterParser.getPort());
UUIDGenerator.init(parameterParser.getServerNode());
// 省略其余代码
}
public AbstractNettyRemotingServer(ThreadPoolExecutor messageExecutor, NettyServerConfig nettyServerConfig) {super(messageExecutor);
serverBootstrap = new NettyServerBootstrap(nettyServerConfig);
serverBootstrap.setChannelHandlers(new ServerHandler());
}
public class NettyServerBootstrap implements RemotingBootstrap {private final ServerBootstrap serverBootstrap = new ServerBootstrap();
private final EventLoopGroup eventLoopGroupWorker;
private final EventLoopGroup eventLoopGroupBoss;
private final NettyServerConfig nettyServerConfig;
private ChannelHandler[] channelHandlers;
private int listenPort;
private final AtomicBoolean initialized = new AtomicBoolean(false);
public NettyServerBootstrap(NettyServerConfig nettyServerConfig) {
this.nettyServerConfig = nettyServerConfig;
if (NettyServerConfig.enableEpoll()) { // 判断是否是 linux 的 epoll
this.eventLoopGroupBoss = new EpollEventLoopGroup(nettyServerConfig.getBossThreadSize(),
new NamedThreadFactory(nettyServerConfig.getBossThreadPrefix(), nettyServerConfig.getBossThreadSize()));
this.eventLoopGroupWorker = new EpollEventLoopGroup(nettyServerConfig.getServerWorkerThreads(),
new NamedThreadFactory(nettyServerConfig.getWorkerThreadPrefix(),
nettyServerConfig.getServerWorkerThreads()));
} else {this.eventLoopGroupBoss = new NioEventLoopGroup(nettyServerConfig.getBossThreadSize(),
new NamedThreadFactory(nettyServerConfig.getBossThreadPrefix(), nettyServerConfig.getBossThreadSize()));
this.eventLoopGroupWorker = new NioEventLoopGroup(nettyServerConfig.getServerWorkerThreads(),
new NamedThreadFactory(nettyServerConfig.getWorkerThreadPrefix(),
nettyServerConfig.getServerWorkerThreads()));
}
}
2:在下面的代码中,创立了 ServerBootstrap。如果不是 linux 的 epoll,则创立的 boss 线程选择器和工作线程选择器都是 NioEventLoopGroup。
其中,boss 线程选择器的线程数默认是 1,工作线程选择器的默认线程数量是 CPU 核数 * 2。
在代码 serverBootstrap.setChannelHandlers(new ServerHandler()) 中,增加了通道 handle:ServerHandler,这个前面会增加到事件处理流水线中。
3: main 办法前面会调用 nettyRemotingServer.init() 办法进行启动 netty 服务。该办法调用的是 NettyServerBootstrap#start 建设 netty 服务,代码如下:
public void start() {this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupWorker) // 增加 BOOT 事件选择器和工作事件选择器
.channel(NettyServerConfig.SERVER_CHANNEL_CLAZZ) // 设置通道类型,默认的是 NioServerSocketChannel
.option(ChannelOption.SO_BACKLOG, nettyServerConfig.getSoBackLogSize()) // 设置反对连贯的数量
.option(ChannelOption.SO_REUSEADDR, true) //
.childOption(ChannelOption.SO_KEEPALIVE, true) // 关上 TCP 的心态,检测是否放弃连贯
.childOption(ChannelOption.TCP_NODELAY, true) // 立刻发送数据,不进行 TCP 音讯缓存
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSendBufSize()) // 发送缓存区大小
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketResvBufSize()) // 承受音讯缓存区大小
.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, // 设置缓存水位线
new WriteBufferWaterMark(nettyServerConfig.getWriteBufferLowWaterMark(),
nettyServerConfig.getWriteBufferHighWaterMark()))
.localAddress(new InetSocketAddress(listenPort)) // 设置 IP 和端口
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {ch.pipeline().addLast(new IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), 0, 0)) // 设置通道闲暇检测工作
.addLast(new ProtocolV1Decoder()) // 音讯解码
.addLast(new ProtocolV1Encoder()); // 音讯编码
if (channelHandlers != null) {addChannelPipelineLast(ch, channelHandlers); // 设置音讯处理器,值是下面提到的 ServerHandler
}
}
});
try {ChannelFuture future = this.serverBootstrap.bind(listenPort).sync(); // 绑定端口
LOGGER.info("Server started, listen port: {}", listenPort);
RegistryFactory.getInstance().register(new InetSocketAddress(XID.getIpAddress(), XID.getPort()));
initialized.set(true);
future.channel().closeFuture().sync();} catch (Exception exx) {throw new RuntimeException(exx);
}
}
4:IdleStateHandler 继承了 ChannelDuplexHandler,它是一个入站处理器,也是一个出站处理器。其继承关系如下:
参数含意:readerIdleTimeSeconds:读闲暇工夫,当指定工夫内没有读取音讯,会发送事件 READER_IDLE
writerIdleTimeSeconds:写闲暇工夫,当指定工夫内没有写入音讯,会发送事件 WRITER_IDLE
allIdleTimeSeconds:读写闲暇工夫,当指定工夫既没读取音讯,也没有写入音讯,发送事件 ALL_IDLE
下面三个参数如果是小于等于 0,则示意禁用对应事件。
5:在通道的 channelActive() 办法中,当通道激活时,会创立定时工作。
private void initialize(ChannelHandlerContext ctx) {switch (state) {
case 1:
case 2:
return;
}
state = 1;
initOutputChanged(ctx);
lastReadTime = lastWriteTime = ticksInNanos();
if (readerIdleTimeNanos > 0) {readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
readerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (writerIdleTimeNanos > 0) {writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
writerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (allIdleTimeNanos > 0) {allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
allIdleTimeNanos, TimeUnit.NANOSECONDS);
}
}
5:入站处理器 ProtocolV1Decoder 继承了 LengthFieldBasedFrameDecoder,次要目标是解析读取的信息。其继承关系如下:
其构造函数办法:
在 ProtocolV1Decoder#decodeFrame 办法中进行音讯解析,把字节流转换为 RpcMessage 对象。代码如下:
public Object decodeFrame(ByteBuf frame) {byte b0 = frame.readByte();
byte b1 = frame.readByte();
if (ProtocolConstants.MAGIC_CODE_BYTES[0] != b0
|| ProtocolConstants.MAGIC_CODE_BYTES[1] != b1) {throw new IllegalArgumentException("Unknown magic code:" + b0 + "," + b1);
}
byte version = frame.readByte();
// TODO check version compatible here
int fullLength = frame.readInt();
short headLength = frame.readShort();
byte messageType = frame.readByte();
byte codecType = frame.readByte();
byte compressorType = frame.readByte();
int requestId = frame.readInt();
RpcMessage rpcMessage = new RpcMessage();
rpcMessage.setCodec(codecType);
rpcMessage.setId(requestId);
rpcMessage.setCompressor(compressorType);
rpcMessage.setMessageType(messageType);
// direct read head with zero-copy
int headMapLength = headLength - ProtocolConstants.V1_HEAD_LENGTH;
if (headMapLength > 0) {Map<String, String> map = HeadMapSerializer.getInstance().decode(frame, headMapLength);
rpcMessage.getHeadMap().putAll(map);
}
// read body
if (messageType == ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST) {rpcMessage.setBody(HeartbeatMessage.PING);
} else if (messageType == ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE) {rpcMessage.setBody(HeartbeatMessage.PONG);
} else {
int bodyLength = fullLength - headLength;
if (bodyLength > 0) {byte[] bs = new byte[bodyLength];
frame.readBytes(bs);
Compressor compressor = CompressorFactory.getCompressor(compressorType);
bs = compressor.decompress(bs);
Serializer serializer = EnhancedServiceLoader.load(Serializer.class, SerializerType.getByCode(rpcMessage.getCodec()).name());
rpcMessage.setBody(serializer.deserialize(bs));
}
}
return rpcMessage;
}
6:出站处理器 ProtocolV1Encoder 继承了 MessageToByteEncoder,次要是对音讯进行编码解决,把 RpcMessage 对象转换为字节流。继承关系如下:
在 ProtocolV1Encoder#encode 的源码如下:
public void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) {
try {if (msg instanceof RpcMessage) {RpcMessage rpcMessage = (RpcMessage) msg;
int fullLength = ProtocolConstants.V1_HEAD_LENGTH;
int headLength = ProtocolConstants.V1_HEAD_LENGTH;
byte messageType = rpcMessage.getMessageType();
out.writeBytes(ProtocolConstants.MAGIC_CODE_BYTES);
out.writeByte(ProtocolConstants.VERSION);
// full Length(4B) and head length(2B) will fix in the end.
out.writerIndex(out.writerIndex() + 6);
out.writeByte(messageType);
out.writeByte(rpcMessage.getCodec());
out.writeByte(rpcMessage.getCompressor());
out.writeInt(rpcMessage.getId());
// direct write head with zero-copy
Map<String, String> headMap = rpcMessage.getHeadMap();
if (headMap != null && !headMap.isEmpty()) {int headMapBytesLength = HeadMapSerializer.getInstance().encode(headMap, out);
headLength += headMapBytesLength;
fullLength += headMapBytesLength;
}
byte[] bodyBytes = null;
if (messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST
&& messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE) {
// heartbeat has no body
Serializer serializer = EnhancedServiceLoader.load(Serializer.class, SerializerType.getByCode(rpcMessage.getCodec()).name());
bodyBytes = serializer.serialize(rpcMessage.getBody());
Compressor compressor = CompressorFactory.getCompressor(rpcMessage.getCompressor());
bodyBytes = compressor.compress(bodyBytes);
fullLength += bodyBytes.length;
}
if (bodyBytes != null) {out.writeBytes(bodyBytes);
}
// fix fullLength and headLength
int writeIndex = out.writerIndex();
// skip magic code(2B) + version(1B)
out.writerIndex(writeIndex - fullLength + 3);
out.writeInt(fullLength);
out.writeShort(headLength);
out.writerIndex(writeIndex);
} else {throw new UnsupportedOperationException("Not support this class:" + msg.getClass());
}
} catch (Throwable e) {LOGGER.error("Encode request error!", e);
}
}
7:处理器 ServerHandler 是解决 TM,RM 发送的 RpcMessage 音讯。其继承关系如下:
在 AbstractNettyRemotingServer.ServerHandler#channelRead 办法里解决 RpcMessage 音讯。
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {if (!(msg instanceof RpcMessage)) {return;}
processMessage(ctx, (RpcMessage) msg);
}