Kafka学习笔记

9次阅读

共计 7142 个字符,预计需要花费 18 分钟才能阅读完成。

Kafka 学习笔记

极客时间 – 胡夕

Kafka 使用一个叫 Franz Kafka 的文学家的名字用来命名的。

Kafka 是一款开源的消息引擎系统。也是一个分布式流处理平台。

Kafka 同时支持点对点模型以及发布 / 订阅模型。

为什么要使用 Kakfa?四个字:削峰填谷!

Kafka 术语

  • Record: 消息,指 Kafka 处理对象
  • Topic: 主题,用来承载消息的容器
  • Partition: 分区,一个有序不变的消息队列,一个主题下可以有多个分区
  • Offset: 消息位移,表示分区中每条信息的位置,是一个单调递增不变的值
  • Replica, 副本,数据冗余。

    • 领导者副本:对外提供服务,与客户端进行交互
    • 追随者副本:不能与外界进行交互,只是被动地追随领导者副本
  • Producer: 生产者,向主题发布新消息的应用程序
  • Consumer: 消费者,向主题订阅新消息的应用程序
  • Consumer Offset: 消费者位移,表示消费者消费进度
  • Consumer Group: 消费者组,多个消费者实例共同组成的一个组,同时消费多个分区来实现高吞吐。
  • Rebalance: 重平衡,消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。它是 Kafka 消费者端实现高可用的重要手段。

Kafka 种类

  • Apache Kafka: 也称社区版 Kafka,迭代速度快,社区响应度高,使用它可以让你有更高的把控度;缺陷在于仅仅提供基础核心组件,缺失一些高级特性
  • Confluent Kafka: 优势在于集成了很多高级特性且由 Kafka 原班人马打造,质量保证;缺陷在于国内相关资料不全,普及率较低,没有太多可参考的范例。
  • CDH/HDP Kafka: 优势在于操作简单,节省运维成本;缺陷在于把控度低,演进速度慢

Kafka 版本号

一个题外话

Kafka 新版本客户端代码开始完全由 java 语言编写,于是有些人开始“JAVA VS SCALA”的大讨论。并从语言特性上分析为什么社区摈弃 Scala 转而投向 Java 的怀抱。

其实事情没有那么复杂,仅仅是因为社区来了一批 Java 程序猿,而以前老的 scala 程序猿隐退了罢了。

版本演进

Kafka 总共演进了 7 个大版本

  • 0.7 版本:上古版本,一旦有人向你推荐这个版本,怼他。
  • 0.8 版本:开始引入副本机制,另外老版本需要制定 zookeeper 地址而不是 Broker 地址。在 0.8.2.0 版本社区引入了新版本 Producer API, 即指定 Broker 地址的 Producer。
  • 0.9 版本:重量级的大版本更迭。增加了基础的安全认证 / 权限功能,引入了 Kafka Connect, 新版本 Producer API 稳定。
  • 0.10.0.0: 里程碑的大版本。该版本又有两个小版本,0.10.1 和 0.10.2。引入 Kafka streams,正式升级为分布式流处理平台。0.10.2.2 新 Consumer API 稳定。
  • 0.11.0.0: 目前最主流的版本之一。引入两个重量级功能变更:一个是提供幂等性 Producer API 以及事务 API, 另一个是对 Kafka 消息格式做了重构。
  • 1.0 和 2.0: 如果你是 Kafka Stream 用户,至少选择 2.0.0 版本吧。

最后还有个建议,不论你使用的是哪个版本,都请尽量保持服务端版本和客户端版本一致,否则你将损失很多 Kafka 为你提供的性能优化收益。

江湖经验:不要轻易成为新版本的小白鼠。

集群部署

磁盘容量举例:

假设公司有个业务需要每天向 Kafka 集群发送 1 亿条信息。每条消息保存两份来防止数据丢失。消息默认保存两周时间。并假设消息的平均大小是 1KB。问你的 Kafka 集群需要为这个业务预留多少磁盘空间?

总大小:1 亿 1KB 2 备份 * 14 ~= 2800G
加上 Kafka 的一些索引数据,为它预留 10%,那么总大小变为 2800 *(1 + 10%)~= 3TB

