kafka3部分原理

13次阅读

共计 6930 个字符,预计需要花费 18 分钟才能阅读完成。

kafka 部分设计及原理

topic & partition & segment

  1. 定义

    topic:主题名,用于对消息进行分类,是一个逻辑上的概念

    partition:是物理上的一个概念,一个 topic 可以对应多个 partition,消息实际存储在 partition

    partition 是一个有序,不可变的记录序列。partition 中的每一条消息都有一个序列号,称之为offset,offset 在一个 partition 内唯一,用于区别消息

    segment:partition 被分为多个 segment 文件进行存储

  2. partition 的物理存储结构

    创建 topic:test.show.log, 副本数 2,分区数 3,segment 文件大小 512byte

    # 创建 topic
    bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 2 --partitions 3 --topic test.show.log --config segment.bytes=512
    # 查看 topic 状态
    bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test.show.log
    
     Topic:test.show.log     PartitionCount:3        ReplicationFactor:2     Configs:segment.bytes=512
            Topic: test.show.log    Partition: 0    Leader: 1       Replicas: 1,0   Isr: 1,0
            Topic: test.show.log    Partition: 1    Leader: 0       Replicas: 0,2   Isr: 0,2
            Topic: test.show.log    Partition: 2    Leader: 2       Replicas: 2,1   Isr: 2,1

    可以看到 broker.0 服务持有 partition 0,1 的副本,并且为 1 partition 的 leader

    打开其日志保存目录可以看到

    test.show.log-0
    test.show.log-1

    broker 对于 topic 的每一个 partition 使用单独的目录保存,每个目录下初始有

    00000000000000000000.index  00000000000000000000.log  00000000000000000000.timeindex  leader-epoch-checkpoint

    .log 文件:segment 日志文件

    .index 文件:segment offset 索引文件

    .timeindex 文件:segment timestamp 索引文件

    leader-epoch-checkpoint:用于副本备份机制

    每一个分区的日志文件被分为多个 segment 文件,segment 文件的命名规则:

    • 第一个 segment 文件名都是 0
    • <u> 后续 segment 文件名是上一个 segment 文件最后一条消息的 offset 值 </u>

segment 文件记录的信息

.index 文件:偏移量 -> 消息在.log 文件中的物理位置

.timeindex 文件:时间戳 T ->.index 文件中的偏移量 offset,表示表示比 T 晚的所有消息偏移量都比 offset 大

根据 offset 查找文件的步骤

.index 使用 稀疏索引,kafka 会在内存中维护一份索引,通过二分查找定位到消息所在的.log 文件,以及在.log 文件中的位置。

  1. 分区的意义

    • 有利于水平扩展
    • 负载均衡
  2. 分段的意义

    • 快速定位消息
    • 日志快速清除
  3. timeindex 的意义

    • 根据时间戳来定位消息
    • <u> 基于时间戳的日志切分策略 </u>
    • <u> 基于时间戳的日志清除策略 </u>

消息格式

消息格式经过了几个版本的变更

具体见:

https://www.cnblogs.com/qwang…

https://kafka.apache.org/docu…

挑几个字段来说明

key:每个消息都可以指定一个 key。可以通过给多个消息指定同一个 key 分发到同一个 partition 中,从而保证消息的有序性。

headers:可变长消息头,可以不需要解析 payload 而拿到一些消息的属性信息。

timestamp: 时间戳。时间戳有两种类型:CreateTime,LogAppendTime。

可以通过 broker 配置 log.message.timestamp.type 来指定全局 topic 时间戳类型;

也可以通过命令行创建 topic 时单独指定该 topic 的时间戳类型。

生产者发送消息时,可以指定消息的事件戳,如果未指定,则使用生产者客户端当前时间。

leader & 集群发现机制

kafka client 对 partition 的读写都是直接访问 leader。那么客户端是如何找到 leader 的?

主要是通过发送一个称之为 TopicMetadataRequest 的请求来获取。

TopicMetadataRequest => [TopicName]
  TopicName => string
Field Type Description
TopicName []string 想要获取的 topic 的元数组,如果为空,则下发所有 topic 的元数据
MetadataResponse => [Broker][TopicMetadata]
  Broker => NodeId Host Port  (any number of brokers may be returned)
    NodeId => int32
    Host => string
    Port => int32
  TopicMetadata => TopicErrorCode TopicName [PartitionMetadata]
    TopicErrorCode => int16
  PartitionMetadata => PartitionErrorCode PartitionId Leader Replicas Isr
    PartitionErrorCode => int16
    PartitionId => int32
    Leader => int32
    Replicas => [int32]
    Isr => [int32]
