1、为什么有音讯零碎

1、解耦合
2、异步解决

例如电商平台,秒杀流动。

个别流程会分为:

  1. 危险管制
  2. 库存锁定
  3. 生成订单
  4. 短信告诉
  5. 更新数据

通过音讯零碎将秒杀流动业务拆离开,将不急需处理的业务放在前面缓缓解决;

流程改为:

  1. 危险管制
  2. 库存锁定
  3. 音讯零碎
  4. 生成订单
  5. 短信告诉
  6. 更新数据
3、流量的管制

3.1 网关在承受到申请后,就把申请放入到音讯队列外面

3.2 后端的服务从音讯队列外面获取到申请,实现后续的秒杀解决流程。而后再给用户返回后果。

  • 长处:管制了流量
  • 毛病:会让流程变慢

举荐一个开源收费的 Spring Boot 实战我的项目:

https://github.com/javastacks/spring-boot-best-practice

2、Kafka外围概念

  • 生产者:Producer 往Kafka集群生成数据
  • 消费者:Consumer 往Kafka外面去获取数据,解决数据、生产数据

Kafka的数据是由消费者本人去拉去Kafka外面的数据

  • 主题:topic
  • 分区:partition

默认一个topic有一个分区(partition),本人可设置多个分区(分区扩散存储在服务器不同节点上)

解决了一个海量数据如何存储的问题

例如:有2T的数据,一台服务器有1T,一个topic能够分多个区,别离存储在多台服务器上,解决海量数据存储问题

3、Kafka的集群架构

Kafka集群中,一个kafka服务器就是一个broker,Topic只是逻辑上的概念,partition在磁盘上就体现为一个目录。

Consumer Group:生产组,生产数据的时候,都必须指定一个group id,指定一个组的id

假设程序A和程序B指定的group id号一样,那么两个程序就属于同一个生产组

非凡:

  • 比方,有一个主题topicA, 程序A去生产了这个topicA,那么程序B就不能再去生产topicA(程序A和程序B属于一个生产组)
  • 再比方程序A曾经生产了topicA外面的数据,当初还是从新再次生产topicA的数据,是不能够的,然而从新指定一个group id号当前,能够生产。

不同生产组之间没有影响。生产组需自定义,消费者名称程序主动生成(举世无双)。

Controller:Kafka节点外面的一个主节点。借助zookeeper

4、Kafka磁盘程序写保障写数据性能

kafka写数据:

程序写,往磁盘上写数据时,就是追加数据,没有随机写的操作。

教训:

如果一个服务器磁盘达到肯定的个数,磁盘也达到肯定转数,往磁盘外面程序写(追加写)数据的速度和写内存的速度差不多。

生产者生产音讯,通过kafka服务先写到os cache 内存中,而后通过sync程序写到磁盘上

5、Kafka零拷贝机制保障读数据高性能

消费者读取数据流程:

  1. 消费者发送申请给kafka服务
  2. kafka服务去os cache缓存读取数据(缓存没有就去磁盘读取数据)
  3. 从磁盘读取了数据到os cache缓存中
  4. os cache复制数据到kafka应用程序中
  5. kafka将数据(复制)发送到socket cache中
  6. socket cache通过网卡传输给消费者

kafka linux sendfile技术 — 零拷贝

  1. 消费者发送申请给kafka服务
  2. kafka服务去os cache缓存读取数据(缓存没有就去磁盘读取数据)
  3. 从磁盘读取了数据到os cache缓存中
  4. os cache间接将数据发送给网卡
  5. 通过网卡将数据传输给消费者

6、Kafka日志分段保留

Kafka中一个主题,个别会设置分区;比方创立了一个topic_a,而后创立的时候指定了这个主题有三个分区。

其实在三台服务器上,会创立三个目录。

服务器1(kafka1):

  • 创立目录topic_a-0:
  • 目录上面是咱们文件(存储数据),kafka数据就是message,数据存储在log文件里
  • .log结尾的就是日志文件,在kafka中把数据文件就叫做日志文件。

一个分区上面默认有n多个日志文件(分段存储),一个日志文件默认1G

服务器2(kafka2):

  • 创立目录topic_a-1:

服务器3(kafka3):

  • 创立目录topic_a-2:

7、Kafka二分查找定位数据

Kafka外面每一条音讯,都有本人的offset(绝对偏移量),存在物理磁盘下面,在position

Position:物理地位(磁盘下面那个中央)

也就是说一条音讯就有两个地位:

  • offset:绝对偏移量(绝对地位)
  • position:磁盘物理地位