Kafka 支持数据压缩,压缩比 0.75 的话,那么应该预留的存储空间为 2.25TB 左右。

带宽举例

与其说是带宽资源的规划,其实真正要规划的是 Kafka 服务器的数量。

假设公司机房环境 1Gbps, 现有个业务,需要在 1 小时内处理 1TB 的业务数据。

一般单台服务器 规划使用 70% 的带宽资源的 1 /3 ~= 240Mbps。

1TB 需要 1 小时处理,则每秒差不多需要处理 2336Mbps 的数据,除 240Mbps,则差不多需要 10 台机器。如果消息还需要额外复制的话,那么还要对应乘上备份数。

集群配置参数

配置名称 示例 建议值
log.dirs /home/kafka1,/home/kafka2 kafka 写日志多路径,不仅能提升写性能,在 1.1 版本中还能支持故障转移功能。
zookeeper.connect zk1:2181,zk2:2181,zk3:2181/kafka1
listens listeners=PLAINTEXT://dn1.ambari:6667
auto.create.topics.enable true false, 不建议可以自动创建主题
unclean.leader.election.enable false false, 如果设置为 true 有丢数据风险
auto.leader.rebalance.enable false false,不定期进行 leader 副本的选举
log.retention.hours 168 默认保持 7 天数据
log.retention.bytes -1 保存多少数据都可以
message.max.bytes 1000000 默认值建议调大。该值代表 Broker 能处理的最大消息大小

生产者分区策略

轮询策略

随机策略

按消息保存键策略

自定义策略

生产者压缩

压缩配置

compression.type

压缩算法

总结一下压缩和解压缩,Producer 端压缩,Broker 端保持,Consumer 端解压缩。

无消息丢失最佳实践

  1. 不要使用 producer.send(msg), 而要使用 producer.send(msg,callback)
  2. 设置 acks=all, 表明所有副本 Broker 都要接受消息,该消息才算是“已提交”
  3. 设置 retries>0, 表明 Producer 自动重试,当网络顺断时,防止消息丢失。
  4. 设置 unclean.leader.election.enable=false
  5. 设置 replication.factor >=3,增加副本数,保证数据冗余
  6. 设置 min.insync.replicas > 1, 控制的是消息至少要被写入多少个副本才算是 已提交。
  7. 确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。推荐设置 replication.factor = min.insync.replicas + 1
  8. 确保消息消费完再提交。设置 enable.aoto.commit=false

Kafka 拦截器

分为生产者拦截器和消费者拦截器。

典型的应用场景可以应用于客户端监控、端到端系统性能测试、消息审计等多种功能在内的场景。

Kafka 是如何管理 TCP 连接的

java 生产者是如何管理 TCP 连接的

  1. KafkaProducer 实例创建时启动 Sender 线程,从而创建与 bootstrap.servers 中所有的 Broker 的 TCP 连接。
  2. KafkaProducer 实例首次更新元数据信息之后,还会再次创建与集群中所有 Broker 的 TCP 连接
  3. 如果 Producer 端发送信息到某台 Broker 时,发现没有与该 Broker 的 TCP 连接,那么也会创建连接
  4. 如果设置connections.max.idle.ms > 0, 则步骤一中的 TCP 连接会被自动关闭;如果设置该参数 -1,那么步骤一中创建的连接无法被关闭,会成为僵尸进程。

Java 消费者是如何管理 TCP 连接的

创建的 3 个时机

  1. 发起 FindCoordinator 请求时
  2. 连接协调者时
  3. 消费数据时

消费者程序会创建 3 类 TCP 连接

  1. 确定协调者和获取集群元数据
  2. 连接协调者,令其执行组成员管理操作
  3. 执行实际的消息获取

幂等生产者和事务生产者

消息交付可靠性保障,常见的承诺有以下三种

  1. 最多一次:消息可能会丢失,但绝不会重复发送
  2. 至少一次:消息不会丢失,但有可能被重复发送
  3. 精确一次:消息不会丢失,也不会被重复发送

