乐趣区

关于java:20000-字彻底搞懂-Kafka

1、为什么有音讯零碎

1、解耦合
2、异步解决

例如电商平台,秒杀流动。

个别流程会分为:

  1. 危险管制
  2. 库存锁定
  3. 生成订单
  4. 短信告诉
  5. 更新数据

通过音讯零碎将秒杀流动业务拆离开,将不急需处理的业务放在前面缓缓解决;

流程改为:

  1. 危险管制
  2. 库存锁定
  3. 音讯零碎
  4. 生成订单
  5. 短信告诉
  6. 更新数据
3、流量的管制

3.1 网关在承受到申请后,就把申请放入到音讯队列外面

3.2 后端的服务从音讯队列外面获取到申请,实现后续的秒杀解决流程。而后再给用户返回后果。

  • 长处:管制了流量
  • 毛病:会让流程变慢

举荐一个开源收费的 Spring Boot 实战我的项目:

https://github.com/javastacks/spring-boot-best-practice

2、Kafka 外围概念

  • 生产者:Producer 往 Kafka 集群生成数据
  • 消费者:Consumer 往 Kafka 外面去获取数据,解决数据、生产数据

Kafka 的数据是由消费者本人去拉去 Kafka 外面的数据

  • 主题:topic
  • 分区:partition

默认一个 topic 有一个分区(partition),本人可设置多个分区(分区扩散存储在服务器不同节点上)

解决了一个海量数据如何存储的问题

例如:有 2T 的数据,一台服务器有 1T,一个 topic 能够分多个区,别离存储在多台服务器上,解决海量数据存储问题

3、Kafka 的集群架构

Kafka 集群中,一个 kafka 服务器就是一个 broker,Topic 只是逻辑上的概念,partition 在磁盘上就体现为一个目录。

Consumer Group:生产组,生产数据的时候,都必须指定一个 group id,指定一个组的 id

假设程序 A 和程序 B 指定的 group id 号一样,那么两个程序就属于同一个生产组

非凡:

  • 比方,有一个主题 topicA,程序 A 去生产了这个 topicA,那么程序 B 就不能再去生产 topicA(程序 A 和程序 B 属于一个生产组)
  • 再比方程序 A 曾经生产了 topicA 外面的数据,当初还是从新再次生产 topicA 的数据,是不能够的,然而从新指定一个 group id 号当前,能够生产。

不同生产组之间没有影响。生产组需自定义,消费者名称程序主动生成(举世无双)。

Controller:Kafka 节点外面的一个主节点。借助 zookeeper

4、Kafka 磁盘程序写保障写数据性能

kafka 写数据:

程序写,往磁盘上写数据时,就是追加数据,没有随机写的操作。

教训:

如果一个服务器磁盘达到肯定的个数,磁盘也达到肯定转数,往磁盘外面程序写(追加写)数据的速度和写内存的速度差不多。

生产者生产音讯,通过 kafka 服务先写到 os cache 内存中,而后通过 sync 程序写到磁盘上

5、Kafka 零拷贝机制保障读数据高性能

消费者读取数据流程:

  1. 消费者发送申请给 kafka 服务
  2. kafka 服务去 os cache 缓存读取数据(缓存没有就去磁盘读取数据)
  3. 从磁盘读取了数据到 os cache 缓存中
  4. os cache 复制数据到 kafka 应用程序中
  5. kafka 将数据(复制)发送到 socket cache 中
  6. socket cache 通过网卡传输给消费者

kafka linux sendfile 技术 — 零拷贝

  1. 消费者发送申请给 kafka 服务
  2. kafka 服务去 os cache 缓存读取数据(缓存没有就去磁盘读取数据)
  3. 从磁盘读取了数据到 os cache 缓存中
  4. os cache 间接将数据发送给网卡
  5. 通过网卡将数据传输给消费者

6、Kafka 日志分段保留

Kafka 中一个主题,个别会设置分区;比方创立了一个 topic_a,而后创立的时候指定了这个主题有三个分区。

其实在三台服务器上,会创立三个目录。

服务器 1(kafka1):

  • 创立目录 topic_a-0:
  • 目录上面是咱们文件(存储数据),kafka 数据就是 message,数据存储在 log 文件里
  • .log 结尾的就是日志文件,在 kafka 中把数据文件就叫做日志文件。

一个分区上面默认有 n 多个日志文件(分段存储),一个日志文件默认 1G

服务器 2(kafka2):

  • 创立目录 topic_a-1:

服务器 3(kafka3):

  • 创立目录 topic_a-2:

7、Kafka 二分查找定位数据

Kafka 外面每一条音讯,都有本人的 offset(绝对偏移量),存在物理磁盘下面,在 position

Position:物理地位(磁盘下面那个中央)