稠密索引:

  • Kafka中采纳了稠密索引的形式读取索引,kafka每当写入了4k大小的日志(.log),就往index里写入一个记录索引。

其中会采纳二分查找

8、高并发网络设计(先理解NIO)

网络设计局部是kafka中设计最好的一个局部,这也是保障Kafka高并发、高性能的起因

对kafka进行调优,就得对kafka原理比拟理解,尤其是网络设计局部

Reactor网络设计模式1:

Reactor网络设计模式2:

Reactor网络设计模式3:

Kafka超高并发网络设计:

9、Kafka冗余正本保障高可用

在kafka外面分区是有正本的,注:0.8以前是没有正本机制的。创立主题时,能够指定分区,也能够指定正本个数。正本是有角色的:

leader partition:

  • 写数据、读数据操作都是从leader partition去操作的。
  • 会保护一个ISR(in-sync- replica )列表,然而会依据肯定的规定删除ISR列表外面的值

生产者发送来一个音讯,音讯首先要写入到leader partition中

写完了当前,还要把音讯写入到ISR列表外面的其它分区,写完后才算这个音讯提交

follower partition:从leader partition同步数据。

10、优良架构思考-总结

Kafka — 高并发、高可用、高性能

  • 高可用:多正本机制
  • 高并发:网络架构设计 三层架构:多selector -> 多线程 -> 队列的设计(NIO)
  • 高性能:

写数据:

  1. 把数据先写入到OS Cache
  2. 写到磁盘下面是程序写,性能很高

读数据:

  1. 依据稠密索引,疾速定位到要生产的数据
  2. 零拷贝机制
    • 缩小数据的拷贝
    • 缩小了应用程序与操作系统上下文切换

11、Kafka生产环境搭建

11.1 需要场景剖析

电商平台,须要每天10亿申请都要发送到Kafka集群下面。二八反正,个别评估进去问题都不大。

10亿申请 -> 24 过去的,个别状况下,每天的12:00 到早上8:00 这段时间其实是没有多大的数据量的。80%的申请是用的另外16小时的解决的。16个小时解决 -> 8亿的申请。16 * 0.2 = 3个小时 解决了8亿申请的80%的数据

也就是说6亿的数据是靠3个小时解决完的。咱们简略的算一下高峰期时候的qps

6亿/3小时 =5.5万/s qps=5.5万

10亿申请 * 50kb = 46T 每天须要存储46T的数据

个别状况下,咱们都会设置两个正本 46T * 2 = 92T,Kafka外面的数据是有保留的工夫周期,保留最近3天的数据。

92T * 3天 = 276T

我这儿说的是50kb不是说一条音讯就是50kb不是(把日志合并了,多条日志合并在一起),通常状况下,一条音讯就几b,也有可能就是几百字节。

11.2 物理机数量评估

1)首先剖析一下是须要虚拟机还是物理机

像Kafka mysql hadoop这些集群搭建的时候,咱们生产外面都是应用物理机。

2)高峰期须要解决的申请总的申请每秒5.5万个,其实一两台物理机相对是能够抗住的。个别状况下,咱们评估机器的时候,是依照高峰期的4倍的去评估。

如果是4倍的话,大略咱们集群的能力要筹备到 20万qps。这样子的集群才是比拟平安的集群。大略就须要5台物理机。每台接受4万申请。

场景总结:

  • 搞定10亿申请,高峰期5.5万的qps,276T的数据,须要5台物理机。

11.3 磁盘抉择

搞定10亿申请,高峰期5.5万的qps,276T的数据,须要5台物理机。

1)SSD固态硬盘,还是须要一般的机械硬盘

  • SSD硬盘:性能比拟好,然而价格贵
  • SAS盘:某方面性能不是很好,然而比拟便宜。

SSD硬盘性能比拟好,指的是它随机读写的性能比拟好。适宜MySQL这样集群。

然而其实他的程序写的性能跟SAS盘差不多。

kafka的了解:就是用的程序写。所以咱们就用一般的【机械硬盘】就能够了。

2)须要咱们评估每台服务器须要多少块磁盘

5台服务器,一共须要276T ,大概每台服务器 须要存储60T的数据。咱们公司外面服务器的配置用的是 11块硬盘,每个硬盘 7T。11 * 7T = 77T

77T * 5 台服务器 = 385T

场景总结:

  • 搞定10亿申请,须要5台物理机,11(SAS) * 7T

11.4 内存评估 搞定10亿申请,须要5台物理机,11(SAS) * 7T

