关于java:seata中netty的使用源码二

7次阅读

共计 4722 个字符,预计需要花费 12 分钟才能阅读完成。

这次咱们看的是客户端局部。

1:在客户端咱们应用的是注解 @GlobalTransactional。会创立代理 GlobalTransactionScanner。在代理的初始化代码中,会进行 TM 和 RM 的初始化,代码如下:

private void initClient() {if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));
    }
    //init TM
    TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
    if (LOGGER.isInfoEnabled()) {LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
    }
    //init RM
    RMClient.init(applicationId, txServiceGroup);
    registerSpringShutdownHook();}

2:在 TMClient 或者 RMClient 的 init 办法里,会创立 NettyClientBootstrap 实例。在 NettyClientBootstrap 结构过程中,会创立 Bootstrap 实例,也会创立 NioEventLoopGroup 的客户端事件选择器。代码如下:

public class NettyClientBootstrap implements RemotingBootstrap {private static final Logger LOGGER = LoggerFactory.getLogger(NettyClientBootstrap.class);
    private final NettyClientConfig nettyClientConfig;
    private final Bootstrap bootstrap = new Bootstrap();
    private final EventLoopGroup eventLoopGroupWorker;
    private EventExecutorGroup defaultEventExecutorGroup;
    private final AtomicBoolean initialized = new AtomicBoolean(false);
    
    public NettyClientBootstrap(NettyClientConfig nettyClientConfig, final EventExecutorGroup eventExecutorGroup,
        NettyPoolKey.TransactionRole transactionRole) {if (nettyClientConfig == null) {nettyClientConfig = new NettyClientConfig();
        }
        this.nettyClientConfig = nettyClientConfig;
        int selectorThreadSizeThreadSize = this.nettyClientConfig.getClientSelectorThreadSize();
        this.transactionRole = transactionRole;
        this.eventLoopGroupWorker = new NioEventLoopGroup(selectorThreadSizeThreadSize,
            new NamedThreadFactory(getThreadPrefix(this.nettyClientConfig.getClientSelectorThreadPrefix()),
            selectorThreadSizeThreadSize));
        this.defaultEventExecutorGroup = eventExecutorGroup;
}

3:创立之后,会调用 NettyClientBootstrap 的 start 办法,建设 netty 的客户端代码,如下:

public void start() {this.bootstrap.group(this.eventLoopGroupWorker).channel( // 绑定事件选择器
        nettyClientConfig.getClientChannelClazz()).option( // 设置通道类型,默认是 NioSocketChannel
        ChannelOption.TCP_NODELAY, true) // TCP 不缓存间接发送
        .option(ChannelOption.SO_KEEPALIVE, true) // TCP 进行心跳检测
        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()) // 设置连贯超时工夫
        .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize()) // 设置发送缓存区大小
        .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize()); // 设置承受缓冲区大小
          
    bootstrap.handler(new ChannelInitializer<SocketChannel>() { // 设置通道处理器
            @Override
       public void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();
                pipeline.addLast(new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(), // 增加通道闲暇心跳处理器
                         nettyClientConfig.getChannelMaxWriteIdleSeconds(),
                         nettyClientConfig.getChannelMaxAllIdleSeconds()))
                    .addLast(new ProtocolV1Decoder()) // 通道音讯解码处理器
                    .addLast(new ProtocolV1Encoder()); // 通道音讯编码处理器
                if (channelHandlers != null) {addChannelPipelineLast(ch, channelHandlers); // 增加处理器 ClientHandler
                 }
            }
        });
        if (initialized.compareAndSet(false, true) && LOGGER.isInfoEnabled()) {LOGGER.info("NettyClientBootstrap has started");
      }
}

4:在 seata 客户端,应用 netty 客户端的时候,应用了池化技术,其工厂类是 NettyPoolableFactory。在 makeObject 办法中去获取 netty 的连贯通道。获取通道的代码如下:

public Channel getNewChannel(InetSocketAddress address) {
    Channel channel;
    ChannelFuture f = this.bootstrap.connect(address); // 连贯 netty 服务器
    try {f.await(this.nettyClientConfig.getConnectTimeoutMillis(), TimeUnit.MILLISECONDS); // 期待连贯实现
        if (f.isCancelled()) {throw new FrameworkException(f.cause(), "connect cancelled, can not connect to services-server.");
        } else if (!f.isSuccess()) {throw new FrameworkException(f.cause(), "connect failed, can not connect to services-server.");
        } else {channel = f.channel(); // 获取通道
        }
    } catch (Exception e) {throw new FrameworkException(e, "can not connect to services-server.");
    }
    return channel;
}

5:发送音讯的示例代码(这是须要获取返回值的状况,如果不须要获取返回值,间接调用 channel.writeAndFlush() 即可):

protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMillis) throws TimeoutException {MessageFuture messageFuture = new MessageFuture();
    messageFuture.setRequestMessage(rpcMessage);
    messageFuture.setTimeout(timeoutMillis);
    futures.put(rpcMessage.getId(), messageFuture);
    channelWritableCheck(channel, rpcMessage.getBody());
    String remoteAddr = ChannelUtil.getAddressFromChannel(channel);
    doBeforeRpcHooks(remoteAddr, rpcMessage);
    channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {if (!future.isSuccess()) {MessageFuture messageFuture1 = futures.remove(rpcMessage.getId());
            if (messageFuture1 != null) {messageFuture1.setResultMessage(future.cause());
            }
            destroyChannel(future.channel());
        }
    });
    try {Object result = messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
        doAfterRpcHooks(remoteAddr, rpcMessage, result);
        return result;
    } catch (Exception exx) {if (exx instanceof TimeoutException) {throw (TimeoutException) exx;
        } else {throw new RuntimeException(exx);
        }
    }
}
正文完
 0