关于rocketmq:RocketMQ学习十三顺序消息延时消息与消息过滤

39次阅读

共计 3810 个字符,预计需要花费 10 分钟才能阅读完成。

如题所示,本文围绕程序音讯,延时音讯与音讯过滤来开展。

一,程序音讯

RocketMQ 只能保障队列级别的音讯有序,如果要实现某一类音讯的程序执行,就必须将这类音讯发送到同一个队列,能够在音讯发送时应用 MessageQueueSelector,通过指定 sharding key 进而将同一类音讯发送到同一队列里,这样在 CommitLog 文件里音讯的程序就与发送时统一了。broker 端抉择发送的队列能够参考之前是的文章:RocketMQ 学习五 - 抉择队列等个性。

上面再说下生产端的解决。

程序音讯生产的事件监听器是 MessageListenerOrderly。咱们晓得 PullMessageService 依据偏移量拉取一批音讯后会存入 ProcessQueue 中,而后应用线程池进行解决。要保障生产端对单队列中的音讯程序解决,故在多线程场景下须要依照音讯生产队列进行加锁。程序生产在生产端的并发度并不取决生产端线程池的大小,而是取决于分给给消费者的队列数量,故如果一个 Topic 是用在程序生产场景中,倡议消费者的队列数设置增多,能够适当为非程序生产的 2~3 倍,这样有利于进步生产端的并发度,不便横向扩容。

生产端的横向扩容或 Broker 端队列个数的变更都会触发音讯生产队列的从新负载,并发生产时一个生产队列有可能被多个消费者同时生产,但程序生产时并不会呈现这种状况,因为程序音讯不仅仅在生产音讯时会锁定音讯生产队列,在调配到音讯队列时,能从该队列拉取音讯还须要在 Broker 端申请该生产队列的锁,即同一个工夫只有一个消费者能拉取该队列中的音讯,确保程序生产的语义。


流程:

  1. PullMessageService 单线程的从 Broker 获取音讯
  2. PullMessageService 将音讯增加到 ProcessQueue 中(ProcessMessage 是一个音讯的缓存),之后提交一个生产工作到 ConsumeMessageOrderService
  3. ConsumeMessageOrderService 多线程执行,每个线程在生产音讯时须要拿到 MessageQueue 的锁
  4. 拿到锁之后从 ProcessQueue 中获取音讯

那如果程序生产的过程中生产失败了怎么解决呢?并发生产模式在生产失败是有重试机制,默认重试 16 次,而且重试时是先将音讯发送到 Broker,而后再次拉取到音讯,这种机制就会丢失其生产的程序性。还有如果一条音讯如果始终不能生产胜利,其音讯生产进度就会始终无奈向前推动,即会造成音讯积压景象,所以程序生产时咱们肯定要捕获异样。

二,延时音讯

在开源版本的 RocketMQ 中延时音讯并不反对任意工夫的延时,目前默认设置为:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,从 1s 到 2h 别离对应着等级 1 到 18,而阿里云中的付费版本是能够反对 40 天内的任何时刻(毫秒级别)。

延时音讯的流程图:

  1. Producer 在本人发送的音讯上设置好须要延时的级别(比方设置 3 等级的提早:message.setDelayTimeLevel(3))。
  2. Broker 发现此音讯是延时音讯(音讯的 delayLevel 大于 0),将 Topic 进行替换成延时 Topic(SCHEDULE_TOPIC_XXXX),每个延时级别都会作为一个独自的 queue(delayLevel-1),将本人的 Topic 作为额定信息存储(CommitLog#putMessage 办法里)。
  3. 构建 ConsumerQueue
  4. 定时工作定时每隔 1s 扫描每个延时级别的 ConsumerQueue。
  5. 拿到 ConsumerQueue 中的 CommitLog 的 Offset,获取音讯,判断是否曾经达到执行工夫
  6. 如果达到,那么将音讯的 Topic 复原,进行从新投递。如果没有达到则提早没有达到的这段时间执行工作。

三,音讯过滤

RocketMQ 反对 SQL 过滤与 TAG 过滤两种形式。

  • SQL 过滤:在 broker 端进行,能够缩小无用数据的网络传输但 broker 压力会大,性能低,反对应用 SQL 语句简单的过滤逻辑。
  • TAG 过滤:在 broker 与 consumer 端进行,减少无用数据的网络传输但 broker 压力小,性能高,只反对简略的过滤。

SQL 过滤先不剖析了,能够参考文章:RocketMQ 源码解析:音讯过滤是如何实现的?
TAG 过滤的流程大略是,broker 获取对应 ConsuemrQueue 里 hashcode(tag)后依据生产端传入的 tag 进行比拟,如果不匹配则将此音讯跳过;如果匹配生产端还要进行一次 tag 的比拟,因为会有可能呈现了 hash 抵触。

broker 端的过滤:

    // 查问音讯入口
    public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
        final int maxMsgNums,
        final MessageFilter messageFilter) {
 
        //tag 过滤,在 consumerQueue 里
       if (messageFilter != null
            && !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {if (getResult.getBufferTotalSize() == 0) {status = GetMessageStatus.NO_MATCHED_MESSAGE;}
                continue;
        }
    //tag 过滤,在 commitlog 里
    if (messageFilter != null
        && !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {if (getResult.getBufferTotalSize() == 0) {status = GetMessageStatus.NO_MATCHED_MESSAGE;}
         // release...
         selectResult.release();
         continue;
    }
}

consumer 过滤:

public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
        final SubscriptionData subscriptionData) {PullResultExt pullResultExt = (PullResultExt) pullResult;

        this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
        if (PullStatus.FOUND == pullResult.getPullStatus()) {ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
            List<MessageExt> msgList = MessageDecoder.decodes(byteBuffer);

            List<MessageExt> msgListFilterAgain = msgList;
            if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {msgListFilterAgain = new ArrayList<MessageExt>(msgList.size());
                for (MessageExt msg : msgList) {if (msg.getTags() != null) {if (subscriptionData.getTagsSet().contains(msg.getTags())) {msgListFilterAgain.add(msg);
                        }
                    }
                }
            }

            if (this.hasHook()) {FilterMessageContext filterMessageContext = new FilterMessageContext();
                filterMessageContext.setUnitMode(unitMode);
                filterMessageContext.setMsgList(msgListFilterAgain);
                this.executeHook(filterMessageContext);
            }

            ......

        }

        pullResultExt.setMessageBinary(null);

        return pullResult;
    }

音讯过滤还能够通过 topic 来实现,咱们在应用 topic 进行过滤还是应用 tag 过滤能够依据具体的业务场景进行抉择,一般来说,不同的 Topic 之间的音讯没有必然的分割,而 Tag 则用来辨别同一个 Topic 下互相关联的音讯。

参考文章
程序音讯参考:13 结合实际场景程序生产、音讯过滤实战
聊一聊程序音讯(RocketMQ 程序音讯的实现机制)
延时音讯参考:rocketmq 一个 topic 多个 group_你须要晓得的 RocketMQ
音讯过滤参考:rocketMQ 音讯 Tag 过滤原理

正文完
 0