也就是说一条音讯就有两个地位:

  • offset:绝对偏移量(绝对地位)
  • position:磁盘物理地位

稠密索引:

  • Kafka 中采纳了稠密索引的形式读取索引,kafka 每当写入了 4k 大小的日志(.log),就往 index 里写入一个记录索引。

其中会采纳二分查找

8、高并发网络设计(先理解 NIO)

网络设计局部是 kafka 中设计最好的一个局部,这也是保障 Kafka 高并发、高性能的起因

对 kafka 进行调优,就得对 kafka 原理比拟理解,尤其是网络设计局部

Reactor 网络设计模式 1:

Reactor 网络设计模式 2:

Reactor 网络设计模式 3:

Kafka 超高并发网络设计:

9、Kafka 冗余正本保障高可用

在 kafka 外面分区是有正本的,注:0.8 以前是没有正本机制的。创立主题时,能够指定分区,也能够指定正本个数。正本是有角色的:

leader partition:

  • 写数据、读数据操作都是从 leader partition 去操作的。
  • 会保护一个 ISR(in-sync- replica)列表,然而会依据肯定的规定删除 ISR 列表外面的值

生产者发送来一个音讯,音讯首先要写入到 leader partition 中

写完了当前,还要把音讯写入到 ISR 列表外面的其它分区,写完后才算这个音讯提交

follower partition:从 leader partition 同步数据。

10、优良架构思考 - 总结

Kafka — 高并发、高可用、高性能

  • 高可用:多正本机制
  • 高并发:网络架构设计 三层架构:多 selector -> 多线程 -> 队列的设计(NIO)
  • 高性能:

写数据:

  1. 把数据先写入到 OS Cache
  2. 写到磁盘下面是程序写,性能很高

读数据:

  1. 依据稠密索引,疾速定位到要生产的数据
  2. 零拷贝机制
    • 缩小数据的拷贝
    • 缩小了应用程序与操作系统上下文切换

11、Kafka 生产环境搭建

11.1 需要场景剖析

电商平台,须要每天 10 亿申请都要发送到 Kafka 集群下面。二八反正,个别评估进去问题都不大。

10 亿申请 -> 24 过去的,个别状况下,每天的 12:00 到早上 8:00 这段时间其实是没有多大的数据量的。80% 的申请是用的另外 16 小时的解决的。16 个小时解决 -> 8 亿的申请。16 * 0.2 = 3 个小时 解决了 8 亿申请的 80% 的数据

也就是说 6 亿的数据是靠 3 个小时解决完的。咱们简略的算一下高峰期时候的 qps

 6 亿 / 3 小时 =5.5 万 /s qps=5.5 万

10 亿申请 * 50kb = 46T 每天须要存储 46T 的数据

个别状况下,咱们都会设置两个正本 46T * 2 = 92T,Kafka 外面的数据是有保留的工夫周期,保留最近 3 天的数据。

92T * 3 天 = 276T

我这儿说的是 50kb 不是说一条音讯就是 50kb 不是(把日志合并了,多条日志合并在一起),通常状况下,一条音讯就几 b,也有可能就是几百字节。

11.2 物理机数量评估

1)首先剖析一下是须要虚拟机还是物理机

像 Kafka mysql hadoop 这些集群搭建的时候,咱们生产外面都是应用物理机。

2)高峰期须要解决的申请总的申请每秒 5.5 万个,其实一两台物理机相对是能够抗住的。个别状况下,咱们评估机器的时候,是依照高峰期的 4 倍的去评估。

如果是 4 倍的话,大略咱们集群的能力要筹备到 20 万 qps。这样子的集群才是比拟平安的集群。大略就须要 5 台物理机。每台接受 4 万申请。

场景总结:

  • 搞定 10 亿申请,高峰期 5.5 万的 qps,276T 的数据,须要 5 台物理机。

11.3 磁盘抉择

搞定 10 亿申请,高峰期 5.5 万的 qps,276T 的数据,须要 5 台物理机。

1)SSD 固态硬盘,还是须要一般的机械硬盘

  • SSD 硬盘:性能比拟好,然而价格贵
  • SAS 盘:某方面性能不是很好,然而比拟便宜。

SSD 硬盘性能比拟好,指的是它随机读写的性能比拟好。适宜 MySQL 这样集群。

然而其实他的程序写的性能跟 SAS 盘差不多。

kafka 的了解:就是用的程序写。所以咱们就用一般的【机械硬盘】就能够了。

2)须要咱们评估每台服务器须要多少块磁盘

5 台服务器,一共须要 276T,大概每台服务器 须要存储 60T 的数据。咱们公司外面服务器的配置用的是 11 块硬盘,每个硬盘 7T。11 * 7T = 77T

77T * 5 台服务器 = 385T

场景总结:

  • 搞定 10 亿申请,须要 5 台物理机,11(SAS)* 7T

