关于java:rocketmq3

53次阅读

共计 5909 个字符,预计需要花费 15 分钟才能阅读完成。

从节点的分流判断

 针对音讯沉积量过大会切换到 Slave 进行查问。maxOffsetPy 为以后最大物理偏移量,maxPhyOffsetPulling 为本次音讯拉取最大物理偏移量,他们的差即可示意音讯沉积量 TOTAL_PHYSICAL_MEMORY_SIZE 示意以后零碎物理内存,accessMessageInMemoryMaxRatio 的默认值为 40,以上逻辑即可算出以后音讯沉积量是否大于物理内存的 40%,如果大于则将 suggestPullingFromSlave 设置为 true。//https://www.jianshu.com/p/cd138e67dca0

master 宕机

 1 状况:nameServer 未检测到 broker 掉线,无论客户端是否产生了 rebalance,producer、consumer 不晓得 broker 挂了
2 状况:nameServer 检测到了 broker 掉线,然而客户端未产生 rebalance,producer、consumer 不晓得 broker 挂了。3 状况:nameServer 检测到了 broker 掉线,客户端产生了 rebalance,producer、consumer 感知到了 broker 掉线。# nameServer 治理 broker。在 broker 注册的时候,会通过 registerBrokerAll 把 topic 信息(蕴含 queue 的信息)、地址、brokerId、brokerName 上报到 nameServer。broker 在启动的时候,定时工作调用 registerBrokerAll。在更新 broker 配置、创立 topic 的时候,也会调用 registerBrokerAll。nameServer 启动时会开启一个定时工作,定时检测 broker 的心跳。如果某 broker 心跳不在了,会进行移除操作。只有同一个 brokerName 的所有 broker 都没了,才会移除对应的 queue 信息。发送者:在 1、2 时,对此 broker 上的 queue 发送音讯时,会失败,同步发送的话,会重试,重试也可能失败,最终会抛出异样。等到在 3 时,producer 端会发现此 queue 的 masterId 的 broker 挂了,就会抛出异样。消费者:在 1、2 时,拉取此 broker 上的 queue 音讯时,会失败,在 push 模式下会稍后接着拉去,等到在 3 时,consumer 会发现此 queue 的 masterId 的 broker 挂了,就会寻找同 brokerName 的 broker 的地址拉去音讯,也就是会去从拉取音讯。对于 producer 来说,主挂了,再对此 broker 上的 queue 发消息都会失败抛异样给用户,用户能够指定 queue 选择器(或本人实现)来排除对此 broker 上的 queue 发送音讯。对于 consumer 来说,主挂了,等到 rebanlance 后,就会从此 broker 的从拉取音讯。

多线程写磁盘速度是否晋升

https://blog.csdn.net/u013043103/article/details/84326462(1)读写最好还是不要多线程,硬盘读写的速度无限,单线程时曾经满负荷了,多线程又会减少线程之间的切换,会减少工夫。如果想减少读写速度,应该减少硬盘,做 raid(2)首先是硬盘的写入是串行的,CPU 的计算才是并行的,如果你并重计算那么多线程能进步,要不怎么叫做并行计算呢;如果偏重存储,除非数据量达到足以体现劣势的水平,否则加上线程之间切换的损耗当然会效率更加公开。(3)这个是依照算法来说的,目前来说大多数的算法都是很快的,瓶颈都在磁盘的 IO 上,咱们针对大多数的算法都进行过测试,根本一半以上的工夫都消耗在磁盘的 IO 上。比方我解决一个影像,解决数据用了 1 分钟,写入图像用了 2 分钟,那你把你的算法优化的很牛逼,10 秒中搞定,你的效率进步了多少,然而如果我多线程写入的话,我效率进步一倍,也就是写入图像用了 1 分钟,那这个效率显著比你优化你的算法来的实惠。这个货色还是要针对算法来说的。(4)磁盘 IO 单线程程序写时最快的,如果多线程写,磁盘的磁头要一直从新寻址,所以写入速度反而会慢。

依据 msgid 做幂等不牢靠

当存在网络稳定,网路延时等诸多问题时,音讯从客户端发送至服务端过程中,服务端失常写入了 commit-log,可在响应客户端(ACK)的时候失败了。导致两条一样的音讯内容,却有了不一样的 MsgId 跟 OffsetMsgId,最终它还是反复生产了