Field Description
Broker kafka broker 的信息,包括 broker id, hostname, port
Isr 与 leader 保持同步的 broker id
Leader leader 的 broker id,如果当前不存在 leader(leader 正在选举中),则为 -1
Replicas 所有当前存活的 follower

服务器 MetaData 的存储后续研究

kafka 协议文档地址:https://cwiki.apache.org/conf…

consumer & consumer group & offset commit

  1. pull vs push

    push :broker 控制传输速度,如果消费者处理速度较慢,会积压大量消息,最终导致消费者拒绝提供服务

    pull : 适用于向消费者批量发送大量数据

    pull 方式的不足:

    消费者可能需要轮训等待消息的到达,为避免这种情况,拉取请求中可以指示等待给定数量的数据到达。

  2. consumer 默认从哪个位置开始消费

        auto.offset.reset

  1. consumer group && offset commit

    consumer 作为组进行消费时,需要记录每个分区消费的 offset,以便于进行重平衡后,新的消费者可以从上一个位置继续消费。

    这个 offset 是由 consumer 主动提交的,broker 会记录在称之为__consumer_offsets 的 topic 中,其对应的 partition 为 hash(group.id)%mode

    提交方式

    1. 自动提交

      enable.auto.commit=true

    2. 手动提交

    一个 partition,同一时间只会被同一 group 内的 consumer 消费,如果 consumer 数量大于 partition 数量,则多余的 consumer 一直空闲

  2. rebalance

    consumer 的离开加入,partition 数量的变化,以及订阅 topic 数发生变更(可以通过正则表达式订阅多个 topic)都会导致 rebalance,rebalance 是由一个称之为 coordinator 的 broker 来负责的。

    coordinator 的选取:

    1. 看 offset 保存在哪个 partition 中
    2. 该 partition 的 leader 作为该 group 的 coordinator

    rebalance 大致流程:

    1. coordinator 随机选取一个 consumer 作为 leader,并将角色信息发给 consumers,还会把 follower 的信息发送给 leader。
    2. consumer leader 根据得到的信息分配 partition
    3. consumers 发送同步请求至 coordinator,consumer leader 发送的请求中包含分配情况
    4. coordinator 将分配情况告诉所有 consumer

ack & replica & leader election

  1. replica

    kafka 默认使用副本机制,将不需要备份的 topic 看做副本数为 1。kafka 备份的单元是 topic partition。

    follower 作为 consumer 从 leader 拉取数据,然后应用到自己的 log 中。拉取方式可以使 follower 批量写入自己的 log。

    Isr (in sync)状态

    需同时满足以下两个条件

    1. 节点的 session 必须在 zookeeper 中(必须和 zk 保持连接,通过 zookeeper 的心跳机制)
    2. 如果是一个 follwer,必须同步 leader 的写入,并且不能太落后

    leader 跟踪 Isr 节点集,如果 follower 死亡,卡住,或落后,leader 就会将他从 Isr 列表中删除。卡住或滞后副本的确定由 replica.lag.time.max.ms 配置项控制。

    replica.lag.time.max.ms

    If a follower hasn’t sent any fetch requests or hasn’t consumed up to the leaders log end offset for at least this time, the leader will remove the follower from isr

    long

    default 10000

  2. message committed 状态

    定义:Isr 集合内的所有节点将其写入自己的 log 中

    只有确认 commited 的消息才会被 consumer 消费,消费者不必担心会得到一条会丢失的消息,也就是说只要消费者得到了一条消息,那么即使 leader 挂掉,该消息也不会丢失(因为已同步至 Isr 集合)。

  3. acks

    对于 producer 来说,可以在发送请求的即时性和消息的持久化之间进行权衡,来选择是否等待消息 commited。

    这个选项可以通过 acks 配置。

    acks=-1|all 与 min.insync.replicas 配合可以最大程度保证消息的可靠性。

    acks=-1|all 表示消息被追加到 Isr 集合内所有节点后才返回,如果当前 Isr 只有一个 leader(follower 由于某些原因掉线),那么也会返回成功,随后 leader 挂掉,其他 follower 被选举为 leader,那么该条消息就会丢失。min.insync.replicas 可以指定,如果当前 Isr 内节点数量小于 min.insync.replicas 指定数量,则 producer 直接抛出异常。

  4. leader election

    kafka controller.brokers 中的一个节点会担当 controller 的角色(据说是通过 zk 创建节点,创建成功则为 controller,失败则监听该节点,如果 controller 挂掉,则再次竞争)

    controller 负责管理整个集群中分区和副本的状态。

    leader 选举

    如果 Isr 中有至少一个 replica 幸存,则选择其中一个为 leader。

    否则选择该 partition 中的任意一个幸存的 replica 为 leader。

    replica 都不工作

    1. 等待 ISR 中的任一个 replica 活过来,并且选他作为 leader。
    2. 选择第一个活过来的 replica 作为 leader。

    第一种等待的时间可能会比较长,或者不可用。第二种不保证包含了所有已 commited 的消息。

    需要在可用性和一致性当中做出折中。

    unclean.leader.election.enable 配置指定使用哪种方案,默认是 true,使用第 2 种。

  5. HW(高水位)

    数据的一致性

    待研究

