关于java:rocketmq2

75次阅读

共计 7896 个字符,预计需要花费 20 分钟才能阅读完成。

Broker 解决 Topic 创立

1. 更改本地 topic 配置缓存 topicConfigTable
2. 将缓存 topicConfigTable 配置信息写入磁盘
3. 向 NameServer 上报变更信息
4. 主从同步变更信息

源码入口
broker 端 AdminBrokerProcessor#processRequest

broker 定时工作

ScheduleMessageService:每隔 10S 长久化每个延时队列的投递进度

ConsumerOffsetManager:Broker 每隔 5S 长久化生产进度, 将 ConsumerOffsetManager#offsetTable 属性序列化到 consumerOffset.json 文件,以笼罩的模式从新写入,offsetTable 是一个 Map 类型的属性,key 是:topic@consumeGroup,value 是每个 ConsumeQueue 的生产进度,也是一个汇合,key 是 id,value 是 offset

FlushConsumeQueueService:每隔 1S 执行刷新 ConsumeQueue,当某个 ConsumeQueue 新写入的数据超过 2 页(8kb),强制 Flush 数据至磁盘;同时每隔 60S 对所有的 ConsumeQueue 执行一次 flush,不论新写入数据量

ReputMessageService:每隔 1ms 进行一次 Reput 工作,将新音讯的地位信息存入 ConsumeQueue,key 信息存入 IndexFile,同时唤醒那些订阅了新音讯所属队列的消费者申请,让它们执行音讯的拉取工作

CommitRealTimeService(开启写入缓冲池):将缓冲池中的数据 Commit 到 CommitLog 的 FileChannel 中

FlushRealTimeService(异步写):每 500ms 对 CommitLog 进行一次 Flush,当新写入数据超过 16KB,或者间隔上次 Flush 的工夫距离超过 10S,将 CommitLog 位于内存中的数据同步到磁盘文件

CleanCommitLogService:每隔 10S 执行一次清理生效 CommitLog 日志文件,默认清理 72h 之前的

CleanConsumeQueueService:每隔 10S 执行一次清理生效 ConsumeQueue 和 IndexFile 文件

PullRequestHoldService:持有针对每个 ConsumeQueue 的音讯 PullRequest,每隔 5S,依据条件:maxOffset > pullFromOffset 来确定是否要唤醒订阅相应 ConsumeQueue 的 PullRequest

ClientHousekeepingService:每隔 10S 扫描持有的 ProducerChannel,ConsumerChannel,FilterChannel,将那些超过 2m 没有发送心跳的连贯敞开掉

每隔 30S 向指定的一个或多个 Namesrc 注册 Broker 信息

consumeQueue 生成

ReputMessageService 继承 ServiceThread 是一个线程服务,服务启动后每距离 1 毫秒调用一次 doReput 办法,doReput 办法会调用 CommitLogDispatcher 进行音讯散发。步骤:获取 index 文件的 mapFile
mapFile 放入 msg

consumerQueue 是帮忙消费者找到音讯的索引文件。外面寄存的就是每条音讯的起始位点和音讯的大小还有 tag 的 hashCode。

index 生成及构造

index 文件依据 key 能够疾速的检索到对应的音讯,key 分为两种,一种是 UniqKey,零碎主动生成的 (createUniqID() 函数生成, 相似 uuid)。还有一种是自定义的 key,放在 message 的 property 中的,生产者指定的。总的来说就是解决一个文件中查找一条音讯。入口:ReputMessageService 继承 ServiceThread 是一个线程服务,服务启动后每距离 1 毫秒调用一次 doReput 办法,doReput 办法会调用 CommitLogDispatcher 进行音讯散发。步骤:获取 index 文件的 mapFile
mapFile 放入 msg

MappedFileQueue

