共计 4722 个字符,预计需要花费 12 分钟才能阅读完成。
这次咱们看的是客户端局部。
1:在客户端咱们应用的是注解 @GlobalTransactional。会创立代理 GlobalTransactionScanner。在代理的初始化代码中,会进行 TM 和 RM 的初始化,代码如下:
private void initClient() {if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));
}
//init TM
TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
if (LOGGER.isInfoEnabled()) {LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
}
//init RM
RMClient.init(applicationId, txServiceGroup);
registerSpringShutdownHook();}
2:在 TMClient 或者 RMClient 的 init 办法里,会创立 NettyClientBootstrap 实例。在 NettyClientBootstrap 结构过程中,会创立 Bootstrap 实例,也会创立 NioEventLoopGroup 的客户端事件选择器。代码如下:
public class NettyClientBootstrap implements RemotingBootstrap {private static final Logger LOGGER = LoggerFactory.getLogger(NettyClientBootstrap.class);
private final NettyClientConfig nettyClientConfig;
private final Bootstrap bootstrap = new Bootstrap();
private final EventLoopGroup eventLoopGroupWorker;
private EventExecutorGroup defaultEventExecutorGroup;
private final AtomicBoolean initialized = new AtomicBoolean(false);
public NettyClientBootstrap(NettyClientConfig nettyClientConfig, final EventExecutorGroup eventExecutorGroup,
NettyPoolKey.TransactionRole transactionRole) {if (nettyClientConfig == null) {nettyClientConfig = new NettyClientConfig();
}
this.nettyClientConfig = nettyClientConfig;
int selectorThreadSizeThreadSize = this.nettyClientConfig.getClientSelectorThreadSize();
this.transactionRole = transactionRole;
this.eventLoopGroupWorker = new NioEventLoopGroup(selectorThreadSizeThreadSize,
new NamedThreadFactory(getThreadPrefix(this.nettyClientConfig.getClientSelectorThreadPrefix()),
selectorThreadSizeThreadSize));
this.defaultEventExecutorGroup = eventExecutorGroup;
}
3:创立之后,会调用 NettyClientBootstrap 的 start 办法,建设 netty 的客户端代码,如下:
public void start() {this.bootstrap.group(this.eventLoopGroupWorker).channel( // 绑定事件选择器
nettyClientConfig.getClientChannelClazz()).option( // 设置通道类型,默认是 NioSocketChannel
ChannelOption.TCP_NODELAY, true) // TCP 不缓存间接发送
.option(ChannelOption.SO_KEEPALIVE, true) // TCP 进行心跳检测
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()) // 设置连贯超时工夫
.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize()) // 设置发送缓存区大小
.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize()); // 设置承受缓冲区大小
bootstrap.handler(new ChannelInitializer<SocketChannel>() { // 设置通道处理器
@Override
public void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(), // 增加通道闲暇心跳处理器
nettyClientConfig.getChannelMaxWriteIdleSeconds(),
nettyClientConfig.getChannelMaxAllIdleSeconds()))
.addLast(new ProtocolV1Decoder()) // 通道音讯解码处理器
.addLast(new ProtocolV1Encoder()); // 通道音讯编码处理器
if (channelHandlers != null) {addChannelPipelineLast(ch, channelHandlers); // 增加处理器 ClientHandler
}
}
});
if (initialized.compareAndSet(false, true) && LOGGER.isInfoEnabled()) {LOGGER.info("NettyClientBootstrap has started");
}
}
4:在 seata 客户端,应用 netty 客户端的时候,应用了池化技术,其工厂类是 NettyPoolableFactory。在 makeObject 办法中去获取 netty 的连贯通道。获取通道的代码如下:
public Channel getNewChannel(InetSocketAddress address) {
Channel channel;
ChannelFuture f = this.bootstrap.connect(address); // 连贯 netty 服务器
try {f.await(this.nettyClientConfig.getConnectTimeoutMillis(), TimeUnit.MILLISECONDS); // 期待连贯实现
if (f.isCancelled()) {throw new FrameworkException(f.cause(), "connect cancelled, can not connect to services-server.");
} else if (!f.isSuccess()) {throw new FrameworkException(f.cause(), "connect failed, can not connect to services-server.");
} else {channel = f.channel(); // 获取通道
}
} catch (Exception e) {throw new FrameworkException(e, "can not connect to services-server.");
}
return channel;
}
5:发送音讯的示例代码(这是须要获取返回值的状况,如果不须要获取返回值,间接调用 channel.writeAndFlush() 即可):
protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMillis) throws TimeoutException {MessageFuture messageFuture = new MessageFuture();
messageFuture.setRequestMessage(rpcMessage);
messageFuture.setTimeout(timeoutMillis);
futures.put(rpcMessage.getId(), messageFuture);
channelWritableCheck(channel, rpcMessage.getBody());
String remoteAddr = ChannelUtil.getAddressFromChannel(channel);
doBeforeRpcHooks(remoteAddr, rpcMessage);
channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {if (!future.isSuccess()) {MessageFuture messageFuture1 = futures.remove(rpcMessage.getId());
if (messageFuture1 != null) {messageFuture1.setResultMessage(future.cause());
}
destroyChannel(future.channel());
}
});
try {Object result = messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
doAfterRpcHooks(remoteAddr, rpcMessage, result);
return result;
} catch (Exception exx) {if (exx instanceof TimeoutException) {throw (TimeoutException) exx;
} else {throw new RuntimeException(exx);
}
}
}
正文完