11.4 内存评估 搞定 10 亿申请,须要 5 台物理机,11(SAS)* 7T

咱们发现 kafka 读写数据的流程 都是基于 os cache, 换句话说假如咱们的 os cashe 无限大那么整个 kafka 是不是相当于就是基于内存去操作,如果是基于内存去操作,性能必定很好。内存是无限的。

  • 尽可能多的内存资源要给 os cache
  • Kafka 的代码用 外围的代码用的是 scala 写的,客户端的代码 java 写的。都是基于 jvm。所以咱们还要给一部分的内存给 jvm。

Kafka 的设计,没有把很多数据结构都放在 jvm 外面。所以咱们的这个 jvm 不须要太大的内存。依据教训,给个 10G 就能够了。

NameNode:jvm 外面还放了元数据(几十 G),JVM 肯定要给得很大。比方给个 100G。

假如咱们这个 10 申请的这个我的项目,一共会有 100 个 topic。100 topic * 5 partition * 2 = 1000 partition

一个 partition 其实就是物理机下面的一个目录,这个目录上面会有很多个.log 的文件。

  • .log 就是存储数据文件,默认状况下一个.log 文件的大小是 1G。

咱们如果要保障 1000 个 partition 的最新的.log 文件的数据 如果都在内存外面,这个时候性能就是最好。1000 * 1G = 1000G内存.

咱们只须要把以后最新的这个 log 保障外面的 25% 的最新的数据在内存外面。250M * 1000 = 0.25 G* 1000 =250G的内存。

  • 250 内存 / 5 = 50G内存
  • 50G+10G = 60G内存

64G 的内存,另外的 4G,操作系统本生是不是也须要内存。其实 Kafka 的 jvm 也能够不必给到 10G 这么多。评估进去 64G 是能够的。当然如果能给到 128G 的内存的服务器,那就最好。

我刚刚评估的时候用的都是一个 topic 是 5 个 partition,然而如果是数据量比拟大的 topic,可能会有 10 个 partition。

总结:

  • 搞定 10 亿申请,须要 5 台物理机,11(SAS)* 7T,须要 64G 的内存(128G 更好)

11.5 CPU 压力评估

评估一下每台服务器须要多少 cpu core(资源很无限)

咱们评估须要多少个 cpu,根据就是看咱们的服务外面有多少线程去跑。线程就是依靠 cpu 去运行的。如果咱们的线程比拟多,然而 cpu core 比拟少,这样的话,咱们的机器负载就会很高,性能不就不好。

评估一下,kafka 的一台服务器 启动当前会有多少线程?

  • Acceptor 线程 1
  • processor 线程 3 6~9 个线程
  • 解决申请线程 8 个 32 个线程
  • 定时清理的线程,拉取数据的线程,定时查看 ISR 列表的机制 等等。

所以大略一个 Kafka 的服务启动起来当前,会有一百多个线程。

  • cpu core = 4 个,一遍来说,几十个线程,就必定把 cpu 打满了。
  • cpu core = 8 个,应该很轻松的能反对几十个线程。

如果咱们的线程是 100 多个,或者差不多 200 个,那么 8 个 cpu core 是搞不定的。

所以咱们这儿倡议:

  • CPU core = 16 个。如果能够的话,能有 32 个 cpu core 那就最好。

论断:

  • kafka 集群,最低也要给 16 个 cpu core,如果能给到 32 cpu core 那就更好。
  • 2cpu * 8 =16 cpu core
  • 4cpu * 8 = 32 cpu core

总结:

  • 搞定 10 亿申请,须要 5 台物理机,11(SAS)* 7T,须要 64G 的内存(128G 更好),须要 16 个 cpu core(32 个更好)

11.6 网络需要评估

评估咱们须要什么样网卡?

个别要么是千兆的网卡(1G/s),还有的就是万兆的网卡(10G/s)

高峰期的时候 每秒会有 5.5 万的申请涌入,5.5/5 = 大概是每台服务器会有 1 万个申请涌入。咱们之前说的,10000 * 50kb = 488M 也就是每条服务器,每秒要承受 488M 的数据。数据还要有正本,正本之间的同步,也是走的网络的申请。488 * 2 = 976m/s

阐明一下:

  • 很多公司的数据,一个申请外面是没有 50kb 这么大的,咱们公司是因为主机在生产端封装了数据,而后把多条数据合并在一起了,所以咱们的一个申请才会有这么大。
  • 个别状况下,网卡的带宽是达不到极限的,如果是千兆的网卡,咱们能用的个别就是 700M 左右。然而如果最好的状况,咱们还是应用万兆的网卡。
  • 如果应用的是万兆的,那就是很轻松。

11.7 集群布局

  • 申请量
  • 布局物理机的个数
  • 剖析磁盘的个数,抉择应用什么样的磁盘
  • 内存
  • cpu core
  • 网卡

