作者:徒钟
什么是音讯过滤
在消息中间件的应用过程中,一个主题对应的消费者想要通过规定只生产这个主题下具备某些特色的音讯,过滤掉本人不关怀的音讯,这个性能就叫音讯过滤。
就如上图所形容的,生产者会向主题中写入不拘一格的音讯,有橙色的、黄色的、还有灰色的,而这个主题有两个消费者,第一个消费者只想要生产橙色的音讯,第二个消费者只想要生产黄色的和灰色的音讯,那么这个成果就须要通过音讯过滤来实现。
音讯过滤的利用场景
咱们以常见的电商场景为例,来看看音讯过滤在理论利用过程中起到的作用。
电商平台在设计时,往往存在零碎拆分细、功能模块多、调用链路长、零碎依赖简单等特点,消息中间件在其中就起到了异步解耦、异步通信的作用,特地是在双十一这样的流量高峰期,消息中间件还起到了削峰填谷的作用。
而在消息中间件应用方面,电商平台因为笼罩的畛域泛滥会产生很多的音讯主题,音讯收发量也随着交易量和订阅零碎的减少而增大。随着业务零碎的程度拆解和垂直减少,相干的音讯呈现出高订阅比和低投递比的状态,比方一个主题订阅比是 300:1,即 1 个主题的订阅者有 300 个,然而投递比却只有 15:300,即一条音讯只有 15 个订阅者须要投递,其余 285 个订阅者全副过滤了这条音讯。那解决这些场景,就须要应用到音讯过滤。
举例来说,在交易链路中,一个订单的解决流程分为下单、扣减库存、领取等流程,这个流程会波及订单操作和状态机的变动。上游的零碎,如积分、物流、告诉、实时计算等,他们会通过消息中间件监听订单的变更音讯。然而它们对订单不同操作和状态的音讯有着不同的需要,如积分零碎只关怀下单音讯,只有下单就扣减积分。物流零碎只关系领取和收货音讯,领取就发动物流订单,收货就实现物流订单。实时计算零碎会统计订单不同状态的数据,所有音讯都要接管。
试想一下如果没有音讯过滤这个性能,咱们会怎么反对以上音讯过滤的性能呢?能想到的个别有以下两个计划:
1. 通过将主题进行拆分,将不同的音讯发送到不同主题上。
对于生产者来说,这意味着消费者有多少生产场景,就须要新建多少个 Topic,这无疑会给生产者带来微小的保护老本。对消费者来说,消费者有可能须要同时订阅多个 Topic,这同样带来了很大的保护老本。另外,音讯被主题拆分后,他们之间的生产程序就无奈保障了,比方对于一个订单,它的下单、领取等操作显然是要被程序解决的。
2. 消费者收到音讯后,依据音讯体对音讯依照规定硬编码自行过滤。
这意味着所有的音讯都会推送到消费者端进行计算,这无疑减少了网络带宽,也减少了消费者在内存和 CPU 上的耗费。
有了音讯过滤这个性能,生产者只需向一个主题进行投递音讯,服务端依据订阅规定进行计算,并按需投递给每个消费者。这样对生产者和消费者的代码保护就十分敌对,同时也能很大水平上升高网络带宽,同时缩小消费者的内存占用和 CPU 的耗费。
RocketMQ 音讯过滤的模式
RocketMQ 是泛滥消息中间件中为数不多反对音讯过滤的零碎。这也是其作为业务集成音讯首选计划的重要根底之一。
在性能层面,RocketMQ 反对两种过滤形式,Tag 标签过滤和 SQL 属性过滤,上面我来这两个过滤形式应用形式和技术原理进行介绍
Tag 标签过滤
- 性能介绍
Tag 标签过滤形式是 RocketMQ 提供的根底音讯过滤能力,基于生产者为音讯设置的 Tag 标签进行匹配。生产者在发送音讯时,设置音讯的 Tag 标签,消费者按需指定已有的 Tag 标签来进行匹配订阅。
- 过滤语法
- 单 Tag 匹配:过滤表达式为指标 Tag,示意只有音讯标签为指定指标 Tag 的音讯合乎匹配条件,会被发送给消费者;
- 多 Tag 匹配:多个 Tag 之间为或的关系,不同 Tag 间应用两个竖线(||)隔开。例如,Tag1||Tag2||Tag3,示意标签为 Tag1 或 Tag2 或 Tag3 的音讯都满足匹配条件,都会被发送给消费者进行生产;
- 全 Tag 匹配:应用星号(*)作为全匹配表达式。示意主题下的所有音讯都将被发送给消费者进行生产。
- 应用形式
- 发送音讯,设置 Tag 标签
Message message = provider.newMessageBuilder()
.setTopic("TopicA")
.setKeys("messageKey")
// 设置音讯 Tag,用于生产端依据指定 Tag 过滤音讯
.setTag("TagA")
.setBody("messageBody".getBytes())
.build();
- 订阅音讯,匹配单个 Tag 标签
// 只订阅音讯标签为“TagA”的音讯
FilterExpression filterExpression = new FilterExpression("TagA", FilterExpressionType.TAG);
pushConsumer.subscribe("TopicA", filterExpression);
- 订阅音讯,匹配多个 Tag 标签
// 只订阅音讯标签为“TagA”、“TagB”或“TagC”的音讯
FilterExpression filterExpression = new FilterExpression("TagA||TagB||TagC", FilterExpressionType.TAG);
pushConsumer.subscribe("TopicA", filterExpression);
- 订阅音讯,匹配所有 Tag 标签,即不过滤
// 应用 Tag 标签过滤音讯,订阅所有音讯
FilterExpression filterExpression = new FilterExpression("*", FilterExpressionType.TAG);
pushConsumer.subscribe("TopicA", filterExpression);
- 技术原理
RocketMQ 在存储音讯的时候,是通过 Append-Only 的形式将所有主题的音讯都写在同一个 CommitLog 文件中,这能够无效的晋升了音讯的写入速率。为了生产时可能疾速检索音讯,它会在后盾启动异步形式将音讯所在位点、音讯的大小,以及音讯的标签哈希值存储到 ConsumeQueue 索引文件中。将标签存储到这个索引文件中,就是为了在通过标签进行音讯过滤的时候,能够在索引层面就能够获取到音讯的标签,不须要从 CommitLog 文件中读取,这样就缩小音讯读取产生的零碎 IO 和内存开销。标签存储哈希值,次要是为了保障 ConsumeQueue 索引文件可能定长解决,这样能够无效较少存储空间,晋升这个索引文件的读取效率。
整个 Tag 标签过滤的流程如下:
- 生产者对音讯打上本人的业务标签,发送给咱们的服务端 Broker;
- Broker 将音讯写入 CommitLog 中,而后通过异步线程将音讯散发到 ConsumeQueue 索引文件中;
- 消费者启动后,定时向 Broker 发送心跳申请,将订阅关系上传到 Broker 端,Broker 将订阅关系及标签的哈希值保留在内存中;
- 消费者向 Broker 拉取音讯,Broker 会通过订阅关系和队列去 ConsumeQueue 中检索音讯,将订阅关系中的标签哈希值和音讯中的标签哈希值做比拟,如果匹配就返回给消费者;
- 消费者收到音讯后,会将音讯中的标签值和本地订阅关系中标签值做准确匹配,匹配胜利才会交给生产线程进行生产。
SQL 属性过滤
- 性能介绍
SQL 属性过滤是 RocketMQ 提供的高级音讯过滤形式,通过生产者为音讯设置的属性(Key)及属性值(Value)进行匹配。生产者在发送音讯时可设置多个属性,消费者订阅时可设置 S QL 语法的过滤表达式过滤多个属性。
- 过滤语法
- 数值比拟:>, >=, <, <=, BETWEEN, =
- 字符比拟:=, <>, IN
- 判空运算:IS NULL or IS NOT NULL
- 逻辑运算:AND, OR, NOT
- 应用形式
- 发送音讯,设置属性
Message message = provider.newMessageBuilder()
.setTopic("TopicA")
.setKeys("messageKey")
// 设置音讯属性,用于生产端依据指定属性过滤音讯。.addProperty("Channel", "TaoBao")
.addProperty("Price", "5999")
.setBody("messageBody".getBytes())
.build();
- 订阅音讯,匹配单个属性
FilterExpression filterExpression = new FilterExpression("Channel='TaoBao'", FilterExpressionType.SQL92);
pushConsumer.subscribe("TopicA", filterExpression);
- 订阅音讯,匹配多个属性
FilterExpression filterExpression = new FilterExpression("Channel='TaoBao'AND Price>5000", FilterExpressionType.SQL92);
pushConsumer.subscribe("TopicA", filterExpression);
- 订阅音讯,匹配所有属性
FilterExpression filterExpression = new FilterExpression("True", FilterExpressionType.SQL92);
pushConsumer.subscribe("TopicA", filterExpression);
- 技术原理
因为 SQL 过滤须要将音讯的属性和 SQL 表达式进行匹配,这会对服务端的内存和 CPU 减少很大的开销。为了升高这个开销,RocketMQ 采纳了布隆过滤器进行优化。当 Broker 在收到音讯后,会事后对所有的订阅者进行 SQL 匹配,并将匹配后果生成布隆过滤器的位图存储在 ConsumeQueueExt 索引扩大文件中。在生产时,Broker 就会应用应用这个过滤位图,通过布隆过滤器对消费者的 SQL 进行过滤,这能够防止音讯在肯定不匹配的时候,不须要去 CommitLog 中将音讯的属性拉取到内存进行计算,能够无效地升高属性和 SQL 进行匹配的音讯量,缩小服务端的内存和 CPU 开销。
整个 SQL 过滤的解决流程如下:
- 消费者通过心跳上传订阅关系,Broker 判断如果是 SQL 过滤,就会通过布隆过滤器的算法,生成这个 SQL 对应的布隆过滤匹配参数;
- 生产者对音讯设置上本人的业务属性,发送给咱们的服务端 Broker;
- Broker 收到后将音讯写入 CommitLog 中,而后通过异步线程将音讯散发到 ConsumeQueue 索引文件中。在写入之前,会将这条音讯的属性和以后所有订阅关系中 SQL 进行匹配,如果通过,则将 SQL 对应的布隆过滤匹配参数合并成一个残缺的布隆过滤位图;
- 消费者生产音讯的时候,Broker 会先获取事后生成的布隆过滤匹配参数,而后通过布隆过滤器对 ConsumeQueueExt 的布隆过滤位图和消费者的布隆过滤匹配参数进行匹配;
- 布隆过滤器返回匹配胜利只能阐明音讯属性和 SQL 可能匹配,Broker 还须要从 CommitLog 中将音讯属性取出来,再做一次和 SQL 的准确匹配,这个时候匹配胜利才会将音讯投递给消费者
差别及比照
最佳实际
主题划分及音讯定义
主题和音讯背地的实质其实就是业务实体的属性、行为或状态产生了变动。只有产生了变动,生产者才会往主题外面发送音讯,消费者才须要监听这些的音讯,去实现本身的业务逻辑。
那么如何做好主题划分和音讯定义呢,咱们以订单实体为例,来看看主题划分和音讯定义的准则。
- 主题划分的准则
- 业务畛域是否统一
不同的业务畛域背地有不同的业务实体,其属性、行为及状态的定义天差地别。比方商品和订单,他们属于两个齐全独立且不同的畛域,就不能定义成同一个主题。
- 业务场景是否统一
同一个业务畛域不同的业务场景或者技术场景,不能定义一个主题。如订单流程和订单缓存刷新都和订单有关系,然而订单缓存刷新可能须要被不同的流程触发,放在一起就会导致局部场景订单缓存不刷新的状况。
- 音讯类型是否统一
同一个业务畛域和业务场景,对音讯类型有不同需要,比方订单处理过程中,咱们须要发送一个事务音讯,同时也须要发送一个定时音讯,那么这两个音讯就不能共用一个主题。
- 音讯定义的准则
- 无标签无属性
对于业务实体极其简略的音讯,是能够不须要定义标签和属性,比方 MySQLBinlog 的同步。所有的消费者都没有音讯过滤需要的,也无需定义标签和属性。
- 如何定义标签
标签过滤是 RocketMQ 中应用最简略,且过滤性能最好的一种过滤形式。为了施展其微小的劣势,能够思考优先应用。在应用时,咱们须要确认这个字段在业务实体和业务流程中是否是惟一定义的,并且它是被绝大多数消费者作为过滤条件的,那么能够将它作为标签来定义。比方订单中有下单渠道和订单操作这两个字段,并且在单次音讯发送过程中都是惟一定义,然而订单操作被绝大多数消费者利用为过滤条件,那么它最合适作为标签。
- 如何定义属性
属性过滤的开销绝对比拟大,所以只有在标签过滤无奈满足时,才举荐应用。比方标签曾经被其余字段占用,或者过滤条件不可枚举,须要反对多属性简单逻辑的过滤,就只能应用属性过滤了。
放弃订阅关系统一
订阅关系统一是指同一个消费者组上面的所有的消费者所订阅的 Topic 和过滤表达式都必须完全一致。
正如上图所示,一个消费者组蕴含两个消费者,他们同时订阅了 Topic-A 这个主题,然而消费者一订阅的是 Tag-A 这个标签的音讯,消费者二订阅的是 Tag-B 这个标签的音讯,那么他们两者的订阅关系就存在不统一。
- 导致的问题:
那么订阅关系不统一会导致什么问题呢?
- 频繁简单平衡
在 RocketMQ 实现中,消费者客户端默认每 30 秒向 Broker 发送一次心跳,这个过程会上传订阅关系,Broker 发现变动了就进行订阅关系笼罩,同时会触发客户端进行负载平衡。那么订阅关系不统一的两个客户端会穿插上传本人的订阅关系,从而导致客户端频繁进行负载平衡。
- 生产速率降落
客户端触发了负载平衡,会导致消费者所持有的生产队列发生变化,呈现间断性暂停音讯拉取,导致整体生产速率降落,甚至呈现音讯积压。
- 音讯反复生产
客户端触发了负载平衡,会导致曾经生产胜利的音讯因为生产队列发生变化而放弃向 Broker 提交生产位点。Broker 会认为这条音讯没有生产胜利而从新向消费者发动投递,从而导致音讯反复生产。
- 音讯未生产
订阅关系的不统一,会有两种场景会导致音讯未生产。第一种是消费者的订阅关系和 Broker 以后订阅关系不统一,导致音讯在 Broker 服务端就被过滤了。第二种是消费者的订阅关系和 Broker 以后的尽管统一,然而 Broker 投递给了其余的消费者,被其余消费者本地过滤了。
- 应用的倡议
在音讯过滤应用中,有以下倡议:
- 不要共用消费者组
不同业务零碎千万不要应用同一个消费者组订阅同一个主题的音讯。个别不同业务零碎由不同团队保护,很容易产生一个团队批改了订阅关系而没有告诉到其余团队,从而导致订阅关系不统一的状况。
- 不频繁变更订阅关系
频繁变更订阅关系这种状况比拟少,但也存在局部用户实现在线规定或者动静参数来设置订阅关系。这有可能导致订阅关系发生变化,触发客户端负载平衡的状况。
- 变更做好危险评估
因为业务的倒退,需要的变更,订阅关系不可能始终不变,然而变更订阅关系过程中,须要思考整体公布实现须要的总体工夫,以及公布过程中订阅关系不统一而对业务可能带来的危险。
- 生产做好幂等解决
不论是订阅关系不统一,还是客户端高低线,都会导致音讯的反复投递,所以音讯幂等解决永远是音讯生产的黄金法令。在业务逻辑中,消费者须要保障对曾经解决过的音讯间接返回胜利,防止二次生产对业务造成的侵害,如果返回失败就会导致音讯始终反复投递直到进死信。
到此,本文对于音讯过滤的分享就到此结束了,非常感谢大家可能破费贵重的工夫浏览,有不对的中央麻烦斧正,感激大家对 RocketMQ 的关注,心愿大家可能多多参加社区的探讨和奉献。
如果您对 RocketMQ 感兴趣,也欢迎您扫描下方二维码退出钉钉群一起沟通交流~
点击此处,进入官网理解更多详情~