前言
Kafka是一个分布式的流解决平台(0.10.x版本),在kafka0.8.x版本的时候,kafka次要是作为一个分布式的、可分区的、具备大数据培训正本数的日志服务零碎(Kafka™ is a distributed, partitioned, replicated commit log service), 具备高水平扩展性、高容错性、访问速度快、分布式等个性;次要利用场景是:日志收集零碎和音讯零碎
为什么应用Kafka 它有什么劣势
有以下特点:大数据畛域、高吞吐量、低提早、可扩展性、持久性、可靠性、容错性、高并发
Kafka适宜以下利用场景
大数据畛域、日志收集、音讯零碎、流动跟踪、数据处理、行为日志、等等方面
Kafka常常被用到的模式
有点对点或者 公布/订阅模式:点对点很容易了解 一对一的公布承受音讯,公布和订阅模式 相似于播送 就比方 微信公众号推文,每天要给这一个公众号外面的所有用户推文,不能一对一的发送音讯,要一对一的发送万一有几百万粉丝订阅,这得创立多少队列在录入数据的时候得有多麻烦,所以就要应用公布/订阅模式去解决,公布/订阅模式外面波及了一个比拟好的点就是 我能够依据场景去抉择我要哪一种模式,一种是能够让消费者主动去拉取数据本人去管制流量运行的速度,然而这里有一个毛病就是kafka被动被消费者程序拉去,被动询问是否有新的音讯 这里防止不了有一个循环,始终去询问kafka有没有新数据,比拟浪费资源。 一种是我被动推送音讯到消费者那里,微信公众号就是这样的逻辑,被动推送音讯给粉丝。
架构图

生产者:用来生产音讯并推送到对应的主题外面。
kafka集群:能够了解为部署多个kafka,能够在一台机器下面 也能够不输在多台机器下面。
broker:能够了解为单个kafka,也就是图下面机器N 的官网名称。
topic:主题,生产者/消费者订阅主题进行收发音讯,次要为了解决某一kafka下面能够反对多个程序处理不同的事。
leader/follower:一个是备份 一个是leader,当leader出问题的时候,follower会上移进化成leader,防止咱们的服务挂掉。
分区:次要是用来削峰,能让咱们的程序能平均的解决音讯。
消费者:消费者外面有一个概念,就是有一个消费者组的这么一说,外面有一个要留神的中央就是 某一个主题只能被某一个消费者组外面某一个用户用来生产,就是同一个组外面消费者不能订阅同一个主题,最好是主题与消费者组外面消费者个数统一 否则会造成资源节约,造成闲暇的消费者。
zk:用于kafka注册音讯应用,9.0以下版本 音讯解决的偏移量放到zk外面存储,9.0以上偏移量就放到kafka topic外面存储,次要就是为了解决不频繁的与zk交互。
常用命令
zk 装置:去官网下载对应压缩包:https://zookeeper.apache.org/...
cd /tmp/zookeeper/zookeeper-node1/conf
cp /tmp/zookeeper/zookeeper-node1/conf/zoo_sample.cfg /tmp/zookeeper/zookeeper-node1/conf/zoo.cfg

data与log 默认zookeeper外面没有须要手动创立

zoo.cfg配置文件须要批改的中央:dataDir=/tmp/zookeeper/zookeeper-node1/data #音讯缓存门路

dataLogDir=/tmp/zookeeper/zookeeper-node1/log #音讯log门路

zookeeper集群须要配置server 伪集群只须要改一下对应的端口就OK了

server.1=47.94.2.151:2888:3888

server.2=47.94.2.151:2889:3889

server.3=47.94.2.151:2890:3890启动zk的时候要查看是否有java环境,没有的话须要装置一下 以centos7为例:yum install java

启动zkServer.sh (start|stop|restart)等

启动命令行调试:bash zkCli.sh

ls / 查看是否有kafka注册上来

ls /broker 查看机器

broker 上面与主题等kafka 应用docker装置

这里应用 docker-compose,具体配置文件 docker-compose.yml 内容如下
version: '3'
services:
zookeeper:

image: wurstmeister/zookeepervolumes:  - /etc/localtime:/etc/localtime:roports:  - "2181:2181"restart: always

kafka:

image: wurstmeister/kafkaports:  - "9092:9092"environment:  KAFKA_ADVERTISED_HOST_NAME: localhost  KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181volumes:  - /var/run/docker.sock:/var/run/docker.sock  - /data/db/kafka:/kafka/KafkaLog  - /etc/localtime:/etc/localtime:rodepends_on:  - zookeeperrestart: alwaysdocker-compose根底命令

docker-compose up -d 在后盾启动
docker-compose down 进行kafka分布式装置
去官网下载对应压缩包:http://kafka.apache.org/downl...
vim /tmp/kafka/config/server.properties
机器的ID必须惟一且为int
broker.id=0
kafka地址
listeners=PLAINTEXT://127.0.0.1:9092
音讯存储门路
log.dirs=/tmp/kafka/logs
zk 门路
zookeeper.connect=127.0.0.1:2181kafka启动敞开:
start开启 stop进行
bash /bin/kafka-server-start.sh -daemon ../config/server.propertieskafka cmd操作:
所有主题:
bash kafka-topics.sh --list --zookeeper 127.0.0.1:2181创立:
bash kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1
--topic test删除:
bash kafka-topics.sh --delete --zookeeper localhost:2181 --topic testkafka 生产者:
./kafka-c testkafka 消费者:
./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic testkafka消费者组:
bash kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic deamon --
consumer.config ../config/consumer.properties
bash kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic deamon --
consumer.config ../config/consumer.properties生产形式+分区调配策略
注:消费者组外面的消费者不可反复订阅 主题外面的分区
生产形式:
次要有两种:

  1. 音讯队列被动推送音讯给对应订阅的消费者,不好之处就是不晓得消费者那面解决的速度如何,还有就是消费者没方法管制音讯发送的速度。
  2. 消费者被动拉取音讯队列中的数据,不好之处是消费者因为须要常常去询问是否有数据须要始终有一个循环去询问,倡议在循环外面减少sleep,如果没有拉取到数据就让程序劳动一下,不然始终空转耗费比拟大。

分区调配策略
次要有两种:

  1. 轮训,他的不好之处就是在有消费者组和多个分区的状况下有可能会生产到不是本消费者订阅的数据,因为他的解决逻辑是把该消费者组订阅的所有主题都当为一个主题,来轮训,所有有的消费者订阅了有的没订阅,就会呈现生产多的状况,然而个别的状况下也不会这么设计,最好是在全副都订阅雷同主题的状况上来应用这种形式
  2. 范畴,底层逻辑 用分区数磨消费者

文件存储机制
简略形容:每一个主题上面都会有不同多个分区,分区的存储是依照 主题名称+序列,序列是从0开始的。

上图的左半局部是索引文件,外面存储的是一对一对的key-value,其中key是音讯在数据文件(对应的log文件)中的编号,比方“1,3,6,8……”,
别离示意在log文件中的第1条音讯、第3条音讯、第6条音讯、第8条音讯……,那么为什么在index文件中这些编号不是间断的呢?
这是因为index文件中并没有为数据文件中的每条音讯都建设索引,而是采纳了稠密存储的形式,每隔肯定字节的数据建设一条索引。
这样防止了索引文件占用过多的空间,从而能够将索引文件保留在内存中。
但毛病是没有建设索引的Message也不能一次定位到其在数据文件的地位,从而须要做一次程序扫描,然而这次程序扫描的范畴就很小了。其中以索引文件中元数据3,497为例,其中3代表在左边log数据文件中从上到下第3个音讯(在全局partiton示意第368772个音讯),
其中497示意该音讯的物理偏移地址(地位)为497。
生产者(数据一致性与分区策略)选举机制ISR 故障细节解决
分区策略,生产者 在生产音讯 往 分区外面打数据的时候有三种模式:

  1. 能够指定partition 去生产数据。
  2. 能够指定对应的key值,kafka会依据key值算hash 而后除以 分区数据 再取余数。
  3. partition没有指定 key也没有指定 会随机选取一个分区 而后在之后的发送数据会轮询着来发送。

