乐趣区

关于kafka:二Kafka命令行操作和架构原理

一.kafka 命令行操作

1.1 创立 topic

[v2admin@hadoop10 kafka]$ bin/kafka-topics.sh --zookeeper hadoop10:2181  --create --replication-factor 2 --partitions 1 --topic  first
Created topic first.

1.2 查看 topic

[v2admin@hadoop10 kafka]$ bin/kafka-topics.sh --zookeeper hadoop10:2181 --list
demo
first

1.3 删除 topic

[v2admin@hadoop10 kafka]$ bin/kafka-topics.sh --zookeeper hadoop10:2181 -delete --topic first
Topic first is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

1.4 发送音讯

[v2admin@hadoop10 kafka]$ bin/kafka-console-producer.sh --broker-list hadoop10:9092 --topic demo
>hello
>world
>haha
>women
>tintian

1.5 生产音讯

[v2admin@hadoop11 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop10:9092 --from-beginning --topic demo
world
women
hello
haha
tintian

1.6 查看某个 topic

^C[v2admin@hadoop11 kafka]$ bin/kafka-topics.sh --zookeeper hadoop10:2181 --describe --topic demo
Topic: demo    PartitionCount: 2    ReplicationFactor: 2    Configs: 
    Topic: demo    Partition: 0    Leader: 11    Replicas: 11,12    Isr: 12,11
    Topic: demo    Partition: 1    Leader: 12    Replicas: 12,10    Isr: 10,12

二.Kafka 架构图

一个典型的 Kafka 集群由以下角色组成:
1)若干 Producer,能够是 web 前端产生的 Page View,或者是服务器日志,零碎 CPU、Memory 等,
2)若干 broker(Kafka 反对程度扩大,个别 broker 数量越多,集群吞吐率越高)
3)若干 Consumer Group
4)一个 Zookeeper 集群

Kafka 通过 Zookeeper 治理集群配置,选举 leader,以及在 Consumer Group 发生变化时进行 rebalance。Producer 应用 push 模式将音讯公布到 broker,Consumer 应用 pull 模式从 broker 订阅并生产音讯。

Kafka 中音讯是以 topic 进行分类的,生产者生产音讯,消费者生产音讯,都是面向 topic 的。
首先明确的是,topic 是逻辑上的概念,而 partition 则是物理上的概念,每个 partition 对应于一个 log 文件,该 log 文件中存储的就是 producer 生产的数据。

Producer 生产的数据会被一直追加到该 log 文件末端,且每条数据都有本人的 offset。

消费者组中的每个消费者,都会实时记录本人生产到了哪个 offset,以便出错复原时,从上次的地位持续生产。

生产者生产的音讯会一直追加到 log 文件开端,这样 log 文件会一直增大,为避免 log 文件过大导致数据定位效率低下,Kafka 采取了分片和索引机制,将每个 partition 分为多个 segment。
每个 segment 对应两个文件——“.index”文件和“.log”文件。
如下就是示例

-rw-rw-r--. 1 v2admin v2admin 10485760 1 月   6 21:17 00000000000000000000.index
-rw-rw-r--. 1 v2admin v2admin        0 1 月   6 21:17 00000000000000000000.log
-rw-rw-r--. 1 v2admin v2admin 10485756 1 月   6 21:17 00000000000000000000.timeindex
-rw-rw-r--. 1 v2admin v2admin        8 1 月   6 21:17 leader-epoch-checkpoint

三. 分区准则

通过后面晓得,Producer 生产的数据进入 kafka 中,会保留到一个 topic 中,但 topic 是逻辑上的概念,实际上,一个 topic 中,在物理是宰割成一个或者多个 Partition 保留的。

那生产者的数据是怎么进行分区的?这个分区准则是什么?
Producer 发送的数据封装成一个 ProducerRecord 对象,咱们来看下 API 中,它的源码

public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) {if (topic == null) {throw new IllegalArgumentException("Topic cannot be null.");
        } else if (timestamp != null && timestamp < 0L) {throw new IllegalArgumentException(String.format("Invalid timestamp: %d. Timestamp should always be non-negative or null.", timestamp));
        } else if (partition != null && partition < 0) {throw new IllegalArgumentException(String.format("Invalid partition: %d. Partition number should always be non-negative or null.", partition));
        } else {
            this.topic = topic;
            this.partition = partition;
            this.key = key;
            this.value = value;
            this.timestamp = timestamp;
            this.headers = new RecordHeaders(headers);
        }
    }

    public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {this(topic, partition, timestamp, key, value, (Iterable)null);
    }

    public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {this(topic, partition, (Long)null, key, value, headers);
    }

    public ProducerRecord(String topic, Integer partition, K key, V value) {this(topic, partition, (Long)null, key, value, (Iterable)null);
    }

    public ProducerRecord(String topic, K key, V value) {this(topic, (Integer)null, (Long)null, key, value, (Iterable)null);
    }

    public ProducerRecord(String topic, V value) {this(topic, (Integer)null, (Long)null, (Object)null, value, (Iterable)null);
    }
    