CommitLog 音讯存储、ConsumeQueue 等通常会记录大量的数据,一个 MappedFile 具备固定大小(默认 1G),所以一个 MappedFile 不能记录所有的内容,于是 CommitLog、ConsumeQueue 通常会应用多个 MappedFile 记录数据,RocketMQ 则应用 MappedFileQueue 组织一系列的 MappedFile,处在 MappedFileQueue 队尾的通常是刚写满的或者还有局部空间或者刚调配的 MappedFile,每次写操作时,都会从队尾拿到最初一个 MappedFile 进行写。如果启动时配置了启用 transientStorePoolEnable,那么在 DefaultMessageStore 构造函数中会调用 TransientStorePool.init 办法,预调配 ByteBuffer 并放入队列中,并且会锁住这些内存,避免操作系统进行置换。只有主 Broker、刷盘形式为异步刷盘且 transientStorePoolEnable 为 true 才会启用暂存池。没有开启 transientStorePoolEnable,创立的时候就会生成一个文件,而后对文件进行映射。如果 transientStorePoolEnable,那么每次在创立的时候,岂但会生成一个文件,而后对文件进行映射,还会从 TransientStorePool 外面 "借" 一块堆外内存(堆外内存曾经申请好了)作为 writeBuffer,而后每次写数据的时候,优先写入 writeBuffer。每次写数据时,都会从 MappedFileQueue 中获取最初一个 MappedFile,如果 MappedFileQueue 为空,或者最初一个 MappedFile 曾经写满,则会重新分配一个新的 MappedFile。如果写数据的时候,残余的空间不够写入(写入的数据大小 + 完结标记 > 残余容量),就会把残余的空间写入文件完结标记,而后返回 END_OF_FILE。而后从新申请一个 mappedFile,从新写入。MappedFile 刷盘操作依据具体配置分为同步和异步刷盘两种形式,这里不论同步异步,其操作相似,都是通过 MappedFile.commit 和 MappedFile.flush, 如果启用了暂存池 TransientStorePool 则会先调用 MappedFile.commit 把 writeBuffer 中的数据写入 fileChannel 中,而后再调用 MappedFile.flush;而 MappedFile.flush 通过 fileChannel.force 或者 mappedByteBuffer.force()进行理论的刷盘动作。如果 writeBuffer 写满了,就会归还给 TransientStorePool,writeBuffer 置为 null。

mq 贮存比照

https://www.zhihu.com/question/346540432

mq 事务

服务端
1:收到事务
批改音讯 topic 为 RMQ_SYS_TRANS_HALF_TOPIC,并备份音讯原有 topic,供后续 commit 音讯时还原音讯 topic 应用
批改音讯 queueId 为 0,并备份音讯原有 queueId,供后续 commit 音讯时还原音讯 queueId 应用
失常的存储信息。因为 topic 被扭转,所以无奈生产
2:事务响应
判断来自事务查看 是则 进行执行操作
判断事务状态 是提交 还是回滚 还是 pending 状态
结构 OperationResult 依据提交事务还是回滚事务进行提交或者回滚音讯
    提交事务
        查看筹备音讯 返回 remotingCommand
        完结音讯事务 endMessageTransaction 应用之前存储的实在 topic 和 queueId 从新构建一个新的音讯
        刷盘解决 将新的音讯写入到 commitLog
        刷盘胜利 进行删除 deletePrepareMessage,将音讯的 offset 当作音讯提放入 opQueue
    回滚事务
        查看筹备音讯 返回 remotingCommand
        返回胜利状态
        不须要做什么 因为实在的音讯始终还败落盘 故也不须要删除
        进行删除 deletePrepareMessage,将音讯的 offset 当作音讯提放入 opQueue
返回 OperationResult 后果
3:回查
开了一个定时工作定时回查。一直地生产 halfQueue 外面的音讯,如果发现 opQueue 中含有这个音讯的 offset,代表曾经解决过了。没有发现,就检测重插次数、事务超时工夫、立马检测事务的工夫等,把这条音讯从新放回 halfQueue,发送回查申请给客户端。客户端依据本地事务状态发送提交、回滚。https://blog.csdn.net/hosaos/article/details/90240260

nameserver

nameServer 治理 broker。在 broker 注册的时候,会通过 registerBrokerAll 把 topic 信息(蕴含 queue 的信息)、地址、brokerId、brokerName 上报到 nameServer。broker 在启动的时候,定时工作调用 registerBrokerAll。在更新 broker 配置、创立 topic 的时候,也会调用 registerBrokerAll。brokerName->broker 地址
topic->queueData
cluster->brokerName
brokerAddr->brokerl 心跳信息
brokerAddr->filter

https://www.jianshu.com/p/3d8d594d9161

pagecache 的读写