数据一致性
生产者发送数据 如何保持数据一致性的问题:
都晓得 kafka 分区外面是有 leader 和 follower的,这块引出一个概念就是ack(确认已收到)的意思,这时候就有一个问题就是 如果有多个follower ack是等这个全副同步胜利后还是半数同步胜利后才返回给生产者这个信息阐明确认已收到,全副同步和半数同步外面有一个概念 就是 全副同步 须要n+1台机器同步胜利再返回ack 就是n台机器挂掉了起码还有一台存活,半数同步的必须 2n+1 台机器同步胜利后再返回ack 就是n台机器挂掉了起码还有n+1台机器存活 因为他是 半数同步的所以须要n+1个follower 同步胜利,kafka为此选了全副同步胜利的,全副胜利还有问题 如果有局部follower 解决很慢或者他挂了 岂不是 生产须要始终期待这个kafka返回ack。
kafka为了解决这个问题加了一个ISR,这个isr是做什么的呢?
次要就是选举哪些 follower同步胜利了,当leader挂了,会从ISR外面选取一个follower晋升级别为leader,不是所有的follower都可能晋升为leader,只有ISR外面的follower能力晋升为leader,想进入到 ISR外面有两个概念就是一个是工夫的同步数据,一个是条数的概念,然而起初版本把这两个其中一个给去掉了,就是那个条数的那个,为什么去掉次要是你如果设置在十条差距数据以外的就要被踢出ISR,如果当初消费者群发音讯 一次发送15条数据,当初所有的follower都相差15条数据比设置为10的预值多了五,就会被踢出去,而后等follower同步完 10条数据后又被加回来,就会呈现一个问题前脚你踢出去了紧跟着有加回来。
ack有一个应答机制:
0 生产者只管发送数据 不承受ack,会造成数据失落 因为我都不晓得这个leader的死活 有可能在网络连接的时候就挂掉了。
1 生产者发送数据leader写入胜利后,返回ack,这个外面还是会丢数据,如果这个时候leader挂了,他的follower没有同步完数据而后晋升为leader之前未同步数据就会失落。
-1(ALL) 生产者发送数据到leader写入胜利后再 等follower也写入胜利后返回ack给生产者,这个外面有一个问题就是数据反复,什么样的时候会造成数据反复呢就是follower写入胜利后刚要回传ack leader就挂了,这个时候生产者认为数据并没有发送胜利,kafka这面会把follower晋升为leader 这个时候的leader曾经同步完了然而生产者不晓得,又从新发送一个一遍数据,就反复了。
对于数据反复kafka外面有一个 解决方案 就是 幂等性 的问题其实就是能够了解为去重,具体就是在生产者连贯kafka的时候会生成一个pid 这个pid是 生产者ID不是过程id,对于幂等性只能存在与同一个连贯外面才会有这个概念,他会依据偏移量去判断是否全副胜利 胜利后会检测音讯是否已存在存在的不进行发送,这个幂等性是在ACK为-1的时候才会有这个概念,这个其实设置也非常简单就在配置文件的一个参数改为true就行了,默认会把ack的模式为-1。
故障细节解决
A 为leader 数据为1-10
B 为follower 已同步数据为 1-6
C 为follower 已同步数据为 1-8
如果leader挂掉了,B选举为leader了 c再去同步发现数据不一样一个是1-8 一个 是1-6 如果这个时候A复原了 为1-10,这个时候他们的音讯不等 消费者生产音讯就会呈现问题,kefka是用两个标识来解决的一个是HW(在所有音讯队列外面最短的一个LEO)为HW,LEO(log end office)最初一个音讯的偏移量,HW在选举为B的时候 他为1-6 他会发送命令 所有人给我截取到6 其余的数据都扔掉,消费者来生产音讯也只会生产到HW所在对应的那一个偏移量,如果这个时候选取的是C同步数据为1-8,他的HW为6的数据下面 会发送指定 都给我截取到6 卡卡卡都截取了,再给我A和B再去询问是否有新数据 发现有 7-8 就会同步下来