Broker解决Topic创立
1.更改本地topic配置缓存topicConfigTable
2.将缓存topicConfigTable配置信息写入磁盘
3.向NameServer上报变更信息
4.主从同步变更信息
源码入口
broker端AdminBrokerProcessor#processRequest
broker定时工作
ScheduleMessageService:每隔10S长久化每个延时队列的投递进度
ConsumerOffsetManager:Broker每隔5S长久化生产进度,将 ConsumerOffsetManager#offsetTable 属性序列化到consumerOffset.json 文件,以笼罩的模式从新写入,offsetTable 是一个Map类型的属性,key 是:topic@consumeGroup ,value是每个ConsumeQueue的生产进度,也是一个汇合,key是id,value是offset
FlushConsumeQueueService: 每隔1S执行刷新ConsumeQueue,当某个ConsumeQueue新写入的数据超过2页(8kb),强制Flush数据至磁盘;同时每隔60S对所有的ConsumeQueue执行一次flush,不论新写入数据量
ReputMessageService:每隔1ms进行一次Reput工作,将新音讯的地位信息存入ConsumeQueue,key信息存入IndexFile,同时唤醒那些订阅了新音讯所属队列的消费者申请,让它们执行音讯的拉取工作
CommitRealTimeService(开启写入缓冲池):将缓冲池中的数据Commit到CommitLog的FileChannel中
FlushRealTimeService(异步写):每500ms对CommitLog进行一次Flush,当新写入数据超过16KB,或者间隔上次Flush的工夫距离超过10S,将CommitLog位于内存中的数据同步到磁盘文件
CleanCommitLogService:每隔10S执行一次清理生效CommitLog日志文件,默认清理72h之前的
CleanConsumeQueueService:每隔10S执行一次清理生效ConsumeQueue和IndexFile文件
PullRequestHoldService:持有针对每个ConsumeQueue的音讯PullRequest,每隔5S,依据条件:maxOffset > pullFromOffset 来确定是否要唤醒订阅相应ConsumeQueue的PullRequest
ClientHousekeepingService:每隔10S扫描持有的ProducerChannel,ConsumerChannel,FilterChannel,将那些超过2m没有发送心跳的连贯敞开掉
每隔30S向指定的一个或多个Namesrc注册 Broker信息
consumeQueue生成
ReputMessageService继承ServiceThread是一个线程服务,服务启动后每距离1毫秒调用一次doReput办法,doReput办法会调用CommitLogDispatcher进行音讯散发。
步骤:
获取index文件的mapFile
mapFile放入msg
consumerQueue是帮忙消费者找到音讯的索引文件。外面寄存的就是每条音讯的起始位点和音讯的大小还有tag的hashCode。
index生成及构造
index文件依据key能够疾速的检索到对应的音讯,key分为两种,一种是UniqKey,零碎主动生成的(createUniqID()函数生成,相似uuid)。还有一种是自定义的key,放在message的property中的,生产者指定的。总的来说就是解决一个文件中查找一条音讯。
入口:
ReputMessageService继承ServiceThread是一个线程服务,服务启动后每距离1毫秒调用一次doReput办法,doReput办法会调用CommitLogDispatcher进行音讯散发。
步骤:
获取index文件的mapFile
mapFile放入msg
MappedFileQueue
CommitLog音讯存储、ConsumeQueue等通常会记录大量的数据,一个MappedFile具备固定大小(默认1G),所以一个MappedFile不能记录所有的内容,于是CommitLog、ConsumeQueue通常会应用多个MappedFile记录数据,RocketMQ则应用MappedFileQueue组织一系列的MappedFile,处在MappedFileQueue队尾的通常是刚写满的或者还有局部空间或者刚调配的MappedFile,每次写操作时,都会从队尾拿到最初一个MappedFile进行写。
如果启动时配置了启用transientStorePoolEnable,那么在DefaultMessageStore构造函数中会调用TransientStorePool.init办法,预调配ByteBuffer并放入队列中,并且会锁住这些内存,避免操作系统进行置换。只有主Broker、刷盘形式为异步刷盘且transientStorePoolEnable为true才会启用暂存池。
没有开启transientStorePoolEnable,创立的时候就会生成一个文件,而后对文件进行映射。
如果transientStorePoolEnable,那么每次在创立的时候,岂但会生成一个文件,而后对文件进行映射,还会从TransientStorePool外面"借"一块堆外内存(堆外内存曾经申请好了)作为writeBuffer,而后每次写数据的时候,优先写入writeBuffer。
每次写数据时,都会从MappedFileQueue中获取最初一个MappedFile,如果MappedFileQueue为空,或者最初一个MappedFile曾经写满,则会重新分配一个新的MappedFile。如果写数据的时候,残余的空间不够写入(写入的数据大小+完结标记>残余容量),就会把残余的空间写入文件完结标记,而后返回 END_OF_FILE。而后从新申请一个mappedFile,从新写入。
MappedFile刷盘操作依据具体配置分为同步和异步刷盘两种形式,这里不论同步异步,其操作相似,都是通过MappedFile.commit和MappedFile.flush,如果启用了暂存池TransientStorePool则会先调用MappedFile.commit把writeBuffer中的数据写入fileChannel中,而后再调用MappedFile.flush;而MappedFile.flush通过fileChannel.force或者mappedByteBuffer.force()进行理论的刷盘动作。如果writeBuffer写满了,就会归还给TransientStorePool,writeBuffer置为null。
mq贮存比照
https://www.zhihu.com/question/346540432
mq事务
服务端
1:收到事务
批改音讯topic为RMQ_SYS_TRANS_HALF_TOPIC,并备份音讯原有topic,供后续commit音讯时还原音讯topic应用
批改音讯queueId为0,并备份音讯原有queueId,供后续commit音讯时还原音讯queueId应用
失常的存储信息。因为topic被扭转,所以无奈生产
2:事务响应
判断来自事务查看 是则 进行执行操作
判断事务状态 是提交 还是回滚 还是pending状态
结构OperationResult 依据提交事务还是回滚事务进行提交或者回滚音讯
提交事务
查看筹备音讯 返回remotingCommand
完结音讯事务 endMessageTransaction应用之前存储的实在topic和queueId从新构建一个新的音讯
刷盘解决 将新的音讯写入到commitLog
刷盘胜利 进行删除 deletePrepareMessage,将音讯的offset当作音讯提放入opQueue
回滚事务
查看筹备音讯 返回remotingCommand
返回胜利状态
不须要做什么 因为实在的音讯始终还败落盘 故也不须要删除
进行删除 deletePrepareMessage,将音讯的offset当作音讯提放入opQueue
返回OperationResult后果
3:回查
开了一个定时工作定时回查。一直地生产halfQueue外面的音讯,
如果发现opQueue中含有这个音讯的offset,代表曾经解决过了。
没有发现,就检测重插次数、事务超时工夫、立马检测事务的工夫等,把这条音讯从新放回halfQueue,发送回查申请给客户端。客户端依据本地事务状态发送提交、回滚。
https://blog.csdn.net/hosaos/article/details/90240260
nameserver
nameServer治理broker。在broker注册的时候,会通过registerBrokerAll把topic信息(蕴含queue的信息)、地址、brokerId、brokerName上报到nameServer。broker在启动的时候,定时工作调用registerBrokerAll。在更新broker配置、创立topic的时候,也会调用registerBrokerAll。
brokerName->broker地址
topic->queueData
cluster->brokerName
brokerAddr->brokerl心跳信息
brokerAddr->filter
https://www.jianshu.com/p/3d8d594d9161
pagecache的读写
读:
加pagecache锁,从hash表中查找page,
如果存在
page援用计数加一,放锁(咱们只在对PageCache相干的数据结构进行读写时须要加pagecache锁,当咱们将page取出来,并且援用计数加了一以保障page不会被回收,之后的page读写,就会以page为粒度进行并发管制)
如果page是最新的:读它!而后援用缩小一
如果page不是最新的
对page加独占锁(可能会sleep)
拿到锁后,查看page是否是最新的(因而sleep时,其余过程可能曾经读了此page)
调用mapping->a_ops->readpage来读入page
如果出错:开释援用,并返回。留神,如果出错,page锁会被间接放掉
否则wait_on_page:这个读是由底层实现的,读期间,page是锁着的,当读操作实现,page会被放锁,于是wait_on_page会醒来
当初page是最新的了,化归成之前的状况了
如果不存在
放pagecache锁
应用page_cache_alloc调配一个page
加pagecache锁
再次查找hash表,如果存在,则能够化归成之前的状况了。留神,为什么这里要再次查找呢?因为调配page期间放开了pagecache锁,此时,其余线程可能曾经调配了须要的page,于是,咱们须要进行一次双重查看
将page退出到pagecache
放pagecache锁
将page退出lru,留神,lru应用独自的锁
于是,问题又化归为下面的状况(代码中用了goto)
写:
计算要写的page在文件地址空间中的index
__grab_cache_page(mapping, index, &cached_page):从PageCache中取出须要的page,并加锁,如果page不存在,会调配一个,并且退出到PageCache中
留神,此时,以后线程持有page的援用,并且page处于加锁状态,此过程和读过程相似,只不过还没有放锁。
mapping->a_ops->prepare_write(file, page, offset, offset+bytes)
__copy_from_user
mapping->a_ops->commit_write(file, page, offset, offset+bytes)
对page放锁
开释page的援用计数
抵触:
读和写在寻找对应page的时候都须要加pagecache锁。
在写申请对page加锁后,此一页的读申请须要等到写申请锁开释能力读。
//https://zhuanlan.zhihu.com/p/42364591
put音讯时加锁
RocketMQ在写入音讯到CommitLog中时,应用了锁机制,即同一时刻只有一个线程能够写CommitLog文件
同步刷盘时,刷盘一次须要的工夫绝对较长,锁竞争强烈,会有较多的线程处于期待阻塞期待锁的状态,如果采纳自旋锁会节约很多的CPU工夫,所以“同步刷盘倡议应用重入锁”。
异步刷盘是距离肯定的工夫刷一次盘,写入隐射内存就返回,对锁的持有不久,所以锁竞争不强烈,不会存在大量阻塞期待锁的线程,偶然锁期待就自旋期待一下很短的工夫,不要进行上下文切换了,所以采纳自旋锁更适合。
request_reply
Producer发送request时创立一个RequestResponseFuture,以correlationId为key,RequestResponseFuture为value存入map,同时申请中带上RequestResponseFuture中的correlationId曾经客户端的clientId,收到回包后依据correlationId拿到对应的RequestResponseFuture,并设置回包内容。Producer端还启动了一个定时工作扫描map,检测request是否超时。request就是用send发送。
同步request:每个RequestResponseFuture外面有一个闭锁countDownLatch,当收到此条音讯的reply后解锁。
producer在发送音讯的时候,会给每条音讯生成惟一的标识符,同时还带上了producer的clientId。当consumer收到并生产音讯后,从音讯中取出音讯的标识符correlationId和producer的标识符clientId,放入响应音讯,用来确定此响应音讯是哪条申请音讯的回包,以及此响应音讯应该发给哪个producer。同时响应音讯中设置了音讯的类型以及响应音讯的topic,而后consumer将音讯发给broker。
服务端承受request和承受send没区别。服务端有解决reply申请的逻辑,依据配置还能够存储reply音讯。broker收到响应音讯后,须要将音讯发回给指定的producer。Broker如何晓得发回给哪个producer?因为音讯中蕴含了producer的标识符clientId,在ProducerManager中,保护了标识符和channel信息的对应关系,通过这个对应关系,就能把回包发给对应的producer。
retry音讯
Retry音讯即consumer生产失败,要求broker重发的音讯。失败的起因有两种,一种是业务端代码解决失败;还有一种是音讯在consumer的缓存队列中待的工夫超时,consumer会将音讯从队列中移除,而后退回给Broker重发
音讯会从新发往%RETRY%+consumerGroup topic,这个topic每个broker上有一个,读和写队列数量为1。音讯达到broker上,会放入SCHEDULE_TOPIC_XXXX下,而后定时工作会读取对应的consumerQueue,从commitlog中读出音讯放入retry topic的队列中,客户端都订阅了retry topic,此时就能将音讯拉取下来了。retry topic是一个consumer一个。每个broker上会有一个。
tag应用
同一个group 应用的tag必须统一,不而后注册的会笼罩后面注册的,导致音讯失落,tag过滤在服务端通过hash过滤,客户端通过equal过滤。会导致tag1的音讯在服务端被过滤了,tag2的局部音讯负载到了consumer1,导致consumer1过滤了。consumer2只可能生产局部音讯。
rebalance机会
触发条件这个rebalance的条件有:
每20s定时刷新(精确说上次刷新后等20s, @see RebalanceService#run
收到Broker告知的Consume变动告诉时@see ClientRemotingProcessor#notifyConsumerIdsChanged
每次Client启动时@see DefaultMQPushConsumerImpl#start。
每次client上报心跳,服务端会判断是否有配置变动或者是新的id退出。如果是,那么会告诉这个group下所有的client进行rebalance。client下线也会告诉这个group下所有的client进行rebalance
磁盘指标
rrqm/s:每秒进行merge的读操作数目。即delta(rmerge)/s
wrqm/s:每秒进行merge的写操作数目。即delta(wmerge)/s
r/s:每秒实现的读I/O设施次数。即delta(rio)/s
w/s:每秒实现的写I/0设施次数。即delta(wio)/s
rsec/s:每秒读扇区数。即delta(rsect)/s
wsec/s:每秒写扇区数。即delta(wsect)/s
rKB/s:每秒读K字节数。是rsec/s的一半,因为每扇区大小为512字节
wKB/s:每秒写K字节数。是wsec/s的一半
avgrq-sz:均匀每次设施I/O操作的数据大小(扇区)。即delta(rsect+wsect)/delta(rio+wio)
avgqu-sz:均匀I/O队列长度。即delta(aveq)/s/1000(因为aveq的单位为毫秒) duiw
await:均匀每次设施I/O操作的等待时间(毫秒)。即delta(ruse+wuse)/delta(rio+wio)
svctm:均匀每次设施I/O操作的服务工夫(毫秒)。即delta(use)/delta(rio+wio) !!!svctm越靠近于await则阐明等待时间少
%util:一秒中有百分之多少的工夫用于I/O操作,或者说一秒中有多少工夫I/O队列是非空的。80%示意设施曾经很忙了,即delta(usr)/s/1000(因为use的单位为毫秒)
rqsize:The average size (in sectors) of the requests that were issued to the device.
qusize:The average queue length of the requests that were issued to the device.
qusize = (rs+ws)*await/1000
rocketmq如果磁盘压力大,体现为:
1:io util 简直达到100%
2:r/s(每秒实现的读I/O设施次数),回升
解释:rocketmq如果失常状况下,都是读热数据,根本不须要去磁盘读,如果读历史音讯,会导致io指向磁盘,r/s读的次数减少,此时因为读写都用的是pagecache,所以写申请的解决会变慢,w/s写的次数会变多。
发表回复