读:加 pagecache 锁,从 hash 表中查找 page,如果存在
    page 援用计数加一,放锁(咱们只在对 PageCache 相干的数据结构进行读写时须要加 pagecache 锁,当咱们将 page 取出来,并且援用计数加了一以保障 page 不会被回收,之后的 page 读写,就会以 page 为粒度进行并发管制)如果 page 是最新的:读它!而后援用缩小一
    如果 page 不是最新的
        对 page 加独占锁(可能会 sleep)拿到锁后,查看 page 是否是最新的(因而 sleep 时,其余过程可能曾经读了此 page)调用 mapping->a_ops->readpage 来读入 page
            如果出错:开释援用,并返回。留神,如果出错,page 锁会被间接放掉
            否则 wait_on_page:这个读是由底层实现的,读期间,page 是锁着的,当读操作实现,page 会被放锁,于是 wait_on_page 会醒来
            当初 page 是最新的了,化归成之前的状况了
如果不存在
    放 pagecache 锁
    应用 page_cache_alloc 调配一个 page
    加 pagecache 锁
    再次查找 hash 表,如果存在,则能够化归成之前的状况了。留神,为什么这里要再次查找呢?因为调配 page 期间放开了 pagecache 锁,此时,其余线程可能曾经调配了须要的 page,于是,咱们须要进行一次双重查看
    将 page 退出到 pagecache
    放 pagecache 锁
    将 page 退出 lru,留神,lru 应用独自的锁
    于是,问题又化归为下面的状况(代码中用了 goto)写:计算要写的 page 在文件地址空间中的 index
__grab_cache_page(mapping, index, &cached_page):从 PageCache 中取出须要的 page,并加锁,如果 page 不存在,会调配一个,并且退出到 PageCache 中
留神,此时,以后线程持有 page 的援用,并且 page 处于加锁状态,此过程和读过程相似,只不过还没有放锁。mapping->a_ops->prepare_write(file, page, offset, offset+bytes)
__copy_from_user
mapping->a_ops->commit_write(file, page, offset, offset+bytes)
对 page 放锁
开释 page 的援用计数

抵触:读和写在寻找对应 page 的时候都须要加 pagecache 锁。在写申请对 page 加锁后,此一页的读申请须要等到写申请锁开释能力读。//https://zhuanlan.zhihu.com/p/42364591

put 音讯时加锁

RocketMQ 在写入音讯到 CommitLog 中时,应用了锁机制,即同一时刻只有一个线程能够写 CommitLog 文件

同步刷盘时,刷盘一次须要的工夫绝对较长,锁竞争强烈,会有较多的线程处于期待阻塞期待锁的状态,如果采纳自旋锁会节约很多的 CPU 工夫,所以“同步刷盘倡议应用重入锁”。异步刷盘是距离肯定的工夫刷一次盘,写入隐射内存就返回,对锁的持有不久,所以锁竞争不强烈,不会存在大量阻塞期待锁的线程,偶然锁期待就自旋期待一下很短的工夫,不要进行上下文切换了,所以采纳自旋锁更适合。

request_reply

Producer 发送 request 时创立一个 RequestResponseFuture,以 correlationId 为 key,RequestResponseFuture 为 value 存入 map,同时申请中带上 RequestResponseFuture 中的 correlationId 曾经客户端的 clientId,收到回包后依据 correlationId 拿到对应的 RequestResponseFuture,并设置回包内容。Producer 端还启动了一个定时工作扫描 map,检测 request 是否超时。request 就是用 send 发送。同步 request: 每个 RequestResponseFuture 外面有一个闭锁 countDownLatch,当收到此条音讯的 reply 后解锁。producer 在发送音讯的时候,会给每条音讯生成惟一的标识符,同时还带上了 producer 的 clientId。当 consumer 收到并生产音讯后,从音讯中取出音讯的标识符 correlationId 和 producer 的标识符 clientId,放入响应音讯,用来确定此响应音讯是哪条申请音讯的回包,以及此响应音讯应该发给哪个 producer。同时响应音讯中设置了音讯的类型以及响应音讯的 topic,而后 consumer 将音讯发给 broker。服务端承受 request 和承受 send 没区别。服务端有解决 reply 申请的逻辑,依据配置还能够存储 reply 音讯。broker 收到响应音讯后,须要将音讯发回给指定的 producer。Broker 如何晓得发回给哪个 producer?因为音讯中蕴含了 producer 的标识符 clientId,在 ProducerManager 中,保护了标识符和 channel 信息的对应关系,通过这个对应关系,就能把回包发给对应的 producer。