咱们发现kafka读写数据的流程 都是基于os cache,换句话说假如咱们的os cashe无限大那么整个kafka是不是相当于就是基于内存去操作,如果是基于内存去操作,性能必定很好。内存是无限的。

  • 尽可能多的内存资源要给 os cache
  • Kafka的代码用 外围的代码用的是scala写的,客户端的代码java写的。都是基于jvm。所以咱们还要给一部分的内存给jvm。

Kafka的设计,没有把很多数据结构都放在jvm外面。所以咱们的这个jvm不须要太大的内存。依据教训,给个10G就能够了。

NameNode:jvm外面还放了元数据(几十G),JVM肯定要给得很大。比方给个100G。

假如咱们这个10申请的这个我的项目,一共会有100个topic。100 topic * 5 partition * 2 = 1000 partition

一个partition其实就是物理机下面的一个目录,这个目录上面会有很多个.log的文件。

  • .log就是存储数据文件,默认状况下一个.log文件的大小是1G。

咱们如果要保障 1000个partition 的最新的.log 文件的数据 如果都在内存外面,这个时候性能就是最好。1000 * 1G = 1000G内存.

咱们只须要把以后最新的这个log 保障外面的25%的最新的数据在内存外面。250M * 1000 = 0.25 G* 1000 =250G的内存。

  • 250内存 / 5 = 50G内存
  • 50G+10G = 60G内存

64G的内存,另外的4G,操作系统本生是不是也须要内存。其实Kafka的jvm也能够不必给到10G这么多。评估进去64G是能够的。当然如果能给到128G的内存的服务器,那就最好。

我刚刚评估的时候用的都是一个topic是5个partition,然而如果是数据量比拟大的topic,可能会有10个partition。

总结:

  • 搞定10亿申请,须要5台物理机,11(SAS) * 7T ,须要64G的内存(128G更好)

11.5 CPU压力评估

评估一下每台服务器须要多少cpu core(资源很无限)

咱们评估须要多少个cpu ,根据就是看咱们的服务外面有多少线程去跑。线程就是依靠cpu 去运行的。如果咱们的线程比拟多,然而cpu core比拟少,这样的话,咱们的机器负载就会很高,性能不就不好。

评估一下,kafka的一台服务器 启动当前会有多少线程?

  • Acceptor线程 1
  • processor线程 3 6~9个线程
  • 解决申请线程 8个 32个线程
  • 定时清理的线程,拉取数据的线程,定时查看ISR列表的机制 等等。

所以大略一个Kafka的服务启动起来当前,会有一百多个线程。

  • cpu core = 4个,一遍来说,几十个线程,就必定把cpu 打满了。
  • cpu core = 8个,应该很轻松的能反对几十个线程。

如果咱们的线程是100多个,或者差不多200个,那么8 个 cpu core是搞不定的。

所以咱们这儿倡议:

  • CPU core = 16个。如果能够的话,能有32个cpu core 那就最好。

论断:

  • kafka集群,最低也要给16个cpu core,如果能给到32 cpu core那就更好。
  • 2cpu * 8 =16 cpu core
  • 4cpu * 8 = 32 cpu core

总结:

  • 搞定10亿申请,须要5台物理机, 11(SAS) * 7T ,须要64G的内存(128G更好),须要16个cpu core(32个更好)

11.6 网络需要评估

评估咱们须要什么样网卡?

个别要么是千兆的网卡(1G/s),还有的就是万兆的网卡(10G/s)

高峰期的时候 每秒会有5.5万的申请涌入,5.5/5 = 大概是每台服务器会有1万个申请涌入。咱们之前说的,10000 * 50kb = 488M 也就是每条服务器,每秒要承受488M的数据。数据还要有正本,正本之间的同步,也是走的网络的申请。488 * 2 = 976m/s

阐明一下:

  • 很多公司的数据,一个申请外面是没有50kb这么大的,咱们公司是因为主机在生产端封装了数据,而后把多条数据合并在一起了,所以咱们的一个申请才会有这么大。
  • 个别状况下,网卡的带宽是达不到极限的,如果是千兆的网卡,咱们能用的个别就是700M左右。然而如果最好的状况,咱们还是应用万兆的网卡。
  • 如果应用的是万兆的,那就是很轻松。

11.7 集群布局

  • 申请量
  • 布局物理机的个数
  • 剖析磁盘的个数,抉择应用什么样的磁盘
  • 内存
  • cpu core
  • 网卡