就是通知大家,当前要是公司外面有什么需要,进行资源的评估,服务器的评估,大家依照我的思路去评估。

一条音讯的大小 50kb -> 1kb 500byte 1M

ip 主机名

  • 192.168.0.100 hadoop1
  • 192.168.0.101 hadoop2
  • 192.168.0.102 hadoop3

主机的布局:kafka 集群架构的时候:主从式的架构:

  • controller -> 通过 zk 集群来治理整个集群的元数据。

zookeeper 集群

  • hadoop1
  • hadoop2
  • hadoop3

kafka 集群

  • 实践上来讲,咱们不应该把 kafka 的服务于 zk 的服务装置在一起。
  • 然而咱们这儿服务器无限。所以咱们 kafka 集群也是装置在 hadoop1 haadoop2 hadoop3

11.8 zookeeper 集群搭建

11.9 外围参数详解

11.10 集群压力测试

12、kafka 运维

12.1 常见运维工具介绍

KafkaManager — 页面管理工具

12.2 常见运维命令

场景一:topic 数据量太大,要减少 topic 数

一开始创立主题的时候,数据量不大,给的分区数不多。

kafka-topics.sh --create --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --replication-factor 1 --partitions 1 --topic test6

kafka-topics.sh --alter --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --partitions 3 --topic test6

broker id:

  • hadoop1:0
  • hadoop2:1
  • hadoop3:2

假如一个 partition 有三个正本:partition0:

a,b,c

  • a:leader partition
  • b,c:follower partition
ISR:{a,b,c}

如果一个 follower 分区 超过 10 秒 没有向 leader partition 去拉取数据,那么这个分区就从 ISR 列表外面移除。

场景二:外围 topic 减少正本因子

如果对外围业务数据须要减少正本因子

vim test.json 脚本,将上面一行 json 脚本保留

{“version”:1,“partitions”:[{“topic”:“test6”,“partition”:0,“replicas”:[0,1,2]},{“topic”:“test6”,“partition”:1,“replicas”:[0,1,2]},{“topic”:“test6”,“partition”:2,“replicas”:[0,1,2]}]}

执行下面 json 脚本:

kafka-reassign-partitions.sh --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --reassignment-json-file test.json --execute

场景三:负载不平衡的 topic,手动迁徙

vi topics-to-move.json

{“topics”: [{“topic”:“test01”}, {“topic”:“test02”}],“version”: 1}
// 把你所有的 topic 都写在这里

kafka-reassgin-partitions.sh --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --topics-to-move-json-file topics-to-move.json --broker-list“5,6”--generate
// 把你所有的包含新退出的 broker 机器都写在这里,就会说是把所有的 partition 平均的扩散在各个 broker 上,包含新进来的 broker

此时会生成一个迁徙计划,能够保留到一个文件里去:expand-cluster-reassignment.json

kafka-reassign-partitions.sh --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --reassignment-json-file expand-cluster-reassignment.json --execute

kafka-reassign-partitions.sh --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --reassignment-json-file expand-cluster-reassignment.json --verify

这种数据迁徙操作肯定要在早晨低峰的时候来做,因为他会在机器之间迁徙数据,十分的占用带宽资源

  • generate: 依据给予的 Topic 列表和 Broker 列表生成迁徙打算。generate 并不会真正进行音讯迁徙,而是将音讯迁徙打算计算出来,供 execute 命令应用。
  • execute: 依据给予的音讯迁徙打算进行迁徙。
  • verify: 查看音讯是否曾经迁徙实现。

场景四:如果某个 broker leader partition 过多

失常状况下,咱们的 leader partition 在服务器之间是负载平衡。

  • hadoop1 4
  • hadoop2 1
  • hadoop3 1

当初各个业务方能够自行申请创立 topic,分区数量都是主动调配和后续动静调整的,kafka 自身会主动把 leader partition 平均扩散在各个机器上,这样能够保障每台机器的读写吞吐量都是平均的。

然而也有例外,那就是如果某些 broker 宕机,会导致 leader partition 过于集中在其余少部分几台 broker 上,这会导致多数几台 broker 的读写申请压力过高,其余宕机的 broker 重启之后都是 folloer partition,读写申请很低。

造成集群负载不平衡有一个参数,auto.leader.rebalance.enable,默认是 true,每隔 300 秒(leader.imbalance.check.interval.seconds)查看 leader 负载是否均衡

如果一台 broker 上的不平衡的 leader 超过了 10%,leader.imbalance.per.broker.percentage,就会对这个 broker 进行选举。

配置参数:

  • auto.leader.rebalance.enable 默认是 true
  • leader.imbalance.per.broker.percentage: 每个 broker 容许的不均衡的 leader 的比率。如果每个 broker 超过了这个值,控制器会触发 leader 的均衡。这个值示意百分比。10%
  • leader.imbalance.check.interval.seconds:默认值 300 秒

