关于大数据:大数据开发之Kafka

6次阅读

共计 5824 个字符,预计需要花费 15 分钟才能阅读完成。

前言
Kafka 是一个分布式的流解决平台(0.10.x 版本),在 kafka0.8.x 版本的时候,kafka 次要是作为一个分布式的、可分区的、具备大数据培训正本数的日志服务零碎(Kafka™ is a distributed, partitioned, replicated commit log service), 具备高水平扩展性、高容错性、访问速度快、分布式等个性;次要利用场景是:日志收集零碎和音讯零碎
为什么应用 Kafka 它有什么劣势
有以下特点:大数据畛域、高吞吐量、低提早、可扩展性、持久性、可靠性、容错性、高并发
Kafka 适宜以下利用场景
大数据畛域、日志收集、音讯零碎、流动跟踪、数据处理、行为日志、等等方面
Kafka 常常被用到的模式
有点对点或者 公布 / 订阅模式:点对点很容易了解 一对一的公布承受音讯,公布和订阅模式 相似于播送 就比方 微信公众号推文,每天要给这一个公众号外面的所有用户推文,不能一对一的发送音讯,要一对一的发送万一有几百万粉丝订阅,这得创立多少队列在录入数据的时候得有多麻烦,所以就要应用公布 / 订阅模式去解决,公布 / 订阅模式外面波及了一个比拟好的点就是 我能够依据场景去抉择我要哪一种模式,一种是能够让消费者主动去拉取数据本人去管制流量运行的速度,然而这里有一个毛病就是 kafka 被动被消费者程序拉去,被动询问是否有新的音讯 这里防止不了有一个循环,始终去询问 kafka 有没有新数据,比拟浪费资源。一种是我被动推送音讯到消费者那里,微信公众号就是这样的逻辑,被动推送音讯给粉丝。
架构图

生产者:用来生产音讯并推送到对应的主题外面。
kafka 集群:能够了解为部署多个 kafka,能够在一台机器下面 也能够不输在多台机器下面。
broker:能够了解为单个 kafka,也就是图下面机器 N 的官网名称。
topic:主题,生产者 / 消费者订阅主题进行收发音讯,次要为了解决某一 kafka 下面能够反对多个程序处理不同的事。
leader/follower:一个是备份 一个是 leader,当 leader 出问题的时候,follower 会上移进化成 leader,防止咱们的服务挂掉。
分区:次要是用来削峰,能让咱们的程序能平均的解决音讯。
消费者:消费者外面有一个概念,就是有一个消费者组的这么一说,外面有一个要留神的中央就是 某一个主题只能被某一个消费者组外面某一个用户用来生产,就是同一个组外面消费者不能订阅同一个主题,最好是主题与消费者组外面消费者个数统一 否则会造成资源节约,造成闲暇的消费者。
zk:用于 kafka 注册音讯应用,9.0 以下版本 音讯解决的偏移量放到 zk 外面存储,9.0 以上偏移量就放到 kafka topic 外面存储,次要就是为了解决不频繁的与 zk 交互。
常用命令
zk 装置:去官网下载对应压缩包:https://zookeeper.apache.org/…
cd /tmp/zookeeper/zookeeper-node1/conf
cp /tmp/zookeeper/zookeeper-node1/conf/zoo_sample.cfg /tmp/zookeeper/zookeeper-node1/conf/zoo.cfg

data 与 log 默认 zookeeper 外面没有须要手动创立

zoo.cfg 配置文件须要批改的中央:dataDir=/tmp/zookeeper/zookeeper-node1/data #音讯缓存门路

dataLogDir=/tmp/zookeeper/zookeeper-node1/log #音讯 log 门路

zookeeper 集群须要配置 server 伪集群只须要改一下对应的端口就 OK 了

server.1=47.94.2.151:2888:3888

server.2=47.94.2.151:2889:3889

server.3=47.94.2.151:2890:3890 启动 zk 的时候要查看是否有 java 环境,没有的话须要装置一下 以 centos7 为例:yum install java

启动 zkServer.sh(start|stop|restart)等

启动命令行调试:bash zkCli.sh

ls / 查看是否有 kafka 注册上来

ls /broker 查看机器

broker 上面与主题等 kafka 应用 docker 装置

这里应用 docker-compose,具体配置文件 docker-compose.yml 内容如下
version: ‘3’
services:
zookeeper:

image: wurstmeister/zookeeper
volumes:
  - /etc/localtime:/etc/localtime:ro
