一、Reactor 模式
reactor
模式是一种事件驱动的应用层 I/O 解决模式,基于分而治之和事件驱动的思维,致力于构建一个高性能的可伸缩的 I/O 解决模式。维基百科对 Reactor pattern 的解释:
The reactor design pattern is an event handling pattern for handling service requests delivered concurrently to a service handler by one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to the associated request handlers
大抵意思是说,reactor
设计模式是一种事件处理模式,用于同时有一个或多个申请发送到事件处理器(service handler),这个事件处理器会采纳多路拆散(demultiplexes )的形式,同步的将这些申请散发到申请处理器(request handlers)。
不难看出,上边介绍的 reactor
模式是一种形象;从实现角度说,reactor
模式有许多变种,不同编程语言中的实现也有差别。就 java 而言,巨匠 Doug Lea 在其【Scalable IO in Java】中就讲述了几个reactor
模式的演进,如单线程版本
、多线程版
,浏览此文后,笔者对巨匠所讲reactor
模式演进的了解与网络中一些形容稍有差别。
在reactor
单线程版中,只有一个reactor
线程,线程中通过 select
(I/O 多路复用接口) 监听所有 I/O 事件,收到 I/O 事件后通过 dispatch
进行分发给 Handlers
解决,此版本容易实现,也容易了解,但性能不高。为了适配多处理器,充分利用多核并行处理的劣势,实现高性能的网络服务,能够采纳分治策略,关键环节采纳多线程模式,于是就呈现了reactor
多线程版本,而多线程的利用体现为worder
线程和reactor
线程,多线程应该被池化治理,这样才容易被调整和管制。线程池中的线程数会比客户端的数量少很多,理论数量能够依据程序自身是 CPU 密集型还是 I/O 密集型操作来进行正当的调配。
多个 worder 线程(池化治理)
- 属于网络 I/O 操作与业务解决的拆分,因为
reactors
监听到 I/O 事件后应该疾速分发给handlers
来处理程序;但如果handler
中的非 I/O 操作慢了就会减慢reactor
中的 I/O 事件响应速度,所以把非 I/O 操作从reactors
的 I/O 线程转移到其余线程中,即由worker
线程来分担非 I/O 逻辑的操作解决。
- 属于网络 I/O 操作与业务解决的拆分,因为
多个 reactor 线程(池化治理)
- 属于网络建连操作与网络 I/O 读写操作的拆分,因为由一个
reactor
在一个线程中实现所有 I/O 操作也会遇到性能瓶颈,可采取拆分并减少reactor
策略,将 I/O 负载调配给多个reactor
(每个reactor
都有本人的线程、选择器和调度循环)以达到负载平衡。这看起来挺不错,但谁来执行调配以达到负载平衡呢?或者是因为这个问题,将reactor
拆分为两类角色,mainReactor
负责接管连贯,之后采纳肯定的负载平衡策略将新连贯调配给其余subReactor
来解决 I/O 读写,这样的拆分天然晦涩。
- 属于网络建连操作与网络 I/O 读写操作的拆分,因为由一个
如此就演进出如上图中的主从reactor
多线程模型。请留神,联合【Scalable IO in Java】原文中的用词和形容看,上图中的mainReactor
和subReactor
能够有多个并做池化治理,所有也有一些文章中会看到如主ReactorGroup
、mainReactorGroup
、从ReactorGroup
、subReactorGroup
等这类名词用 Group 后缀来强调 Reactor 是池化治理。 或者是不好布局,也或者是为了凸显主从reactor
角色的协作关系,上图中都只展现了一个,另外服务端利用通常只裸露一个服务端口时,只需用一个 mainReactor
来监听端口上的连贯事件并解决。
二、Netty 主从 reactor
多线程模型
Netty
中reactor
所对应的实现类是NioEventLoop
,其外围逻辑如下:
- 不同类型的 channel 向 Selector 注册所感兴趣的事件
- 扫描是否有感兴趣的事件产生
- 事件产生后做相应的解决
客户端和服务端别离会有不同类型的channel
,客户端创立SocketChannel
向服务端发动连贯申请,服务端创立ServerSocketChannel
监听客户端连贯,建连后创立SocketChannel
与客户端的SocketChannel
相互收发数据,这些channel
分工不同,向 Selector 注册所感兴趣的事件状况也不同:
客户端/服务端 | channel | OP_ACCEPT | OP_CONNECT | OP_WRITE | OP_READ |
---|---|---|---|---|---|
客户端 | SocketChannel | YES | |||
服务端 | ServerSocketChannel | YES | |||
服务端 | SocketChannel | YES | YES |
Netty
中 Nio 形式实现几种 reactor
模型如下:
mainReactor
对应 Netty
中配置的 bossGroup
线程组(下图中的主ReactorGroup
),次要负责承受客户端连贯的建设。每 bind
一个端口就用掉一个bossGroup
中的线程。
subReactor
对应 Netty
中配置的 workerGroup
线程组(下图中的 reactorGroup
),bossGroup
线程组承受完客户端的连贯后,将 channel
转交给 workerGroup
线程组,在 workerGroup
线程组内抉择一个线程,执行 I/O 读写的解决,workerGroup
线程组默认是 2 * CPU 核数个线程。
主从 reactor
模式的外围流程:
- 如果只监听一个端口,那么只需一个主
reactor
干活儿,所以通常看到boosGroup
只配置一个线程。主reactor
运行在独立的线程中 ,该线程中只负责与客户端的连贯申请 - 从
reactor
在服务器端能够不止一个, 通常运行多个从reactor
, 每个从reactor
也运行在一个独立的线程中 ,负责与客户端的读写操作 - 主
reactor
检测到客户端的链接后,创立NioSocketChannel
,依照肯定的算法循环选取(负载平衡)一个从reactor
,并把刚创立的NioSocketChannel
注册到这个从reactor
中,这样建连和读写事件互不影响。 - 一个
reactor
中可被注册多个NioSocketChannel
,这个reactor
监听所有的被调配的NioSocketChannel
的读写事件 , 如果监听到客户端的数据发送事件 , 将对应的业务逻辑转发给NioSocketChannel
中的pipeline
里的handler
链进行解决 handler
最好只负责响应 I/O 事件,不解决具体的与客户端交互的业务逻辑 , 这样不会长时间阻塞 , 其read
办法读取客户端数据后 , 将音讯数据交给业务线程池去解决相干业务逻辑- 业务线程池实现相干业务逻辑的解决后,将后果返回,通过
NioSocketChannel
的的pipeline
里的handler
链将后果音讯写回给客户端 - 当
buffer
不满足将后果音讯写回给客户端时的条件时,注册写事件,期待可写时再写
三、Seata Server 端 的 reactor 模式利用
Seata Server 采纳了 主从 reactor
多线程模型,对应这个模型的话是有四个线程池,其中自定义业务线程池是两个。
性能 | 线程池对象 | 备注 |
---|---|---|
接管客户端连贯 | NettyServerBootstrap#eventLoopGroupBoss | |
解决 IO 事件 | NettyServerBootstrap#eventLoopGroupWorker | 局部 RPC 音讯在这里解决 |
解决客户端的 request 音讯 | AbstractNettyRemoting#messageExecutor | 客户端被动发给的音讯 |
解决客户端的 response 音讯 | NettyRemotingServer#branchResultMessageExecutor | 服务端被动发给客户端音讯,客户端解决后给服务端响应 |
3.1、NettyServerBootstrap#eventLoopGroupBoss
笔者的环境未启用 epoll
,要害信息如下:
- 线程数:1,只监听一个端口
- 线程名前缀:“NettyBoss”
this.eventLoopGroupBoss = new NioEventLoopGroup( //CONFIG.getInt("transport.threadFactory.bossThreadSize", 1); nettyServerConfig.getBossThreadSize(), new NamedThreadFactory( // CONFIG.getConfig("transport.threadFactory.bossThreadPrefix", "NettyBoss"); nettyServerConfig.getBossThreadPrefix(), //CONFIG.getConfig("transport.threadFactory.bossThreadSize", 1); nettyServerConfig.getBossThreadSize()));
3.2、NettyServerBootstrap#eventLoopGroupWorker
笔者的环境未启用 epoll
,要害信息如下:
- 线程数:默认值是 cpu 核数 * 2
- 线程名前缀:“NettyServerNIOWorker”
this.eventLoopGroupWorker = new NioEventLoopGroup( // System.getProperty("transport.serverWorkerThreads", String.valueOf(WORKER_THREAD_SIZE)));//默认值cpu核数*2 nettyServerConfig.getServerWorkerThreads(), new NamedThreadFactory( // CONFIG.getConfig("transport.threadFactory.workerThreadPrefix", // enableEpoll() ? EPOLL_WORKER_THREAD_PREFIX : DEFAULT_NIO_WORKER_THREAD_PREFIX); // 默认值 NettyServerNIOWorker ,没有启用 epoll nettyServerConfig.getWorkerThreadPrefix(), //System.getProperty("transport.serverWorkerThreads", String.valueOf(WORKER_THREAD_SIZE)));//默认值 cpu核数*2 nettyServerConfig.getServerWorkerThreads()));
3.3、AbstractNettyRemoting#messageExecutor
此线程池解决客户端的 request 音讯,要害参数信息如下:
- 线程数:50 ~ 500
- keepAlive:500 秒
- 线程名字前缀: "ServerHandlerThread"
- 队列长度: 500
- 回绝策略:CallerRunsPolicy(),饱和的状况下,调用者来执行该工作,即 Netty 的 I/O 线程
ThreadPoolExecutor workingThreads = new ThreadPoolExecutor( //Integer.parseInt(System.getProperty("transport.minServerPoolSize", "50")); NettyServerConfig.getMinServerPoolSize(), //Integer.parseInt(System.getProperty("transport.maxServerPoolSize", "500")); NettyServerConfig.getMaxServerPoolSize(), //Integer.parseInt(System.getProperty("transport.keepAliveTime", "500")); NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS, //Integer.parseInt(System.getProperty("transport.maxTaskQueueSize", "500")); new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()), new NamedThreadFactory( "ServerHandlerThread", //Integer.parseInt(System.getProperty("transport.maxServerPoolSize", "500")); NettyServerConfig.getMaxServerPoolSize()), //饱和的状况下,调用者来执行该工作,即Netty的IO线程 new ThreadPoolExecutor.CallerRunsPolicy());
3.4、NettyRemotingServer#branchResultMessageExecutor
此线程池解决客户端的 response 音讯,要害参数信息如下:
- 线程数:cpu 核数2 ~ cpu 核数2
- keepAlive:500 秒
- 线程名字前缀: "BranchResultHandlerThread"
- 队列长度: 20000
- 回绝策略:
CallerRunsPolicy()
,饱和的状况下,调用者来执行该工作,即 Netty 的 IO 线程
private ThreadPoolExecutor branchResultMessageExecutor = new ThreadPoolExecutor( //System.getProperty("transport.minBranchResultPoolSize", String.valueOf(WORKER_THREAD_SIZE))),默认值 cpu核数*2 NettyServerConfig.getMinBranchResultPoolSize(), //System.getProperty("transport.maxBranchResultPoolSize", String.valueOf(WORKER_THREAD_SIZE))),默认值 cpu核数*2 NettyServerConfig.getMaxBranchResultPoolSize(), // System.getProperty("transport.keepAliveTime", "500"),默认值500 NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS, new LinkedBlockingQueue<>( //System.getProperty("transport.maxTaskQueueSize", "20000"),默认值 20000 NettyServerConfig.getMaxTaskQueueSize()), new NamedThreadFactory( // 分支响应音讯的解决线程的名字前缀 BranchResultHandlerThread "BranchResultHandlerThread", // System.getProperty("transport.maxBranchResultPoolSize", String.valueOf(WORKER_THREAD_SIZE))),默认值 cpu核数*2 NettyServerConfig.getMaxBranchResultPoolSize() ), //饱和的状况下,调用者来执行该工作,即Netty的IO线程 new ThreadPoolExecutor.CallerRunsPolicy());
3.5、业务线程池如何解决音讯
3.5.1、注销音讯处理器
Seata 音讯解决的外围逻辑是:定义好什么类型的音讯,应用哪个音讯处理器,这个音讯处理器的音讯解决逻辑在哪个线程池中执行。这个映射关系通过AbstractNettyRemoting#processorTable
来存储。
/** * 能够接管什么类型的音讯,以及应用哪个音讯处理器和线程池来解决音讯 * HashMap<音讯类型, Pair<音讯处理器, 线程池>> * processor type {@link MessageType} */protected final HashMap<Integer/*MessageType*/, Pair<RemotingProcessor, ExecutorService>> processorTable = new HashMap<>(32);
各模块 Netty 组件启动前,通过AbstractNettyRemotingServer#registerProcessor
办法注销到这个构造中。
public void registerProcessor(int messageType, RemotingProcessor processor, ExecutorService executor) { Pair<RemotingProcessor, ExecutorService> pair = new Pair<>(processor, executor); this.processorTable.put(messageType, pair);}
拿 Seata Server 来说,如在ServerBootStrap
启动前,通过NettyRemotingServer#registerProcessor
注册好消息处理器。不同音讯对应的处理器的线程池也不同,也有一些音讯没有指定业务线程池(没必要),状况如下:
private void registerProcessor() { // 1\. registry on request message processor ServerOnRequestProcessor onRequestProcessor = new ServerOnRequestProcessor(this, getHandler()); ShutdownHook.getInstance().addDisposable(onRequestProcessor); super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor); super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor); super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor); super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor); super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor); super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor); super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor); super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor); super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor); // 2\. registry on response message processor ServerOnResponseProcessor onResponseProcessor = new ServerOnResponseProcessor(getHandler(), getFutures()); super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, branchResultMessageExecutor); super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, branchResultMessageExecutor); // 3\. registry rm message processor RegRmProcessor regRmProcessor = new RegRmProcessor(this); super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor); // 4\. registry tm message processor RegTmProcessor regTmProcessor = new RegTmProcessor(this); super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null); // 5\. registry heartbeat message processor ServerHeartbeatProcessor heartbeatMessageProcessor = new ServerHeartbeatProcessor(this); super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null);}
3.5.2、解决音讯
当 Seata Server 收到客户端发送的 RPC 音讯后,会进入AbstractNettyRemotingServer.ServerHandler#channelRead
中,在这里对音讯类型简略判断后,委托给processMessage
解决。
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { if (!(msg instanceof RpcMessage)) { return; } // 收到音讯后,委托给 processMessage 解决 processMessage(ctx, (RpcMessage) msg);}
processMessage
中通过音讯类型找到音讯处理器进行业务层解决:
- 如果音讯处理器有指定的业务线程池,在指定的业务线程池中解决音讯
- 若音讯处理器没有指定的业务线程池,则在 I/O 线程中间接解决。
protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception { ... Object body = rpcMessage.getBody(); if (body instanceof MessageTypeAware) { MessageTypeAware messageTypeAware = (MessageTypeAware) body; // 通过音讯类型找到音讯处理器 final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode()); if (pair != null) { // 如果音讯处理器有指定的业务线程池 if (pair.getSecond() != null) { try { // 在指定的业务线程池中解决音讯 pair.getSecond().execute(() -> { ... pair.getFirst().process(ctx, rpcMessage); ... }); } catch (RejectedExecutionException e) { ... } } else { try { //若音讯处理器没有指定的业务线程池,则在I/O现成中间接解决。 pair.getFirst().process(ctx, rpcMessage); } catch (Throwable th) { ... } } } else { ... } } else { ... }}
四、Seata client 端的 reactor 模式利用
Seata client 端也采纳了 reactor
多线程模型,在初始化的时候有RmNettyRemotingClient
和TmNettyRemotingClient
两个对象,别离会创立各自的 Bootstrap
,RM 和 TM 各有本人的 I/O 线程池和业务线程池。
性能 | 线程池对象 | 备注 |
---|---|---|
解决 IO 事件 | NettyClientBootstrap#eventLoopGroupWorker | |
解决业务音讯 | AbstractNettyRemoting#messageExecutor |
源码里还有个NettyClientBootstrap#defaultEventExecutorGroup
,没看进去哪里有用。TmNettyRemotingClient#getInstance()
中构建了 TM 的业务线程池,赋值给NettyClientBootstrap#messageExecutor
,同样RmNettyRemotingClient#getInstance()
中构建了 RM 的业务线程池
4.1、NettyClientBootstrap#eventLoopGroupWorker
客户端此线程池要害信息如下:
- 线程数:1
线程名字前缀:
- TM:"NettyClientSelector_TMROLE"
- RM:"NettyClientSelector_RMROLE"
// 单I/O线程this.eventLoopGroupWorker = new NioEventLoopGroup(//CONFIG.getInt("transport.threadFactory.clientSelectorThreadSize", 1)selectorThreadSizeThreadSize,new NamedThreadFactory( // CONFIG.getConfig("transport.threadFactory.clientSelectorThreadPrefix", "NettyClientSelector"); // 再拼上角色后默认值为:"NettyClientSelector_TMROLE" getThreadPrefix(this.nettyClientConfig.getClientSelectorThreadPrefix()), //CONFIG.getInt("transport.threadFactory.clientSelectorThreadSize", 1) selectorThreadSizeThreadSize));
4.2、AbstractNettyRemoting#messageExecutor
TmNettyRemotingClient#getInstance()
和RmNettyRemotingClient#getInstance()
创立各自的线程池,配置并不相同。
1)TmNettyRemotingClient#getInstance()
中所创立线程池的要害信息如下:
- 线程数:默认值是 cpu 核数 2 ~ cpu 核数 2
- keepAlive:Integer.MAX_VALUE 秒
- 线程名字前缀:rpcDispatch_TMROLE
- 队列长度: 2000
- 回绝策略:
runsOldestTaskPolicy()
,饱和的状况下,增加新工作并由投递工作的线程运行最早的工作。
public static TmNettyRemotingClient getInstance() { if (instance == null) { synchronized (TmNettyRemotingClient.class) { if (instance == null) { NettyClientConfig nettyClientConfig = new NettyClientConfig(); // 自定义TM业务线程池 final ThreadPoolExecutor messageExecutor = new ThreadPoolExecutor( nettyClientConfig.getClientWorkerThreads(), // 默认是cpu核数 * 2 nettyClientConfig.getClientWorkerThreads(), // 默认是cpu核数 * 2 KEEP_ALIVE_TIME, TimeUnit.SECONDS,//Integer.MAX_VALUE; new LinkedBlockingQueue<>(MAX_QUEUE_SIZE),//2000 new NamedThreadFactory(nettyClientConfig.getTmDispatchThreadPrefix(),// TM的线程名是:rpcDispatch_TMROLE nettyClientConfig.getClientWorkerThreads()),// 默认是cpu核数 * 2 RejectedPolicies.runsOldestTaskPolicy());//增加新工作并由主线程运行最早的工作。 instance = new TmNettyRemotingClient(nettyClientConfig, null, messageExecutor); } } } return instance;}
2)RmNettyRemotingClient#getInstance()
中所创立线程池的要害信息如下:
- 线程数:默认是 cpu 核数 2 ~ cpu 核数 2
- keepAlive:Integer.MAX_VALUE 秒
- 线程名字前缀:rpcDispatch_RMROLE
- 队列长度: 20000
- 回绝策略:
CallerRunsPolicy()
,饱和的状况下,调用者来执行该工作,即 Netty 的 IO 线程。
public static RmNettyRemotingClient getInstance() { if (instance == null) { synchronized (RmNettyRemotingClient.class) { if (instance == null) { NettyClientConfig nettyClientConfig = new NettyClientConfig(); // 自定义RM业务线程池 final ThreadPoolExecutor messageExecutor = new ThreadPoolExecutor( nettyClientConfig.getClientWorkerThreads(), // 默认是cpu核数 * 2 nettyClientConfig.getClientWorkerThreads(), // 默认是cpu核数 * 2 KEEP_ALIVE_TIME, TimeUnit.SECONDS,//Integer.MAX_VALUE; new LinkedBlockingQueue<>(MAX_QUEUE_SIZE),//20000 new NamedThreadFactory( nettyClientConfig.getRmDispatchThreadPrefix(),// RM的线程名是:rpcDispatch_RMROLE, nettyClientConfig.getClientWorkerThreads()),// 默认是cpu核数 * 2 new ThreadPoolExecutor.CallerRunsPolicy());////饱和的状况下,调用者来执行该工作,即Netty的IO线程 instance = new RmNettyRemotingClient(nettyClientConfig, null, messageExecutor); } } } return instance;}
4.3、音讯解决
TmNettyRemotingClient
和RmNettyRemotingClient
在init()
办法中会调用registerProcessor()
办法注册各自的 RPC 音讯处理器。收到 RPC 音讯后就由这些处理器+对应的线程池做后续解决,音讯的相干业务属性在后续的事务流程中介绍。
五、撑持非凡能力的业务线程池
1)AbstractNettyRemotingClient#mergeSendExecutorService
用于批量发送申请,多个音讯合并,缩小通信次数。实现逻辑比拟清晰,当容许发送批量音讯时,音讯首先分桶保留到 basketMap,在一个周期性的无线循环中,把 basketMap 中的音讯队列取出来,把每个队列的音讯都放到 mergeMessage 中,最初把 mergeMessage 发送进来。
- 线程数:1
- 线程名前缀:”rpcMergeMessageSend“
AbstractNettyRemotingClient
中性能相干的属性介绍:Object mergeLock
:发送申请的锁对象。Map<Integer, MergeMessage> mergeMsgMap
:当发送音讯的类型是 MergeMessage,那么就将音讯保留到 mergeMsgMap。ConcurrentHashMap<String/*serverAddress*/, BlockingQueue<RpcMessage>> basketMap
:当容许发送批量音讯时,音讯首先分桶保留到 basketMap,而后通过定时工作将保留 basketMap 的音讯发送进来。basketMap 的是服务器的地址,value 是保留的发送个服务器的音讯。依照地址分桶是将要发给同一个服务器的多个音讯合并到一个MergedWarpMessage
后发送。
- 有配置开关,默认值如下:
transport.enableTmClientBatchSendRequest=falsetransport.enableRmClientBatchSendRequest=truetransport.enableTcServerBatchSendResponse=false
对应的要害代码逻辑如下:
- 在
AbstractNettyRemotingClient#sendSyncRequest
中,同步发送时将音讯缓存起来,默认配置看只有 RM 开启了音讯合并发送,另外同步发送超时设定,默认 TM 30 秒,RM 15 秒。依照 IP 地址分桶,同一个指标实例的音讯才能够合并发送
public Object sendSyncRequest(Object msg) throws TimeoutException { String serverAddress = loadBalance(getTransactionServiceGroup(), msg); // 同步发送超时设定,默认 TM 30秒,RM 15秒 long timeoutMillis = this.getRpcRequestTimeout(); RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC); // send batch message // put message into basketMap, @see MergedSendRunnable // 默认只有RM开启了音讯合并发送,TM 并未开启零售送 if (this.isEnableClientBatchSendRequest()) { // send batch message is sync request, needs to create messageFuture and put it in futures. MessageFuture messageFuture = new MessageFuture(); messageFuture.setRequestMessage(rpcMessage); messageFuture.setTimeout(timeoutMillis); futures.put(rpcMessage.getId(), messageFuture); // put message into basketMap // 依照指标地址分桶,同一个TC实例的音讯才能够合并发送 BlockingQueue<RpcMessage> basket = CollectionUtils.computeIfAbsent(basketMap, serverAddress, key -> new LinkedBlockingQueue<>()); if (!basket.offer(rpcMessage)) { LOGGER.error("put message into basketMap offer failed, serverAddress:{},rpcMessage:{}", serverAddress, rpcMessage); return null; } if (LOGGER.isDebugEnabled()) { LOGGER.debug("offer message: {}", rpcMessage.getBody()); } // 告诉合并发送线程 有音讯要发送,醒来干活儿 if (!isSending) { synchronized (mergeLock) { mergeLock.notifyAll(); } } try { // 阻塞期待音讯的响应。 return messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS); } catch (Exception exx) { LOGGER.error("wait response error:{},ip:{},request:{}", exx.getMessage(), serverAddress, rpcMessage.getBody()); if (exx instanceof TimeoutException) { throw (TimeoutException) exx; } else { throw new RuntimeException(exx); } } } else { // 不合并发送的话,就获取指定IP的channel,并立刻发送。 Channel channel = clientChannelManager.acquireChannel(serverAddress); return super.sendSync(channel, rpcMessage, timeoutMillis); }}
- 在
AbstractNettyRemotingClient#init
中构建线程池mergeSendExecutorService
,在这个线程池中执行音讯的批处理(音讯合并、音讯发送)。
public void init() { ... // 通过线程池有1个线程,执行音讯合并发送 if (this.isEnableClientBatchSendRequest()) { mergeSendExecutorService = new ThreadPoolExecutor( MAX_MERGE_SEND_THREAD,//1 MAX_MERGE_SEND_THREAD,//1 KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory( //TM : rpcMergeMessageSend_TMROLE //RM : rpcMergeMessageSend_RMROLE //SERVER : rpcMergeMessageSend_SERVERROLE getThreadPrefix(), MAX_MERGE_SEND_THREAD)//1 ); mergeSendExecutorService.submit(new MergedSendRunnable()); } super.init(); clientBootstrap.start();}
- 批处理工作
MergedSendRunnable
中,实现了音讯合并和音讯发送
private class MergedSendRunnable implements Runnable { @Override public void run() { while (true) { //mergeLock 用于生产-生产的合作 synchronized (mergeLock) { try { // MAX_MERGE_SEND_MILLS = 1,还有线程休眠的成果 mergeLock.wait(MAX_MERGE_SEND_MILLS); } catch (InterruptedException e) { } } isSending = true; // 发送音讯,音讯是依照IP地址分组 basketMap.forEach((address, basket) -> { if (basket.isEmpty()) { return; } MergedWarpMessage mergeMessage = new MergedWarpMessage(); //如果basket队列不为空,将其中的音讯全取出来,增加到mergeMessage中 while (!basket.isEmpty()) { RpcMessage msg = basket.poll(); mergeMessage.msgs.add((AbstractMessage) msg.getBody()); mergeMessage.msgIds.add(msg.getId()); } // debug 打印本次发送的音讯个数和每个音讯的Id,以及此时在futures中做超时管控的所有音讯的Id, // 两个音讯Id比对,可晓得音讯积压状况9666 if (mergeMessage.msgIds.size() > 1) { printMergeMessageLog(mergeMessage); } Channel sendChannel = null; try { // 获取指定地址的channel对象,异步发送音讯 // 发送批量音讯是同步的申请,然而这里不须要失去返回的值,在音讯保留到basketMap之前,曾经创立了messageFuture了, // 返回值将会从ClientOnResponseProcessor中失去 sendChannel = clientChannelManager.acquireChannel(address); // 因为原始音讯的发送曾经退出过超时管控,所以批量发送环节不再须要退出额定的超时管制 AbstractNettyRemotingClient.this.sendAsyncRequest(sendChannel, mergeMessage); } catch (FrameworkException e) { if (e.getErrcode() == FrameworkErrorCode.ChannelIsNotWritable && sendChannel != null) { destroyChannel(address, sendChannel); } // fast fail // 产生异样,疾速将保留在mergeMessage的音讯清理掉 for (Integer msgId : mergeMessage.msgIds) { MessageFuture messageFuture = futures.remove(msgId); if (messageFuture != null) { messageFuture.setResultMessage( new RuntimeException(String.format("%s is unreachable", address), e)); } } LOGGER.error("client merge call failed: {}", e.getMessage(), e); } }); isSending = false; } }
2)AbstractNettyRemoting#timerExecutor
Netty 的 I/O 操作异步的,RPC 音讯的发送操作会对应一个 Future
对象,在 Seata 中这个 Futrue
对象被封装为 MessageFuture
,需同步发送的音讯,其对应的 MessageFuture
被放入 map 缓存起来,当收到音讯的 response 后,将音讯从 map 中移除。AbstractNettyRemoting#timerExecutor
里的这个线程定时巡检 map 中的音讯,若超时未收到 response 则认定为发送超时。
- 线程数:1
- 线程名前缀:”timeoutChecker“
- scheduleAtFixedRate :提早 3 秒,频率 3 秒
AbstractNettyRemoting
中的性能相干的属性介绍:ScheduledExecutorService timerExecutor
:执行定时工作,音讯发送当前,到了过期工夫还没有返回,则会对音讯进行清理。ConcurrentHashMap<Integer, MessageFuture> futures
:保留着不同音讯,timerExecutor 会清理 futures 中过期的音讯。
对应的要害代码逻辑如下:
- 构建定时工作的线程池
AbstractNettyRemoting#timerExecutor
,只用 1 个线程
/** * 定时器,用于巡检音讯的发送是否超时 */protected final ScheduledExecutorService timerExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("timeoutChecker", 1, true));复制代码
- 通过
AbstractNettyRemoting#sendSync
同步发送音讯,构建MessageFuture
并放入futures
这个 map 中,发送过程配置监听器 用于解决channel
异样,指定失败起因并从futures
中移除,还要销毁channel
。
protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMillis) throws TimeoutException { ... // 构建 MessageFuture MessageFuture messageFuture = new MessageFuture(); messageFuture.setRequestMessage(rpcMessage); messageFuture.setTimeout(timeoutMillis); // 放入 futures 这个map中 futures.put(rpcMessage.getId(), messageFuture); //查看通道是否能够写 channelWritableCheck(channel, rpcMessage.getBody()); String remoteAddr = ChannelUtil.getAddressFromChannel(channel); //在申请发送之前执行钩子 doBeforeRpcHooks(remoteAddr, rpcMessage); // 发送申请,并配置监听器 用于解决channel异样 channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> { // 这里响应不胜利,根本是channel不失常了 if (!future.isSuccess()) { //移除音讯 MessageFuture messageFuture1 = futures.remove(rpcMessage.getId()); if (messageFuture1 != null) { messageFuture1.setResultMessage(future.cause()); } //响应不胜利,则销毁channel destroyChannel(future.channel()); } }); ... //获取响应后果 Object result = messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS); //响应之后执行钩子 doAfterRpcHooks(remoteAddr, rpcMessage, result); ...}
- 失常收到 response 后,给
MessageFuture
对象赋值,从futures
中移除,如ClientOnResponseProcessor#process
中的实现
@Overridepublic void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception { ... // 从futures中移除 MessageFuture messageFuture = futures.remove(rpcMessage.getId()); if (messageFuture != null) { // 赋值后果 messageFuture.setResultMessage(rpcMessage.getBody()); }}
- 在
AbstractNettyRemoting#init
中开启定时工作,巡检出futures
这个 map 中的超时对象后从futures
中移除,不再查看,并指定后果为TimeoutException
public void init() { // 检测音讯同步发送(sendSync(xxx))是否超时, // 定时工作默认是提早3秒,距离3秒 timerExecutor.scheduleAtFixedRate(new Runnable() { @Override public void run() { for (Map.Entry<Integer, MessageFuture> entry : futures.entrySet()) { MessageFuture future = entry.getValue(); if (future.isTimeout()) { // 如果过期了则将发送后果设置为TimeoutException // 从futures中移除,不再查看 futures.remove(entry.getKey()); RpcMessage rpcMessage = future.getRequestMessage(); future.setResultMessage(new TimeoutException(String .format("msgId: %s ,msgType: %s ,msg: %s ,request timeout", rpcMessage.getId(), String.valueOf(rpcMessage.getMessageType()), rpcMessage.getBody().toString()))); if (LOGGER.isDebugEnabled()) { LOGGER.debug("timeout clear future: {}", entry.getValue().getRequestMessage().getBody()); } } } nowMills = System.currentTimeMillis(); } }, TIMEOUT_CHECK_INTERVAL, TIMEOUT_CHECK_INTERVAL, TimeUnit.MILLISECONDS);}
还有线程池跟服务注册发现和建连相干,会后边篇章再介绍。