共计 7142 个字符,预计需要花费 18 分钟才能阅读完成。
Kafka 学习笔记
极客时间 – 胡夕
Kafka 使用一个叫 Franz Kafka 的文学家的名字用来命名的。
Kafka 是一款开源的消息引擎系统。也是一个分布式流处理平台。
Kafka 同时支持点对点模型以及发布 / 订阅模型。
为什么要使用 Kakfa?四个字:削峰填谷!
Kafka 术语
- Record: 消息,指 Kafka 处理对象
- Topic: 主题,用来承载消息的容器
- Partition: 分区,一个有序不变的消息队列,一个主题下可以有多个分区
- Offset: 消息位移,表示分区中每条信息的位置,是一个单调递增不变的值
-
Replica, 副本,数据冗余。
- 领导者副本:对外提供服务,与客户端进行交互
- 追随者副本:不能与外界进行交互,只是被动地追随领导者副本
- Producer: 生产者,向主题发布新消息的应用程序
- Consumer: 消费者,向主题订阅新消息的应用程序
- Consumer Offset: 消费者位移,表示消费者消费进度
- Consumer Group: 消费者组,多个消费者实例共同组成的一个组,同时消费多个分区来实现高吞吐。
- Rebalance: 重平衡,消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。它是 Kafka 消费者端实现高可用的重要手段。
Kafka 种类
- Apache Kafka: 也称社区版 Kafka,迭代速度快,社区响应度高,使用它可以让你有更高的把控度;缺陷在于仅仅提供基础核心组件,缺失一些高级特性
- Confluent Kafka: 优势在于集成了很多高级特性且由 Kafka 原班人马打造,质量保证;缺陷在于国内相关资料不全,普及率较低,没有太多可参考的范例。
- CDH/HDP Kafka: 优势在于操作简单,节省运维成本;缺陷在于把控度低,演进速度慢
Kafka 版本号
一个题外话
Kafka 新版本客户端代码开始完全由 java 语言编写,于是有些人开始“JAVA VS SCALA”的大讨论。并从语言特性上分析为什么社区摈弃 Scala 转而投向 Java 的怀抱。
其实事情没有那么复杂,仅仅是因为社区来了一批 Java 程序猿,而以前老的 scala 程序猿隐退了罢了。
版本演进
Kafka 总共演进了 7 个大版本
- 0.7 版本:上古版本,一旦有人向你推荐这个版本,怼他。
- 0.8 版本:开始引入副本机制,另外老版本需要制定 zookeeper 地址而不是 Broker 地址。在 0.8.2.0 版本社区引入了新版本 Producer API, 即指定 Broker 地址的 Producer。
- 0.9 版本:重量级的大版本更迭。增加了基础的安全认证 / 权限功能,引入了 Kafka Connect, 新版本 Producer API 稳定。
- 0.10.0.0: 里程碑的大版本。该版本又有两个小版本,0.10.1 和 0.10.2。引入 Kafka streams,正式升级为分布式流处理平台。0.10.2.2 新 Consumer API 稳定。
- 0.11.0.0: 目前最主流的版本之一。引入两个重量级功能变更:一个是提供幂等性 Producer API 以及事务 API, 另一个是对 Kafka 消息格式做了重构。
- 1.0 和 2.0: 如果你是 Kafka Stream 用户,至少选择 2.0.0 版本吧。
最后还有个建议,不论你使用的是哪个版本,都请尽量保持服务端版本和客户端版本一致,否则你将损失很多 Kafka 为你提供的性能优化收益。
江湖经验:不要轻易成为新版本的小白鼠。
集群部署
磁盘容量举例:
假设公司有个业务需要每天向 Kafka 集群发送 1 亿条信息。每条消息保存两份来防止数据丢失。消息默认保存两周时间。并假设消息的平均大小是 1KB。问你的 Kafka 集群需要为这个业务预留多少磁盘空间?
总大小:1 亿 1KB 2 备份 * 14 ~= 2800G
加上 Kafka 的一些索引数据,为它预留 10%,那么总大小变为 2800 *(1 + 10%)~= 3TB
Kafka 支持数据压缩,压缩比 0.75 的话,那么应该预留的存储空间为 2.25TB 左右。
带宽举例
与其说是带宽资源的规划,其实真正要规划的是 Kafka 服务器的数量。
假设公司机房环境 1Gbps, 现有个业务,需要在 1 小时内处理 1TB 的业务数据。
一般单台服务器 规划使用 70% 的带宽资源的 1 /3 ~= 240Mbps。
1TB 需要 1 小时处理,则每秒差不多需要处理 2336Mbps 的数据,除 240Mbps,则差不多需要 10 台机器。如果消息还需要额外复制的话,那么还要对应乘上备份数。
集群配置参数
配置名称 | 示例 | 建议值 |
---|---|---|
log.dirs | /home/kafka1,/home/kafka2 | kafka 写日志多路径,不仅能提升写性能,在 1.1 版本中还能支持故障转移功能。 |
zookeeper.connect | zk1:2181,zk2:2181,zk3:2181/kafka1 | |
listens | listeners=PLAINTEXT://dn1.ambari:6667 | |
auto.create.topics.enable | true | false, 不建议可以自动创建主题 |
unclean.leader.election.enable | false | false, 如果设置为 true 有丢数据风险 |
auto.leader.rebalance.enable | false | false,不定期进行 leader 副本的选举 |
log.retention.hours | 168 | 默认保持 7 天数据 |
log.retention.bytes | -1 | 保存多少数据都可以 |
message.max.bytes | 1000000 | 默认值建议调大。该值代表 Broker 能处理的最大消息大小 |
生产者分区策略
轮询策略
随机策略
按消息保存键策略
自定义策略
生产者压缩
压缩配置
compression.type
压缩算法
总结一下压缩和解压缩,Producer 端压缩,Broker 端保持,Consumer 端解压缩。
无消息丢失最佳实践
- 不要使用 producer.send(msg), 而要使用 producer.send(msg,callback)
- 设置 acks=all, 表明所有副本 Broker 都要接受消息,该消息才算是“已提交”
- 设置 retries>0, 表明 Producer 自动重试,当网络顺断时,防止消息丢失。
- 设置 unclean.leader.election.enable=false
- 设置 replication.factor >=3,增加副本数,保证数据冗余
- 设置 min.insync.replicas > 1, 控制的是消息至少要被写入多少个副本才算是 已提交。
- 确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。推荐设置 replication.factor = min.insync.replicas + 1
- 确保消息消费完再提交。设置 enable.aoto.commit=false
Kafka 拦截器
分为生产者拦截器和消费者拦截器。
典型的应用场景可以应用于客户端监控、端到端系统性能测试、消息审计等多种功能在内的场景。
Kafka 是如何管理 TCP 连接的
java 生产者是如何管理 TCP 连接的
- KafkaProducer 实例创建时启动 Sender 线程,从而创建与 bootstrap.servers 中所有的 Broker 的 TCP 连接。
- KafkaProducer 实例首次更新元数据信息之后,还会再次创建与集群中所有 Broker 的 TCP 连接
- 如果 Producer 端发送信息到某台 Broker 时,发现没有与该 Broker 的 TCP 连接,那么也会创建连接
- 如果设置
connections.max.idle.ms > 0
, 则步骤一中的 TCP 连接会被自动关闭;如果设置该参数 -1,那么步骤一中创建的连接无法被关闭,会成为僵尸进程。
Java 消费者是如何管理 TCP 连接的
创建的 3 个时机
- 发起 FindCoordinator 请求时
- 连接协调者时
- 消费数据时
消费者程序会创建 3 类 TCP 连接
- 确定协调者和获取集群元数据
- 连接协调者,令其执行组成员管理操作
- 执行实际的消息获取
幂等生产者和事务生产者
消息交付可靠性保障,常见的承诺有以下三种
- 最多一次:消息可能会丢失,但绝不会重复发送
- 至少一次:消息不会丢失,但有可能被重复发送
- 精确一次:消息不会丢失,也不会被重复发送
Kafka 默认是最少一次
要保证精确一次,就需要幂等和事务。不过性能会想对较差。
幂等生产者
幂等性有很多好处。其最大的优势在于我们可以安全地重试任何幂等性操作,反正它们不会破坏我们的系统状态。
在 0.11.0.0 版本引入了幂等生产者,只要更改配置props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true)
。
使用幂等生产者要注意
- 它只能保证单分区的幂等,多分区无法实现
- 只能实现单会话上的幂等,重启之后幂等消失
事务生产者
设置事务型 Producer
- props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true)
- 设置 producer 端参数
transctional.id
。最好为其设置一个有意义的名字
此外代码也要做一些调整变化。
producer.initTransactions();
try {producer.beginTransaction();
producer.send(record1);
producer.send(record2);
producer.commitTransaction();} catch (KafkaException e) {producer.abortTransaction();
}
重平衡
怎么避免 Rebalance
Rebalance 发生的时机有三个
- 组成员数据量发生变化
- 订阅主题数量发生变化
- 订阅主题的分区数发生变化
后面两个通常是运维的主动操作,无法避免。主要还是针对组成员数量减少的情况。增加一般也是人为主动的。
那么避免因为参数或逻辑不合理而导致的成员退出,与之相关的主要参数
- session.timeout.ms, 推荐设置 6s
- heartbeat.interval.ms, 推荐设置 2s
- max.poll.interval.ms, 推荐设置比你的业务逻辑处理要长
- GC 参数,避免频繁的 FULL GC
重平衡通知
重平衡过程是通过 消费者端的心跳线程来通知到其他消费者实例的。
0.10.1.0 版本之前,发送心跳请求是在消费者主线程完成的,也就是 kafkaConsumer.poll 方法的那个线程。这样做有诸多弊端,因为消息处理也是在这个线程中完成的。因此当业务逻辑处理消耗了较长时间,心跳请求就无法及时发送到协调者那边了。导致协调者 错误地认为该消费者已经死了。
0.10.1.0 版本开始,社区引入了一个单独的线程来专门执行心跳发送。
消费者组状态机
定义了 5 种状态
各个状态的流转
一个消费者组最开始是 Empty 状态,当重平衡过程开启后,它会被置为 PreparingRebalance 状态等待成员加入,之后变更到 CompletingRebalance 状态等待分配方案,最后流转到 Stable 状态完成重平衡。
当有新成员或已有成员退出时,消费者组的状态从 Stable 直接跳到 PreparingRebalance 状态,此时,所有现存成员就必须重新申请加入组。
当所有成员都退出组后,消费者组状态变更为 Empty。
Kafka 自动定期删除过期位移的条件就是,组要处于 Empty 状态。
重平衡流程
消费者端重平衡流程
JoinGroup 请求
SyncGroup 请求
Broker 端重平衡场景分析
- 新成员入组
- 组成员主动离组
- 组成员崩溃离组
- 重平衡时协调者对组内成员提交位移的处理
位移提交
CommitFailedException 怎么处理?
- 缩短消息处理的时间,该方法优先处理
- 增加 Consumer 端允许下游系统消费一批数据的最大时长。设置参数
max.poll.interval.ms
,新版本默认是 5 分钟。 - 减少下游系统一次性消费的消息总数。
max.poll.records
- 下游系统使用多线程来加速消费
多消费者实例
鉴于 KafkaConsumer 不是线程安全的事实,制定两套多线程方案。
- 每个线程维护专属的 KafkaConsumer 实例,负责完整的消息获取、消息处理流程
核心代码
```
public class KafkaConsumerRunner implements Runnable {private final AtomicBoolean closed = new AtomicBoolean(false);
private final KafkaConsumer consumer;
public void run() {
try {consumer.subscribe(Arrays.asList("topic"));
while (!closed.get()) {
ConsumerRecords records =
consumer.poll(Duration.ofMillis(10000));
// 执行消息处理逻辑
}
} catch (WakeupException e) {
// Ignore exception if closing
if (!closed.get()) throw e;
} finally {consumer.close();
}
}
// Shutdown hook which can be called from a separate thread
public void shutdown() {closed.set(true);
consumer.wakeup();}
```
- 消费者程序使用单或多线程获取消息,创建多个消费者线程执行消息处理逻辑
核心代码
```
private final KafkaConsumer<String, String> consumer;
private ExecutorService executors;
...
private int workerNum = ...;
executors = new ThreadPoolExecutor(
workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1000),
new ThreadPoolExecutor.CallerRunsPolicy());
...
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
for (final ConsumerRecord record : records) {executors.submit(new Worker(record));
}
}
```
两种方案各有特点。
监控消费进度的 3 种方法
- 使用 Kafka 自带命令行工具 kafka-consumer-groups 脚本
- 使用 Kafka Consumer API
- 使用 Kafka 自带的 JMX 监控指标
Kafka 副本详解
副本机制的好处:
- 提供数据冗余
- 提供高伸缩性
- 改善数据局部性
但 Kafka 只有第一种好处,原因是这样的设计,Kafka 有两点好处
- 方便实现 Read-your-writes
指当你用生产者 API 向 Kafka 成功写入消息后,马上使用消费者 API 去读取刚才生产的消息
- 方便实现单调读(Monotonic Reads)
在多次消费信息时,不会看到该消息一会存在一会不存在的情况。
判断 Follower 副本与 Leader 副本是否同步的标准,Broker 参数 replia.lag.time.max.ms
的参数值。Kafka 有一个 in-sync Replicas(ISR)集合的概念。
Kafka 控制器
控制器组件(Controller), 是 Kafka 的核心组件,它的主要作用是在 Apache Zookeeper 的帮助下管理和协调整个 Kafka 集群。
控制器是怎么被选出来的
每台 Broker 都能充当控制器,在 Broker 启动时,会尝试去 Zookeeper 中创建 /controller 节点。Kafka 当前选举规则,第一个成功创建 /controller 节点的 Broker 会被指定为控制器。
控制器能做什么?
- 主题管理
- 分区重分配
- Prefered 领导者选举
- 集群成员管理
- 数据服务,控制器上保存最全的集群元数据信息
控制器保存了什么数据?
这些数据其实也在 Zookeeper 中存储了一份。
控制器的故障转移
总结
小窍门分享:当你觉得控制器出现问题时,比如主题无法删除了,重分区 hang 住了,你可以不用重启 broker 或者控制器,快速简便的方法,直接去 Zookeeper 手动删除 /controller 节点。
这样做的好处是,既可以引发控制器的重选举,又可以避免重启 Broker 导致的消息中断。
Kafka 请求处理
请求方案
Kafka 方案类似于 Reactor 模式
那么 Kafka 类似的方案是这样的。网络线程池默认参数num.network.threads=3
好了,客户端发来的请求会被 Aceptor 线程分发到任意一个网络线程中,由他们进行处理。你可能会认为,网络线程池是顺序处理不就好了?实际上,Kafka 在这个环节上又做了一层异步线程池的处理。
IO 线程池执行真正的处理。如果是 PRODUCER 生产请求,则将消息写入到底层的磁盘日志中;如果是 FETCH 请求,则从磁盘或页缓存中读取消息。当 IO 请求处理完请求后,会将生成的响应放入网络线程池的响应队列中,并由对应的网络线程负责将 Response 反还给客户端。
请求队列是所有网络线程共享的,而响应队列则是每个网络线程专属的。
IO 线程池默认参数num.io.threads=8
图中还有一个 Purgatory 的组件,这是 Kafka 中著名的“炼狱”组件。
它是用来缓存延时请求的,所谓延时请求,就是那些一时未满足条件的不可立刻处理的请求。