就是通知大家,当前要是公司外面有什么需要,进行资源的评估,服务器的评估,大家依照我的思路去评估。

一条音讯的大小 50kb -> 1kb 500byte 1M

ip 主机名

  • 192.168.0.100 hadoop1
  • 192.168.0.101 hadoop2
  • 192.168.0.102 hadoop3

主机的布局:kafka集群架构的时候:主从式的架构:

  • controller -> 通过zk集群来治理整个集群的元数据。

zookeeper集群

  • hadoop1
  • hadoop2
  • hadoop3

kafka集群

  • 实践上来讲,咱们不应该把kafka的服务于zk的服务装置在一起。
  • 然而咱们这儿服务器无限。所以咱们kafka集群也是装置在hadoop1 haadoop2 hadoop3

11.8 zookeeper集群搭建

11.9 外围参数详解

11.10 集群压力测试

12、kafka运维

12.1 常见运维工具介绍

KafkaManager — 页面管理工具

12.2 常见运维命令

场景一:topic数据量太大,要减少topic数

一开始创立主题的时候,数据量不大,给的分区数不多。

kafka-topics.sh --create --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --replication-factor 1 --partitions 1 --topic test6kafka-topics.sh --alter --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --partitions 3 --topic test6

broker id:

  • hadoop1:0
  • hadoop2:1
  • hadoop3:2

假如一个partition有三个正本:partition0:

a,b,c
  • a:leader partition
  • b,c:follower partition
ISR:{a,b,c}

如果一个follower分区 超过10秒 没有向leader partition去拉取数据,那么这个分区就从ISR列表外面移除。

场景二:外围topic减少正本因子

如果对外围业务数据须要减少正本因子

vim test.json脚本,将上面一行json脚本保留

{“version”:1,“partitions”:[{“topic”:“test6”,“partition”:0,“replicas”:[0,1,2]},{“topic”:“test6”,“partition”:1,“replicas”:[0,1,2]},{“topic”:“test6”,“partition”:2,“replicas”:[0,1,2]}]}

执行下面json脚本:

kafka-reassign-partitions.sh --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --reassignment-json-file test.json --execute

场景三:负载不平衡的topic,手动迁徙

vi topics-to-move.json

{“topics”: [{“topic”: “test01”}, {“topic”: “test02”}], “version”: 1}// 把你所有的topic都写在这里kafka-reassgin-partitions.sh --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --topics-to-move-json-file topics-to-move.json --broker-list “5,6” --generate// 把你所有的包含新退出的broker机器都写在这里,就会说是把所有的partition平均的扩散在各个broker上,包含新进来的broker

此时会生成一个迁徙计划,能够保留到一个文件里去:expand-cluster-reassignment.json

kafka-reassign-partitions.sh --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --reassignment-json-file expand-cluster-reassignment.json --executekafka-reassign-partitions.sh --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --reassignment-json-file expand-cluster-reassignment.json --verify

这种数据迁徙操作肯定要在早晨低峰的时候来做,因为他会在机器之间迁徙数据,十分的占用带宽资源

  • generate: 依据给予的Topic列表和Broker列表生成迁徙打算。generate并不会真正进行音讯迁徙,而是将音讯迁徙打算计算出来,供execute命令应用。
  • execute: 依据给予的音讯迁徙打算进行迁徙。
  • verify: 查看音讯是否曾经迁徙实现。

场景四:如果某个broker leader partition过多

失常状况下,咱们的leader partition在服务器之间是负载平衡。

  • hadoop1 4
  • hadoop2 1
  • hadoop3 1

当初各个业务方能够自行申请创立topic,分区数量都是主动调配和后续动静调整的,kafka自身会主动把leader partition平均扩散在各个机器上,这样能够保障每台机器的读写吞吐量都是平均的。

然而也有例外,那就是如果某些broker宕机,会导致leader partition过于集中在其余少部分几台broker上,这会导致多数几台broker的读写申请压力过高,其余宕机的broker重启之后都是folloer partition,读写申请很低。

造成集群负载不平衡有一个参数,auto.leader.rebalance.enable,默认是true,每隔300秒(leader.imbalance.check.interval.seconds)查看leader负载是否均衡

如果一台broker上的不平衡的leader超过了10%,leader.imbalance.per.broker.percentage,就会对这个broker进行选举。

配置参数:

  • auto.leader.rebalance.enable 默认是true
  • leader.imbalance.per.broker.percentage: 每个broker容许的不均衡的leader的比率。如果每个broker超过了这个值,控制器会触发leader的均衡。这个值示意百分比。10%
  • leader.imbalance.check.interval.seconds:默认值300秒

