关于kafka:如何保证kafka消费的顺序性

89次阅读

共计 2501 个字符,预计需要花费 7 分钟才能阅读完成。

在 Kafka 中 Partition(分区)是真正保留音讯的中央,发送的音讯都寄存在这里。Partition(分区)又存在于 Topic(主题)中,并且一个 Topic(主题)能够指定多个 Partition(分区)。

在 Kafka 中,只保障 Partition(分区)内有序,不保障 Topic 所有分区都是有序的。

所以 Kafka 要保障音讯的生产程序,能够有 2 种办法:
一、1 个 Topic(主题)只创立 1 个 Partition(分区),这样生产者的所有数据都发送到了一个 Partition(分区),保障了音讯的生产程序。
二、生产者在发送音讯的时候指定要发送到哪个 Partition(分区)。

那么问题来了:在 1 个 topic 中,有 3 个 partition,那么如何保证数据的生产?

1、如程序生产中的 “ 第①点 ” 和 “Kafka 要保障音讯的生产程序第二个办法 ” 阐明,生产者在写的时候,能够指定一个 key,比如说咱们指定了某个订单 id 作为 key,那么这个订单相干的数据,肯定会被散发到同一个 partition 中去,而且这个 partition 中的数据肯定是有程序的。

2、消费者从 partition 中取出来数据的时候,也肯定是有程序的。到这里,程序还是 ok 的,没有错乱。

3、然而消费者里可能会有多个线程来并发来解决音讯。因为如果消费者是单线程生产数据,那么这个吞吐量太低了。而多个线程并发的话,程序可能就乱掉了。

解决方案:
写 N 个 queue,将具备雷同 key 的数据都存储在同一个 queue,而后对于 N 个线程,每个线程别离生产一个 queue 即可。

注:在单线程中,一个 topic,一个 partition,一个 consumer,外部单线程生产,这样的状态数据生产是有序的。但因为单线程吞吐量太低,在数据宏大的理论场景很少采纳。

然而以上生产线程模型,存在一个问题:

在生产过程中,如果 Kafka 生产组产生重均衡,此时的分区被调配给其它生产组了,如果拉取回来的音讯没有被生产,尽管 Kakfa 能够实现 ConsumerRebalanceListener 接口,在新一轮重均衡前被动提交生产偏移量,但这貌似解决不了未生产的音讯被打乱程序的可能性?

因而在生产前,还须要被动进行判断此分区是否被调配给其它消费者解决,并且还须要锁定该分区在生产当中不能被调配到其它消费者中(但 kafka 目前做不到这一点)。

参考 RocketMQ 的做法:

在生产前被动调用 ProcessQueue#isDropped 办法判断队列是否已过期,并且对该队列进行加锁解决(向 broker 端申请该队列加锁)。

RocketMQ
RocketMQ 不像 Kafka 那么“原生”,RocketMQ 早已为你筹备好了你的需要,它自身的生产模型就是单 consumer 实例 + 多 worker 线程模型,有趣味的小伙伴能够从以下办法观摩 RocketMQ 的生产逻辑:

org.apache.rocketmq.client.impl.consumer.PullMessageService#run
RocketMQ 会为每个队列调配一个 PullRequest,并将其放入 pullRequestQueue,PullMessageService 线程会一直轮询从 pullRequestQueue 中取出 PullRequest 去拉取音讯,接着将拉取到的音讯给到 ConsumeMessageService 解决,ConsumeMessageService 有两个子接口:

// 并发音讯生产逻辑实现类
org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService;
// 程序音讯生产逻辑实现类
org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService;
其中,ConsumeMessageConcurrentlyService 外部有一个线程池,用于并发生产,同样地,如果须要程序生产,那么 RocketMQ 提供了 ConsumeMessageOrderlyService 类进行程序音讯生产解决。

通过对 Kafka 生产线程模型的思考之后,从 ConsumeMessageOrderlyService 源码中可能看出 RocketMQ 可能实现部分生产程序,我认为次要有以下两点:

1)RocketMQ 会为每个音讯队列建一个对象锁,这样只有线程池中有该音讯队列在解决,则需期待解决完能力进行下一次生产,保障在以后 Consumer 内,同一队列的音讯进行串行生产。

2)向 Broker 端申请锁定以后程序生产的队列,避免在生产过程中被调配给其它消费者解决从而打乱生产程序

总结
1)多分区的状况下:

如果想要保障 Kafka 在生产时要保障生产的程序性,能够应用每个线程保护一个 KafkaConsumer 实例的生产线程模型,并且是一条一条地去拉取音讯并进行生产(避免重均衡时有可能打乱生产程序)。(备注:每个 KafkaConsumer 会负责固定的分区,因而无奈晋升单个分区的生产能力,如果一个主题分区数量很多,只能通过减少 KafkaConsumer 实例进步生产能力,这样一来线程数量过多,导致我的项目 Socket 连贯开销微小,我的项目中个别不必该线程模型去生产。)

对于能容忍音讯短暂乱序的业务(话说回来,Kafka 集群也不能保障严格的音讯程序),能够应用单 KafkaConsumer 实例 + 多 worker 线程 + 一条线程对应一个阻塞队列生产线程模型(以上两图就是对此生产线程模型的解释)。

1)单分区的状况下:

因为单分区不存在重均衡问题,以上所提到的线程模型都能够保障生产的程序性。

另外如果是 RocketMQ,应用 MessageListenerOrderly 监听生产可保障音讯生产程序。

很多人也有这个疑难:既然 Kafka 和 RocketMQ 都不能保障严格的程序音讯,那么程序生产还有意义吗?

一般来说一般的的程序音讯可能满足大部分业务场景,如果业务可能容忍集群异样状态下音讯短暂不统一的状况,则不须要严格的程序音讯。

关键词:大数据培训

正文完
 0