log compaction

  1. 日志删除策略

    kafka 对于过期日志的删除有两种策略

    • delete

      直接删除过期的 segment 文件

    • compact

      通过创建新的 segment 文件将相同 key 的最新一条消息保留下来(缩容,合并)

compact 效果图:

  1. 应用场景

    这个特性可以保证日志包含每个 key 的最终值的完整快照,消费者就可以从这个 topic 中恢复自己的状态,而不需要保留所有更改的完整日志。

    • 数据容灾,提高数据的可用性。定时将内存中的数据备份到 topic 中,当崩溃恢复后再从 topic 中读回来
  2. 源码解析

    待研究

  3. 启用 log compaction

    首先确保 broker 配置项 log.cleaner.enable 为 true.

    可以通过设置 broker 配置项 log.cleanup.policy 为 compact,默认是 delete.

    可以通过设置 topic 配置项 c leanup.policy 为 compact,默认是 delete.

    topic 配置项可以覆盖 broker 全局配置.

    log.cleanup.policy 和 cleanup.policy 取值可以是 compact 或 delete 或 compact,delete

  4. 参考

    https://kafka.apache.org/documentation/#compaction

    https://blog.csdn.net/u013256…

网络层

  1. NIO
  2. 批量发送数据集时使用 sendfile。延申:零拷贝技术,写时拷贝技术
  3. 线程模型

    一个线程作为 acceptor,接受 tcp 连接,n 个处理线程,每个处理线程处理固定数量的连接。

  4. 协议

    定长编码

    int8, int16, int32, int64。大端序

    变长编码

    length+content

    字符串 length 用 int16 表示,二进制数组 length 用 int32 表示。

    content 为空用 length=- 1 表示

    数组

    sizeof(array)+array[0]+array[1]….+array[n]

    数组大小使用 int32 表示

集群扩展

增加 broker,增加 partition,增加 replication

待研究

事务幂等

  1. producer 请求的幂等性

    幂等:

    在编程中一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同

    producer 引入幂等性的意义:

    防止生产者重复生产消息。生产者进行 retry 时重复产生消息,有了幂等性之后,在进行 retry 重试时,只会有一条消息 commited。

    实现方式:

    1. PID。每个 producer 在初始化时会被分配一个唯一的 PID,改 PID 对用户不可见
    2. Sequence Number。producer 在向每个 paritition 发送的每条数据都会携带一个 seq。其从 0 递增
    3. broker 会缓存 PID 及其 seq。如果收到的消息 seq 比缓存 seq 大 1(highwater mark)则接受,否则丢弃。(tcp ack)

    非幂等

    幂等

    PID 的生成:

    1. producer 从任意一个 broker 获取事务协调者(Transaction Coordinator)的信息
    2. producer 向 Transaction Coordinator 请求 PID

    Transaction Coordinator 的选择

    PID 的生成规则及保存失效规则

    注意事项:

    如果使用 kafka 的幂等性,则必须开启 topic 配置 enable.idempotence

    When set to ‘true’, the producer will ensure that exactly one copy of each message is written in the stream. If ‘false’, producer retries due to broker failures, etc., may write duplicates of the retried message in the stream. Note that enabling idempotence requires max.in.flight.requests.per.connection to be less than or equal to 5, retriesto be greater than 0 and acks must be ‘all’. If these values are not explicitly set by the user, suitable values will be chosen. If incompatible values are set, a ConfigExceptionwill be thrown.

    前提条件 acks 必须是 all

  2. 事务

    待研究

dashboard

待研究

正文完
 0