共计 9769 个字符,预计需要花费 25 分钟才能阅读完成。
底层架构
先停一下,学习之前,先看下如何学习,两篇不错的干货文章分享给你,肯定要点开看下
- 如何从一般程序员,进阶架构师!
- 工作几年?如何疾速降职架构师!!
6.1 存储架构
6.1.1 分段存储
开篇讲过,kafka 每个主题能够有多个分区,每个分区在它所在的 broker 上创立一个文件夹
每个分区又分为多个段,每个段两个文件,log 文件里程序存音讯,index 文件里存音讯的索引
段的命名间接以以后段的第一条音讯的 offset 为名
留神是偏移量,不是序号!第几条音讯 = 偏移量 + 1。相似数组长度和下标。
所以 offset 从 0 开始(能够开新队列新 groupid 生产第一条音讯打印 offset 失去验证)
例如:
0.log -> 有 8 条,offset 为 0-7
8.log -> 有两条,offset 为 8-9
10.log -> 有 xx 条,offset 从 10-xx
6.1.2 日志索引
每个 log 文件装备一个索引文件 *.index
文件格式为:(offset , 内存偏移地址)
综合上述,来看一个音讯的查找:
- consumer 发动申请要求从 offset= 6 的音讯开始生产
- kafka 间接依据文件名大小,发现 6 号音讯在 00000.log 这个文件里
- 那文件找到了,它在文件的哪个地位呢?
- 依据 index 文件,发现 6,9807,阐明音讯藏在这里!
- 从 log 文件的 9807 地位开始读取。
- 那读多长呢?简略,读到下一条音讯的偏移量进行就能够了
6.1.3 日志删除
Kafka 作为消息中间件,数据须要依照肯定的规定删除,否则数据量太大会把集群存储空间占满。
删除数据形式:
- 依照工夫,超过一段时间后删除过期音讯
- 依照音讯大小,音讯数量超过肯定大小后删除最旧的数据
Kafka 删除数据的最小单位:segment,也就是间接干掉文件!一删就是一个 log 和 index 文件
6.1.4 存储验证
1)数据筹备
将 broker 2 和 3 停掉,只保留 1
docker pause kafka-2 kafka-3
2)删掉 test 主题,通过 km 新建一个 test 主题,加 2 个分区
新建时,留神上面的选项:
segment.bytes = 1000,即:每个 log 文件达到 1000byte 时,开始创立新文件
删除策略:
retention.bytes = 2000,即:超出 2000byte 的旧日志被删除
retention.ms = 60000,即:超出 1 分钟后的旧日志被删除
以上任意一条满足,就会删除。
3)进入 kafka- 1 这台容器
docker exec -it kafka-1 sh
#查看容器中的文件信息
/ # ls /
bin dev etc home kafka lib lib64 media mnt opt proc root run sbin srv sys tmp usr var
/ # cd /kafka/
/kafka # ls
kafka-logs-d0b9c75080d6
/kafka # cd kafka-logs-d0b9c75080d6/
/kafka/kafka-logs-d0b9c75080d6 # ls -l | grep test
drwxr-xr-x 2 root root 4096 Jan 15 14:35 test-0
drwxr-xr-x 2 root root 4096 Jan 15 14:35 test-1
#2 个分区的日志文件清单,留神以后还没有任何音讯写进来
#timeindex:日志的工夫信息
#leader-epoch,上面会讲到
/kafka/kafka-logs-d0b9c75080d6 # ls -lR test-*
test-0:
total 4
-rw-r--r-- 1 root root 10485760 Jan 15 14:35 00000000000000000000.index
-rw-r--r-- 1 root root 0 Jan 15 14:35 00000000000000000000.log
-rw-r--r-- 1 root root 10485756 Jan 15 14:35 00000000000000000000.timeindex
-rw-r--r-- 1 root root 8 Jan 15 14:35 leader-epoch-checkpoint
test-1:
total 4
-rw-r--r-- 1 root root 10485760 Jan 15 14:35 00000000000000000000.index
-rw-r--r-- 1 root root 0 Jan 15 14:35 00000000000000000000.log
-rw-r--r-- 1 root root 10485756 Jan 15 14:35 00000000000000000000.timeindex
-rw-r--r-- 1 root root 8 Jan 15 14:35 leader-epoch-checkpoint
4)往里灌数据。启动我的项目通过 swagger 发送音讯
留神!边发送边查看上一步的文件列表信息!
# 先发送 2 条,音讯开始进来,log 文件变大!音讯在两个分区之间一一减少。/kafka/kafka-logs-d0b9c75080d6 # ls -lR test-*
test-0:
total 8
-rw-r--r-- 1 root root 10485760 Jan 15 14:35 00000000000000000000.index
-rw-r--r-- 1 root root 875 Jan 15 14:46 00000000000000000000.log
-rw-r--r-- 1 root root 10485756 Jan 15 14:35 00000000000000000000.timeindex
-rw-r--r-- 1 root root 8 Jan 15 14:35 leader-epoch-checkpoint
test-1:
total 8
-rw-r--r-- 1 root root 10485760 Jan 15 14:35 00000000000000000000.index
-rw-r--r-- 1 root root 875 Jan 15 14:46 00000000000000000000.log
-rw-r--r-- 1 root root 10485756 Jan 15 14:35 00000000000000000000.timeindex
-rw-r--r-- 1 root root 8 Jan 15 14:35 leader-epoch-checkpoint
#持续逐条发送,返回再来看文件,大小为 1000,达到边界!/kafka/kafka-logs-d0b9c75080d6 # ls -lR test-*
test-0:
total 8
-rw-r--r-- 1 root root 10485760 Jan 15 14:35 00000000000000000000.index
-rw-r--r-- 1 root root 1000 Jan 15 14:46 00000000000000000000.log
-rw-r--r-- 1 root root 10485756 Jan 15 14:35 00000000000000000000.timeindex
-rw-r--r-- 1 root root 8 Jan 15 14:35 leader-epoch-checkpoint
test-1:
total 8
-rw-r--r-- 1 root root 10485760 Jan 15 14:35 00000000000000000000.index
-rw-r--r-- 1 root root 1000 Jan 15 14:46 00000000000000000000.log
-rw-r--r-- 1 root root 10485756 Jan 15 14:35 00000000000000000000.timeindex
-rw-r--r-- 1 root root 8 Jan 15 14:35 leader-epoch-checkpoint
#持续发送音讯!1 号分区的 log 文件开始决裂
#阐明第 8 条音讯曾经进入了第二个 log
/kafka/kafka-logs-d0b9c75080d6 # ls -lR test-*
test-0:
total 8
-rw-r--r-- 1 root root 10485760 Jan 15 14:35 00000000000000000000.index
-rw-r--r-- 1 root root 1000 Jan 15 14:46 00000000000000000000.log
-rw-r--r-- 1 root root 10485756 Jan 15 14:35 00000000000000000000.timeindex
-rw-r--r-- 1 root root 8 Jan 15 14:35 leader-epoch-checkpoint
test-1:
total 20
-rw-r--r-- 1 root root 0 Jan 15 14:46 00000000000000000000.index
-rw-r--r-- 1 root root 1000 Jan 15 14:46 00000000000000000000.log
-rw-r--r-- 1 root root 12 Jan 15 14:46 00000000000000000000.timeindex
-rw-r--r-- 1 root root 10485760 Jan 15 14:46 00000000000000000008.index
-rw-r--r-- 1 root root 125 Jan 15 14:46 00000000000000000008.log #第二个 log 文件!-rw-r--r-- 1 root root 10 Jan 15 14:46 00000000000000000008.snapshot
-rw-r--r-- 1 root root 10485756 Jan 15 14:46 00000000000000000008.timeindex
-rw-r--r-- 1 root root 8 Jan 15 14:35 leader-epoch-checkpoint
#继续发送,另一个分区也开始拆散
/kafka/kafka-logs-d0b9c75080d6 # ls -lR test-*
test-0:
total 20
-rw-r--r-- 1 root root 0 Jan 15 15:55 00000000000000000000.index
-rw-r--r-- 1 root root 1000 Jan 15 14:46 00000000000000000000.log
-rw-r--r-- 1 root root 12 Jan 15 15:55 00000000000000000000.timeindex
-rw-r--r-- 1 root root 10485760 Jan 15 15:55 00000000000000000008.index
-rw-r--r-- 1 root root 625 Jan 15 15:55 00000000000000000008.log
-rw-r--r-- 1 root root 10 Jan 15 15:55 00000000000000000008.snapshot
-rw-r--r-- 1 root root 10485756 Jan 15 15:55 00000000000000000008.timeindex
-rw-r--r-- 1 root root 8 Jan 15 14:35 leader-epoch-checkpoint
test-1:
total 20
-rw-r--r-- 1 root root 0 Jan 15 14:46 00000000000000000000.index
-rw-r--r-- 1 root root 1000 Jan 15 14:46 00000000000000000000.log
-rw-r--r-- 1 root root 12 Jan 15 14:46 00000000000000000000.timeindex
-rw-r--r-- 1 root root 10485760 Jan 15 14:46 00000000000000000008.index
-rw-r--r-- 1 root root 750 Jan 15 15:55 00000000000000000008.log
-rw-r--r-- 1 root root 10 Jan 15 14:46 00000000000000000008.snapshot
-rw-r--r-- 1 root root 10485756 Jan 15 14:46 00000000000000000008.timeindex
-rw-r--r-- 1 root root 8 Jan 15 14:35 leader-epoch-checkpoint
#继续发送音讯,分区越来越多。#过一段时间后再来查看,清理工作将会执行,超出的日志被删除!(默认调度距离 5min)#log.retention.check.interval.ms 参数指定
/kafka/kafka-logs-d0b9c75080d6 # ls -lR test-*
test-0:
total 8
-rw-r--r-- 1 root root 10485760 Jan 15 19:12 00000000000000000119.index
-rw-r--r-- 1 root root 0 Jan 15 19:12 00000000000000000119.log
-rw-r--r-- 1 root root 10 Jan 15 19:12 00000000000000000119.snapshot
-rw-r--r-- 1 root root 10485756 Jan 15 19:12 00000000000000000119.timeindex
-rw-r--r-- 1 root root 10 Jan 15 19:12 leader-epoch-checkpoint
test-1:
total 8
-rw-r--r-- 1 root root 10485760 Jan 15 19:12 00000000000000000119.index
-rw-r--r-- 1 root root 0 Jan 15 19:12 00000000000000000119.log
-rw-r--r-- 1 root root 10 Jan 15 19:12 00000000000000000119.snapshot
-rw-r--r-- 1 root root 10485756 Jan 15 19:12 00000000000000000119.timeindex
-rw-r--r-- 1 root root 10 Jan 15 19:12 leader-epoch-checkpoint
6.2 零拷贝
Kafka 在执行音讯的写入和读取这么快,其中的一个起因是零拷贝(Zero-copy)技术
6.2.1 传统文件读写
传统读写,波及到 4 次数据的复制。然而这个过程中,数据齐全没有变动,咱们仅仅是想从磁盘把数据送到网卡。
那有没有方法不绕这一圈呢?让磁盘和网卡之类的外围设备间接拜访内存,而不通过 cpu?
有!这就是 DMA(Direct Memory Access 间接内存拜访)。
6.2.2 DMA
DMA 其实是由 DMA 芯片(硬件反对)来管制的。通过 DMA 管制芯片,能够让网卡等外部设备间接去读取内存,而不是由 cpu 来回拷贝传输。这就是所谓的零拷贝
目前计算机支流硬件根本都反对 DMA,就包含咱们的硬盘和网卡。
kafka 就是调取操作系统的 sendfile,借助 DMA 来实现零拷贝数据传输的
6.2.3 java 实现
为加深了解,类比为 java 中的零拷贝:
- 在 Java 中的零拷贝是通过 java.nio.channels.FileChannel 中的 transferTo 办法来实现的
- transferTo 办法底层通过 native 调操作系统的 sendfile
-
操作系统 sendfile 负责把数据从某个 fd(linux file descriptor)传输到另一个 fd
备注:linux 下所有的设施都是一个文件描述符 fd
代码参考:
File file = new File("0.log");
RandomAccessFile raf = new RandomAccessFile(file, "rw");
// 文件通道,起源
FileChannel fileChannel = raf.getChannel();
// 网络通道,去处
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("1.1.1.1", 1234));
// 对接上,通过 transfer 间接送过来
fileChannel.transferTo(0, fileChannel.size(), socketChannel);
6.3 分区一致性
6.3.1 水位值
1)先回顾两个值:
2)再看下几个值的存储地位:
留神!分区是有 leader 和 follower 的,最新写的音讯会进入 leader,follower 从 leader 不停的同步
无论 leader 还是 follower,都有本人的 HW 和 LEO,存储在各自分区所在的磁盘上
leader 多一个 Remote LEO,它示意针对各个 follower 的 LEO,leader 又额定记了一份!
3)为什么这么做呢?
leader 会拿这些 remote 值里最小的来更新本人的 hw,具体过程咱们具体往下看
6.3.2 同步原理
咱们来看这几个值是如何更新的:
1)leader.LEO
这个很简略,每次 producer 有新音讯发过来,就会减少
2)其余值
另外的 4 个值初始化都是 0
他们的更新由 follower 的 fetch(同步音讯线程)失去的数据来决定!
如果把 fetch 看做是 leader 上提供的办法,由 follower 近程申请调用,那么它的伪代码大略是这个样子:
//java 伪代码!//follower 端的操作,不停的申请从 leader 获取最新数据
class Follower{
private List<Message> messages;
private HW hw;
private LEO leo;
@Schedule("不停的向 leader 发动同步申请")
void execute(){
// 向 leader 发动 fetch 申请,将本人的 leo 传过来
//leader 返回 leo 之后最新的音讯,以及 leader 的 hw
LeaderReturn lr = leader.fetch(this.leo) ;
// 存音讯
this.messages.addAll(lr.newMsg);
// 减少 follower 的 leo 值
this.leo = this.leo + lr.newMsg.length;
// 比拟本人的 leo 和 leader 的 hw,取两者小的,作为 follower 的 hw
this.hw = min(this.leo , lr.leaderHW);
}
}
//leader 返回的报文
class LeaderReturn{
// 新增的音讯
List<Messages> newMsg;
//leader 的 hw
HW leaderHW;
}
//leader 在接到 follower 的 fetch 申请时,做的逻辑
class Leader{
private List<Message> messages;
private LEO leo;
private HW hw;
//Leader 比 follower 多了个 Remote!
// 留神!如果有多个正本,那么 RemoteLEO 也有多个,每个正本对应一个
private RemoteLEO remoteLEO;
// 接到 follower 的 fetch 申请时,leader 做的事件
LeaderReturn fetch(LEO followerLEO){
// 依据 follower 传过来的 leo,来更新 leader 的 remote
this.remoteLEO = followerLEO ;
// 而后取 ISR(所有可用正本)的最小 leo 作为 leader 的 hw
this.hw = min(this.leo , this.remoteLEO) ;
// 从 leader 的音讯列表里,查找大于 follower 的 leo 的所有新音讯
List<Message> newMsg = queryMsg(followerLEO) ;
// 将最新的音讯(大于 follower leo 的那些),以及 leader 的 hw 返回给 follower
LeaderReturn lr = new LeaderReturn(newMsg , this.hw)
return lr;
}
}
6.3.3 Leader Epoch
1)产生的背景
0.11 版本之前的 kafka,齐全借助 hw 作为音讯的基准,不论 leo。
产生故障后的规定:
- follower 故障再次复原后,从磁盘读取 hw 的值并从 hw 开始剔除前面的音讯,并同步 leader 音讯
- leader 故障后,新入选的 leader 的 hw 作为新的分区 hw,其余节点依照此 hw 进行剔除数据,并从新同步
- 上述依据 hw 进行数据恢复会呈现数据失落和不统一的状况,上面离开来看
假如:
咱们有两个正本:leader(A),follower(B)
场景一:丢数据
- 某个工夫点 B 挂了。当它复原后,以挂之前的 hw 为准,设置 leo = hw
- 这就造成一个问题:事实中,leo 很可能是 大于 hw 的。leo 被回退了!
- 如果这时候,恰好 A 也挂掉了。kafka 会重选 leader,B 被选中。
- 过段时间,A 复原后变成 follower,从 B 开始同步数据。
- 问题来了!下面说了,B 的数据是被回退过的,以它为基准会有问题
- 最终后果:两者的数据都产生失落,没有中央能够找回!
场景二:数据不统一
- 这次假如 AB 全挂了。比拟惨
- B 先复原。然而它的 hw 有可能挂之前没从 A 同步过去(原来 A 是 leader)
- 咱们假如,A.hw = 2 , B.hw = 1
- B 复原后,集群里只有它本人,所以被选为 leader,开始承受新音讯
- B.hw 上涨,变成 2
- 而后,A 复原,原来 A.hw = 2,复原后以 B 的 hw,也就是 2 为基准开始同步。
- 问题来了!B 当 leader 后新接到的 2 号音讯是不会同步给 A 的,A 始终保留着它当 leader 时的旧数据
- 最终后果:数据不统一了!
2)改良思路
0.11 之后,kafka 改良了 hw 做主的规定,这就是 leader epoch
leader epoch 给 leader 节点带了一个版本号,相似于乐观锁的设计。
它的思维是,一旦产生机器故障,重启之后,不再机械的将 leo 退回 hw
而是借助 epoch 的版本信息,去申请以后 leader,让它去算一算 leo 应该是什么
3)实现原理
比照下面丢数据的问题:
- A 为(leo=2 , hw=2),B 为(leo=2 , hw=1)
- B 重启,然而 B 不再焦急将 leo 打回 hw,而是发动一个 Epoch 申请给以后 leader,也就是 A
- A 收到 LE= 0 后,发现和本人的 LE 一样,阐明 B 在挂掉前后,leader 没变,都是 A 本人
- 那么 A 就将本人的 leo 值返回给 B,也就是数字 2
- B 收到 2 后和本人的 leo 比对取较小值,发现也是 2,那么不再退回到 hw 的 1
- 没有回退,也就是信息 1 的地位没有被笼罩,最大水平的爱护了数据
- 如果和下面一样的场景,A 挂掉,B 被选为 leader
- 那么 A 再次启动时后,从 B 开始同步数据
- 因为 B 之前没有回退,1 号信息失去了保留
- 同时,B 的 LE(epoch 号码)开始减少,从 0 变成 1,offset 记录为 B 当 leader 时的地位,也就是 2
-
A 传过来的 epoch 为 0,B 是 1,不相等。那么取大于 0 的所有 epoch 里最小的
(事实中可能产生了屡次从新选主,有多条 epoch)
- 其实就是 LE= 1 的那条。事实中可能有多条。并找到它对应的 offset(也就是 2)给 A 返回去
- 最终 A 失去了 B 同步过去的数据
再来看一致性问题的解决:
- 还是下面的场景,AB 同时挂掉,然而 hw 还没同步,那么 A.hw=2 , B.hw=1
- B 先启动被选成了 leader,新 leader 选举后,epoch 加了一条记录(参考下图,LE=1,这时候 offset=1)
- 示意 B 从 1 开始往后持续写数据,新来了条信息,内容为 m3,写到 1 号位
- A 启动前,集群只有 B 本人,音讯被确认,hw 上涨到 2,变成上面的样子
- A 开始复原,启动后向 B 发送 epoch 申请,将本人的 LE= 0 通知 leader,也就是 B
- B 发现自己的 LE 不同,同样去大于 0 的 LE 里最小的那条,也就是 1 , 对应的 offset 也是 1,返回给 A
- A 从 1 开始同步数据,将本人本地的数据截断、笼罩,hw 回升到 2
- 那么最新的写入的 m3 从 B 给同步到了 A,并笼罩了 A 上之前的旧数据 m2
- 后果:数据放弃了统一
附:epochRequest 的具体流程图
本文由传智教育博学谷 – 狂野架构师教研团队公布,转载请注明出处!
如果本文对您有帮忙,欢送关注和点赞;如果您有任何倡议也可留言评论或私信,您的反对是我保持创作的能源