消费者如果生产失败,调用 sendMessageBack 办法将音讯发给 broker,从新生产时的音讯 msgId 不变,offsetMsgId 会变(因为新音讯贮存的地址已变),uniq_key 属性保留原音讯 msgId

开启长期内存 transientStorePoolEnable

mappedFile 有两种模式,一种是只有映射内存,写和读都是从映射的内存中读。然而读写都是间接操作 pagecache,会导致在大量读冷数据的时候,pagecache 吃紧的时候,读和写会相互影响,可能会导致写超时之类(默认超过 200ms)。一种是长期内存 + 映射内存。长期内存是有 jvm 向操作系统申请的间接内存,间接会锁住,避免内核将此块内存置换到硬盘。这种模式下,写入间接写入长期内存就返回,读的话仍旧读映射内存,间接内存会有一个服务 commit 到映射内存,写入间接内存然而没有 commit 的数据是不能够读的。commit 的策略,每次在 put 音讯完结后都会唤醒刷盘服务,如果未 commit 的数据超过 leastPages(默认 4k)或者间隔上次 commit 的工夫超过 commitDataThoroughInterval(默认 200ms)就会 commit 一次。开启长期内存会优化读写的抵触,晋升性能,等于批量写 pagecache,尤其在读冷数据 pachecache 吃紧的状况下。然而写入音讯不频繁(200ms 内,音讯量达不到 4k),音讯会产生较为显著额定的提早。在写入压力失常的状况下,根本不会有额定的提早。

默认路由

默认路由 TBW102 阐明一下

broker 如果开启了 autoCreateTopicEnable(默认就是 true),broker 会向 namesrv 注册默认路由 topic:TBW102

当 producer 从 namesrv 取 topic 路由如果不存在,就会取 TBW102 的路由信息,查看哪些 broker 反对主动创立 topic,而后结构新的 topic 路由信息。这里有一个坑点,如果应用默认路由,可能呈现 topic 只在一个 broker 中创立,例如如下状况

一开始 broker- a 和 broker- b 反对主动创立路由,producer 本地更新了本地路由表 [topicnew->broker-a;   topicnew->broker-b],而后发送音讯给 broker-a,broker- a 更新了 topicnew 到 namesrv,这时的 namesrv 上只有[topicnew->broker-a] 一个路由信息,如果一段时间 producer 没有发送音讯给 broker-b,而后从 namesrv 更新了 topicnew 的路由信息,则最终就会呈现新增的 topic 只被路由到一个 broker 中,没有高可用。所以在音讯不是很频繁的状况下,最好手动创立 topic。

起始位点逻辑

默认状况下为:最新位点
    所属的消费者组是新上线的,订阅的音讯,最早的音讯(offset=0)还在内存中。rocketmq 的设计者认为,你这是一个新上线的业务,会强制从第一条音讯开始生产。如果订阅的音讯,曾经产生了过期音讯。那么才会从咱们这个 client 启动的工夫点开始生产。设置为起始位点:客户端计算逻辑会为 0,而后服务端发现位点比最小位点要小,就会更正位点,设置为最小位点
设置工夫戳:如果消费者组以前生产过某个 topic,setConsumeFromWhere 这个参数是不起效的。只有 broker 找的到生产位点,就是依照 broker 的来

RebalancePushImpl#computePullFromWhereWithException
ConsumerManageProcessor#queryConsumerOffset
DefaultMessageStore#getMessage

事务回查实现

TransactionalMessageCheckService 类中,此类蕴含一个线程,此线程默认每分钟触发一次事务查看,在其 onWaitEnd 办法中,实际上还是调用了 TransactionalMessageService 的 check 办法。大抵的回查逻辑:Half Topic 对应队列中寄存着 prepare 音讯,Operation Topic 对应的队列则寄存了 prepare message 对应的 commit/rollback 音讯,音讯体中则是 prepare message 对应的 offset,服务端通过比对两个队列的差值来找到尚未提交的超时事务,进行回查。https://www.jianshu.com/p/feda710d9716

刷盘服务

https://blog.csdn.net/yecong111/article/details/103858172

位点保留逻辑