Kafka 默认是最少一次

要保证精确一次,就需要幂等和事务。不过性能会想对较差。

幂等生产者

幂等性有很多好处。其最大的优势在于我们可以安全地重试任何幂等性操作,反正它们不会破坏我们的系统状态。

在 0.11.0.0 版本引入了幂等生产者,只要更改配置props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true)

使用幂等生产者要注意

  1. 它只能保证单分区的幂等,多分区无法实现
  2. 只能实现单会话上的幂等,重启之后幂等消失

事务生产者

设置事务型 Producer

  1. props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true)
  2. 设置 producer 端参数transctional.id。最好为其设置一个有意义的名字

此外代码也要做一些调整变化。

producer.initTransactions();
try {producer.beginTransaction();
            producer.send(record1);
            producer.send(record2);
            producer.commitTransaction();} catch (KafkaException e) {producer.abortTransaction();
}

重平衡

怎么避免 Rebalance

Rebalance 发生的时机有三个

  1. 组成员数据量发生变化
  2. 订阅主题数量发生变化
  3. 订阅主题的分区数发生变化

后面两个通常是运维的主动操作,无法避免。主要还是针对组成员数量减少的情况。增加一般也是人为主动的。

那么避免因为参数或逻辑不合理而导致的成员退出,与之相关的主要参数

  1. session.timeout.ms, 推荐设置 6s
  2. heartbeat.interval.ms, 推荐设置 2s
  3. max.poll.interval.ms, 推荐设置比你的业务逻辑处理要长
  4. GC 参数,避免频繁的 FULL GC

重平衡通知

重平衡过程是通过 消费者端的心跳线程来通知到其他消费者实例的。

0.10.1.0 版本之前,发送心跳请求是在消费者主线程完成的,也就是 kafkaConsumer.poll 方法的那个线程。这样做有诸多弊端,因为消息处理也是在这个线程中完成的。因此当业务逻辑处理消耗了较长时间,心跳请求就无法及时发送到协调者那边了。导致协调者 错误地认为该消费者已经死了。

0.10.1.0 版本开始,社区引入了一个单独的线程来专门执行心跳发送。

消费者组状态机

定义了 5 种状态

各个状态的流转

一个消费者组最开始是 Empty 状态,当重平衡过程开启后,它会被置为 PreparingRebalance 状态等待成员加入,之后变更到 CompletingRebalance 状态等待分配方案,最后流转到 Stable 状态完成重平衡。

当有新成员或已有成员退出时,消费者组的状态从 Stable 直接跳到 PreparingRebalance 状态,此时,所有现存成员就必须重新申请加入组。

当所有成员都退出组后,消费者组状态变更为 Empty。

Kafka 自动定期删除过期位移的条件就是,组要处于 Empty 状态。

重平衡流程

消费者端重平衡流程

JoinGroup 请求

SyncGroup 请求

Broker 端重平衡场景分析

  • 新成员入组

  • 组成员主动离组

  • 组成员崩溃离组

  • 重平衡时协调者对组内成员提交位移的处理

位移提交

CommitFailedException 怎么处理?

  1. 缩短消息处理的时间,该方法优先处理
  2. 增加 Consumer 端允许下游系统消费一批数据的最大时长。设置参数max.poll.interval.ms,新版本默认是 5 分钟。
  3. 减少下游系统一次性消费的消息总数。max.poll.records
  4. 下游系统使用多线程来加速消费

多消费者实例

鉴于 KafkaConsumer 不是线程安全的事实,制定两套多线程方案。

  1. 每个线程维护专属的 KafkaConsumer 实例,负责完整的消息获取、消息处理流程

核心代码

```
public class KafkaConsumerRunner implements Runnable {private final AtomicBoolean closed = new AtomicBoolean(false);
     private final KafkaConsumer consumer;


     public void run() {
         try {consumer.subscribe(Arrays.asList("topic"));
             while (!closed.get()) {
            ConsumerRecords records = 
                consumer.poll(Duration.ofMillis(10000));
                 //  执行消息处理逻辑
             }
         } catch (WakeupException e) {
             // Ignore exception if closing
             if (!closed.get()) throw e;
         } finally {consumer.close();
         }
     }


     // Shutdown hook which can be called from a separate thread
     public void shutdown() {closed.set(true);
         consumer.wakeup();}
```
  1. 消费者程序使用单或多线程获取消息,创建多个消费者线程执行消息处理逻辑