13、Kafka 生产者

13.1 消费者发送音讯原理

13.2 消费者发送音讯原理—根底案例演示

13.3 如何晋升吞吐量

如何晋升吞吐量:参数一:buffer.memory:

设置发送音讯的缓冲区,默认值是 33554432,就是 32MB

参数二:compression.type:

默认是 none,不压缩,然而也能够应用 lz4 压缩,效率还是不错的,压缩之后能够减小数据量,晋升吞吐量,然而会加大 producer 端的 cpu 开销

参数三:batch.size:

  • 设置 batch 的大小,如果 batch 太小,会导致频繁网络申请,吞吐量降落;
  • 如果 batch 太大,会导致一条音讯须要期待很久能力被发送进来,而且会让内存缓冲区有很大压力,过多数据缓冲在内存里,默认值是:16384,就是 16kb,也就是一个 batch 满了 16kb 就发送进来,个别在理论生产环境,这个 batch 的值能够增大一些来晋升吞吐量,如果一个批次设置大了,会有提早。个别依据一条音讯大小来设置。
  • 如果咱们音讯比拟少。配合应用的参数 linger.ms,这个值默认是 0,意思就是音讯必须立刻被发送,然而这是不对的,个别设置一个 100 毫秒之类的,这样的话就是说,这个音讯被发送进来后进入一个 batch,如果 100 毫秒内,这个 batch 满了 16kb,天然就会发送进来。

13.4 如何解决异样

1、LeaderNotAvailableException:

这个就是如果某台机器挂了,此时 leader 正本不可用,会导致你写入失败,要期待其余 follower 正本切换为 leader 正本之后,能力持续写入,此时能够重试发送即可;如果说你平时重启 kafka 的 broker 过程,必定会导致 leader 切换,肯定会导致你写入报错,是LeaderNotAvailableException

2、NotControllerException:

这个也是同理,如果说 Controller 所在 Broker 挂了,那么此时会有问题,须要期待 Controller 从新选举,此时也是一样就是重试即可。

3、NetworkException:网络异样 timeout

  • 配置 retries 参数,他会主动重试的
  • 然而如果重试几次之后还是不行,就会提供 Exception 给咱们来解决了, 咱们获取到异样当前,再对这个音讯进行独自解决。咱们会有备用的链路。发送不胜利的音讯发送到 Redis 或者写到文件系统中,甚至是抛弃。

13.5 重试机制

重试会带来一些问题:

音讯反复

有的时候一些 leader 切换之类的问题,须要进行重试,设置 retries 即可,然而音讯重试会导致, 反复发送的问题,比如说网络抖动一下导致他认为没胜利,就重试了,其实人家都胜利了.

音讯乱序 音讯重试是可能导致音讯的乱序的,因为可能排在你前面的音讯都发送进来了。所以能够应用 ” max.in.flight.requests.per.connection“ 参数设置为 1,这样能够保障 producer 同一时间只能发送一条音讯。

两次重试的距离默认是 100 毫秒,用 ”retry.backoff.ms“ 来进行设置,基本上在开发过程中,靠重试机制根本就能够搞定 95% 的异样问题。

13.6 ACK 参数详解

producer 端

request.required.acks=0;
  • 只有申请已发送进来,就算是发送完了,不关怀有没有写胜利。
  • 性能很好,如果是对一些日志进行剖析,能够接受丢数据的状况,用这个参数,性能会很好。
request.required.acks=1;
  • 发送一条音讯,当 leader partition 写入胜利当前,才算写入胜利。
  • 不过这种形式也有丢数据的可能。
request.required.acks=-1;
  • 须要 ISR 列表外面,所有正本都写完当前,这条音讯才算写入胜利。
  • ISR:1 个正本。1 leader partition 1 follower partition

kafka 服务端:

min.insync.replicas:1

如果咱们不设置的话,默认这个值是 1,一个 leader partition 会保护一个 ISR 列表,这个值就是限度 ISR 列表外面,至多得有几个正本,比方这个值是 2,那么当 ISR 列表外面只有一个正本的时候。往这个分区插入数据的时候会报错。

设计一个不丢数据的计划:

  • 分区正本 >=2
  • acks = -1
  • min.insync.replicas >=2

还有可能就是发送有异样:对异样进行解决

13.7 自定义分区

分区:

  • 没有设置 key

咱们的音讯就会被轮训的发送到不同的分区。

  • 设置了 key

kafka 自带的分区器,会依据 key 计算出来一个 hash 值,这个 hash 值会对应某一个分区。

如果 key 雷同的,那么 hash 值必然雷同,key 雷同的值,必然是会被发送到同一个分区。

然而有些比拟非凡的时候,咱们就须要自定义分区