ports:
  - "2181:2181"
restart: always

kafka:

image: wurstmeister/kafka
ports:
  - "9092:9092"
environment:
  KAFKA_ADVERTISED_HOST_NAME: localhost
  KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
  - /var/run/docker.sock:/var/run/docker.sock
  - /data/db/kafka:/kafka/KafkaLog
  - /etc/localtime:/etc/localtime:ro
depends_on:
  - zookeeper
restart: alwaysdocker-compose 根底命令

docker-compose up -d 在后盾启动
docker-compose down 进行 kafka 分布式装置
去官网下载对应压缩包:http://kafka.apache.org/downl…
vim /tmp/kafka/config/server.properties
机器的 ID 必须惟一且为 int
broker.id=0
kafka 地址
listeners=PLAINTEXT://127.0.0.1:9092
音讯存储门路
log.dirs=/tmp/kafka/logs
zk 门路
zookeeper.connect=127.0.0.1:2181kafka 启动敞开:
start 开启 stop 进行
bash /bin/kafka-server-start.sh -daemon ../config/server.propertieskafka cmd 操作:
所有主题:
bash kafka-topics.sh –list –zookeeper 127.0.0.1:2181 创立:
bash kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1
–topic test 删除:
bash kafka-topics.sh –delete –zookeeper localhost:2181 –topic testkafka 生产者:
./kafka-c testkafka 消费者:
./kafka-console-consumer.sh –bootstrap-server 127.0.0.1:9092 –from-beginning –topic testkafka 消费者组:
bash kafka-console-consumer.sh –bootstrap-server 127.0.0.1:9092 –topic deamon —
consumer.config ../config/consumer.properties
bash kafka-console-consumer.sh –bootstrap-server 127.0.0.1:9092 –topic deamon —
consumer.config ../config/consumer.properties 生产形式 + 分区调配策略
注:消费者组外面的消费者不可反复订阅 主题外面的分区
生产形式:
次要有两种:

  1. 音讯队列被动推送音讯给对应订阅的消费者,不好之处就是不晓得消费者那面解决的速度如何,还有就是消费者没方法管制音讯发送的速度。
  2. 消费者被动拉取音讯队列中的数据,不好之处是消费者因为须要常常去询问是否有数据须要始终有一个循环去询问,倡议在循环外面减少 sleep,如果没有拉取到数据就让程序劳动一下,不然始终空转耗费比拟大。

分区调配策略
次要有两种:

  1. 轮训,他的不好之处就是在有消费者组和多个分区的状况下有可能会生产到不是本消费者订阅的数据,因为他的解决逻辑是把该消费者组订阅的所有主题都当为一个主题,来轮训,所有有的消费者订阅了有的没订阅,就会呈现生产多的状况,然而个别的状况下也不会这么设计,最好是在全副都订阅雷同主题的状况上来应用这种形式
  2. 范畴,底层逻辑 用分区数磨消费者

文件存储机制
简略形容:每一个主题上面都会有不同多个分区,分区的存储是依照 主题名称 + 序列,序列是从 0 开始的。

上图的左半局部是索引文件,外面存储的是一对一对的 key-value,其中 key 是音讯在数据文件(对应的 log 文件)中的编号,比方“1,3,6,8……”,
别离示意在 log 文件中的第 1 条音讯、第 3 条音讯、第 6 条音讯、第 8 条音讯……,那么为什么在 index 文件中这些编号不是间断的呢?
这是因为 index 文件中并没有为数据文件中的每条音讯都建设索引,而是采纳了稠密存储的形式,每隔肯定字节的数据建设一条索引。
这样防止了索引文件占用过多的空间,从而能够将索引文件保留在内存中。
但毛病是没有建设索引的 Message 也不能一次定位到其在数据文件的地位,从而须要做一次程序扫描,然而这次程序扫描的范畴就很小了。其中以索引文件中元数据 3,497 为例,其中 3 代表在左边 log 数据文件中从上到下第 3 个音讯 (在全局 partiton 示意第 368772 个音讯),
其中 497 示意该音讯的物理偏移地址(地位)为 497。
生产者(数据一致性与分区策略)选举机制 ISR 故障细节解决
分区策略,生产者 在生产音讯 往 分区外面打数据的时候有三种模式:

  1. 能够指定 partition 去生产数据。
  2. 能够指定对应的 key 值,kafka 会依据 key 值算 hash 而后除以 分区数据 再取余数。
  3. partition 没有指定 key 也没有指定 会随机选取一个分区 而后在之后的发送数据会轮询着来发送。

