乐趣区

关于java:一文说透kafka底层架构

底层架构

先停一下,学习之前,先看下如何学习,两篇不错的干货文章分享给你,肯定要点开看下

  • 如何从一般程序员,进阶架构师!
  • 工作几年?如何疾速降职架构师!!

6.1 存储架构

6.1.1 分段存储

开篇讲过,kafka 每个主题能够有多个分区,每个分区在它所在的 broker 上创立一个文件夹

每个分区又分为多个段,每个段两个文件,log 文件里程序存音讯,index 文件里存音讯的索引

段的命名间接以以后段的第一条音讯的 offset 为名

留神是偏移量,不是序号!第几条音讯 = 偏移量 + 1。相似数组长度和下标。

所以 offset 从 0 开始(能够开新队列新 groupid 生产第一条音讯打印 offset 失去验证)

例如:

0.log -> 有 8 条,offset 为 0-7

8.log -> 有两条,offset 为 8-9

10.log -> 有 xx 条,offset 从 10-xx

6.1.2 日志索引

每个 log 文件装备一个索引文件 *.index

文件格式为:(offset , 内存偏移地址)

综合上述,来看一个音讯的查找:

  • consumer 发动申请要求从 offset= 6 的音讯开始生产
  • kafka 间接依据文件名大小,发现 6 号音讯在 00000.log 这个文件里
  • 那文件找到了,它在文件的哪个地位呢?
  • 依据 index 文件,发现 6,9807,阐明音讯藏在这里!
  • 从 log 文件的 9807 地位开始读取。
  • 那读多长呢?简略,读到下一条音讯的偏移量进行就能够了

6.1.3 日志删除

Kafka 作为消息中间件,数据须要依照肯定的规定删除,否则数据量太大会把集群存储空间占满。

删除数据形式:

  • 依照工夫,超过一段时间后删除过期音讯
  • 依照音讯大小,音讯数量超过肯定大小后删除最旧的数据

Kafka 删除数据的最小单位:segment,也就是间接干掉文件!一删就是一个 log 和 index 文件

6.1.4 存储验证

1)数据筹备

将 broker 2 和 3 停掉,只保留 1

docker pause kafka-2 kafka-3

2)删掉 test 主题,通过 km 新建一个 test 主题,加 2 个分区

新建时,留神上面的选项:

segment.bytes = 1000,即:每个 log 文件达到 1000byte 时,开始创立新文件

删除策略:

retention.bytes = 2000,即:超出 2000byte 的旧日志被删除

retention.ms = 60000,即:超出 1 分钟后的旧日志被删除

以上任意一条满足,就会删除。

3)进入 kafka- 1 这台容器

docker exec -it kafka-1 sh

#查看容器中的文件信息
/ # ls /
bin    dev    etc    home   kafka  lib    lib64  media  mnt    opt    proc   root   run    sbin   srv    sys    tmp    usr    var

/ # cd /kafka/

/kafka # ls
kafka-logs-d0b9c75080d6

/kafka # cd kafka-logs-d0b9c75080d6/
/kafka/kafka-logs-d0b9c75080d6 # ls -l | grep test
drwxr-xr-x    2 root     root          4096 Jan 15 14:35 test-0
drwxr-xr-x    2 root     root          4096 Jan 15 14:35 test-1

#2 个分区的日志文件清单,留神以后还没有任何音讯写进来
#timeindex:日志的工夫信息
#leader-epoch,上面会讲到
/kafka/kafka-logs-d0b9c75080d6 # ls -lR test-*
test-0:
total 4
-rw-r--r--    1 root     root      10485760 Jan 15 14:35 00000000000000000000.index
-rw-r--r--    1 root     root             0 Jan 15 14:35 00000000000000000000.log
-rw-r--r--    1 root     root      10485756 Jan 15 14:35 00000000000000000000.timeindex
-rw-r--r--    1 root     root             8 Jan 15 14:35 leader-epoch-checkpoint

test-1:
total 4
-rw-r--r--    1 root     root      10485760 Jan 15 14:35 00000000000000000000.index
-rw-r--r--    1 root     root             0 Jan 15 14:35 00000000000000000000.log
-rw-r--r--    1 root     root      10485756 Jan 15 14:35 00000000000000000000.timeindex
-rw-r--r--    1 root     root             8 Jan 15 14:35 leader-epoch-checkpoint