public class HotDataPartitioner implements Partitioner {
private Random random;
@Override
public void configure(Map<String, ?> configs) {random = new Random();
}
@Override
public int partition(String topic, Object keyObj, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {String key = (String)keyObj;
List partitionInfoList = cluster.availablePartitionsForTopic(topic);
// 获取到分区的个数 0,1,2
int partitionCount = partitionInfoList.size();
// 最初一个分区
int hotDataPartition = partitionCount - 1;
return !key.contains(“hot_data”) ? random.nextInt(partitionCount - 1) : hotDataPartition;
}
}

如何应用:

配置上这个类即可:props.put(”partitioner.class”,“com.zhss.HotDataPartitioner”);

13.8 综合案例演示

需要剖析:

电商背景 -》二手的电商平台

【欢畅送】的我的项目,用户购买了货色当前会有【星星】,用星星去换物品。一块钱一个星星。

订单零碎(音讯的生产),发送一条音讯(领取订单,勾销订单)-> Kafka <- 会员零碎,从 kafak 外面去生产数据,找到对应用户生产的金额,而后给该用户更新星星的数量。

剖析一下:

发送音讯的时候,能够指定 key,也能够不指定 key。

1)如果不指定 key

  • zhangsan -> 下订单 -> 100 -> +100
  • zhangsan -> 勾销订单 -> -100 -> -100
  • 会员零碎生产数据的时候,有可能先生产到的是 勾销订单的数据。

2)如果指定 key,key -> hash(数字)-> 对应分区号 -> 发送到对应的分区外面。

  • 如果 key 雷同的 -> 数据必定会被发送到同一个分区(有序的)

这个我的项目须要指定 key,把用户的 id 指定为 key.

14、Kafka 消费者

14.1 生产组概念

groupid 雷同就属于同一个生产组

1)每个 consumer 都要属于一个 consumer.group,就是一个生产组,topic 的一个分区只会调配给一个生产组下的一个 consumer 来解决,每个 consumer 可能会调配多个分区,也有可能某个 consumer 没有调配到任何分区。

2)如果想要实现一个播送的成果,那只须要应用不同的 group id 去生产就能够。

topicA:

  • partition0、partition1

groupA:

  • consumer1: 生产 partition0
  • consuemr2: 生产 partition1
  • consuemr3: 生产不到数据

groupB:

  • consuemr3: 生产到 partition0 和 partition1

3)如果 consumer group 中某个消费者挂了,此时会主动把调配给他的分区交给其余的消费者,如果他又重启了,那么又会把一些分区从新交还给他

14.2 根底案例演示

14.3 偏移量治理

每个 consumer 内存里数据结构保留对每个 topic 的每个分区的生产 offset,定期会提交 offset,老版本是写入 zk,然而那样高并发申请 zk 是不合理的架构设计,zk 是做分布式系统的协调的,轻量级的元数据存储,不能负责高并发读写,作为数据存储。

当初新的版本提交 offset 发送给 kafka 外部 topic:__consumer_offsets,提交过来的时候,key 是group.id+topic+ 分区号,value 就是以后 offset 的值,每隔一段时间,kafka 外部会对这个 topic 进行 compact(合并),也就是每个 group.id+topic+ 分区号就保留最新数据。

__consumer_offsets可能会接管高并发的申请,所以默认分区 50 个(leader partitiron -> 50 kafka),这样如果你的 kafka 部署了一个大的集群,比方有 50 台机器,就能够用 50 台机器来抗 offset 提交的申请压力。

  • 消费者 -> broker 端的数据
  • message -> 磁盘 -> offset 程序递增
  • 从哪儿开始生产?-> offset
  • 消费者(offset)

14.4 偏移量监控工具介绍

web 页面治理的一个管理软件(kafka Manager)

  • 批改 bin/kafka-run-class.sh 脚本,第一行减少JMX_PORT=9988
  • 重启 kafka 过程

另一个软件:次要监控的 consumer 的偏移量。

就是一个 jar 包java -cp KafkaOffsetMonitor-assembly-0.3.0-SNAPSHOT.jar

com.quantifind.kafka.offsetapp.OffsetGetterWeb
  • offsetStorage kafka \(依据版本:偏移量存在 kafka 就填 kafka,存在 zookeeper 就填 zookeeper)
  • zk hadoop1:2181
  • port 9004
  • refresh 15.seconds
  • retain 2.days

写了一段程序 , 生产 kafka 外面的数据(consumer,解决数据 -> 业务代码)-> Kafka 如何去判断你的这段代码真的是实时的去生产的呢?

提早几亿条数据 -> 阈值(20 万条的时候 发送一个告警。)

14.5 生产异样感知