每个 queue 都会有 ProcessQueue 来保护生产状况
每次拉取一批音讯,都会把音讯放到一个 treeMap 中,key 为 offset,value 为 msg
每次生产完某条音讯,就会把这条信息在 treeMap 中移除。而后将内存中的 offsetTable 的信息更新,更新为 treeMap 的树顶。而后在 MQClientInstance 启动的时候会启动一个定时工作,定时的上报位点到近程。依据 queue 获取 brokerName,而后再获取地址,而后发送申请。在 pull 音讯的时候,如果本位置点大于 0,就设置能够提交位点,把本位置点提交下来。broker 如果是从,不会理睬,如果是主,就会更新位点到内存。broker 端内存保护各个生产组的生产位点,定时工作长久化(BrokerController#init 开启的定时工作)。ps:位点的这条音讯是没有生产的,须要生产。

提早音讯

提早队列的外围思路是:Producer 发送音讯是,指定 delayLevel;或者 Consumer 生产音讯时,返回 RECONSUME_LATER,或者被动的 sendMessageBack(…,int delayLevel)时,会将音讯发回给 Broker,Broker 对音讯做个封装,指定 topic 为 SCHEDULE_TOPIC_XXXX,QueudId=delayLevel-1,若未指定 delayLevel,默认是 ReConsumeTimes + 3,将封装后的音讯存入 CommitLog,ReputMessageService 为其生成 PositionInfo,tagsCode 存储延时投递工夫,存入 "SCHEDULE_TOPIC_XXXX" 的 ConsumeQueue 中。delayLevel 有 16 个,因而最多状况下 SCHEDULE_TOPIC_XXXX 会有 16 个 ConsumeQueue。Broker 启动时,ScheduleMessageService 会启动 16 个线程对应 16 个 delayLevel 的读取服务,有序的读取 ConsumeQueue 里的 PositionInfo。ScheduleMessageService 会在 [以后工夫 <= 延时投递工夫] 时从 CommitLog 中提取这音讯,去除封装,抹去 delayLevel 属性,从新存入 CommitLog,并马上更新延时投递偏移量 dealyOffset。ReputMessageService 再次为以后音讯生成 PositionInfo,因为不存在 delayLevel,PositionInfo 存入 Topic 为 %RETRY%+consumeGroup,queueId 为 0 的 ConsumeQueue 中。每个消费者在启动时都订阅了本身消费者组的重试队列,当重试队列里有地位信息时,拉取相应音讯进行从新生产。音讯的第一次重试会发回给原始的消费者(执行 sendMessageBack 的消费者),之后的多次重试对立由订阅了 QueueId = 0 的消费者生产。消费者发回音讯时,能够指定提早级别,默认级别:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,也就是说 delayLevel = 3 代表提早 10 秒后重投递,最大重试次数 16 对应着 2h 后投递,每多生产一次投递工夫就增长到下个阶段。当提早级别 delayLevel < 0 时,放入 Dead Letter Queue。

重置位点

重置位点的执行程序依照 admin 到 broker 到 consumer 的程序顺次触发,admin 负责构建参数告诉 broker,broker 负责查问 consumeQueue 的具体位移,broker 负责告诉 consumer 进行位移重置。依据工夫戳查找 consumeQueue 对应的位移,而后由 broker 告诉 consumer 来长久化生产位移,最终会长久化到 broker 的生产位移。重置位点操作实质上是在 consumer 端执行,consumer 端负责长久化新的生产位移而后由定时工作告诉 broker 更新生产位移。consumer 在整个位移重置过程中会设置 ProcessQueue 的状态为 Dropped,从而阻断音讯拉取工作 ConsumeRequest 的执行阻断音讯拉取,其次会在 consumer 侧批改生产位移通过心跳告诉 broker 批改 consumer 的生产位移,最初通过从新的 rebalance 过程开始从新生产音讯

主从数据不统一

造成的起因是落盘落后于主从复制,导致当主挂了,可能数据没有落盘,然而这些数据能够从从生产到,而主复原后,这一部分数据失落,导致数据不统一。

主从同步

RocketMQ 的主从同步机制如下:(1) slave 启动,跟 master 建设连贯
(2) slave 以 5 秒的距离,向 master 拉取音讯,如果是第一次拉取的话,先获取本地 commitlog 文件中最大的偏移量,以该偏移量向服务端拉取音讯;(3) master 将数据返回给 slave
(4) slave 将数据写入本身 commitLog 中,更新偏移量;反复以上步骤


从服务器定时调度 syncAll 办法,定时向主服务器同步生产组订阅信息、生产位点、提早音讯位点、topic 配置。BrokerController#handleSlaveSynchronize

正文完
 0