13、Kafka生产者

13.1 消费者发送音讯原理

13.2 消费者发送音讯原理—根底案例演示

13.3 如何晋升吞吐量

如何晋升吞吐量:参数一:buffer.memory:

设置发送音讯的缓冲区,默认值是33554432,就是32MB

参数二:compression.type:

默认是none,不压缩,然而也能够应用lz4压缩,效率还是不错的,压缩之后能够减小数据量,晋升吞吐量,然而会加大producer端的cpu开销

参数三:batch.size:

  • 设置batch的大小,如果batch太小,会导致频繁网络申请,吞吐量降落;
  • 如果batch太大,会导致一条音讯须要期待很久能力被发送进来,而且会让内存缓冲区有很大压力,过多数据缓冲在内存里,默认值是:16384,就是16kb,也就是一个batch满了16kb就发送进来,个别在理论生产环境,这个batch的值能够增大一些来晋升吞吐量,如果一个批次设置大了,会有提早。个别依据一条音讯大小来设置。
  • 如果咱们音讯比拟少。配合应用的参数linger.ms,这个值默认是0,意思就是音讯必须立刻被发送,然而这是不对的,个别设置一个100毫秒之类的,这样的话就是说,这个音讯被发送进来后进入一个batch,如果100毫秒内,这个batch满了16kb,天然就会发送进来。

13.4 如何解决异样

1、LeaderNotAvailableException:

这个就是如果某台机器挂了,此时leader正本不可用,会导致你写入失败,要期待其余follower正本切换为leader正本之后,能力持续写入,此时能够重试发送即可;如果说你平时重启kafka的broker过程,必定会导致leader切换,肯定会导致你写入报错,是LeaderNotAvailableException

2、NotControllerException:

这个也是同理,如果说Controller所在Broker挂了,那么此时会有问题,须要期待Controller从新选举,此时也是一样就是重试即可。

3、NetworkException:网络异样 timeout

  • 配置retries参数,他会主动重试的
  • 然而如果重试几次之后还是不行,就会提供Exception给咱们来解决了,咱们获取到异样当前,再对这个音讯进行独自解决。咱们会有备用的链路。发送不胜利的音讯发送到Redis或者写到文件系统中,甚至是抛弃。

13.5 重试机制

重试会带来一些问题:

音讯反复

有的时候一些leader切换之类的问题,须要进行重试,设置retries即可,然而音讯重试会导致,反复发送的问题,比如说网络抖动一下导致他认为没胜利,就重试了,其实人家都胜利了.

音讯乱序音讯重试是可能导致音讯的乱序的,因为可能排在你前面的音讯都发送进来了。所以能够应用" max.in.flight.requests.per.connection"参数设置为1,这样能够保障producer同一时间只能发送一条音讯。

两次重试的距离默认是100毫秒,用"retry.backoff.ms"来进行设置,基本上在开发过程中,靠重试机制根本就能够搞定95%的异样问题。

13.6 ACK参数详解

producer端

request.required.acks=0;
  • 只有申请已发送进来,就算是发送完了,不关怀有没有写胜利。
  • 性能很好,如果是对一些日志进行剖析,能够接受丢数据的状况,用这个参数,性能会很好。
request.required.acks=1;
  • 发送一条音讯,当leader partition写入胜利当前,才算写入胜利。
  • 不过这种形式也有丢数据的可能。
request.required.acks=-1;
  • 须要ISR列表外面,所有正本都写完当前,这条音讯才算写入胜利。
  • ISR:1个正本。1 leader partition 1 follower partition

kafka服务端:

min.insync.replicas:1

如果咱们不设置的话,默认这个值是1,一个leader partition会保护一个ISR列表,这个值就是限度ISR列表外面,至多得有几个正本,比方这个值是2,那么当ISR列表外面只有一个正本的时候。往这个分区插入数据的时候会报错。

设计一个不丢数据的计划:

  • 分区正本 >=2
  • acks = -1
  • min.insync.replicas >=2

还有可能就是发送有异样:对异样进行解决

13.7 自定义分区

分区:

  • 没有设置key

咱们的音讯就会被轮训的发送到不同的分区。

  • 设置了key

kafka自带的分区器,会依据key计算出来一个hash值,这个hash值会对应某一个分区。

如果key雷同的,那么hash值必然雷同,key雷同的值,必然是会被发送到同一个分区。

然而有些比拟非凡的时候,咱们就须要自定义分区