heartbeat.interval.ms

  • consumer 心跳工夫距离,必须得与 coordinator 放弃心跳能力晓得 consumer 是否故障了,
  • 而后如果故障之后,就会通过心跳下发 rebalance 的指令给其余的 consumer 告诉他们进行 rebalance 的操作

session.timeout.ms

  • kafka 多长时间感知不到一个 consumer 就认为他故障了,默认是 10 秒

max.poll.interval.ms

  • 如果在两次 poll 操作之间,超过了这个工夫,那么就会认为这个 consume 解决能力太弱了,会被踢出生产组,分区调配给他人去生产,一般来说联合业务解决的性能来设置就能够了。

14.6 外围参数解释

fetch.max.bytes

获取一条音讯最大的字节数,个别倡议设置大一些,默认是 1M 其实咱们在之前多个中央都见到过这个相似的参数,意思就是说一条信息最大能多大?

  1. Producer:发送的数据,一条音讯最大多大,-> 10M
  2. Broker:存储数据,一条音讯最大能承受多大 -> 10M
  3. Consumer:

max.poll.records:

一次 poll 返回音讯的最大条数,默认是 500 条

connection.max.idle.ms

consumer 跟 broker 的 socket 连贯如果闲暇超过了肯定的工夫,此时就会主动回收连贯,然而下次生产就要从新建设 socket 连贯,这个倡议设置为 -1,不要去回收

enable.auto.commit:

开启主动提交偏移量

auto.commit.interval.ms:

每隔多久提交一次偏移量,默认值 5000 毫秒

auto.offset.reset

  • earliest:当各分区下有已提交的 offset 时,从提交的 offset 开始生产;无提交的 offset 时,从头开始生产
  • latest:当各分区下有已提交的 offset 时,从提交的 offset 开始生产;无提交的 offset 时,生产新产生的该分区下的数据
  • none:topic 各分区都存在已提交的 offset 时,从 offset 后开始生产;只有有一个分区不存在已提交的 offset,则抛出异样

14.7 综合案例演示

引入案例:二手电商平台(欢畅送),依据用户生产的金额,对用户星星进行累计。

  • 订单零碎(生产者)-> Kafka 集群外面发送了音讯。
  • 会员零碎(消费者)-> Kafak 集群外面生产音讯,对音讯进行解决。

14.8 group coordinator 原理

面试题:消费者是如何实现 rebalance 的?— 依据 coordinator 实现

什么是 coordinator

每个 consumer group 都会抉择一个 broker 作为本人的 coordinator,他是负责监控这个生产组里的各个消费者的心跳,以及判断是否宕机,而后开启 rebalance 的

如何抉择 coordinator 机器

首先对 groupId 进行 hash(数字),接着对 __consumer_offsets 的分区数量取模,默认是 50,_consumer_offsets的分区数能够通过 offsets.topic.num.partitions 来设置,找到分区当前,这个分区所在的 broker 机器就是 coordinator 机器。

比如说:groupId,“myconsumer_group”-> hash 值(数字)-> 对 50 取模 -> 8__consumer_offsets 这个主题的 8 号分区在哪台 broker 下面,那一台就是 coordinator 就晓得这个 consumer group 下的所有的消费者提交 offset 的时候是往哪个分区去提交 offset,

运行流程

  • 每个 consumer 都发送 JoinGroup 申请到 Coordinator,
  • 而后 Coordinator 从一个 consumer group 中抉择一个 consumer 作为 leader,
  • 把 consumer group 状况发送给这个 leader,
  • 接着这个 leader 会负责制订生产计划,
  • 通过 SyncGroup 发给 Coordinator
  • 接着 Coordinator 就把生产计划下发给各个 consumer,他们会从指定的分区的

leader broker 开始进行 socket 连贯以及生产音讯

14.9 rebalance 策略

consumer group 靠 coordinator 实现了 Rebalance

这里有三种 rebalance 的策略:range、round-robin、sticky

比方咱们生产的一个主题有 12 个分区:

p0,p1,p2,p3,p4,p5,p6,p7,p8,p9,p10,p11

假如咱们的消费者组外面有三个消费者

range 策略

  • range 策略就是依照 partiton 的序号范畴
  • p0~3 consumer1
  • p4~7 consumer2
  • p8~11 consumer3
  • 默认就是这个策略;

round-robin 策略

  • 就是轮询调配
  • consumer1:0,3,6,9
  • consumer2:1,4,7,10
  • consumer3:2,5,8,11

然而后面的这两个计划有个问题:12 -> 2 每个消费者会生产 6 个分区

假如 consuemr1 挂了:p0- 5 调配给 consumer2,p6-11 调配给 consumer3,这样的话,本来在 consumer2 上的的 p6,p7 分区就被调配到了 consumer3 上。

sticky 策略

最新的一个 sticky 策略,就是说尽可能保障在 rebalance 的时候,让本来属于这个 consumer 的分区还是属于他们,而后把多余的分区再平均调配过来,这样尽可能维持原来的分区调配的策略

  • consumer1:0-3
  • consumer2: 4-7
  • consumer3: 8-11