数据一致性
生产者发送数据 如何保持数据一致性的问题:
都晓得 kafka 分区外面是有 leader 和 follower 的,这块引出一个概念就是 ack(确认已收到)的意思,这时候就有一个问题就是 如果有多个 follower ack 是等这个全副同步胜利后还是半数同步胜利后才返回给生产者这个信息阐明确认已收到,全副同步和半数同步外面有一个概念 就是 全副同步 须要 n + 1 台机器同步胜利再返回 ack 就是 n 台机器挂掉了起码还有一台存活,半数同步的必须 2n+1 台机器同步胜利后再返回 ack 就是 n 台机器挂掉了起码还有 n + 1 台机器存活 因为他是 半数同步的所以须要 n + 1 个 follower 同步胜利,kafka 为此选了全副同步胜利的,全副胜利还有问题 如果有局部 follower 解决很慢或者他挂了 岂不是 生产须要始终期待这个 kafka 返回 ack。
kafka 为了解决这个问题加了一个 ISR,这个 isr 是做什么的呢?
次要就是选举哪些 follower 同步胜利了,当 leader 挂了,会从 ISR 外面选取一个 follower 晋升级别为 leader,不是所有的 follower 都可能晋升为 leader,只有 ISR 外面的 follower 能力晋升为 leader,想进入到 ISR 外面有两个概念就是一个是工夫的同步数据,一个是条数的概念,然而起初版本把这两个其中一个给去掉了,就是那个条数的那个,为什么去掉次要是你如果设置在十条差距数据以外的就要被踢出 ISR,如果当初消费者群发音讯 一次发送 15 条数据,当初所有的 follower 都相差 15 条数据比设置为 10 的预值多了五,就会被踢出去,而后等 follower 同步完 10 条数据后又被加回来,就会呈现一个问题前脚你踢出去了紧跟着有加回来。
ack 有一个应答机制:
0 生产者只管发送数据 不承受 ack,会造成数据失落 因为我都不晓得这个 leader 的死活 有可能在网络连接的时候就挂掉了。
1 生产者发送数据 leader 写入胜利后,返回 ack,这个外面还是会丢数据,如果这个时候 leader 挂了,他的 follower 没有同步完数据而后晋升为 leader 之前未同步数据就会失落。
-1(ALL) 生产者发送数据到 leader 写入胜利后再 等 follower 也写入胜利后返回 ack 给生产者,这个外面有一个问题就是数据反复,什么样的时候会造成数据反复呢就是 follower 写入胜利后刚要回传 ack leader 就挂了,这个时候生产者认为数据并没有发送胜利,kafka 这面会把 follower 晋升为 leader 这个时候的 leader 曾经同步完了然而生产者不晓得,又从新发送一个一遍数据,就反复了。
对于数据反复 kafka 外面有一个 解决方案 就是 幂等性 的问题其实就是能够了解为去重,具体就是在生产者连贯 kafka 的时候会生成一个 pid 这个 pid 是 生产者 ID 不是过程 id,对于幂等性只能存在与同一个连贯外面才会有这个概念,他会依据偏移量去判断是否全副胜利 胜利后会检测音讯是否已存在存在的不进行发送,这个幂等性是在 ACK 为 - 1 的时候才会有这个概念,这个其实设置也非常简单就在配置文件的一个参数改为 true 就行了,默认会把 ack 的模式为 -1。
故障细节解决
A 为 leader 数据为 1 -10
B 为 follower 已同步数据为 1-6
C 为 follower 已同步数据为 1-8
如果 leader 挂掉了,B 选举为 leader 了 c 再去同步发现数据不一样一个是 1 -8 一个 是 1 -6 如果这个时候 A 复原了 为 1 -10,这个时候他们的音讯不等 消费者生产音讯就会呈现问题,kefka 是用两个标识来解决的一个是 HW(在所有音讯队列外面最短的一个 LEO)为 HW,LEO(log end office)最初一个音讯的偏移量,HW 在选举为 B 的时候 他为 1 -6 他会发送命令 所有人给我截取到 6 其余的数据都扔掉,消费者来生产音讯也只会生产到 HW 所在对应的那一个偏移量,如果这个时候选取的是 C 同步数据为 1 -8,他的 HW 为 6 的数据下面 会发送指定 都给我截取到 6 卡卡卡都截取了,再给我 A 和 B 再去询问是否有新数据 发现有 7-8 就会同步下来

正文完
 0