retry 音讯

Retry 音讯即 consumer 生产失败,要求 broker 重发的音讯。失败的起因有两种,一种是业务端代码解决失败;还有一种是音讯在 consumer 的缓存队列中待的工夫超时,consumer 会将音讯从队列中移除,而后退回给 Broker 重发

音讯会从新发往 %RETRY%+consumerGroup topic,这个 topic 每个 broker 上有一个,读和写队列数量为 1。音讯达到 broker 上,会放入 SCHEDULE_TOPIC_XXXX 下,而后定时工作会读取对应的 consumerQueue,从 commitlog 中读出音讯放入 retry topic 的队列中,客户端都订阅了 retry topic,此时就能将音讯拉取下来了。retry topic 是一个 consumer 一个。每个 broker 上会有一个。

tag 应用

同一个 group 应用的 tag 必须统一,不而后注册的会笼罩后面注册的,导致音讯失落,tag 过滤在服务端通过 hash 过滤,客户端通过 equal 过滤。会导致 tag1 的音讯在服务端被过滤了,tag2 的局部音讯负载到了 consumer1,导致 consumer1 过滤了。consumer2 只可能生产局部音讯。

rebalance 机会

触发条件这个 rebalance 的条件有:

每 20s 定时刷新(精确说上次刷新后等 20s, @see RebalanceService#run
收到 Broker 告知的 Consume 变动告诉时 @see ClientRemotingProcessor#notifyConsumerIdsChanged
每次 Client 启动时 @see DefaultMQPushConsumerImpl#start。每次 client 上报心跳,服务端会判断是否有配置变动或者是新的 id 退出。如果是,那么会告诉这个 group 下所有的 client 进行 rebalance。client 下线也会告诉这个 group 下所有的 client 进行 rebalance

磁盘指标

rrqm/s: 每秒进行 merge 的读操作数目。即 delta(rmerge)/s 
wrqm/s: 每秒进行 merge 的写操作数目。即 delta(wmerge)/s 
r/s: 每秒实现的读 I / O 设施次数。即 delta(rio)/s 
w/s: 每秒实现的写 I / 0 设施次数。即 delta(wio)/s 
rsec/s: 每秒读扇区数。即 delta(rsect)/s 
wsec/s: 每秒写扇区数。即 delta(wsect)/s 
rKB/s: 每秒读 K 字节数。是 rsec/ s 的一半,因为每扇区大小为 512 字节 
wKB/s: 每秒写 K 字节数。是 wsec/ s 的一半 
avgrq-sz: 均匀每次设施 I / O 操作的数据大小(扇区)。即 delta(rsect+wsect)/delta(rio+wio) 
avgqu-sz: 均匀 I / O 队列长度。即 delta(aveq)/s/1000(因为 aveq 的单位为毫秒) duiw
await: 均匀每次设施 I / O 操作的等待时间(毫秒)。即 delta(ruse+wuse)/delta(rio+wio) 
svctm: 均匀每次设施 I / O 操作的服务工夫(毫秒)。即 delta(use)/delta(rio+wio)!!!svctm 越靠近于 await 则阐明等待时间少 
%util: 一秒中有百分之多少的工夫用于 I / O 操作, 或者说一秒中有多少工夫 I / O 队列是非空的。80% 示意设施曾经很忙了, 即 delta(usr)/s/1000(因为 use 的单位为毫秒)

rqsize:The average size (in sectors) of the requests that were issued to the device.
qusize:The average queue length of the requests that were issued to the device.
qusize = (rs+ws)*await/1000
 
 rocketmq 如果磁盘压力大,体现为:1:io util 简直达到 100%  
 2:r/s(每秒实现的读 I / O 设施次数),回升
 解释:rocketmq 如果失常状况下,都是读热数据,根本不须要去磁盘读,如果读历史音讯,会导致 io 指向磁盘,r/ s 读的次数减少,此时因为读写都用的是 pagecache,所以写申请的解决会变慢,w/ s 写的次数会变多。

正文完
 0