这次咱们看的是客户端局部。

1:在客户端咱们应用的是注解 @GlobalTransactional。会创立代理 GlobalTransactionScanner。在代理的初始化代码中,会进行TM和RM的初始化,代码如下:

private void initClient() {    if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {        throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));    }    //init TM    TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);    if (LOGGER.isInfoEnabled()) {        LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);    }    //init RM    RMClient.init(applicationId, txServiceGroup);    registerSpringShutdownHook();}

2:在 TMClient 或者 RMClient 的init 办法里,会创立 NettyClientBootstrap 实例。在 NettyClientBootstrap 结构过程中,会创立 Bootstrap 实例,也会创立 NioEventLoopGroup 的客户端事件选择器。代码如下:

public class NettyClientBootstrap implements RemotingBootstrap {    private static final Logger LOGGER = LoggerFactory.getLogger(NettyClientBootstrap.class);    private final NettyClientConfig nettyClientConfig;    private final Bootstrap bootstrap = new Bootstrap();    private final EventLoopGroup eventLoopGroupWorker;    private EventExecutorGroup defaultEventExecutorGroup;    private final AtomicBoolean initialized = new AtomicBoolean(false);        public NettyClientBootstrap(NettyClientConfig nettyClientConfig, final EventExecutorGroup eventExecutorGroup,        NettyPoolKey.TransactionRole transactionRole) {        if (nettyClientConfig == null) {            nettyClientConfig = new NettyClientConfig();        }        this.nettyClientConfig = nettyClientConfig;        int selectorThreadSizeThreadSize = this.nettyClientConfig.getClientSelectorThreadSize();        this.transactionRole = transactionRole;        this.eventLoopGroupWorker = new NioEventLoopGroup(selectorThreadSizeThreadSize,            new NamedThreadFactory(getThreadPrefix(this.nettyClientConfig.getClientSelectorThreadPrefix()),            selectorThreadSizeThreadSize));        this.defaultEventExecutorGroup = eventExecutorGroup;}

3:创立之后,会调用 NettyClientBootstrap 的 start 办法,建设netty的客户端代码,如下:

public void start() {    this.bootstrap.group(this.eventLoopGroupWorker).channel( //绑定事件选择器        nettyClientConfig.getClientChannelClazz()).option( //设置通道类型,默认是NioSocketChannel        ChannelOption.TCP_NODELAY, true) // TCP不缓存间接发送        .option(ChannelOption.SO_KEEPALIVE, true) // TCP进行心跳检测        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()) // 设置连贯超时工夫        .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize()) //设置发送缓存区大小        .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize()); //设置承受缓冲区大小              bootstrap.handler(new ChannelInitializer<SocketChannel>() { //设置通道处理器            @Override       public void initChannel(SocketChannel ch) {                ChannelPipeline pipeline = ch.pipeline();                pipeline.addLast(                    new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(), //增加通道闲暇心跳处理器                         nettyClientConfig.getChannelMaxWriteIdleSeconds(),                         nettyClientConfig.getChannelMaxAllIdleSeconds()))                    .addLast(new ProtocolV1Decoder()) //通道音讯解码处理器                    .addLast(new ProtocolV1Encoder()); //通道音讯编码处理器                if (channelHandlers != null) {                    addChannelPipelineLast(ch, channelHandlers); //增加处理器 ClientHandler                 }            }        });        if (initialized.compareAndSet(false, true) && LOGGER.isInfoEnabled()) {            LOGGER.info("NettyClientBootstrap has started");      }}

4:在seata客户端,应用netty客户端的时候,应用了池化技术,其工厂类是 NettyPoolableFactory。在 makeObject 办法中去获取netty的连贯通道。获取通道的代码如下:

public Channel getNewChannel(InetSocketAddress address) {    Channel channel;    ChannelFuture f = this.bootstrap.connect(address); // 连贯netty服务器    try {        f.await(this.nettyClientConfig.getConnectTimeoutMillis(), TimeUnit.MILLISECONDS); // 期待连贯实现        if (f.isCancelled()) {            throw new FrameworkException(f.cause(), "connect cancelled, can not connect to services-server.");        } else if (!f.isSuccess()) {            throw new FrameworkException(f.cause(), "connect failed, can not connect to services-server.");        } else {            channel = f.channel(); //获取通道        }    } catch (Exception e) {        throw new FrameworkException(e, "can not connect to services-server.");    }    return channel;}

5:发送音讯的示例代码(这是须要获取返回值的状况,如果不须要获取返回值,间接调用 channel.writeAndFlush()即可):

protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMillis) throws TimeoutException {    MessageFuture messageFuture = new MessageFuture();    messageFuture.setRequestMessage(rpcMessage);    messageFuture.setTimeout(timeoutMillis);    futures.put(rpcMessage.getId(), messageFuture);    channelWritableCheck(channel, rpcMessage.getBody());    String remoteAddr = ChannelUtil.getAddressFromChannel(channel);    doBeforeRpcHooks(remoteAddr, rpcMessage);    channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {        if (!future.isSuccess()) {            MessageFuture messageFuture1 = futures.remove(rpcMessage.getId());            if (messageFuture1 != null) {                messageFuture1.setResultMessage(future.cause());            }            destroyChannel(future.channel());        }    });    try {        Object result = messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);        doAfterRpcHooks(remoteAddr, rpcMessage, result);        return result;    } catch (Exception exx) {        if (exx instanceof TimeoutException) {            throw (TimeoutException) exx;        } else {            throw new RuntimeException(exx);        }    }}