使用多线程增加kafka消费能力

63次阅读

共计 3055 个字符,预计需要花费 8 分钟才能阅读完成。

前提:本例适合那些没有顺序要求的消息主题。
kafka 通过一系列优化,写入和读取速度能够达到数万条 / 秒。通过增加分区数量,能够通过部署多个消费者增加并行消费能力。但还是有很多情况下,某些业务的执行速度实在是太慢,这个时候我们就要用到多线程去消费,提高应用机器的利用率,而不是一味的给 kafka 增加压力。使用 Spring 创建一个 kafka 消费者是非常简单的。我们选择的方式是继承 kafka 的 ShutdownableThread,然后实现它的 doWork 方法即可。

参考:https://github.com/apache/kaf…
多线程消费某个分区的数据
即然是使用多线程,我们就需要新建一个线程池。我们创建了一个最大容量为 20 的线程池,其中有两个参数需要注意一下。(参考《JAVA 多线程使用场景和注意事项简版》)。
我们使用了了零容量的 SynchronousQueue,一进一出,避免队列里缓冲数据,这样在系统异常关闭时,就能排除因为阻塞队列丢消息的可能。然后使用了 CallerRunsPolicy 饱和策略,使得多线程处理不过来的时候,能够阻塞在 kafka 的消费线程上。
然后,我们将真正处理业务的逻辑放在任务中多线程执行,每次执行完毕,我们都手工的 commit 一次 ack,表明这条消息我已经处理了。由于是线程池认领了这些任务,顺序性是无法保证的,可能有些任务没有执行完毕,后面的任务就已经把它的 offset 给提交了。o.O
不过这暂时不重要,首先让它并行化运行就好。可惜的是,当我们运行程序,直接抛出了异常,无法进行下去。程序直接说了:
KafkaConsumer is not safe for multi-threaded access
显然,kafka 的消费端不是线程安全的,它拒绝你这么调用它的 api。kafka 的初衷是好的,想要避免一些并发环境的问题,但我确实需要使用多线程处理。
kafka 消费者通过比较调用者的线程 id 来判断是否是由外部线程发起请求。
long threadId = Thread.currentThread().getId();
if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
throw new ConcurrentModificationException(“KafkaConsumer is not safe for multi-threaded access”);
refcount.incrementAndGet();
}

得,只能将 commitSync 函数放在线程外面了,先提交 ack、再执行任务。
加入管道
我们获取的消息,可能在真正被执行之前,会进行一些过滤,比如一些空值或者特定条件的判断。虽然可以直接放在消费者线程里运行,但显的特别的乱,可以加入一个生产者消费者模型(你可以认为这是画蛇添足)。这里采用的是阻塞队列依然是 SynchronousQueue,它充当了管道的功能。

我们把任务放入管道后,立马 commit。如果线程池已经满了,将一直阻塞在消费者线程里,直到有空缺。然后,我们单独启动了一个线程,用来接收这些数据,然后提交到这部分的代码看起来大概这样。
应用能够启动了,消费速度贼快。
参数配置
kafka 的参数非常的多,我们比较关心的有以下几个参数。
max.poll.records
调用一次 poll,返回的最大条数。这个值设置的大,那么处理的就慢,很容易超出 max.poll.interval.ms 的值(默认 5 分钟),造成消费者的离线。在耗时非常大的消费中,是需要特别注意的。
enable.auto.commit
是否开启自动提交(offset)如果开启,consumer 已经消费的 offset 信息将会间歇性的提交到 kafka 中(持久保存)
当开启 offset 自动提交时,提交请求的时间频率由参数 `auto.commit.interval.ms` 控制。
fetch.max.wait.ms
如果 broker 端反馈的数据量不足时(fetch.min.bytes),fetch 请求等待的最长时间。如果数据量满足需要,则立即返回。
session.timeout.ms
consumer 会话超时时长,如果在此时间内,server 尚未接收到 consumer 任何请求(包括心跳检测),那么 server 将会判定此 consumer 离线。此值越大,server 等待 consumer 失效、rebalance 时间就越长。
heartbeat.interval.ms
consumer 协调器与 kafka 集群之间,心跳检测的时间间隔。kafka 集群通过心跳判断 consumer 会话的活性,以判断 consumer 是否在线,如果离线则会把此 consumer 注册的 partition 分配(assign)给相同 group 的其他 consumer。此值必须小于“session.timeout.ms”, 即会话过期时间应该比心跳检测间隔要大,通常为 session.timeout.ms 的三分之一,否则心跳检测就失去意义。

在本例中,我们的参数简单的设置如下,主要调整了每次获取的条数和检测时间。其他的都是默认。
消息保证
仔细的同学可能会看到,我们的代码依然不是完全安全的。这是由于我们提前提交了 ack 导致的。程序正常运行下,这无伤大雅。但在应用异常关闭的时候,那些正在执行中的消息,很可能会丢失,对于一致性要求非常高的应用,我们要从两个手段上进行保证。
使用关闭钩子
第一种就是考虑 kill -15 的情况。这种方式比较简单,只要覆盖 ShutdownableThread 的 shutdown 方法即可,应用将有机会执行线程池中的任务,确保消费完毕再关闭应用。
@Override
public void shutdown() {
super.shutdown();
executor.shutdown();
}
使用日志处理
应用 oom,或者直接 kill - 9 了,事情就变得麻烦起来。
维护一个单独的日志文件(或者本地 db),在 commit 之前写入一条日志,然后在真正执行完毕之后写入一条对应的日志。当系统启动时,读取这些日志文件,获取没有执行成功的任务,重新执行。
想要效率,还想要可靠,是得下点苦力气的。
借助 redis 处理
这种方式与日志方式类似,但由于 redis 的效率很高(可达数万),而且方便,是优于日志方式的。
可以使用 Hash 结构,提交任务的同时写入 Redis,任务执行完毕删掉这个值,那么剩下的就是出现问题的消息。
在系统启动时,首先检测一下 redis 中是否有异常数据。如果有,首先处理这些数据,然后正常消费。
End
多线程是为了增加效率,redis 等是为了增加可靠性。业务代码是非常好编写的,搞懂了逻辑就搞定了大部分;业务代码有时候又是困难的,你要编写大量辅助功能增加它的效率、照顾它的边界。
以程序员的角度来说,最有竞争力的代码都是为了照顾小概率发生的边界异常。
kafka 在吞吐量和可靠性方面,有各种的权衡,很多都是鱼和熊掌的关系。不必纠结于它本身,我们可以借助外部的工具,获取更大的收益。在这种情况下,redis 当机与应用同时当机的概率还是比较小的。5 个 9 的消息保证是可以做到的,剩下的那点不完美问题消息,你为什么不从日志里找呢?

扩展阅读:
1、JAVA 多线程使用场景和注意事项简版
2、Kafka 基础知识索引
3、360 度测试:KAFKA 会丢数据么?其高可用是否满足需求?

正文完
 0