4)往里灌数据。启动我的项目通过 swagger 发送音讯

留神!边发送边查看上一步的文件列表信息!

# 先发送 2 条,音讯开始进来,log 文件变大!音讯在两个分区之间一一减少。/kafka/kafka-logs-d0b9c75080d6 # ls -lR test-*
test-0:
total 8
-rw-r--r--    1 root     root      10485760 Jan 15 14:35 00000000000000000000.index
-rw-r--r--    1 root     root           875 Jan 15 14:46 00000000000000000000.log
-rw-r--r--    1 root     root      10485756 Jan 15 14:35 00000000000000000000.timeindex
-rw-r--r--    1 root     root             8 Jan 15 14:35 leader-epoch-checkpoint

test-1:
total 8
-rw-r--r--    1 root     root      10485760 Jan 15 14:35 00000000000000000000.index
-rw-r--r--    1 root     root           875 Jan 15 14:46 00000000000000000000.log
-rw-r--r--    1 root     root      10485756 Jan 15 14:35 00000000000000000000.timeindex
-rw-r--r--    1 root     root             8 Jan 15 14:35 leader-epoch-checkpoint

#持续逐条发送,返回再来看文件,大小为 1000,达到边界!/kafka/kafka-logs-d0b9c75080d6 # ls -lR test-*
test-0:
total 8
-rw-r--r--    1 root     root      10485760 Jan 15 14:35 00000000000000000000.index
-rw-r--r--    1 root     root          1000 Jan 15 14:46 00000000000000000000.log
-rw-r--r--    1 root     root      10485756 Jan 15 14:35 00000000000000000000.timeindex
-rw-r--r--    1 root     root             8 Jan 15 14:35 leader-epoch-checkpoint

test-1:
total 8
-rw-r--r--    1 root     root      10485760 Jan 15 14:35 00000000000000000000.index
-rw-r--r--    1 root     root          1000 Jan 15 14:46 00000000000000000000.log
-rw-r--r--    1 root     root      10485756 Jan 15 14:35 00000000000000000000.timeindex
-rw-r--r--    1 root     root             8 Jan 15 14:35 leader-epoch-checkpoint

#持续发送音讯!1 号分区的 log 文件开始决裂
#阐明第 8 条音讯曾经进入了第二个 log
/kafka/kafka-logs-d0b9c75080d6 # ls -lR test-*
test-0:
total 8
-rw-r--r--    1 root     root      10485760 Jan 15 14:35 00000000000000000000.index
-rw-r--r--    1 root     root          1000 Jan 15 14:46 00000000000000000000.log
-rw-r--r--    1 root     root      10485756 Jan 15 14:35 00000000000000000000.timeindex
-rw-r--r--    1 root     root             8 Jan 15 14:35 leader-epoch-checkpoint

test-1:
total 20
-rw-r--r--    1 root     root             0 Jan 15 14:46 00000000000000000000.index
-rw-r--r--    1 root     root          1000 Jan 15 14:46 00000000000000000000.log
-rw-r--r--    1 root     root            12 Jan 15 14:46 00000000000000000000.timeindex
-rw-r--r--    1 root     root      10485760 Jan 15 14:46 00000000000000000008.index
-rw-r--r--    1 root     root           125 Jan 15 14:46 00000000000000000008.log   #第二个 log 文件!-rw-r--r--    1 root     root            10 Jan 15 14:46 00000000000000000008.snapshot
-rw-r--r--    1 root     root      10485756 Jan 15 14:46 00000000000000000008.timeindex
-rw-r--r--    1 root     root             8 Jan 15 14:35 leader-epoch-checkpoint