public class HotDataPartitioner implements Partitioner {private Random random;@Overridepublic void configure(Map<String, ?> configs) {random = new Random();}@Overridepublic int partition(String topic, Object keyObj, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {String key = (String)keyObj;List partitionInfoList = cluster.availablePartitionsForTopic(topic);//获取到分区的个数 0,1,2int partitionCount = partitionInfoList.size();//最初一个分区int hotDataPartition = partitionCount - 1;return !key.contains(“hot_data”) ? random.nextInt(partitionCount - 1) : hotDataPartition;}}

如何应用:

配置上这个类即可:props.put(”partitioner.class”, “com.zhss.HotDataPartitioner”);

13.8 综合案例演示

需要剖析:

电商背景 -》 二手的电商平台

【欢畅送】的我的项目,用户购买了货色当前会有【星星】,用星星去换物品。一块钱一个星星。

订单零碎(音讯的生产),发送一条音讯(领取订单,勾销订单) -> Kafka <- 会员零碎,从kafak外面去生产数据,找到对应用户生产的金额,而后给该用户更新星星的数量。

剖析一下:

发送音讯的时候,能够指定key,也能够不指定key。

1)如果不指定key

  • zhangsan ->下订单 -> 100 -> +100
  • zhangsan -> 勾销订单 -> -100 -> -100
  • 会员零碎生产数据的时候,有可能先生产到的是 勾销订单的数据。

2)如果指定key,key -> hash(数字) -> 对应分区号 -> 发送到对应的分区外面。

  • 如果key雷同的 -> 数据必定会被发送到同一个分区(有序的)

这个我的项目须要指定key,把用户的id指定为key.

14、Kafka消费者

14.1 生产组概念

groupid雷同就属于同一个生产组

1)每个consumer都要属于一个consumer.group,就是一个生产组,topic的一个分区只会调配给一个生产组下的一个consumer来解决,每个consumer可能会调配多个分区,也有可能某个consumer没有调配到任何分区。

2)如果想要实现一个播送的成果,那只须要应用不同的group id去生产就能够。

topicA:

  • partition0、partition1

groupA:

  • consumer1:生产 partition0
  • consuemr2:生产 partition1
  • consuemr3:生产不到数据

groupB:

  • consuemr3:生产到partition0和partition1

3)如果consumer group中某个消费者挂了,此时会主动把调配给他的分区交给其余的消费者,如果他又重启了,那么又会把一些分区从新交还给他

14.2 根底案例演示

14.3 偏移量治理

每个consumer内存里数据结构保留对每个topic的每个分区的生产offset,定期会提交offset,老版本是写入zk,然而那样高并发申请zk是不合理的架构设计,zk是做分布式系统的协调的,轻量级的元数据存储,不能负责高并发读写,作为数据存储。

当初新的版本提交offset发送给kafka外部topic:__consumer_offsets,提交过来的时候, key是group.id+topic+分区号,value就是以后offset的值,每隔一段时间,kafka外部会对这个topic进行compact(合并),也就是每个group.id+topic+分区号就保留最新数据。

__consumer_offsets可能会接管高并发的申请,所以默认分区50个(leader partitiron -> 50 kafka),这样如果你的kafka部署了一个大的集群,比方有50台机器,就能够用50台机器来抗offset提交的申请压力。

  • 消费者 -> broker端的数据
  • message -> 磁盘 -> offset 程序递增
  • 从哪儿开始生产?-> offset
  • 消费者(offset)

14.4 偏移量监控工具介绍

web页面治理的一个管理软件(kafka Manager)

  • 批改bin/kafka-run-class.sh脚本,第一行减少JMX_PORT=9988
  • 重启kafka过程

另一个软件:次要监控的consumer的偏移量。

就是一个jar包java -cp KafkaOffsetMonitor-assembly-0.3.0-SNAPSHOT.jar

com.quantifind.kafka.offsetapp.OffsetGetterWeb
  • offsetStorage kafka \(依据版本:偏移量存在kafka就填kafka,存在zookeeper就填zookeeper)
  • zk hadoop1:2181
  • port 9004
  • refresh 15.seconds
  • retain 2.days

写了一段程序 ,生产kafka外面的数据(consumer,解决数据 -> 业务代码) -> Kafka 如何去判断你的这段代码真的是实时的去生产的呢?

提早几亿条数据 -> 阈值(20万条的时候 发送一个告警。)

14.5 生产异样感知