核心代码

```
private final KafkaConsumer<String, String> consumer;
private ExecutorService executors;
...


private int workerNum = ...;
executors = new ThreadPoolExecutor(
    workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,
    new ArrayBlockingQueue<>(1000), 
    new ThreadPoolExecutor.CallerRunsPolicy());


...
while (true)  {
    ConsumerRecords<String, String> records = 
        consumer.poll(Duration.ofSeconds(1));
    for (final ConsumerRecord record : records) {executors.submit(new Worker(record));
    }
}

```

两种方案各有特点。

监控消费进度的 3 种方法

  1. 使用 Kafka 自带命令行工具 kafka-consumer-groups 脚本
  2. 使用 Kafka Consumer API
  3. 使用 Kafka 自带的 JMX 监控指标

Kafka 副本详解

副本机制的好处:

  1. 提供数据冗余
  2. 提供高伸缩性
  3. 改善数据局部性

但 Kafka 只有第一种好处,原因是这样的设计,Kafka 有两点好处

  1. 方便实现 Read-your-writes

    指当你用生产者 API 向 Kafka 成功写入消息后,马上使用消费者 API 去读取刚才生产的消息

  2. 方便实现单调读(Monotonic Reads)

    在多次消费信息时,不会看到该消息一会存在一会不存在的情况。

判断 Follower 副本与 Leader 副本是否同步的标准,Broker 参数 replia.lag.time.max.ms 的参数值。Kafka 有一个 in-sync Replicas(ISR)集合的概念。

Kafka 控制器

控制器组件(Controller), 是 Kafka 的核心组件,它的主要作用是在 Apache Zookeeper 的帮助下管理和协调整个 Kafka 集群。

控制器是怎么被选出来的

每台 Broker 都能充当控制器,在 Broker 启动时,会尝试去 Zookeeper 中创建 /controller 节点。Kafka 当前选举规则,第一个成功创建 /controller 节点的 Broker 会被指定为控制器。

控制器能做什么?

  1. 主题管理
  2. 分区重分配
  3. Prefered 领导者选举
  4. 集群成员管理
  5. 数据服务,控制器上保存最全的集群元数据信息

控制器保存了什么数据?

这些数据其实也在 Zookeeper 中存储了一份。

控制器的故障转移

总结

小窍门分享:当你觉得控制器出现问题时,比如主题无法删除了,重分区 hang 住了,你可以不用重启 broker 或者控制器,快速简便的方法,直接去 Zookeeper 手动删除 /controller 节点。

这样做的好处是,既可以引发控制器的重选举,又可以避免重启 Broker 导致的消息中断。

Kafka 请求处理

请求方案

Kafka 方案类似于 Reactor 模式

那么 Kafka 类似的方案是这样的。网络线程池默认参数num.network.threads=3

好了,客户端发来的请求会被 Aceptor 线程分发到任意一个网络线程中,由他们进行处理。你可能会认为,网络线程池是顺序处理不就好了?实际上,Kafka 在这个环节上又做了一层异步线程池的处理。

IO 线程池执行真正的处理。如果是 PRODUCER 生产请求,则将消息写入到底层的磁盘日志中;如果是 FETCH 请求,则从磁盘或页缓存中读取消息。当 IO 请求处理完请求后,会将生成的响应放入网络线程池的响应队列中,并由对应的网络线程负责将 Response 反还给客户端。

请求队列是所有网络线程共享的,而响应队列则是每个网络线程专属的。

IO 线程池默认参数num.io.threads=8

图中还有一个 Purgatory 的组件,这是 Kafka 中著名的“炼狱”组件。

它是用来缓存延时请求的,所谓延时请求,就是那些一时未满足条件的不可立刻处理的请求。

正文完
 0