#继续发送,另一个分区也开始拆散
/kafka/kafka-logs-d0b9c75080d6 # ls -lR test-*
test-0: 
total 20
-rw-r--r--    1 root     root             0 Jan 15 15:55 00000000000000000000.index
-rw-r--r--    1 root     root          1000 Jan 15 14:46 00000000000000000000.log
-rw-r--r--    1 root     root            12 Jan 15 15:55 00000000000000000000.timeindex
-rw-r--r--    1 root     root      10485760 Jan 15 15:55 00000000000000000008.index
-rw-r--r--    1 root     root           625 Jan 15 15:55 00000000000000000008.log
-rw-r--r--    1 root     root            10 Jan 15 15:55 00000000000000000008.snapshot
-rw-r--r--    1 root     root      10485756 Jan 15 15:55 00000000000000000008.timeindex
-rw-r--r--    1 root     root             8 Jan 15 14:35 leader-epoch-checkpoint

test-1:
total 20
-rw-r--r--    1 root     root             0 Jan 15 14:46 00000000000000000000.index
-rw-r--r--    1 root     root          1000 Jan 15 14:46 00000000000000000000.log
-rw-r--r--    1 root     root            12 Jan 15 14:46 00000000000000000000.timeindex
-rw-r--r--    1 root     root      10485760 Jan 15 14:46 00000000000000000008.index
-rw-r--r--    1 root     root           750 Jan 15 15:55 00000000000000000008.log
-rw-r--r--    1 root     root            10 Jan 15 14:46 00000000000000000008.snapshot
-rw-r--r--    1 root     root      10485756 Jan 15 14:46 00000000000000000008.timeindex
-rw-r--r--    1 root     root             8 Jan 15 14:35 leader-epoch-checkpoint


#继续发送音讯,分区越来越多。#过一段时间后再来查看,清理工作将会执行,超出的日志被删除!(默认调度距离 5min)#log.retention.check.interval.ms 参数指定

/kafka/kafka-logs-d0b9c75080d6 # ls -lR test-*
test-0:
total 8
-rw-r--r--    1 root     root      10485760 Jan 15 19:12 00000000000000000119.index
-rw-r--r--    1 root     root             0 Jan 15 19:12 00000000000000000119.log
-rw-r--r--    1 root     root            10 Jan 15 19:12 00000000000000000119.snapshot
-rw-r--r--    1 root     root      10485756 Jan 15 19:12 00000000000000000119.timeindex
-rw-r--r--    1 root     root            10 Jan 15 19:12 leader-epoch-checkpoint

test-1:
total 8
-rw-r--r--    1 root     root      10485760 Jan 15 19:12 00000000000000000119.index
-rw-r--r--    1 root     root             0 Jan 15 19:12 00000000000000000119.log
-rw-r--r--    1 root     root            10 Jan 15 19:12 00000000000000000119.snapshot
-rw-r--r--    1 root     root      10485756 Jan 15 19:12 00000000000000000119.timeindex
-rw-r--r--    1 root     root            10 Jan 15 19:12 leader-epoch-checkpoint

6.2 零拷贝

Kafka 在执行音讯的写入和读取这么快,其中的一个起因是零拷贝(Zero-copy)技术

6.2.1 传统文件读写

传统读写,波及到 4 次数据的复制。然而这个过程中,数据齐全没有变动,咱们仅仅是想从磁盘把数据送到网卡。

那有没有方法不绕这一圈呢?让磁盘和网卡之类的外围设备间接拜访内存,而不通过 cpu?

有!这就是 DMA(Direct Memory Access 间接内存拜访)。

6.2.2 DMA

DMA 其实是由 DMA 芯片(硬件反对)来管制的。通过 DMA 管制芯片,能够让网卡等外部设备间接去读取内存,而不是由 cpu 来回拷贝传输。这就是所谓的零拷贝

目前计算机支流硬件根本都反对 DMA,就包含咱们的硬盘和网卡。

kafka 就是调取操作系统的 sendfile,借助 DMA 来实现零拷贝数据传输的

6.2.3 java 实现

为加深了解,类比为 java 中的零拷贝:

  • 在 Java 中的零拷贝是通过 java.nio.channels.FileChannel 中的 transferTo 办法来实现的
  • transferTo 办法底层通过 native 调操作系统的 sendfile
  • 操作系统 sendfile 负责把数据从某个 fd(linux file descriptor)传输到另一个 fd

    备注:linux 下所有的设施都是一个文件描述符 fd

代码参考:

File file = new File("0.log");
RandomAccessFile raf = new RandomAccessFile(file, "rw");
// 文件通道,起源
FileChannel fileChannel = raf.getChannel();
// 网络通道,去处
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("1.1.1.1", 1234));
// 对接上,通过 transfer 间接送过来
fileChannel.transferTo(0, fileChannel.size(), socketChannel);

6.3 分区一致性

6.3.1 水位值

1)先回顾两个值:

2)再看下几个值的存储地位:

留神!分区是有 leader 和 follower 的,最新写的音讯会进入 leader,follower 从 leader 不停的同步

无论 leader 还是 follower,都有本人的 HW 和 LEO,存储在各自分区所在的磁盘上

leader 多一个 Remote LEO,它示意针对各个 follower 的 LEO,leader 又额定记了一份!

3)为什么这么做呢?

leader 会拿这些 remote 值里最小的来更新本人的 hw,具体过程咱们具体往下看

6.3.2 同步原理

咱们来看这几个值是如何更新的:

1)leader.LEO

这个很简略,每次 producer 有新音讯发过来,就会减少

2)其余值

另外的 4 个值初始化都是 0

他们的更新由 follower 的 fetch(同步音讯线程)失去的数据来决定!

如果把 fetch 看做是 leader 上提供的办法,由 follower 近程申请调用,那么它的伪代码大略是这个样子:

//java 伪代码!//follower 端的操作,不停的申请从 leader 获取最新数据
class Follower{
  private List<Message> messages;
  private HW hw;
  private LEO leo;
  
  @Schedule("不停的向 leader 发动同步申请")
  void execute(){
    // 向 leader 发动 fetch 申请,将本人的 leo 传过来
    //leader 返回 leo 之后最新的音讯,以及 leader 的 hw
    LeaderReturn lr = leader.fetch(this.leo) ;
    
    // 存音讯
    this.messages.addAll(lr.newMsg);
    // 减少 follower 的 leo 值
    this.leo = this.leo + lr.newMsg.length;
    // 比拟本人的 leo 和 leader 的 hw,取两者小的,作为 follower 的 hw
    this.hw = min(this.leo , lr.leaderHW);
  }
}



//leader 返回的报文
class LeaderReturn{
  // 新增的音讯
  List<Messages> newMsg;
  //leader 的 hw
  HW leaderHW;
}
//leader 在接到 follower 的 fetch 申请时,做的逻辑
class Leader{
  private List<Message> messages;
  private LEO leo;
  private HW hw;
  //Leader 比 follower 多了个 Remote!
  // 留神!如果有多个正本,那么 RemoteLEO 也有多个,每个正本对应一个
  private RemoteLEO remoteLEO;
  
  // 接到 follower 的 fetch 申请时,leader 做的事件
  LeaderReturn fetch(LEO followerLEO){
    // 依据 follower 传过来的 leo,来更新 leader 的 remote
    this.remoteLEO = followerLEO ;
    // 而后取 ISR(所有可用正本)的最小 leo 作为 leader 的 hw
    this.hw = min(this.leo , this.remoteLEO) ;
    
    // 从 leader 的音讯列表里,查找大于 follower 的 leo 的所有新音讯
    List<Message> newMsg = queryMsg(followerLEO) ;
    
    // 将最新的音讯(大于 follower leo 的那些),以及 leader 的 hw 返回给 follower
    LeaderReturn lr = new LeaderReturn(newMsg , this.hw)
    return lr;
  }
  
}

6.3.3 Leader Epoch

1)产生的背景

0.11 版本之前的 kafka,齐全借助 hw 作为音讯的基准,不论 leo。

产生故障后的规定:

  • follower 故障再次复原后,从磁盘读取 hw 的值并从 hw 开始剔除前面的音讯,并同步 leader 音讯
  • leader 故障后,新入选的 leader 的 hw 作为新的分区 hw,其余节点依照此 hw 进行剔除数据,并从新同步
  • 上述依据 hw 进行数据恢复会呈现数据失落和不统一的状况,上面离开来看

假如:

咱们有两个正本:leader(A),follower(B)