1)在指定 Partition 时,间接将 Partition 的值作为 Partition 值
2)在没指定 Partition 时,但指定了 key 的状况下,则将 key 的 hash 与 topic 的 Partition 值进行取余,失去的值作为 Partition 值
3)都没有指定状况下,第一次调用就随机生成一个整数,前面每次调用在这个整数上自增。
这个随机整数与 topic 可用的 Partition 总数取余失去 Partition 值,这个也就是 roundp-robin 算法。

四. 数据可靠性

Producer 发送的数据,如何保障可能达到指定的 topic?可靠性如何保障?
topic 的每个 partition 收到 producer 发送的数据后,都要向 producer 发送 ack,示意确认收到,如果 producer 收到 ack,就会进行下一轮的发送,否则从新发送数据。

那 ack 什么时候发送呢?
有以下两种同步策略

计划 长处 毛病
半数以上实现同步,就发送 ack 提早低 选举新的 leader 时,容忍 n 台节点的故障,须要 2n+ 1 个正本
全副实现同步,才发送 ack 选举新的 leader 时,容忍 n 台节点的故障,须要 n + 1 个正本 提早高

kafka 采纳了第二种计划,
1)如果是第一种计划,为了容忍 n 个节点的故障,第一种计划须要 2n+ 1 个正本,而第二种计划则须要 n + 1 个正本,Kafka 的每个分区都有大量的数据,那第一种计划会造成大量数据的冗余,资源节约。
2)第二种计划的网络提早会比拟高,不过网络提早对 Kafka 的影响不大,能够忽略不计。

kafka 采纳第二种计划后,那还有一个问题,
那看下这个场景,leader 收到数据,这时候所有 follower 开始同步数据,但这个过程中,有一个 follower,因为某种原因 (自身故障或者网络故障缘故等),长时间不能与 leader 进行同步,那 leader 就要始终等上来,直到它实现同步,能力发送 ack。
为了解决这个问题,Leader 保护了一个动静的 in-sync replica set (ISR),意为和 leader 放弃同步的 follower 汇合。当 ISR 中的 follower 实现数据的同步之后,leader 就会给 producer 发送 ack。如果 follower 长时间未向 leader 同步数据,则该 follower 将被踢出 ISR,该工夫阈值由 replica.lag.time.max.ms 参数设定。Leader 产生故障之后,就会从 ISR 中选举新的 leader。

但还有一个问题,有些时候,咱们发送的数据并不是那么多重要,对可靠性要求也就不高,所以没必要等 ISR 中的 Follower 全副接管胜利。
Kafka 提供三种牢靠级别,咱们能够自行抉择:

acks 参数 形容
0 producer 不期待 broker 的 ack,这一操作提供了一个最低的提早,broker 一接管到还没有写入磁盘就曾经返回,当 broker 故障时有可能失落数据;
1 producer 期待 broker 的 ack,partition 的 leader 落盘胜利后返回 ack,如果在 follower 同步胜利之前 leader 故障,那么将会失落数据;
-1(all) producer 期待 broker 的 ack,partition 的 leader 和 follower 全副落盘胜利后才返回 ack。然而如果在 follower 同步实现后,broker 发送 ack 之前,leader 产生故障,那么会造成数据反复。

那么具体故障是如何解决的呢?如下图示例

故障 解决细节
Follower 故障 follower 产生故障,会被长期踢出 ISR,待该 follower 复原后,follower 会读取本地磁盘记录的上次的 HW,并将 log 文件高于 HW 的局部截取掉,从 HW 开始向 leader 进行同步。等该 follower 的 LEO 大于等于该 Partition 的 HW,即 follower 追上 leader 之后,就能够重新加入 ISR 了
Leader 故障 leader 产生故障之后,会从 ISR 中选出一个新的 leader,之后,为保障多个正本之间的数据一致性,其余的 follower 会先将各自的 log 文件高于 HW 的局部截掉,而后从新的 leader 同步数据。

五.Exactly Once 和事务

将服务器 ACK 级别设置为 0,能够保障生产者每条音讯只会被发送一次,即 At Most Once 语义,但不能保证数据不失落。
将服务器的 ACK 级别设置为 -1,能够保障 Producer 到 Server 之间不会失落数据,即 At Least Once 语义,但不不能保证数据不反复。

这就难堪了,咱们对于一些重要的数据,既要保证数据不失落,也要保证数据不反复,也就是 Exactly Once 语义。

在 0.11 版本之前的 Kafka 是没啥方法的,只能在生产者发送数据保证数据不丢,而后在消费者时,对数据去重。
在 0.11 版本之后的 Kafka 引入一个个性:幂等性,就是用户对于同一操作发动的一次申请或者屡次申请的后果是统一的,不会因为屡次点击而产生了副作用。
在 kafka 中,就是指 Producer 不管向 Server 发送多少次反复数据,Server 端都只会长久化一条。
幂等性与 At Least Once 联合,就形成 kafka 的 Exactly Once 语义:
At Least Once + 幂等性 = Exactly Once

