底层架构


先停一下,学习之前,先看下如何学习,两篇不错的干货文章分享给你,肯定要点开看下

  • 如何从一般程序员,进阶架构师!
  • 工作几年?如何疾速降职架构师!!

6.1 存储架构

6.1.1 分段存储

开篇讲过,kafka每个主题能够有多个分区,每个分区在它所在的broker上创立一个文件夹

每个分区又分为多个段,每个段两个文件,log文件里程序存音讯,index文件里存音讯的索引

段的命名间接以以后段的第一条音讯的offset为名

留神是偏移量,不是序号! 第几条音讯 = 偏移量 + 1。相似数组长度和下标。

所以offset从0开始(能够开新队列新groupid生产第一条音讯打印offset失去验证)

例如:

0.log -> 有8条,offset为 0-7

8.log -> 有两条,offset为 8-9

10.log -> 有xx条,offset从10-xx

6.1.2 日志索引

每个log文件装备一个索引文件 *.index

文件格式为: (offset , 内存偏移地址)

综合上述,来看一个音讯的查找:

  • consumer发动申请要求从offset=6的音讯开始生产
  • kafka间接依据文件名大小,发现6号音讯在00000.log这个文件里
  • 那文件找到了,它在文件的哪个地位呢?
  • 依据index文件,发现 6,9807,阐明音讯藏在这里!
  • 从log文件的 9807 地位开始读取。
  • 那读多长呢?简略,读到下一条音讯的偏移量进行就能够了

6.1.3 日志删除

Kafka作为消息中间件,数据须要依照肯定的规定删除,否则数据量太大会把集群存储空间占满。

删除数据形式:

  • 依照工夫,超过一段时间后删除过期音讯
  • 依照音讯大小,音讯数量超过肯定大小后删除最旧的数据

Kafka删除数据的最小单位:segment,也就是间接干掉文件!一删就是一个log和index文件

6.1.4 存储验证

1)数据筹备

将broker 2和3 停掉,只保留1

docker pause kafka-2 kafka-3

2)删掉test主题,通过km新建一个test主题,加2个分区

新建时,留神上面的选项:

segment.bytes = 1000 ,即:每个log文件达到1000byte时,开始创立新文件

删除策略:

retention.bytes = 2000,即:超出2000byte的旧日志被删除

retention.ms = 60000,即:超出1分钟后的旧日志被删除

以上任意一条满足,就会删除。

3)进入kafka-1这台容器

docker exec -it kafka-1 sh#查看容器中的文件信息/ # ls /bin    dev    etc    home   kafka  lib    lib64  media  mnt    opt    proc   root   run    sbin   srv    sys    tmp    usr    var/ # cd /kafka//kafka # lskafka-logs-d0b9c75080d6/kafka # cd kafka-logs-d0b9c75080d6//kafka/kafka-logs-d0b9c75080d6 # ls -l | grep testdrwxr-xr-x    2 root     root          4096 Jan 15 14:35 test-0drwxr-xr-x    2 root     root          4096 Jan 15 14:35 test-1#2个分区的日志文件清单,留神以后还没有任何音讯写进来#timeindex:日志的工夫信息#leader-epoch,上面会讲到/kafka/kafka-logs-d0b9c75080d6 # ls -lR test-*test-0:total 4-rw-r--r--    1 root     root      10485760 Jan 15 14:35 00000000000000000000.index-rw-r--r--    1 root     root             0 Jan 15 14:35 00000000000000000000.log-rw-r--r--    1 root     root      10485756 Jan 15 14:35 00000000000000000000.timeindex-rw-r--r--    1 root     root             8 Jan 15 14:35 leader-epoch-checkpointtest-1:total 4-rw-r--r--    1 root     root      10485760 Jan 15 14:35 00000000000000000000.index-rw-r--r--    1 root     root             0 Jan 15 14:35 00000000000000000000.log-rw-r--r--    1 root     root      10485756 Jan 15 14:35 00000000000000000000.timeindex-rw-r--r--    1 root     root             8 Jan 15 14:35 leader-epoch-checkpoint

4)往里灌数据。启动我的项目通过swagger发送音讯

留神!边发送边查看上一步的文件列表信息!