场景一:丢数据

  • 某个工夫点 B 挂了。当它复原后,以挂之前的 hw 为准,设置 leo = hw
  • 这就造成一个问题:事实中,leo 很可能是 大于 hw 的。leo 被回退了!
  • 如果这时候,恰好 A 也挂掉了。kafka 会重选 leader,B 被选中。
  • 过段时间,A 复原后变成 follower,从 B 开始同步数据。
  • 问题来了!下面说了,B 的数据是被回退过的,以它为基准会有问题
  • 最终后果:两者的数据都产生失落,没有中央能够找回!

场景二:数据不统一

  • 这次假如 AB 全挂了。比拟惨
  • B 先复原。然而它的 hw 有可能挂之前没从 A 同步过去(原来 A 是 leader)
  • 咱们假如,A.hw = 2 , B.hw = 1
  • B 复原后,集群里只有它本人,所以被选为 leader,开始承受新音讯
  • B.hw 上涨,变成 2
  • 而后,A 复原,原来 A.hw = 2,复原后以 B 的 hw,也就是 2 为基准开始同步。
  • 问题来了!B 当 leader 后新接到的 2 号音讯是不会同步给 A 的,A 始终保留着它当 leader 时的旧数据
  • 最终后果:数据不统一了!

2)改良思路

0.11 之后,kafka 改良了 hw 做主的规定,这就是 leader epoch

leader epoch 给 leader 节点带了一个版本号,相似于乐观锁的设计。

它的思维是,一旦产生机器故障,重启之后,不再机械的将 leo 退回 hw

而是借助 epoch 的版本信息,去申请以后 leader,让它去算一算 leo 应该是什么

3)实现原理

比照下面丢数据的问题:

  • A 为(leo=2 , hw=2),B 为(leo=2 , hw=1)
  • B 重启,然而 B 不再焦急将 leo 打回 hw,而是发动一个 Epoch 申请给以后 leader,也就是 A
  • A 收到 LE= 0 后,发现和本人的 LE 一样,阐明 B 在挂掉前后,leader 没变,都是 A 本人
  • 那么 A 就将本人的 leo 值返回给 B,也就是数字 2
  • B 收到 2 后和本人的 leo 比对取较小值,发现也是 2,那么不再退回到 hw 的 1
  • 没有回退,也就是信息 1 的地位没有被笼罩,最大水平的爱护了数据
  • 如果和下面一样的场景,A 挂掉,B 被选为 leader
  • 那么 A 再次启动时后,从 B 开始同步数据
  • 因为 B 之前没有回退,1 号信息失去了保留
  • 同时,B 的 LE(epoch 号码)开始减少,从 0 变成 1,offset 记录为 B 当 leader 时的地位,也就是 2
  • A 传过来的 epoch 为 0,B 是 1,不相等。那么取大于 0 的所有 epoch 里最小的

    (事实中可能产生了屡次从新选主,有多条 epoch)

  • 其实就是 LE= 1 的那条。事实中可能有多条。并找到它对应的 offset(也就是 2)给 A 返回去
  • 最终 A 失去了 B 同步过去的数据

再来看一致性问题的解决:

  • 还是下面的场景,AB 同时挂掉,然而 hw 还没同步,那么 A.hw=2 , B.hw=1
  • B 先启动被选成了 leader,新 leader 选举后,epoch 加了一条记录(参考下图,LE=1,这时候 offset=1)
  • 示意 B 从 1 开始往后持续写数据,新来了条信息,内容为 m3,写到 1 号位
  • A 启动前,集群只有 B 本人,音讯被确认,hw 上涨到 2,变成上面的样子
  • A 开始复原,启动后向 B 发送 epoch 申请,将本人的 LE= 0 通知 leader,也就是 B
  • B 发现自己的 LE 不同,同样去大于 0 的 LE 里最小的那条,也就是 1 , 对应的 offset 也是 1,返回给 A
  • A 从 1 开始同步数据,将本人本地的数据截断、笼罩,hw 回升到 2
  • 那么最新的写入的 m3 从 B 给同步到了 A,并笼罩了 A 上之前的旧数据 m2
  • 后果:数据放弃了统一

附:epochRequest 的具体流程图

本文由传智教育博学谷 – 狂野架构师教研团队公布,转载请注明出处!

如果本文对您有帮忙,欢送关注和点赞;如果您有任何倡议也可留言评论或私信,您的反对是我保持创作的能源

退出移动版