heartbeat.interval.ms

  • consumer心跳工夫距离,必须得与coordinator放弃心跳能力晓得consumer是否故障了,
  • 而后如果故障之后,就会通过心跳下发rebalance的指令给其余的consumer告诉他们进行rebalance的操作

session.timeout.ms

  • kafka多长时间感知不到一个consumer就认为他故障了,默认是10秒

max.poll.interval.ms

  • 如果在两次poll操作之间,超过了这个工夫,那么就会认为这个consume解决能力太弱了,会被踢出生产组,分区调配给他人去生产,一般来说联合业务解决的性能来设置就能够了。

14.6 外围参数解释

fetch.max.bytes

获取一条音讯最大的字节数,个别倡议设置大一些,默认是1M 其实咱们在之前多个中央都见到过这个相似的参数,意思就是说一条信息最大能多大?

  1. Producer:发送的数据,一条音讯最大多大, -> 10M
  2. Broker:存储数据,一条音讯最大能承受多大 -> 10M
  3. Consumer:

max.poll.records:

一次poll返回音讯的最大条数,默认是500条

connection.max.idle.ms

consumer跟broker的socket连贯如果闲暇超过了肯定的工夫,此时就会主动回收连贯,然而下次生产就要从新建设socket连贯,这个倡议设置为-1,不要去回收

enable.auto.commit:

开启主动提交偏移量

auto.commit.interval.ms:

每隔多久提交一次偏移量,默认值5000毫秒

auto.offset.reset

  • earliest:当各分区下有已提交的offset时,从提交的offset开始生产;无提交的offset时,从头开始生产
  • latest:当各分区下有已提交的offset时,从提交的offset开始生产;无提交的offset时,生产新产生的该分区下的数据
  • none:topic各分区都存在已提交的offset时,从offset后开始生产;只有有一个分区不存在已提交的offset,则抛出异样

14.7 综合案例演示

引入案例:二手电商平台(欢畅送),依据用户生产的金额,对用户星星进行累计。

  • 订单零碎(生产者) -> Kafka集群外面发送了音讯。
  • 会员零碎(消费者) -> Kafak集群外面生产音讯,对音讯进行解决。

14.8 group coordinator原理

面试题:消费者是如何实现rebalance的?— 依据coordinator实现

什么是coordinator

每个consumer group都会抉择一个broker作为本人的coordinator,他是负责监控这个生产组里的各个消费者的心跳,以及判断是否宕机,而后开启rebalance的

如何抉择coordinator机器

首先对groupId进行hash(数字),接着对__consumer_offsets的分区数量取模,默认是50,_consumer_offsets的分区数能够通过offsets.topic.num.partitions来设置,找到分区当前,这个分区所在的broker机器就是coordinator机器。

比如说:groupId,“myconsumer_group” -> hash值(数字)-> 对50取模 -> 8__consumer_offsets 这个主题的8号分区在哪台broker下面,那一台就是coordinator 就晓得这个consumer group下的所有的消费者提交offset的时候是往哪个分区去提交offset,

运行流程

  • 每个consumer都发送JoinGroup申请到Coordinator,
  • 而后Coordinator从一个consumer group中抉择一个consumer作为leader,
  • 把consumer group状况发送给这个leader,
  • 接着这个leader会负责制订生产计划,
  • 通过SyncGroup发给Coordinator
  • 接着Coordinator就把生产计划下发给各个consumer,他们会从指定的分区的

leader broker开始进行socket连贯以及生产音讯

14.9 rebalance策略

consumer group靠coordinator实现了Rebalance

这里有三种rebalance的策略:range、round-robin、sticky

比方咱们生产的一个主题有12个分区:

p0,p1,p2,p3,p4,p5,p6,p7,p8,p9,p10,p11

假如咱们的消费者组外面有三个消费者

range策略

  • range策略就是依照partiton的序号范畴
  • p0~3 consumer1
  • p4~7 consumer2
  • p8~11 consumer3
  • 默认就是这个策略;

round-robin策略

  • 就是轮询调配
  • consumer1:0,3,6,9
  • consumer2:1,4,7,10
  • consumer3:2,5,8,11

然而后面的这两个计划有个问题:12 -> 2 每个消费者会生产6个分区

假如consuemr1挂了:p0-5调配给consumer2,p6-11调配给consumer3,这样的话,本来在consumer2上的的p6,p7分区就被调配到了 consumer3上。

sticky策略

最新的一个sticky策略,就是说尽可能保障在rebalance的时候,让本来属于这个consumer的分区还是属于他们,而后把多余的分区再平均调配过来,这样尽可能维持原来的分区调配的策略

  • consumer1:0-3
  • consumer2: 4-7
  • consumer3: 8-11