#先发送2条,音讯开始进来,log文件变大!音讯在两个分区之间一一减少。/kafka/kafka-logs-d0b9c75080d6 # ls -lR test-*test-0:total 8-rw-r--r--    1 root     root      10485760 Jan 15 14:35 00000000000000000000.index-rw-r--r--    1 root     root           875 Jan 15 14:46 00000000000000000000.log-rw-r--r--    1 root     root      10485756 Jan 15 14:35 00000000000000000000.timeindex-rw-r--r--    1 root     root             8 Jan 15 14:35 leader-epoch-checkpointtest-1:total 8-rw-r--r--    1 root     root      10485760 Jan 15 14:35 00000000000000000000.index-rw-r--r--    1 root     root           875 Jan 15 14:46 00000000000000000000.log-rw-r--r--    1 root     root      10485756 Jan 15 14:35 00000000000000000000.timeindex-rw-r--r--    1 root     root             8 Jan 15 14:35 leader-epoch-checkpoint#持续逐条发送,返回再来看文件,大小为1000,达到边界!/kafka/kafka-logs-d0b9c75080d6 # ls -lR test-*test-0:total 8-rw-r--r--    1 root     root      10485760 Jan 15 14:35 00000000000000000000.index-rw-r--r--    1 root     root          1000 Jan 15 14:46 00000000000000000000.log-rw-r--r--    1 root     root      10485756 Jan 15 14:35 00000000000000000000.timeindex-rw-r--r--    1 root     root             8 Jan 15 14:35 leader-epoch-checkpointtest-1:total 8-rw-r--r--    1 root     root      10485760 Jan 15 14:35 00000000000000000000.index-rw-r--r--    1 root     root          1000 Jan 15 14:46 00000000000000000000.log-rw-r--r--    1 root     root      10485756 Jan 15 14:35 00000000000000000000.timeindex-rw-r--r--    1 root     root             8 Jan 15 14:35 leader-epoch-checkpoint#持续发送音讯!1号分区的log文件开始决裂#阐明第8条音讯曾经进入了第二个log/kafka/kafka-logs-d0b9c75080d6 # ls -lR test-*test-0:total 8-rw-r--r--    1 root     root      10485760 Jan 15 14:35 00000000000000000000.index-rw-r--r--    1 root     root          1000 Jan 15 14:46 00000000000000000000.log-rw-r--r--    1 root     root      10485756 Jan 15 14:35 00000000000000000000.timeindex-rw-r--r--    1 root     root             8 Jan 15 14:35 leader-epoch-checkpointtest-1:total 20-rw-r--r--    1 root     root             0 Jan 15 14:46 00000000000000000000.index-rw-r--r--    1 root     root          1000 Jan 15 14:46 00000000000000000000.log-rw-r--r--    1 root     root            12 Jan 15 14:46 00000000000000000000.timeindex-rw-r--r--    1 root     root      10485760 Jan 15 14:46 00000000000000000008.index-rw-r--r--    1 root     root           125 Jan 15 14:46 00000000000000000008.log   #第二个log文件!-rw-r--r--    1 root     root            10 Jan 15 14:46 00000000000000000008.snapshot-rw-r--r--    1 root     root      10485756 Jan 15 14:46 00000000000000000008.timeindex-rw-r--r--    1 root     root             8 Jan 15 14:35 leader-epoch-checkpoint#继续发送,另一个分区也开始拆散/kafka/kafka-logs-d0b9c75080d6 # ls -lR test-*test-0: total 20-rw-r--r--    1 root     root             0 Jan 15 15:55 00000000000000000000.index-rw-r--r--    1 root     root          1000 Jan 15 14:46 00000000000000000000.log-rw-r--r--    1 root     root            12 Jan 15 15:55 00000000000000000000.timeindex-rw-r--r--    1 root     root      10485760 Jan 15 15:55 00000000000000000008.index-rw-r--r--    1 root     root           625 Jan 15 15:55 00000000000000000008.log-rw-r--r--    1 root     root            10 Jan 15 15:55 00000000000000000008.snapshot-rw-r--r--    1 root     root      10485756 Jan 15 15:55 00000000000000000008.timeindex-rw-r--r--    1 root     root             8 Jan 15 14:35 leader-epoch-checkpointtest-1:total 20-rw-r--r--    1 root     root             0 Jan 15 14:46 00000000000000000000.index-rw-r--r--    1 root     root          1000 Jan 15 14:46 00000000000000000000.log-rw-r--r--    1 root     root            12 Jan 15 14:46 00000000000000000000.timeindex-rw-r--r--    1 root     root      10485760 Jan 15 14:46 00000000000000000008.index-rw-r--r--    1 root     root           750 Jan 15 15:55 00000000000000000008.log-rw-r--r--    1 root     root            10 Jan 15 14:46 00000000000000000008.snapshot-rw-r--r--    1 root     root      10485756 Jan 15 14:46 00000000000000000008.timeindex-rw-r--r--    1 root     root             8 Jan 15 14:35 leader-epoch-checkpoint#继续发送音讯,分区越来越多。#过一段时间后再来查看,清理工作将会执行,超出的日志被删除!(默认调度距离5min)#log.retention.check.interval.ms 参数指定/kafka/kafka-logs-d0b9c75080d6 # ls -lR test-*test-0:total 8-rw-r--r--    1 root     root      10485760 Jan 15 19:12 00000000000000000119.index-rw-r--r--    1 root     root             0 Jan 15 19:12 00000000000000000119.log-rw-r--r--    1 root     root            10 Jan 15 19:12 00000000000000000119.snapshot-rw-r--r--    1 root     root      10485756 Jan 15 19:12 00000000000000000119.timeindex-rw-r--r--    1 root     root            10 Jan 15 19:12 leader-epoch-checkpointtest-1:total 8-rw-r--r--    1 root     root      10485760 Jan 15 19:12 00000000000000000119.index-rw-r--r--    1 root     root             0 Jan 15 19:12 00000000000000000119.log-rw-r--r--    1 root     root            10 Jan 15 19:12 00000000000000000119.snapshot-rw-r--r--    1 root     root      10485756 Jan 15 19:12 00000000000000000119.timeindex-rw-r--r--    1 root     root            10 Jan 15 19:12 leader-epoch-checkpoint