假如 consumer3 挂了

  • consumer1:0-3,+8,9
  • consumer2: 4-7,+10,11

15、Broker 治理

15.1 Leo、hw 含意

  • Kafka 的外围原理
  • 如何去评估一个集群资源
  • 搭建了一套 kafka 集群 -》介绍了简略的一些运维治理的操作。
  • 生产者(应用,外围的参数)
  • 消费者(原理,应用的,外围参数)
  • broker 外部的一些原理,外围的概念:LEO,HW

LEO:是跟 offset 偏移量有关系。

LEO:

在 kafka 外面,无论 leader partition 还是 follower partition 对立都称作正本(replica)。

每次 partition 接管到一条音讯,都会更新本人的 LEO,也就是 log end offset,LEO 其实就是最新的 offset + 1

HW:高水位

LEO 有一个很重要的性能就是更新 HW,如果 follower 和 leader 的 LEO 同步了,此时 HW 就能够更新

HW 之前的数据对消费者是可见,音讯属于 commit 状态。HW 之后的音讯消费者生产不到。

15.2 Leo 更新

15.3 hw 更新

15.4 controller 如何治理整个集群

1: 竞争 controller 的

  • /controller/id

2:controller 服务监听的目录:

  • /broker/ids/ 用来感知 broker 高低线
  • /broker/topics/ 创立主题,咱们过后创立主题命令,提供的参数,ZK 地址。
  • /admin/reassign_partitions 分区重调配

15.5 延时工作

kafka 的提早调度机制(扩大常识)

咱们先看一下 kafka 外面哪些地方须要有工作要进行提早调度。

第一类延时的工作:

比如说 producer 的 acks=-1,必须期待 leader 和 follower 都写完能力返回响应。

有一个超时工夫,默认是 30 秒(request.timeout.ms)。

所以须要在写入一条数据到 leader 磁盘之后,就必须有一个延时工作,到期工夫是 30 秒延时工作 放到 DelayedOperationPurgatory(延时管理器)中。

如果在 30 秒之前如果所有 follower 都写入正本到本地磁盘了,那么这个工作就会被主动触发昏迷,就能够返回响应后果给客户端了,否则的话,这个延时工作本人指定了最多是 30 秒到期,如果到了超时工夫都没等到,就间接超时返回异样。

第二类延时的工作:

follower 往 leader 拉取音讯的时候,如果发现是空的,此时会创立一个延时拉取工作

延时工夫到了之后(比方到了 100ms),就给 follower 返回一个空的数据,而后 follower 再次发送申请读取音讯,然而如果延时的过程中(还没到 100ms),leader 写入了音讯,这个工作就会主动昏迷,主动执行拉取工作。

海量的延时工作,须要去调度。

15.6 工夫轮机制

1. 什么会有要设计工夫轮?

Kafka 外部有很多延时工作,没有基于 JDK Timer 来实现,那个插入和删除工作的工夫复杂度是 O(nlogn),而是基于了本人写的工夫轮来实现的,工夫复杂度是 O(1),依附工夫轮机制,延时工作插入和删除,O(1)

2. 工夫轮是什么?

其实工夫轮说白其实就是一个数组。

  • tickMs: 工夫轮距离 1ms
  • wheelSize:工夫轮大小 20
  • interval:timckMS * whellSize,一个工夫轮的总的时间跨度。20ms
  • currentTime:过后工夫的指针。
    • a: 因为工夫轮是一个数组,所以要获取外面数据的时候,靠的是 index,工夫复杂度是 O(1)
    • b: 数组某个地位上对应的工作,用的是双向链表存储的,往双向链表外面插入,删除工作,工夫复杂度也是 O(1)

3. 多层级的工夫轮

比方:要插入一个 110 毫秒当前运行的工作。

  • tickMs: 工夫轮距离 20ms
  • wheelSize:工夫轮大小 20
  • interval:timckMS * whellSize,一个工夫轮的总的时间跨度。20ms
  • currentTime:过后工夫的指针。
    • 第一层工夫轮:1ms * 20
    • 第二层工夫轮:20ms * 20
    • 第三层工夫轮:400ms * 20

作者:erainm \
起源:blog.csdn.net/eraining/article/details/115860664

近期热文举荐:

1.1,000+ 道 Java 面试题及答案整顿(2022 最新版)

2. 劲爆!Java 协程要来了。。。

3.Spring Boot 2.x 教程,太全了!

4. 别再写满屏的爆爆爆炸类了,试试装璜器模式,这才是优雅的形式!!

5.《Java 开发手册(嵩山版)》最新公布,速速下载!

感觉不错,别忘了顺手点赞 + 转发哦!

退出移动版