启动幂等性,要将 Producer 的参数中 enable.idompotence 设置为 true 即可。
开启幂等性的 Producer 在初始化的时候会被调配一个 PID,发往同一 Partition 的音讯会附带 Sequence Number。而 Broker 端会对 <PID, Partition, SeqNumber> 做缓存,当具备雷同主键的音讯提交时,Broker 只会长久化一条。

然而留神,不同的 Partition 也有不同主键,所以幂等性无奈保障跨分区的 Exactly Once,而且 PID 重启也会发生变化。

那这个怎么解决?就是事务,事务,就是要么全胜利,要么全失败,kafka 从 0.11 版本开始引入事务反对,这个事务能够保障 Kafka 在 Exactly Once 语义根底上,生产和消费者能够跨分区和会话,要么全胜利,要么全失败。
1)Producer 事务
为了实现跨分区跨会话的事务,这里引入一个全局惟一的 Transaction ID,并将 Producer 取得的 PID 和 Transaction ID 绑定。这样当 Producer 重启后就能够通过正在进行的 Transaction ID 取得原来的 PID。
为了治理 Transaction,Kafka 引入了一个新的组件 Transaction Coordinator。
Producer 就是通过和 Transaction Coordinator 交互取得 Transaction ID 对应的工作状态。
Transaction Coordinator 还负责将事务所有写入 Kafka 的一个外部 Topic,这样即便整个服务重启,因为事务状态失去保留,进行中的事务状态能够失去复原,从而持续进行。

2)Consumer 事务
对于 Consumer 而言,事务的保障就会绝对较弱,很难保障 Commit 的信息被准确生产。
这是因为 Consumer 能够通过 offset 拜访任意信息,而不同的 Segment File 生命周期不同,同一事务的音讯可能会呈现重启后被删除的状况。
解决这个问题,下一章 Kafka API 上会说。

六.Consumer 消费者

6.1 生产形式

Consumer 采纳 pull 模式从 broker 中读取数据。
为什么不采纳 push 模式呢?
因为采纳 push 模式,音讯发送速率由 broker 决定,所以 push 模式很难适应生产速率不同的 Consumer。
比方说,push 模式下
broker 的推送音讯速率每秒 100 条,但 Consumer 解决音讯的速率是每秒 10 条,那每秒就有 90 条音讯来不及解决,典型的结果就是拒绝服务、网络梗塞。
采纳 pull 模式,Consumer 能够依据本人的生产能力以适当的速率生产音讯。

pull 模式也有不足之处,那就是 broker 没有数据,Consumer 很可能会陷入循环中,始终返回空数据。
针对这一点,Kafka 的消费者在生产数据时会传入一个时长参数 timeout,如果以后没有数据可供生产,consumer 会期待一段时间之后再返回,这段时长即为 timeout。

6.2 分区调配策略

一个 topic 中有多个 Partition,一个 Consumer group 中也会有多个 Consumer,哪个 Partition 由哪个 Consumer 来解决?这就是 Partition 的调配问题。
kafka 中有两种调配形式,round-robin 和 range。
1)round-bin 轮询分区
比方我 1,2,3,4,5,6,7 条数据,有 a、b、c 三个消费者,那怎么调配呢?
这种模式下这样调配
1 给 a,2 给 b,3 给 c,4 给 a,5 给 b,6 给 c,7 给 a,就这样轮询调配。
最初调配
a:1,4,7;b:2,5 c:3,6

2)range 模式
如果有 10 个分区,3 个消费者,把分区依照序号排列 0,1,2,3,4,5,6,7,8,9;消费者为 a,b,c,那么用分区数除以消费者数来决定每个 Consumer 生产几个 Partition,除不尽的后面几个消费者将会多生产一个
最初调配后果如下

a:0,1,2,3
b:4,5,6
c:7,8,9

如果有 11 个分区将会是:

C1:0,1,2,3
C2:4,5,6,7
C3:8,9,10

如果咱们有两个主题 T1,T2,别离有 10 个分区,最初的调配后果将会是这样:

C1:T1(0,1,2,3)T2(0,1,2,3)
C2:T1(4,5,6)T2(4,5,6)
C3:T1(7,8,9)T2(7,8,9)

在这种状况下,C1 多生产了两个分区

6.3 offset

consumer 在生产过程中可能会呈现断电宕机等故障,consumer 复原后,须要从故障前的地位的持续生产,所以 consumer 须要实时记录本人生产到了哪个 offset,以便故障复原后持续生产。
Kafka 0.9 版本之前,consumer 默认将 offset 保留在 Zookeeper 中,从 0.9 版本开始,consumer 默认将 offset 保留在 Kafka 一个内置的 topic 中,该 topic 为__consumer_offsets。

6.4 Zookeeper 的作用

Kafka 集群中 broker 的上线下线、topic 的分区正本调配、Leader 的选举等等,这些须要有一个角色来治理,这个角色就是 Controller,Controller 是从集群中的 broker 中选举进去的,这个 Controller 的工作是依赖 Zookeeper 实现的。

退出移动版