6.2 零拷贝

Kafka 在执行音讯的写入和读取这么快,其中的一个起因是零拷贝(Zero-copy)技术

6.2.1 传统文件读写

传统读写,波及到 4 次数据的复制。然而这个过程中,数据齐全没有变动,咱们仅仅是想从磁盘把数据送到网卡。

那有没有方法不绕这一圈呢?让磁盘和网卡之类的外围设备间接拜访内存,而不通过cpu?

有! 这就是DMA(Direct Memory Access 间接内存拜访)。

6.2.2 DMA

DMA其实是由DMA芯片(硬件反对)来管制的。通过DMA管制芯片,能够让网卡等外部设备间接去读取内存,而不是由cpu来回拷贝传输。这就是所谓的零拷贝

目前计算机支流硬件根本都反对DMA,就包含咱们的硬盘和网卡。

kafka就是调取操作系统的sendfile,借助DMA来实现零拷贝数据传输的

6.2.3 java实现

为加深了解,类比为java中的零拷贝:

  • 在Java中的零拷贝是通过java.nio.channels.FileChannel中的transferTo办法来实现的
  • transferTo办法底层通过native调操作系统的sendfile
  • 操作系统sendfile负责把数据从某个fd(linux file descriptor)传输到另一个fd

    备注:linux下所有的设施都是一个文件描述符fd

代码参考:

File file = new File("0.log");RandomAccessFile raf = new RandomAccessFile(file, "rw");//文件通道,起源FileChannel fileChannel = raf.getChannel();//网络通道,去处SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("1.1.1.1", 1234));//对接上,通过transfer间接送过来fileChannel.transferTo(0, fileChannel.size(), socketChannel);

6.3 分区一致性

6.3.1 水位值

1)先回顾两个值:

2)再看下几个值的存储地位:

留神!分区是有leader和follower的,最新写的音讯会进入leader,follower从leader不停的同步

无论leader还是follower,都有本人的HW和LEO,存储在各自分区所在的磁盘上

leader多一个Remote LEO,它示意针对各个follower的LEO,leader又额定记了一份!

3)为什么这么做呢?

leader会拿这些remote值里最小的来更新本人的hw,具体过程咱们具体往下看

6.3.2 同步原理

咱们来看这几个值是如何更新的:

1)leader.LEO

这个很简略,每次producer有新音讯发过来,就会减少

2)其余值

另外的4个值初始化都是 0

他们的更新由follower的fetch(同步音讯线程)失去的数据来决定!

如果把fetch看做是leader上提供的办法,由follower近程申请调用,那么它的伪代码大略是这个样子:

//java伪代码!//follower端的操作,不停的申请从leader获取最新数据class Follower{  private List<Message> messages;  private HW hw;  private LEO leo;    @Schedule("不停的向leader发动同步申请")  void execute(){    //向leader发动fetch申请,将本人的leo传过来    //leader返回leo之后最新的音讯,以及leader的hw    LeaderReturn lr = leader.fetch(this.leo) ;        //存音讯    this.messages.addAll(lr.newMsg);    //减少follower的leo值    this.leo = this.leo + lr.newMsg.length;    //比拟本人的leo和leader的hw,取两者小的,作为follower的hw    this.hw = min(this.leo , lr.leaderHW);  }}//leader返回的报文class LeaderReturn{  //新增的音讯  List<Messages> newMsg;  //leader的hw  HW leaderHW;}
//leader在接到follower的fetch申请时,做的逻辑class Leader{  private List<Message> messages;  private LEO leo;  private HW hw;  //Leader比follower多了个Remote!  //留神!如果有多个正本,那么RemoteLEO也有多个,每个正本对应一个  private RemoteLEO remoteLEO;    //接到follower的fetch申请时,leader做的事件  LeaderReturn fetch(LEO followerLEO){    //依据follower传过来的leo,来更新leader的remote    this.remoteLEO = followerLEO ;    //而后取ISR(所有可用正本)的最小leo作为leader的hw    this.hw = min(this.leo , this.remoteLEO) ;        //从leader的音讯列表里,查找大于follower的leo的所有新音讯    List<Message> newMsg = queryMsg(followerLEO) ;        //将最新的音讯(大于follower leo的那些),以及leader的hw返回给follower    LeaderReturn lr = new LeaderReturn(newMsg , this.hw)    return lr;  }  }

