从节点的分流判断
针对音讯沉积量过大会切换到Slave进行查问。maxOffsetPy 为以后最大物理偏移量,maxPhyOffsetPulling 为本次音讯拉取最大物理偏移量,他们的差即可示意音讯沉积量TOTAL_PHYSICAL_MEMORY_SIZE 示意以后零碎物理内存,accessMessageInMemoryMaxRatio 的默认值为 40,以上逻辑即可算出以后音讯沉积量是否大于物理内存的 40%,如果大于则将 suggestPullingFromSlave 设置为 true。
//https://www.jianshu.com/p/cd138e67dca0
master宕机
1状况:nameServer未检测到broker掉线,无论客户端是否产生了rebalance,producer、consumer不晓得broker挂了
2状况:nameServer检测到了broker掉线,然而客户端未产生rebalance,producer、consumer不晓得broker挂了。
3状况:nameServer检测到了broker掉线,客户端产生了rebalance,producer、consumer感知到了broker掉线。
# nameServer治理broker。在broker注册的时候,会通过registerBrokerAll把topic信息(蕴含queue的信息)、地址、brokerId、brokerName上报到nameServer。broker在启动的时候,定时工作调用registerBrokerAll。在更新broker配置、创立topic的时候,也会调用registerBrokerAll。
nameServer启动时会开启一个定时工作,定时检测broker的心跳。如果某broker心跳不在了,会进行移除操作。只有同一个brokerName的所有broker都没了,才会移除对应的queue信息。
发送者:在1、2时,对此broker上的queue发送音讯时,会失败,同步发送的话,会重试,重试也可能失败,最终会抛出异样。等到在3时,producer端会发现此queue的masterId的broker挂了,就会抛出异样。
消费者:在1、2时,拉取此broker上的queue音讯时,会失败,在push模式下会稍后接着拉去,等到在3时,consumer会发现此queue的masterId的broker挂了,就会寻找同brokerName的broker的地址拉去音讯,也就是会去从拉取音讯。
对于producer来说,主挂了,再对此broker上的queue发消息都会失败抛异样给用户,用户能够指定queue选择器(或本人实现)来排除对此broker上的queue发送音讯。
对于consumer来说,主挂了,等到rebanlance后,就会从此broker的从拉取音讯。
多线程写磁盘速度是否晋升
https://blog.csdn.net/u013043103/article/details/84326462
(1)读写最好还是不要多线程,硬盘读写的速度无限,单线程时曾经满负荷了,多线程又会减少线程之间的切换,会减少工夫。
如果想减少读写速度,应该减少硬盘,做raid
(2)首先是硬盘的写入是串行的,CPU的计算才是并行的,如果你并重计算那么多线程能进步,要不怎么叫做并行计算呢;
如果偏重存储,除非数据量达到足以体现劣势的水平,否则加上线程之间切换的损耗当然会效率更加公开。
(3)这个是依照算法来说的,目前来说大多数的算法都是很快的,瓶颈都在磁盘的IO上,咱们针对大多数的算法都进行过测试,根本一半以上的工夫都消耗在磁盘的IO上。
比方我解决一个影像,解决数据用了1分钟,写入图像用了2分钟,那你把你的算法优化的很牛逼,10秒中搞定,你的效率进步了多少,然而如果我多线程写入的话,
我效率进步一倍,也就是写入图像用了1分钟,那这个效率显著比你优化你的算法来的实惠。这个货色还是要针对算法来说的。
(4)磁盘IO单线程程序写时最快的,如果多线程写,磁盘的磁头要一直从新寻址,所以写入速度反而会慢。
依据msgid做幂等不牢靠
当存在网络稳定,网路延时等诸多问题时,音讯从客户端发送至服务端过程中,服务端失常写入了commit-log,可在响应客户端(ACK)的时候失败了。导致两条一样的音讯内容,却有了不一样的MsgId跟OffsetMsgId,最终它还是反复生产了
消费者如果生产失败,调用sendMessageBack办法将音讯发给broker,从新生产时的音讯msgId不变,offsetMsgId会变(因为新音讯贮存的地址已变),uniq_key属性保留原音讯msgId
开启长期内存transientStorePoolEnable
mappedFile有两种模式,
一种是只有映射内存,写和读都是从映射的内存中读。然而读写都是间接操作pagecache,会导致在大量读冷数据的时候,pagecache吃紧的时候,读和写会相互影响,可能会导致写超时之类(默认超过200ms)。
一种是长期内存+映射内存。长期内存是有jvm向操作系统申请的间接内存,间接会锁住,避免内核将此块内存置换到硬盘。这种模式下,写入间接写入长期内存就返回,读的话仍旧读映射内存,间接内存会有一个服务commit到映射内存,写入间接内存然而没有commit的数据是不能够读的。
commit的策略,每次在put音讯完结后都会唤醒刷盘服务,如果未commit的数据超过leastPages(默认4k)或者间隔上次commit的工夫超过commitDataThoroughInterval(默认200ms)就会commit一次。
开启长期内存会优化读写的抵触,晋升性能,等于批量写pagecache,尤其在读冷数据pachecache吃紧的状况下。然而写入音讯不频繁(200ms内,音讯量达不到4k),音讯会产生较为显著额定的提早。在写入压力失常的状况下,根本不会有额定的提早。
默认路由
默认路由TBW102阐明一下
broker如果开启了autoCreateTopicEnable(默认就是true),broker会向namesrv注册默认路由topic:TBW102
当producer从namesrv取topic路由如果不存在,就会取TBW102的路由信息,查看哪些broker反对主动创立topic,而后结构新的topic路由信息。
这里有一个坑点,如果应用默认路由,可能呈现topic只在一个broker中创立,例如如下状况
一开始broker-a和broker-b反对主动创立路由,
producer本地更新了本地路由表[topicnew->broker-a; topicnew->broker-b],而后发送音讯给broker-a,broker-a更新了topicnew到namesrv,这时的namesrv上只有[topicnew->broker-a]一个路由信息,如果一段时间producer没有发送音讯给broker-b,而后从namesrv更新了topicnew的路由信息,则最终就会呈现新增的topic只被路由到一个broker中,没有高可用。
所以在音讯不是很频繁的状况下,最好手动创立topic。
起始位点逻辑
默认状况下为:最新位点
所属的消费者组是新上线的,订阅的音讯,最早的音讯(offset=0)还在内存中。rocketmq的设计者认为,你这是一个新上线的业务,会强制从第一条音讯开始生产。
如果订阅的音讯,曾经产生了过期音讯。那么才会从咱们这个client启动的工夫点开始生产。
设置为起始位点:
客户端计算逻辑会为0,而后服务端发现位点比最小位点要小,就会更正位点,设置为最小位点
设置工夫戳:
如果消费者组以前生产过某个topic,setConsumeFromWhere这个参数是不起效的。只有broker找的到生产位点,就是依照broker的来
RebalancePushImpl#computePullFromWhereWithException
ConsumerManageProcessor#queryConsumerOffset
DefaultMessageStore#getMessage
事务回查实现
TransactionalMessageCheckService类中,此类蕴含一个线程,此线程默认每分钟触发一次事务查看,在其onWaitEnd办法中,实际上还是调用了TransactionalMessageService的check办法。
大抵的回查逻辑:Half Topic 对应队列中寄存着 prepare 音讯,Operation Topic 对应的队列则寄存了 prepare message 对应的 commit/rollback 音讯,音讯体中则是 prepare message 对应的 offset,服务端通过比对两个队列的差值来找到尚未提交的超时事务,进行回查。
https://www.jianshu.com/p/feda710d9716
刷盘服务
https://blog.csdn.net/yecong111/article/details/103858172
位点保留逻辑
每个queue都会有ProcessQueue来保护生产状况
每次拉取一批音讯,都会把音讯放到一个treeMap中,key为offset,value为msg
每次生产完某条音讯,就会把这条信息在treeMap中移除。而后将内存中的offsetTable的信息更新,更新为treeMap的树顶。
而后在MQClientInstance启动的时候会启动一个定时工作,定时的上报位点到近程。依据queue获取brokerName,而后再获取地址,而后发送申请。
在pull音讯的时候,如果本位置点大于0,就设置能够提交位点,把本位置点提交下来。broker如果是从,不会理睬,如果是主,就会更新位点到内存。
broker端内存保护各个生产组的生产位点,定时工作长久化(BrokerController#init开启的定时工作)。
ps:位点的这条音讯是没有生产的,须要生产。
提早音讯
提早队列的外围思路是:
Producer发送音讯是,指定delayLevel;或者Consumer生产音讯时,返回RECONSUME_LATER,或者被动的sendMessageBack(…,int delayLevel)时,会将音讯发回给Broker,Broker对音讯做个封装,指定topic为SCHEDULE_TOPIC_XXXX,QueudId=delayLevel-1,若未指定delayLevel,默认是ReConsumeTimes + 3,将封装后的音讯存入CommitLog,ReputMessageService为其生成PositionInfo,tagsCode存储延时投递工夫,存入"SCHEDULE_TOPIC_XXXX"的ConsumeQueue中。delayLevel有16个,因而最多状况下SCHEDULE_TOPIC_XXXX会有16个ConsumeQueue。Broker启动时,ScheduleMessageService会启动16个线程对应16个delayLevel的读取服务,有序的读取ConsumeQueue里的PositionInfo。ScheduleMessageService会在 [以后工夫<=延时投递工夫] 时从CommitLog中提取这音讯,去除封装,抹去delayLevel属性,从新存入CommitLog,并马上更新延时投递偏移量dealyOffset。ReputMessageService再次为以后音讯生成PositionInfo,因为不存在delayLevel,PositionInfo存入Topic为%RETRY%+consumeGroup,queueId为0的ConsumeQueue中。每个消费者在启动时都订阅了本身消费者组的重试队列,当重试队列里有地位信息时,拉取相应音讯进行从新生产。音讯的第一次重试会发回给原始的消费者(执行sendMessageBack的消费者),之后的多次重试对立由订阅了QueueId = 0 的消费者生产。
消费者发回音讯时,能够指定提早级别,默认级别:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,也就是说delayLevel = 3代表提早10秒后重投递,最大重试次数16对应着2h后投递,每多生产一次投递工夫就增长到下个阶段。当提早级别delayLevel < 0时,放入Dead Letter Queue。
重置位点
重置位点的执行程序依照admin 到 broker 到 consumer的程序顺次触发,admin负责构建参数告诉broker,broker负责查问consumeQueue的具体位移,broker负责告诉consumer进行位移重置。
依据工夫戳查找consumeQueue对应的位移,而后由broker告诉consumer来长久化生产位移,最终会长久化到broker的生产位移。
重置位点操作实质上是在consumer端执行,consumer端负责长久化新的生产位移而后由定时工作告诉broker更新生产位移。
consumer在整个位移重置过程中会设置ProcessQueue的状态为Dropped,从而阻断音讯拉取工作ConsumeRequest的执行阻断音讯拉取,其次会在consumer侧批改生产位移通过心跳告诉broker批改consumer的生产位移,最初通过从新的rebalance过程开始从新生产音讯
主从数据不统一
造成的起因是落盘落后于主从复制,导致当主挂了,可能数据没有落盘,然而这些数据能够从从生产到,而主复原后,这一部分数据失落,导致数据不统一。
主从同步
RocketMQ 的主从同步机制如下:
(1) slave启动,跟master建设连贯
(2) slave 以5秒的距离,向master拉取音讯,如果是第一次拉取的话,先获取本地commitlog文件中最大的偏移量,以该偏移量向服务端拉取音讯;
(3) master将数据返回给slave
(4) slave将数据写入本身commitLog中,更新偏移量;反复以上步骤
从服务器定时调度syncAll办法,定时向主服务器同步生产组订阅信息、生产位点、提早音讯位点、topic配置。
BrokerController#handleSlaveSynchronize
发表回复