假如consumer3挂了

  • consumer1:0-3,+8,9
  • consumer2: 4-7,+10,11

15、Broker治理

15.1 Leo、hw含意

  • Kafka的外围原理
  • 如何去评估一个集群资源
  • 搭建了一套kafka集群 -》 介绍了简略的一些运维治理的操作。
  • 生产者(应用,外围的参数)
  • 消费者(原理,应用的,外围参数)
  • broker外部的一些原理,外围的概念:LEO,HW
LEO:是跟offset偏移量有关系。

LEO:

在kafka外面,无论leader partition还是follower partition对立都称作正本(replica)。

每次partition接管到一条音讯,都会更新本人的LEO,也就是log end offset,LEO其实就是最新的offset + 1

HW:高水位

LEO有一个很重要的性能就是更新HW,如果follower和leader的LEO同步了,此时HW就能够更新

HW之前的数据对消费者是可见,音讯属于commit状态。HW之后的音讯消费者生产不到。

15.2 Leo更新

15.3 hw更新

15.4 controller如何治理整个集群

1: 竞争controller的

  • /controller/id

2:controller服务监听的目录:

  • /broker/ids/ 用来感知 broker高低线
  • /broker/topics/ 创立主题,咱们过后创立主题命令,提供的参数,ZK地址。
  • /admin/reassign_partitions 分区重调配

15.5 延时工作

kafka的提早调度机制(扩大常识)

咱们先看一下kafka外面哪些地方须要有工作要进行提早调度。

第一类延时的工作:

比如说producer的acks=-1,必须期待leader和follower都写完能力返回响应。

有一个超时工夫,默认是30秒(request.timeout.ms)。

所以须要在写入一条数据到leader磁盘之后,就必须有一个延时工作,到期工夫是30秒延时工作 放到DelayedOperationPurgatory(延时管理器)中。

如果在30秒之前如果所有follower都写入正本到本地磁盘了,那么这个工作就会被主动触发昏迷,就能够返回响应后果给客户端了,否则的话,这个延时工作本人指定了最多是30秒到期,如果到了超时工夫都没等到,就间接超时返回异样。

第二类延时的工作:

follower往leader拉取音讯的时候,如果发现是空的,此时会创立一个延时拉取工作

延时工夫到了之后(比方到了100ms),就给follower返回一个空的数据,而后follower再次发送申请读取音讯,然而如果延时的过程中(还没到100ms),leader写入了音讯,这个工作就会主动昏迷,主动执行拉取工作。

海量的延时工作,须要去调度。

15.6 工夫轮机制

1.什么会有要设计工夫轮?

Kafka外部有很多延时工作,没有基于JDK Timer来实现,那个插入和删除工作的工夫复杂度是O(nlogn),而是基于了本人写的工夫轮来实现的,工夫复杂度是O(1),依附工夫轮机制,延时工作插入和删除,O(1)

2.工夫轮是什么?

其实工夫轮说白其实就是一个数组。

  • tickMs:工夫轮距离 1ms
  • wheelSize:工夫轮大小 20
  • interval:timckMS * whellSize,一个工夫轮的总的时间跨度。20ms
  • currentTime:过后工夫的指针。
    • a:因为工夫轮是一个数组,所以要获取外面数据的时候,靠的是index,工夫复杂度是O(1)
    • b:数组某个地位上对应的工作,用的是双向链表存储的,往双向链表外面插入,删除工作,工夫复杂度也是O(1)

3.多层级的工夫轮

比方:要插入一个110毫秒当前运行的工作。

  • tickMs:工夫轮距离 20ms
  • wheelSize:工夫轮大小 20
  • interval:timckMS * whellSize,一个工夫轮的总的时间跨度。20ms
  • currentTime:过后工夫的指针。
    • 第一层工夫轮:1ms * 20
    • 第二层工夫轮:20ms * 20
    • 第三层工夫轮:400ms * 20

作者:erainm \
起源:blog.csdn.net/eraining/article/details/115860664

近期热文举荐:

1.1,000+ 道 Java面试题及答案整顿(2022最新版)

2.劲爆!Java 协程要来了。。。

3.Spring Boot 2.x 教程,太全了!

4.别再写满屏的爆爆爆炸类了,试试装璜器模式,这才是优雅的形式!!

5.《Java开发手册(嵩山版)》最新公布,速速下载!

感觉不错,别忘了顺手点赞+转发哦!