6.3.3 Leader Epoch

1)产生的背景

0.11版本之前的kafka,齐全借助hw作为音讯的基准,不论leo。

产生故障后的规定:

  • follower故障再次复原后,从磁盘读取hw的值并从hw开始剔除前面的音讯,并同步leader音讯
  • leader故障后,新入选的leader的hw作为新的分区hw,其余节点依照此hw进行剔除数据,并从新同步
  • 上述依据hw进行数据恢复会呈现数据失落和不统一的状况,上面离开来看

假如:

咱们有两个正本:leader(A),follower(B)

场景一:丢数据

  • 某个工夫点B挂了。当它复原后,以挂之前的hw为准,设置 leo = hw
  • 这就造成一个问题:事实中,leo 很可能是 大于 hw的。leo被回退了!
  • 如果这时候,恰好A也挂掉了。kafka会重选leader,B被选中。
  • 过段时间,A复原后变成follower,从B开始同步数据。
  • 问题来了!下面说了,B的数据是被回退过的,以它为基准会有问题
  • 最终后果:两者的数据都产生失落,没有中央能够找回!

场景二:数据不统一

  • 这次假如AB全挂了。比拟惨
  • B先复原。然而它的hw有可能挂之前没从A同步过去(原来A是leader)
  • 咱们假如,A.hw = 2 , B.hw = 1
  • B复原后,集群里只有它本人,所以被选为leader,开始承受新音讯
  • B.hw上涨,变成2
  • 而后,A复原,原来A.hw = 2 ,复原后以B的hw,也就是2为基准开始同步。
  • 问题来了!B当leader后新接到的2号音讯是不会同步给A的,A始终保留着它当leader时的旧数据
  • 最终后果:数据不统一了!

2)改良思路

0.11之后,kafka改良了hw做主的规定,这就是leader epoch

leader epoch给leader节点带了一个版本号,相似于乐观锁的设计。

它的思维是,一旦产生机器故障,重启之后,不再机械的将leo退回hw

而是借助epoch的版本信息,去申请以后leader,让它去算一算leo应该是什么

3)实现原理

比照下面丢数据的问题:

  • A为(leo=2 , hw=2),B为(leo=2 , hw=1)
  • B重启,然而B不再焦急将leo打回hw,而是发动一个Epoch申请给以后leader,也就是A
  • A收到LE=0后,发现和本人的LE一样,阐明B在挂掉前后,leader没变,都是A本人
  • 那么A就将本人的leo值返回给B,也就是数字2
  • B收到2后和本人的leo比对取较小值,发现也是2,那么不再退回到hw的1
  • 没有回退,也就是信息1的地位没有被笼罩,最大水平的爱护了数据
  • 如果和下面一样的场景,A挂掉,B被选为leader

  • 那么A再次启动时后,从B开始同步数据
  • 因为B之前没有回退,1号信息失去了保留
  • 同时,B的LE(epoch号码)开始减少,从0变成1,offset记录为B当leader时的地位,也就是2
  • A传过来的epoch为0,B是1,不相等。那么取大于0的所有epoch里最小的

    (事实中可能产生了屡次从新选主,有多条epoch)

  • 其实就是LE=1的那条。事实中可能有多条。并找到它对应的offset(也就是2)给A返回去
  • 最终A失去了B同步过去的数据

再来看一致性问题的解决:

  • 还是下面的场景,AB同时挂掉,然而hw还没同步,那么A.hw=2 , B.hw=1
  • B先启动被选成了leader,新leader选举后,epoch加了一条记录(参考下图,LE=1,这时候offset=1)
  • 示意B从1开始往后持续写数据,新来了条信息,内容为m3,写到1号位
  • A启动前,集群只有B本人,音讯被确认,hw上涨到2,变成上面的样子

  • A开始复原,启动后向B发送epoch申请,将本人的LE=0通知leader,也就是B
  • B发现自己的LE不同,同样去大于0的LE里最小的那条,也就是1 , 对应的offset也是1,返回给A
  • A从1开始同步数据,将本人本地的数据截断、笼罩,hw回升到2
  • 那么最新的写入的m3从B给同步到了A,并笼罩了A上之前的旧数据m2
  • 后果:数据放弃了统一

附:epochRequest的具体流程图

本文由传智教育博学谷 - 狂野架构师教研团队公布,转载请注明出处!

如果本文对您有帮忙,欢送关注和点赞;如果您有任何倡议也可留言评论或私信,您的反对是我保持创作的能源