一、Reactor 模式
reactor
模式是一种事件驱动的应用层 I/O 解决模式,基于分而治之和事件驱动的思维,致力于构建一个高性能的可伸缩的 I/O 解决模式。维基百科对 Reactor pattern 的解释:
The reactor design pattern is an event handling pattern for handling service requests delivered concurrently to a service handler by one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to the associated request handlers
大抵意思是说,reactor
设计模式是一种事件处理模式,用于同时有一个或多个申请发送到事件处理器(service handler),这个事件处理器会采纳多路拆散(demultiplexes)的形式,同步的将这些申请散发到申请处理器(request handlers)。
不难看出,上边介绍的 reactor
模式是一种形象;从实现角度说,reactor
模式有许多变种,不同编程语言中的实现也有差别。就 java 而言,巨匠 Doug Lea 在其【Scalable IO in Java】中就讲述了几个 reactor
模式的演进,如 单线程版本
、 多线程版
,浏览此文后,笔者对巨匠所讲reactor
模式演进的了解与网络中一些形容稍有差别。
在 reactor
单线程版中,只有一个reactor
线程,线程中通过 select
(I/O 多路复用接口)监听所有 I/O 事件,收到 I/O 事件后通过 dispatch
进行分发给 Handlers
解决,此版本容易实现,也容易了解,但性能不高。为了适配多处理器,充分利用多核并行处理的劣势,实现高性能的网络服务,能够采纳分治策略,关键环节采纳多线程模式,于是就呈现了 reactor
多线程版本,而多线程的利用体现为 worder
线程和 reactor
线程,多线程应该被 池化治理,这样才容易被调整和管制。线程池中的线程数会比客户端的数量少很多,理论数量能够依据程序自身是 CPU 密集型还是 I/O 密集型操作来进行正当的调配。
-
多个 worder 线程(池化治理)
- 属于网络 I/O 操作与业务解决的拆分,因为
reactors
监听到 I/O 事件后应该疾速分发给handlers
来处理程序;但如果handler
中的非 I/O 操作慢了就会减慢reactor
中的 I/O 事件响应速度,所以把非 I/O 操作从reactors
的 I/O 线程转移到其余线程中,即由worker
线程来分担非 I/O 逻辑的操作解决。
- 属于网络 I/O 操作与业务解决的拆分,因为
-
多个 reactor 线程(池化治理)
- 属于网络建连操作与网络 I/O 读写操作的拆分,因为由一个
reactor
在一个线程中实现所有 I/O 操作也会遇到性能瓶颈,可采取拆分并减少reactor
策略,将 I/O 负载调配给多个reactor
(每个reactor
都有本人的线程、选择器和调度循环)以达到负载平衡。这看起来挺不错,但谁来执行调配以达到负载平衡呢?或者是因为这个问题,将reactor
拆分为两类角色,mainReactor
负责接管连贯,之后采纳肯定的负载平衡策略将新连贯调配给其余subReactor
来解决 I/O 读写,这样的拆分天然晦涩。
- 属于网络建连操作与网络 I/O 读写操作的拆分,因为由一个
如此就演进出如上图中的主从 reactor
多线程模型。请留神,联合【Scalable IO in Java】原文中的用词和形容看,上图中的 mainReactor
和subReactor
能够有多个并做池化治理,所有也有一些文章中会看到如 主 ReactorGroup
、mainReactorGroup
、从 ReactorGroup
、subReactorGroup
等这类名词用 Group 后缀来强调 Reactor 是池化治理。或者是不好布局,也或者是为了凸显主从 reactor
角色的协作关系,上图中都只展现了一个,另外服务端利用通常只裸露一个服务端口时,只需用一个 mainReactor
来监听端口上的连贯事件并解决。
二、Netty 主从 reactor
多线程模型
Netty
中 reactor
所对应的实现类是NioEventLoop
,其外围逻辑如下:
- 不同类型的 channel 向 Selector 注册所感兴趣的事件
- 扫描是否有感兴趣的事件产生
- 事件产生后做相应的解决
客户端和服务端别离会有不同类型的 channel
,客户端创立SocketChannel
向服务端发动连贯申请,服务端创立 ServerSocketChannel
监听客户端连贯,建连后创立 SocketChannel
与客户端的 SocketChannel
相互收发数据,这些 channel
分工不同,向 Selector 注册所感兴趣的事件状况也不同:
客户端 / 服务端 | channel | OP_ACCEPT | OP_CONNECT | OP_WRITE | OP_READ |
---|---|---|---|---|---|
客户端 | SocketChannel | YES | |||
服务端 | ServerSocketChannel | YES | |||
服务端 | SocketChannel | YES | YES |
Netty
中 Nio 形式实现几种 reactor
模型如下:
mainReactor
对应 Netty
中配置的 bossGroup
线程组(下图中的 主 ReactorGroup
),次要负责承受客户端连贯的建设。每 bind
一个端口就用掉一个 bossGroup
中的线程。
subReactor
对应 Netty
中配置的 workerGroup
线程组(下图中的 reactorGroup
),bossGroup
线程组承受完客户端的连贯后,将 channel
转交给 workerGroup
线程组,在 workerGroup
线程组内抉择一个线程,执行 I/O 读写的解决,workerGroup
线程组默认是 2 * CPU 核数个线程。
主从 reactor
模式的外围流程:
- 如果只监听一个端口,那么只需一个主
reactor
干活儿,所以通常看到boosGroup
只配置一个线程。主reactor
运行在独立的线程中,该线程中只负责与客户端的连贯申请 - 从
reactor
在服务器端能够不止一个,通常运行多个从reactor
, 每个从reactor
也运行在一个独立的线程中,负责与客户端的读写操作 - 主
reactor
检测到客户端的链接后,创立NioSocketChannel
,依照肯定的算法循环选取(负载平衡)一个从reactor
,并把刚创立的NioSocketChannel
注册到这个从reactor
中,这样建连和读写事件互不影响。 - 一个
reactor
中可被注册多个NioSocketChannel
,这个reactor
监听所有的被调配的NioSocketChannel
的读写事件 , 如果监听到客户端的数据发送事件 , 将对应的业务逻辑转发给NioSocketChannel
中的pipeline
里的handler
链进行解决 handler
最好只负责响应 I/O 事件,不解决具体的与客户端交互的业务逻辑 , 这样不会长时间阻塞 , 其read
办法读取客户端数据后 , 将音讯数据交给业务线程池去解决相干业务逻辑- 业务线程池实现相干业务逻辑的解决后,将后果返回,通过
NioSocketChannel
的的pipeline
里的handler
链将后果音讯写回给客户端 - 当
buffer
不满足将后果音讯写回给客户端时的条件时,注册写事件,期待可写时再写
三、Seata Server 端 的 reactor 模式利用
Seata Server 采纳了 主从 reactor
多线程模型,对应这个模型的话是有四个线程池,其中自定义业务线程池是两个。
性能 | 线程池对象 | 备注 |
---|---|---|
接管客户端连贯 | NettyServerBootstrap#eventLoopGroupBoss |
|
解决 IO 事件 | NettyServerBootstrap#eventLoopGroupWorker |
局部 RPC 音讯在这里解决 |
解决客户端的 request 音讯 | AbstractNettyRemoting#messageExecutor |
客户端被动发给的音讯 |
解决客户端的 response 音讯 | NettyRemotingServer#branchResultMessageExecutor |
服务端被动发给客户端音讯,客户端解决后给服务端响应 |
3.1、NettyServerBootstrap#eventLoopGroupBoss
笔者的环境未启用 epoll
,要害信息如下:
- 线程数:1,只监听一个端口
- 线程名前缀:“NettyBoss”
this.eventLoopGroupBoss = new NioEventLoopGroup(//CONFIG.getInt("transport.threadFactory.bossThreadSize", 1);
nettyServerConfig.getBossThreadSize(),
new NamedThreadFactory(// CONFIG.getConfig("transport.threadFactory.bossThreadPrefix", "NettyBoss");
nettyServerConfig.getBossThreadPrefix(),
//CONFIG.getConfig("transport.threadFactory.bossThreadSize", 1);
nettyServerConfig.getBossThreadSize())
);
3.2、NettyServerBootstrap#eventLoopGroupWorker
笔者的环境未启用 epoll
,要害信息如下:
- 线程数:默认值是 cpu 核数 * 2
- 线程名前缀:“NettyServerNIOWorker”
this.eventLoopGroupWorker = new NioEventLoopGroup(// System.getProperty("transport.serverWorkerThreads", String.valueOf(WORKER_THREAD_SIZE)));// 默认值 cpu 核数 *2
nettyServerConfig.getServerWorkerThreads(),
new NamedThreadFactory(
// CONFIG.getConfig("transport.threadFactory.workerThreadPrefix",
// enableEpoll() ? EPOLL_WORKER_THREAD_PREFIX : DEFAULT_NIO_WORKER_THREAD_PREFIX);
// 默认值 NettyServerNIOWorker,没有启用 epoll
nettyServerConfig.getWorkerThreadPrefix(),
//System.getProperty("transport.serverWorkerThreads", String.valueOf(WORKER_THREAD_SIZE)));// 默认值 cpu 核数 *2
nettyServerConfig.getServerWorkerThreads())
);
3.3、AbstractNettyRemoting#messageExecutor
此线程池解决客户端的 request 音讯,要害参数信息如下:
- 线程数:50 ~ 500
- keepAlive:500 秒
- 线程名字前缀:“ServerHandlerThread”
- 队列长度:500
- 回绝策略:CallerRunsPolicy(),饱和的状况下,调用者来执行该工作,即 Netty 的 I/O 线程
ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(//Integer.parseInt(System.getProperty("transport.minServerPoolSize", "50"));
NettyServerConfig.getMinServerPoolSize(),
//Integer.parseInt(System.getProperty("transport.maxServerPoolSize", "500"));
NettyServerConfig.getMaxServerPoolSize(),
//Integer.parseInt(System.getProperty("transport.keepAliveTime", "500"));
NettyServerConfig.getKeepAliveTime(),
TimeUnit.SECONDS,
//Integer.parseInt(System.getProperty("transport.maxTaskQueueSize", "500"));
new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()),
new NamedThreadFactory(
"ServerHandlerThread",
//Integer.parseInt(System.getProperty("transport.maxServerPoolSize", "500"));
NettyServerConfig.getMaxServerPoolSize()),
// 饱和的状况下,调用者来执行该工作,即 Netty 的 IO 线程
new ThreadPoolExecutor.CallerRunsPolicy());
3.4、NettyRemotingServer#branchResultMessageExecutor
此线程池解决客户端的 response 音讯,要害参数信息如下:
- 线程数:cpu 核数2 ~ cpu 核数2
- keepAlive:500 秒
- 线程名字前缀:“BranchResultHandlerThread”
- 队列长度:20000
- 回绝策略:
CallerRunsPolicy()
,饱和的状况下,调用者来执行该工作,即 Netty 的 IO 线程
private ThreadPoolExecutor branchResultMessageExecutor = new ThreadPoolExecutor(//System.getProperty("transport.minBranchResultPoolSize", String.valueOf(WORKER_THREAD_SIZE))),默认值 cpu 核数 *2
NettyServerConfig.getMinBranchResultPoolSize(),
//System.getProperty("transport.maxBranchResultPoolSize", String.valueOf(WORKER_THREAD_SIZE))),默认值 cpu 核数 *2
NettyServerConfig.getMaxBranchResultPoolSize(),
// System.getProperty("transport.keepAliveTime", "500"),默认值 500
NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS,
new LinkedBlockingQueue<>(//System.getProperty("transport.maxTaskQueueSize", "20000"),默认值 20000
NettyServerConfig.getMaxTaskQueueSize()),
new NamedThreadFactory(
// 分支响应音讯的解决线程的名字前缀 BranchResultHandlerThread
"BranchResultHandlerThread",
// System.getProperty("transport.maxBranchResultPoolSize", String.valueOf(WORKER_THREAD_SIZE))),默认值 cpu 核数 *2
NettyServerConfig.getMaxBranchResultPoolSize()),
// 饱和的状况下,调用者来执行该工作,即 Netty 的 IO 线程
new ThreadPoolExecutor.CallerRunsPolicy());
3.5、业务线程池如何解决音讯
3.5.1、注销音讯处理器
Seata 音讯解决的外围逻辑是:定义好什么类型的音讯,应用哪个音讯处理器,这个音讯处理器的音讯解决逻辑在哪个线程池中执行。这个映射关系通过 AbstractNettyRemoting#processorTable
来存储。
/**
* 能够接管什么类型的音讯,以及应用哪个音讯处理器和线程池来解决音讯
* HashMap< 音讯类型, Pair< 音讯处理器, 线程池 >>
* processor type {@link MessageType}
*/
protected final HashMap<Integer/*MessageType*/, Pair<RemotingProcessor, ExecutorService>> processorTable = new HashMap<>(32);
各模块 Netty 组件启动前,通过 AbstractNettyRemotingServer#registerProcessor
办法注销到这个构造中。
public void registerProcessor(int messageType, RemotingProcessor processor, ExecutorService executor) {Pair<RemotingProcessor, ExecutorService> pair = new Pair<>(processor, executor);
this.processorTable.put(messageType, pair);
}
拿 Seata Server 来说,如在 ServerBootStrap
启动前,通过 NettyRemotingServer#registerProcessor
注册好消息处理器。不同音讯对应的处理器的线程池也不同,也有一些音讯没有指定业务线程池(没必要),状况如下:
private void registerProcessor() {
// 1\. registry on request message processor
ServerOnRequestProcessor onRequestProcessor =
new ServerOnRequestProcessor(this, getHandler());
ShutdownHook.getInstance().addDisposable(onRequestProcessor);
super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor);
// 2\. registry on response message processor
ServerOnResponseProcessor onResponseProcessor =
new ServerOnResponseProcessor(getHandler(), getFutures());
super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, branchResultMessageExecutor);
super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, branchResultMessageExecutor);
// 3\. registry rm message processor
RegRmProcessor regRmProcessor = new RegRmProcessor(this);
super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor);
// 4\. registry tm message processor
RegTmProcessor regTmProcessor = new RegTmProcessor(this);
super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null);
// 5\. registry heartbeat message processor
ServerHeartbeatProcessor heartbeatMessageProcessor = new ServerHeartbeatProcessor(this);
super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null);
}
3.5.2、解决音讯
当 Seata Server 收到客户端发送的 RPC 音讯后,会进入 AbstractNettyRemotingServer.ServerHandler#channelRead
中,在这里对音讯类型简略判断后,委托给 processMessage
解决。
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {if (!(msg instanceof RpcMessage)) {return;}
// 收到音讯后,委托给 processMessage 解决
processMessage(ctx, (RpcMessage) msg);
}
processMessage
中通过音讯类型找到音讯处理器进行业务层解决:
- 如果音讯处理器有指定的业务线程池,在指定的业务线程池中解决音讯
- 若音讯处理器没有指定的业务线程池,则在 I/O 线程中间接解决。
protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
...
Object body = rpcMessage.getBody();
if (body instanceof MessageTypeAware) {MessageTypeAware messageTypeAware = (MessageTypeAware) body;
// 通过音讯类型找到音讯处理器
final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
if (pair != null) {
// 如果音讯处理器有指定的业务线程池
if (pair.getSecond() != null) {
try {
// 在指定的业务线程池中解决音讯
pair.getSecond().execute(() -> {
...
pair.getFirst().process(ctx, rpcMessage);
...
});
} catch (RejectedExecutionException e) {...}
} else {
try {
// 若音讯处理器没有指定的业务线程池,则在 I / O 现成中间接解决。pair.getFirst().process(ctx, rpcMessage);
} catch (Throwable th) {...}
}
} else {...}
} else {...}
}
四、Seata client 端的 reactor 模式利用
Seata client 端也采纳了 reactor
多线程模型,在初始化的时候有 RmNettyRemotingClient
和TmNettyRemotingClient
两个对象,别离会创立各自的 Bootstrap
,RM 和 TM 各有本人的 I/O 线程池和业务线程池。
性能 | 线程池对象 | 备注 |
---|---|---|
解决 IO 事件 | NettyClientBootstrap#eventLoopGroupWorker |
|
解决业务音讯 | AbstractNettyRemoting#messageExecutor |
源码里还有个 NettyClientBootstrap#defaultEventExecutorGroup
,没看进去哪里有用。TmNettyRemotingClient#getInstance()
中构建了 TM 的业务线程池,赋值给 NettyClientBootstrap#messageExecutor
,同样RmNettyRemotingClient#getInstance()
中构建了 RM 的业务线程池
4.1、NettyClientBootstrap#eventLoopGroupWorker
客户端此线程池要害信息如下:
- 线程数:1
-
线程名字前缀:
- TM:”NettyClientSelector_TMROLE”
- RM:”NettyClientSelector_RMROLE”
// 单 I / O 线程
this.eventLoopGroupWorker = new NioEventLoopGroup(//CONFIG.getInt("transport.threadFactory.clientSelectorThreadSize", 1)
selectorThreadSizeThreadSize,
new NamedThreadFactory(// CONFIG.getConfig("transport.threadFactory.clientSelectorThreadPrefix", "NettyClientSelector");
// 再拼上角色后默认值为:"NettyClientSelector_TMROLE"
getThreadPrefix(this.nettyClientConfig.getClientSelectorThreadPrefix()),
//CONFIG.getInt("transport.threadFactory.clientSelectorThreadSize", 1)
selectorThreadSizeThreadSize)
);
4.2、AbstractNettyRemoting#messageExecutor
TmNettyRemotingClient#getInstance()
和 RmNettyRemotingClient#getInstance()
创立各自的线程池,配置并不相同。
1)TmNettyRemotingClient#getInstance()
中所创立线程池的要害信息如下:
- 线程数:默认值是 cpu 核数 2 ~ cpu 核数 2
- keepAlive:Integer.MAX_VALUE 秒
- 线程名字前缀:rpcDispatch_TMROLE
- 队列长度:2000
- 回绝策略:
runsOldestTaskPolicy()
,饱和的状况下,增加新工作并由投递工作的线程运行最早的工作。
public static TmNettyRemotingClient getInstance() {if (instance == null) {synchronized (TmNettyRemotingClient.class) {if (instance == null) {NettyClientConfig nettyClientConfig = new NettyClientConfig();
// 自定义 TM 业务线程池
final ThreadPoolExecutor messageExecutor = new ThreadPoolExecutor(nettyClientConfig.getClientWorkerThreads(), // 默认是 cpu 核数 * 2
nettyClientConfig.getClientWorkerThreads(), // 默认是 cpu 核数 * 2
KEEP_ALIVE_TIME, TimeUnit.SECONDS,//Integer.MAX_VALUE;
new LinkedBlockingQueue<>(MAX_QUEUE_SIZE),//2000
new NamedThreadFactory(nettyClientConfig.getTmDispatchThreadPrefix(),// TM 的线程名是:rpcDispatch_TMROLE
nettyClientConfig.getClientWorkerThreads()),// 默认是 cpu 核数 * 2
RejectedPolicies.runsOldestTaskPolicy());// 增加新工作并由主线程运行最早的工作。instance = new TmNettyRemotingClient(nettyClientConfig, null, messageExecutor);
}
}
}
return instance;
}
2)RmNettyRemotingClient#getInstance()
中所创立线程池的要害信息如下:
- 线程数:默认是 cpu 核数 2 ~ cpu 核数 2
- keepAlive:Integer.MAX_VALUE 秒
- 线程名字前缀:rpcDispatch_RMROLE
- 队列长度:20000
- 回绝策略:
CallerRunsPolicy()
,饱和的状况下,调用者来执行该工作,即 Netty 的 IO 线程。
public static RmNettyRemotingClient getInstance() {if (instance == null) {synchronized (RmNettyRemotingClient.class) {if (instance == null) {NettyClientConfig nettyClientConfig = new NettyClientConfig();
// 自定义 RM 业务线程池
final ThreadPoolExecutor messageExecutor = new ThreadPoolExecutor(nettyClientConfig.getClientWorkerThreads(), // 默认是 cpu 核数 * 2
nettyClientConfig.getClientWorkerThreads(), // 默认是 cpu 核数 * 2
KEEP_ALIVE_TIME, TimeUnit.SECONDS,//Integer.MAX_VALUE;
new LinkedBlockingQueue<>(MAX_QUEUE_SIZE),//20000
new NamedThreadFactory(nettyClientConfig.getRmDispatchThreadPrefix(),// RM 的线程名是:rpcDispatch_RMROLE,
nettyClientConfig.getClientWorkerThreads()),// 默认是 cpu 核数 * 2
new ThreadPoolExecutor.CallerRunsPolicy());//// 饱和的状况下,调用者来执行该工作,即 Netty 的 IO 线程
instance = new RmNettyRemotingClient(nettyClientConfig, null, messageExecutor);
}
}
}
return instance;
}
4.3、音讯解决
TmNettyRemotingClient
和 RmNettyRemotingClient
在init()
办法中会调用 registerProcessor()
办法注册各自的 RPC 音讯处理器。收到 RPC 音讯后就由这些处理器 + 对应的线程池做后续解决,音讯的相干业务属性在后续的事务流程中介绍。
五、撑持非凡能力的业务线程池
1)AbstractNettyRemotingClient#mergeSendExecutorService
用于批量发送申请,多个音讯合并,缩小通信次数。实现逻辑比拟清晰,当容许发送批量音讯时,音讯首先分桶保留到 basketMap,在一个周期性的无线循环中,把 basketMap 中的音讯队列取出来,把每个队列的音讯都放到 mergeMessage 中,最初把 mergeMessage 发送进来。
- 线程数:1
- 线程名前缀:”rpcMergeMessageSend“
-
AbstractNettyRemotingClient
中性能相干的属性介绍:Object mergeLock
:发送申请的锁对象。Map<Integer, MergeMessage> mergeMsgMap
:当发送音讯的类型是 MergeMessage,那么就将音讯保留到 mergeMsgMap。ConcurrentHashMap<String/*serverAddress*/, BlockingQueue<RpcMessage>> basketMap
:当容许发送批量音讯时,音讯首先分桶保留到 basketMap,而后通过定时工作将保留 basketMap 的音讯发送进来。basketMap 的是服务器的地址,value 是保留的发送个服务器的音讯。依照地址分桶是将要发给同一个服务器的多个音讯合并到一个MergedWarpMessage
后发送。
- 有配置开关,默认值如下:
transport.enableTmClientBatchSendRequest=false
transport.enableRmClientBatchSendRequest=true
transport.enableTcServerBatchSendResponse=false
对应的要害代码逻辑如下:
- 在
AbstractNettyRemotingClient#sendSyncRequest
中,同步发送时将音讯缓存起来,默认配置看只有 RM 开启了音讯合并发送,另外同步发送超时设定,默认 TM 30 秒,RM 15 秒。依照 IP 地址分桶,同一个指标实例的音讯才能够合并发送
public Object sendSyncRequest(Object msg) throws TimeoutException {String serverAddress = loadBalance(getTransactionServiceGroup(), msg);
// 同步发送超时设定,默认 TM 30 秒,RM 15 秒
long timeoutMillis = this.getRpcRequestTimeout();
RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);
// send batch message
// put message into basketMap, @see MergedSendRunnable
// 默认只有 RM 开启了音讯合并发送,TM 并未开启零售送
if (this.isEnableClientBatchSendRequest()) {
// send batch message is sync request, needs to create messageFuture and put it in futures.
MessageFuture messageFuture = new MessageFuture();
messageFuture.setRequestMessage(rpcMessage);
messageFuture.setTimeout(timeoutMillis);
futures.put(rpcMessage.getId(), messageFuture);
// put message into basketMap
// 依照指标地址分桶,同一个 TC 实例的音讯才能够合并发送
BlockingQueue<RpcMessage> basket = CollectionUtils.computeIfAbsent(basketMap, serverAddress,
key -> new LinkedBlockingQueue<>());
if (!basket.offer(rpcMessage)) {LOGGER.error("put message into basketMap offer failed, serverAddress:{},rpcMessage:{}",
serverAddress, rpcMessage);
return null;
}
if (LOGGER.isDebugEnabled()) {LOGGER.debug("offer message: {}", rpcMessage.getBody());
}
// 告诉合并发送线程 有音讯要发送,醒来干活儿
if (!isSending) {synchronized (mergeLock) {mergeLock.notifyAll();
}
}
try {
// 阻塞期待音讯的响应。return messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
} catch (Exception exx) {LOGGER.error("wait response error:{},ip:{},request:{}",
exx.getMessage(), serverAddress, rpcMessage.getBody());
if (exx instanceof TimeoutException) {throw (TimeoutException) exx;
} else {throw new RuntimeException(exx);
}
}
} else {
// 不合并发送的话,就获取指定 IP 的 channel,并立刻发送。Channel channel = clientChannelManager.acquireChannel(serverAddress);
return super.sendSync(channel, rpcMessage, timeoutMillis);
}
}
- 在
AbstractNettyRemotingClient#init
中构建线程池mergeSendExecutorService
,在这个线程池中执行音讯的批处理(音讯合并、音讯发送)。
public void init() {
...
// 通过线程池有 1 个线程,执行音讯合并发送
if (this.isEnableClientBatchSendRequest()) {
mergeSendExecutorService = new ThreadPoolExecutor(
MAX_MERGE_SEND_THREAD,//1
MAX_MERGE_SEND_THREAD,//1
KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new NamedThreadFactory(
//TM : rpcMergeMessageSend_TMROLE
//RM : rpcMergeMessageSend_RMROLE
//SERVER : rpcMergeMessageSend_SERVERROLE
getThreadPrefix(),
MAX_MERGE_SEND_THREAD)//1
);
mergeSendExecutorService.submit(new MergedSendRunnable());
}
super.init();
clientBootstrap.start();}
- 批处理工作
MergedSendRunnable
中,实现了音讯合并和音讯发送
private class MergedSendRunnable implements Runnable {
@Override
public void run() {while (true) {
//mergeLock 用于生产 - 生产的合作
synchronized (mergeLock) {
try {
// MAX_MERGE_SEND_MILLS = 1,还有线程休眠的成果
mergeLock.wait(MAX_MERGE_SEND_MILLS);
} catch (InterruptedException e) {}}
isSending = true;
// 发送音讯,音讯是依照 IP 地址分组
basketMap.forEach((address, basket) -> {if (basket.isEmpty()) {return;}
MergedWarpMessage mergeMessage = new MergedWarpMessage();
// 如果 basket 队列不为空,将其中的音讯全取出来,增加到 mergeMessage 中
while (!basket.isEmpty()) {RpcMessage msg = basket.poll();
mergeMessage.msgs.add((AbstractMessage) msg.getBody());
mergeMessage.msgIds.add(msg.getId());
}
// debug 打印本次发送的音讯个数和每个音讯的 Id,以及此时在 futures 中做超时管控的所有音讯的 Id,// 两个音讯 Id 比对,可晓得音讯积压状况 9666
if (mergeMessage.msgIds.size() > 1) {printMergeMessageLog(mergeMessage);
}
Channel sendChannel = null;
try {
// 获取指定地址的 channel 对象, 异步发送音讯
// 发送批量音讯是同步的申请,然而这里不须要失去返回的值,在音讯保留到 basketMap 之前,曾经创立了 messageFuture 了,// 返回值将会从 ClientOnResponseProcessor 中失去
sendChannel = clientChannelManager.acquireChannel(address);
// 因为原始音讯的发送曾经退出过超时管控,所以批量发送环节不再须要退出额定的超时管制
AbstractNettyRemotingClient.this.sendAsyncRequest(sendChannel, mergeMessage);
} catch (FrameworkException e) {if (e.getErrcode() == FrameworkErrorCode.ChannelIsNotWritable && sendChannel != null) {destroyChannel(address, sendChannel);
}
// fast fail
// 产生异样,疾速将保留在 mergeMessage 的音讯清理掉
for (Integer msgId : mergeMessage.msgIds) {MessageFuture messageFuture = futures.remove(msgId);
if (messageFuture != null) {
messageFuture.setResultMessage(new RuntimeException(String.format("%s is unreachable", address), e));
}
}
LOGGER.error("client merge call failed: {}", e.getMessage(), e);
}
});
isSending = false;
}
}
2)AbstractNettyRemoting#timerExecutor
Netty 的 I/O 操作异步的,RPC 音讯的发送操作会对应一个 Future
对象,在 Seata 中这个 Futrue
对象被封装为 MessageFuture
,需同步发送的音讯,其对应的 MessageFuture
被放入 map 缓存起来,当收到音讯的 response 后,将音讯从 map 中移除。AbstractNettyRemoting#timerExecutor
里的这个线程定时巡检 map 中的音讯,若超时未收到 response 则认定为发送超时。
- 线程数:1
- 线程名前缀:”timeoutChecker“
- scheduleAtFixedRate:提早 3 秒,频率 3 秒
-
AbstractNettyRemoting
中的性能相干的属性介绍:ScheduledExecutorService timerExecutor
:执行定时工作,音讯发送当前,到了过期工夫还没有返回,则会对音讯进行清理。ConcurrentHashMap<Integer, MessageFuture> futures
:保留着不同音讯,timerExecutor 会清理 futures 中过期的音讯。
对应的要害代码逻辑如下:
- 构建定时工作的线程池
AbstractNettyRemoting#timerExecutor
,只用 1 个线程
/**
* 定时器,用于巡检音讯的发送是否超时
*/
protected final ScheduledExecutorService timerExecutor = new ScheduledThreadPoolExecutor(1,
new NamedThreadFactory("timeoutChecker", 1, true));
复制代码
- 通过
AbstractNettyRemoting#sendSync
同步发送音讯,构建MessageFuture
并放入futures
这个 map 中,发送过程配置监听器 用于解决channel
异样,指定失败起因并从futures
中移除,还要销毁channel
。
protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMillis) throws TimeoutException {
...
// 构建 MessageFuture
MessageFuture messageFuture = new MessageFuture();
messageFuture.setRequestMessage(rpcMessage);
messageFuture.setTimeout(timeoutMillis);
// 放入 futures 这个 map 中
futures.put(rpcMessage.getId(), messageFuture);
// 查看通道是否能够写
channelWritableCheck(channel, rpcMessage.getBody());
String remoteAddr = ChannelUtil.getAddressFromChannel(channel);
// 在申请发送之前执行钩子
doBeforeRpcHooks(remoteAddr, rpcMessage);
// 发送申请,并配置监听器 用于解决 channel 异样
channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
// 这里响应不胜利,根本是 channel 不失常了
if (!future.isSuccess()) {
// 移除音讯
MessageFuture messageFuture1 = futures.remove(rpcMessage.getId());
if (messageFuture1 != null) {messageFuture1.setResultMessage(future.cause());
}
// 响应不胜利,则销毁 channel
destroyChannel(future.channel());
}
});
...
// 获取响应后果
Object result = messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
// 响应之后执行钩子
doAfterRpcHooks(remoteAddr, rpcMessage, result);
...
}
- 失常收到 response 后,给
MessageFuture
对象赋值,从futures
中移除,如ClientOnResponseProcessor#process
中的实现
@Override
public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
...
// 从 futures 中移除
MessageFuture messageFuture = futures.remove(rpcMessage.getId());
if (messageFuture != null) {
// 赋值后果
messageFuture.setResultMessage(rpcMessage.getBody());
}
}
- 在
AbstractNettyRemoting#init
中开启定时工作,巡检出futures
这个 map 中的超时对象后从futures
中移除,不再查看,并指定后果为TimeoutException
public void init() {// 检测音讯同步发送 (sendSync(xxx)) 是否超时,
// 定时工作默认是提早 3 秒,距离 3 秒
timerExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {for (Map.Entry<Integer, MessageFuture> entry : futures.entrySet()) {MessageFuture future = entry.getValue();
if (future.isTimeout()) {
// 如果过期了则将发送后果设置为 TimeoutException
// 从 futures 中移除,不再查看
futures.remove(entry.getKey());
RpcMessage rpcMessage = future.getRequestMessage();
future.setResultMessage(new TimeoutException(String
.format("msgId: %s ,msgType: %s ,msg: %s ,request timeout", rpcMessage.getId(), String.valueOf(rpcMessage.getMessageType()), rpcMessage.getBody().toString())));
if (LOGGER.isDebugEnabled()) {LOGGER.debug("timeout clear future: {}", entry.getValue().getRequestMessage().getBody());
}
}
}
nowMills = System.currentTimeMillis();}
}, TIMEOUT_CHECK_INTERVAL, TIMEOUT_CHECK_INTERVAL, TimeUnit.MILLISECONDS);
}
还有线程池跟服务注册发现和建连相干,会后边篇章再介绍。