如题所示,本文围绕程序音讯,延时音讯与音讯过滤来开展。
一,程序音讯
RocketMQ只能保障队列级别的音讯有序,如果要实现某一类音讯的程序执行,就必须将这类音讯发送到同一个队列,能够在音讯发送时应用 MessageQueueSelector,通过指定sharding key进而将同一类音讯发送到同一队列里,这样在CommitLog文件里音讯的程序就与发送时统一了。broker端抉择发送的队列能够参考之前是的文章:RocketMQ学习五-抉择队列等个性。
上面再说下生产端的解决。
程序音讯生产的事件监听器是MessageListenerOrderly。咱们晓得PullMessageService依据偏移量拉取一批音讯后会存入ProcessQueue中,而后应用线程池进行解决。要保障生产端对单队列中的音讯程序解决,故在多线程场景下须要依照音讯生产队列进行加锁。程序生产在生产端的并发度并不取决生产端线程池的大小,而是取决于分给给消费者的队列数量,故如果一个 Topic 是用在程序生产场景中,倡议消费者的队列数设置增多,能够适当为非程序生产的 2~3 倍,这样有利于进步生产端的并发度,不便横向扩容。
生产端的横向扩容或 Broker 端队列个数的变更都会触发音讯生产队列的从新负载,并发生产时一个生产队列有可能被多个消费者同时生产,但程序生产时并不会呈现这种状况,因为程序音讯不仅仅在生产音讯时会锁定音讯生产队列,在调配到音讯队列时,能从该队列拉取音讯还须要在 Broker 端申请该生产队列的锁,即同一个工夫只有一个消费者能拉取该队列中的音讯,确保程序生产的语义。
流程:
- PullMessageService单线程的从Broker获取音讯
- PullMessageService将音讯增加到ProcessQueue中(ProcessMessage是一个音讯的缓存),之后提交一个生产工作到ConsumeMessageOrderService
- ConsumeMessageOrderService多线程执行,每个线程在生产音讯时须要拿到MessageQueue的锁
- 拿到锁之后从ProcessQueue中获取音讯
那如果程序生产的过程中生产失败了怎么解决呢?并发生产模式在生产失败是有重试机制,默认重试 16 次,而且重试时是先将音讯发送到 Broker,而后再次拉取到音讯,这种机制就会丢失其生产的程序性。还有如果一条音讯如果始终不能生产胜利,其音讯生产进度就会始终无奈向前推动,即会造成音讯积压景象,所以程序生产时咱们肯定要捕获异样。
二,延时音讯
在开源版本的RocketMQ中延时音讯并不反对任意工夫的延时,目前默认设置为:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,从1s到2h别离对应着等级1到18,而阿里云中的付费版本是能够反对40天内的任何时刻(毫秒级别)。
延时音讯的流程图:
- Producer在本人发送的音讯上设置好须要延时的级别(比方设置3等级的提早:message.setDelayTimeLevel(3))。
- Broker发现此音讯是延时音讯(音讯的 delayLevel 大于0),将Topic进行替换成延时Topic(SCHEDULE_TOPIC_XXXX),每个延时级别都会作为一个独自的queue(delayLevel-1),将本人的Topic作为额定信息存储(CommitLog#putMessage办法里)。
- 构建ConsumerQueue
- 定时工作定时每隔1s扫描每个延时级别的ConsumerQueue。
- 拿到ConsumerQueue中的CommitLog的Offset,获取音讯,判断是否曾经达到执行工夫
- 如果达到,那么将音讯的Topic复原,进行从新投递。如果没有达到则提早没有达到的这段时间执行工作。
三,音讯过滤
RocketMQ反对SQL过滤与TAG过滤两种形式。
- SQL过滤:在broker端进行,能够缩小无用数据的网络传输但broker压力会大,性能低,反对应用SQL语句简单的过滤逻辑。
- TAG过滤:在broker与consumer端进行,减少无用数据的网络传输但broker压力小,性能高,只反对简略的过滤。
SQL过滤先不剖析了,能够参考文章:RocketMQ源码解析:音讯过滤是如何实现的?
TAG过滤的流程大略是,broker获取对应ConsuemrQueue里hashcode(tag)后依据生产端传入的tag进行比拟,如果不匹配则将此音讯跳过;如果匹配生产端还要进行一次tag的比拟,因为会有可能呈现了hash抵触。
broker端的过滤:
//查问音讯入口 public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset, final int maxMsgNums, final MessageFilter messageFilter) { //tag过滤,在consumerQueue里 if (messageFilter != null && !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) { if (getResult.getBufferTotalSize() == 0) { status = GetMessageStatus.NO_MATCHED_MESSAGE; } continue; } //tag过滤,在commitlog里 if (messageFilter != null && !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) { if (getResult.getBufferTotalSize() == 0) { status = GetMessageStatus.NO_MATCHED_MESSAGE; } // release... selectResult.release(); continue; }}
consumer过滤:
public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult, final SubscriptionData subscriptionData) { PullResultExt pullResultExt = (PullResultExt) pullResult; this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId()); if (PullStatus.FOUND == pullResult.getPullStatus()) { ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary()); List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer); List<MessageExt> msgListFilterAgain = msgList; if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) { msgListFilterAgain = new ArrayList<MessageExt>(msgList.size()); for (MessageExt msg : msgList) { if (msg.getTags() != null) { if (subscriptionData.getTagsSet().contains(msg.getTags())) { msgListFilterAgain.add(msg); } } } } if (this.hasHook()) { FilterMessageContext filterMessageContext = new FilterMessageContext(); filterMessageContext.setUnitMode(unitMode); filterMessageContext.setMsgList(msgListFilterAgain); this.executeHook(filterMessageContext); } ...... } pullResultExt.setMessageBinary(null); return pullResult; }
音讯过滤还能够通过topic来实现,咱们在应用topic进行过滤还是应用tag过滤能够依据具体的业务场景进行抉择,一般来说,不同的 Topic 之间的音讯没有必然的分割,而 Tag 则用来辨别同一个 Topic 下互相关联的音讯。
参考文章
程序音讯参考:13 结合实际场景程序生产、音讯过滤实战
聊一聊程序音讯(RocketMQ程序音讯的实现机制)
延时音讯参考:rocketmq一个topic多个group_你须要晓得的RocketMQ
音讯过滤参考:rocketMQ音讯Tag过滤原理