关于kafka:Kafka详细学习资料

43次阅读

共计 30552 个字符,预计需要花费 77 分钟才能阅读完成。

Kafka

一. 消息中间件的益处

1. 解耦

​ 容许你独立的扩大或批改两边的处理过程,只有确保它们恪守同样的接口束缚。如果为以能解决这类峰值拜访为规范来投入资源随时待命无疑是微小的节约。应用音讯队列可能使要害组件顶住突发的拜访压力,而不会因为突发的超负荷的申请而齐全解体。

2. 异步

​ 很多时候,用户不想也不须要立刻解决音讯。音讯队列提供了异步解决机制,容许用户把一个音讯放入队列,但并不立刻解决它。想向队列中放入多少音讯就放多少,而后在须要的时候再去解决它们。

3. 灵活性 / 削峰

​ 在访问量剧增的状况下,利用依然须要持续发挥作用,然而这样的突发流量并不常见。如果为以能解决这类峰值拜访为规范来投入资源随时待命无疑是微小的节约。应用音讯队列可能使要害组件顶住突发的拜访压力,而不会因为突发的超负荷的申请而齐全解体。

4. 可恢复性

​ 零碎的一部分组件生效时,不会影响到整个零碎。音讯队列升高了过程间的耦合度,所以即便一个解决音讯的过程挂掉,退出队列中的音讯依然能够在零碎复原后被解决。

5. 缓冲

​ 有助于管制和优化数据流通过零碎的速度, 解决生产音讯和生产音讯的处理速度不统一的状况。

二. 音讯队列通信的模式

1. 点对点模式(一对一,消费者被动拉取数据,音讯收到后音讯革除)

2. 公布订阅模式(一对多,消费者生产数据之后不会革除音讯)

kafka 个别应用的是生产方拉取,会始终轮询,浪费资源。(能够设置一个等待时间,在没有拉取到信息的时候,会期待设置的工夫)

三. Kafka

Kafka 是由 Apache 软件基金会开发的一个开源流解决平台,由 Scala(一品种 java 语言)和 Java 编写。Kafka 是一种高吞吐量的分布式的基于公布 / 订阅模式的音讯队列,次要利用于大数据实时处理畛域(spark 实时剖析框架)。

1.Kafka 的个性

  • 高吞吐量、低提早:kafka 每秒能够解决几十万条音讯,它的提早最低只有几毫秒,每个 topic 能够分多个 partition, consumer group 对 partition 进行 consume 操作。
  • 可扩展性:kafka 集群反对热扩大
  • 持久性、可靠性:音讯被长久化到本地磁盘,并且反对数据备份避免数据失落
  • 容错性:容许集群中节点失败(若正本数量为 n, 则容许 n - 1 个节点失败)
  • 高并发:反对数千个客户端同时读写

2.Kafka 的根本架构

1)Producer:音讯生产者,就是向 kafka broker 发消息的客户端;2)Consumer:音讯消费者,向 kafka broker 取音讯的客户端;3)Consumer Group(CG):消费者组,由多个 consumer 组成。消费者组内每个消费者负责生产不同分区的数据,一个分区只能由一个组内消费者生产;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。4)Broker:一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker 能够包容多个 topic。5)Topic:能够了解为一个队列,生产者和消费者面向的都是一个 topic;6)Partition:为了实现扩展性,一个十分大的 topic 能够散布到多个 broker(即服务器)上,一个 topic 能够分为多个 partition,每个 partition 是一个有序的队列;7)Replica:正本,为保障集群中的某个节点产生故障时,该节点上的 partition 数据不失落,且 kafka 依然可能持续工作,kafka 提供了正本机制,一个 topic 的每个分区都有若干个正本,一个 leader 和若干个 follower。8)leader:每个分区多个正本的“主”,生产者发送数据的对象,以及消费者生产数据的对象都是 leader。9)follower:每个分区多个正本中的“从”,实时从 leader 中同步数据,放弃和 leader 数据的同步。leader 产生故障时,某个 follower 会成为新的 follower

​ 最优的设计就是,consumer group 下的 consumer thread 的数量等于 partition 数量,这样效率是最高的。

​ 所以咱们线上的分布式多个 service 服务,每个 service 外面的 kafka consumer 数量都小于对应的 topic 的 partition 数量,然而所有服务的 consumer 数量只和等于 partition 的数量,这是因为分布式 service 服务的所有 consumer 都来自一个 consumer group,如果来自不同的 consumer group 就会解决反复的 message 了(同一个 consumer group 下的 consumer 不能解决同一个 partition,不同的 consumer group 能够解决同一个 topic,那么都是程序解决 message,肯定会解决反复的。个别这种状况都是两个不同的业务逻辑,才会启动两个 consumer group 来解决一个 topic)。

​ 如果 producer 的流量增大,以后的 topic 的 parition 数量 =consumer 数量,这时候的应答形式就是很想扩大:减少 topic 下的 partition,同时减少这个 consumer group 下的 consumer。

3.Kafka 装置部署

3.1 jar 包下载

http://kafka.apache.org/downloads.html

我这里用的是 kafka_2.11-0.11.0.0.tgz 版本

3.2 集群部署

1)解压安装包

[atguigu@hadoop102 software]$ tar -zxvf kafka_2.11-0.11.0.0.tgz -C /opt/module/

2)批改解压后的文件名称

[atguigu@hadoop102 module]$ mv kafka_2.11-0.11.0.0/ kafka

3)在 /opt/module/kafka 目录下创立 logs 文件夹

[atguigu@hadoop102 kafka]$ mkdir logs

4)批改配置文件

[atguigu@hadoop102 kafka]$ cd config/

[atguigu@hadoop102 config]$ vi server.properties

输出以下内容:

broker 的全局惟一编号,不能反复

broker.id=0

删除 topic 性能开启

delete.topic.enable=true

解决网络申请的线程数量

num.network.threads=3

用来解决磁盘 IO 的现成数量

num.io.threads=8

发送套接字的缓冲区大小

socket.send.buffer.bytes=102400

接管套接字的缓冲区大小

socket.receive.buffer.bytes=102400

申请套接字的缓冲区大小

socket.request.max.bytes=104857600

kafka 运行日志寄存的门路

log.dirs=/opt/module/kafka/logs

topic 在以后 broker 上的分区个数

num.partitions=1

用来复原和清理 data 下数据的线程数量

num.recovery.threads.per.data.dir=1

segment 文件保留的最长工夫,超时将被删除

log.retention.hours=168

配置连贯 Zookeeper 集群地址

zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181

5)配置环境变量

[atguigu@hadoop102 module]$ sudo vi /etc/profile

KAFKA_HOME

export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin

[atguigu@hadoop102 module]$ source /etc/profile

6) 同理批改另外两个 Kafka

server.properties 中的 broker.id=1、broker.id=2
注:broker.id 不得反复

7) 启动集群

[atguigu@hadoop102 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties \[atguigu@hadoop103 kafka\]$ bin/kafka-server-start.sh -daemon config/server.properties [atguigu@hadoop104 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties

8) 敞开集群

[atguigu@hadoop102 kafka]$ bin/kafka-server-stop.sh stop
[atguigu@hadoop103 kafka]$ bin/kafka-server-stop.sh stop
[atguigu@hadoop104 kafka]$ bin/kafka-server-stop.sh stop

9)Kafka 群起脚本

for i in hadoop102 hadoop103 hadoop104
do
echo “========== $i ==========”
ssh $i ‘/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties’
done

4.Kafka 根本命令

1)查看以后服务器中的所有 topic

[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh –zookeeper hadoop102:2181 –list

2)创立 topic

[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh –zookeeper hadoop102:2181 –create –replication-factor 3 –partitions 1 -topic first

–topic 定义 topic 名 –replication-factor 定义正本数 –partitions 定义分区数

3)删除 topic

[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh –zookeeper hadoop102:2181 –delete –topic first

须要 server.properties 中设置 delete.topic.enable=true 否则只是标记删除。

4)发送音讯

[atguigu@hadoop102 kafka]$ bin/kafka-console-producer.sh –brokerlist hadoop102:9092 –topic first
>hello world
>atguigu zywx

5)生产音讯

[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh –zookeeper hadoop102:2181 –topic first

连贯 zookeeper 办法曾经过期,临时还能够应用,但不举荐

[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh –bootstrap-server hadoop102:9092 –topic first

[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh –bootstrap-server hadoop102:9092 –from-beginning –topic first

–from-beginning:会把主题中以往所有的数据都读取进去

6)查看某个 Topic 的详情

[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh –zookeeper hadoop102:2181 –describe –topic first

7)批改分区数

[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh –zookeeper hadoop102:2181 –alter –topic first –partitions 6

四. Kafka 架构深刻

1.Kafka 生产者

1.1 分区策略

1)分区的起因(1)不便在集群中扩大,每个 Partition 能够通过调整以适应它所在的机器,而一个 topic 又能够有多个 Partition 组成,因而整个集群就能够适应任意大小的数据了;(2)能够进步并发,因为能够以 Partition 为单位读写了。2)分区的准则 咱们须要将 producer 发送的数据封装成一个 ProducerRecord 对象。

​(1)指明 partition 的状况下,间接将指明的值间接作为 partiton 值;(2)没有指明 partition 值但有 key 的状况下,将 key 的 hash 值与 topic 的 partition 数进行取余失去 partition 值;(3)既没有 partition 值又没有 key 值的状况下,第一次调用时随机生成一个整数(前面每次调用在这个整数上自增),将这个值与 topic 可用的 partition 总数取余失去 partition 值,也就是常说的 round-robin 算法。

1.2 数据可靠性保障

​ 为保障 producer 发送的数据,能牢靠的发送到指定的 topic,topic 的每个 partition 收到 producer 发送的数据后,都须要向 producer 发送 ack(acknowledgement 确认收到),如果 producer 收到 ack,就会进行下一轮的发送,否则从新发送数据。

1)正本数据同步策略

计划

长处

毛病

半数以上 follower 实现同步,就发送 ack

提早低(follower 同步有快有慢,半数以上同步实现就发送 ack 能把满的那些给过滤掉)

选举新的 leader 时,容忍 n 台节点的故障,须要 2n+ 1 个正本(半数以上参加投票示意须要 n + 1 台节点存活,总共则须要 n +1+ n 个正本)

全副的 follower 实现同步,才发送 ack

选举新的 leader 时,容忍 n 台节点的故障,须要 n + 1 个正本

提早高(同步快的须要等同步满的,导致提早高)

Kafka抉择了第二种计划(全副实现同步,才发送 ack),起因如下:

  1. 同样为了容忍 n 台节点的故障,第一种计划须要 2n+ 1 个正本,而第二种计划只须要 n + 1 个正本,而 Kafka 的每个分区都有大量的数据,第一种计划会造成大量数据的冗余。
  2. 尽管第二种计划的网络提早会比拟高,但网络提早对 Kafka 的影响较小。

2)ISR

​ 采纳第二种计划之后,构想以下情景:leader 收到数据,所有 follower 都开始同步数据,但有一个 follower,因为某种故障,迟迟不能与 leader 进行同步,那 leader 就要始终等上来,直到它实现同步,能力发送 ack。这个问题怎么解决呢?

​ Leader 保护了一个动静的 in-sync replica set (ISR- 同步正本列表),意为 和 leader 放弃同步的 follower 汇合 。当 ISR 中的 follower 实现数据的同步之后,leader 就会给 follower 发送 ack。如果 follower 长时间未向 leader 同步数据,则该 follower 将被踢出 ISR,该工夫阈值由replica.lag.time.max.ms 参数设定。Leader 产生故障之后,就会从 ISR 中选举新的 leader。

3)ack 应答机制

​ 对于某些不太重要的数据,对数据的可靠性要求不是很高,可能容忍数据的大量失落,所以没必要等 ISR 中的 follower 全副接管胜利。

​ 所以 Kafka 为用户提供了三种可靠性级别,用户 依据对可靠性和提早的要求进行衡量,抉择以下的配置。

acks 参数配置:

  • 0:producer 不期待 broker 的 ack,这一操作提供了一个最低的提早,broker 一接管到还没有写入磁盘就曾经返回,当 broker 故障时有可能 失落数据
  • 1:producer 期待 broker 的 ack,partition 的 leader 落盘胜利后返回 ack,如果在 follower 同步胜利之前 leader 故障,而因为曾经返回了 ack,零碎默认新选举的 leader 曾经有了数据,从而不会进行失败重试,那么将会 失落数据

    对于某些不太重要的数据,对数据的可靠性要求不是很高,可能容忍数据的大量失落,所以没必要等 ISR 中的 follower 全副接管胜利。

    所以 Kafka 为用户提供了三种可靠性级别,用户 依据对可靠性和提早的要求进行衡量,抉择以下的配置。

    acks 参数配置:

    • 0:producer 不期待 broker 的 ack,这一操作提供了一个最低的提早,broker 一接管到还没有写入磁盘就曾经返回,当 broker 故障时有可能 失落数据
    • 1:producer 期待 broker 的 ack,partition 的 leader 落盘胜利后返回 ack,如果在 follower 同步胜利之前 leader 故障,而因为曾经返回了 ack,零碎默认新选举的 leader 曾经有了数据,从而不会进行失败重试,那么将会 失落数据
    • -1(all):producer 期待 broker 的 ack,partition 的 leader 和 follower 全副落盘胜利后才返回 ack。然而如果在 follower 同步实现后,broker 发送 ack 之前,leader 产生故障,导致没有返回 ack 给 Producer,因为失败重试机制,又会给新选举进去的 leader 发送数据,造成 数据反复

4)故障解决细节

​ LEO:指的是每个正本最大的 offset;HW:指的是消费者能见到的最大的 offset,ISR 队列中最小的 LEO

(1)follower 故障 follower 产生故障后会被长期踢出 ISR,待 该 follower 复原后,follower 会读取本地磁盘记录的上次的 HW,并将 log 文件高于 HW 的局部截取掉,从 HW 开始向 leader 进行同步。等该 follower 的 LEO 大于等于该 Partition 的 HW,即 follower 追上 leader 之后,就能够重新加入 ISR 了。(2)leader 故障 leader 产生故障之后,会从 ISR 中选出一个新的 leader,之后,为保障多个正本之间的数据一致性,其余的 follower 会先将各自的 log 文件高于 HW 的局部截掉,而后从新的 leader 同步数据。留神:这只能保障正本之间的数据一致性,并不能保证数据不失落或者不反复。

2. 消费者

2.1 生产形式

​ consumer 采纳 pull(拉)模式从 broker 中读取数据。push(推)模式很难适应生产速率不同的消费者,因为音讯发送速率是由 broker 决定的。它的指标是尽可能以最快速度传递音讯,然而这样很容易造成 consumer 来不及解决音讯,典型的体现就是拒绝服务以及网络拥塞。而 pull 模式则能够依据 consumer 的生产能力以适当的速率生产音讯。

​ pull 模式不足之处是,如果 kafka 没有数据,消费者可能会陷入循环中,始终返回空数据。针对这一点,Kafka 的消费者在生产数据时会传入一个时长参数 timeout,如果以后没有数据可供生产,consumer 会期待一段时间之后再返回,这段时长即为 timeout。

2.2 分区调配策略

​ 一个 consumer group 中有多个 consumer,一个 topic 有多个 partition,所以必然会波及到 partition 的调配问题,即确定那个 partition 由哪个 consumer 来生产。Kafka 有两种调配策略,一是 RoundRobin(依据生产组来调配),一是 Range(依据主题 Topic 调配)。

​ 在启动消费者,减少缩小消费者时候进行调配。

2.3 offset 的保护

​ 因为 consumer 在生产过程中可能会呈现断电宕机等故障,consumer 复原后,须要从故障前的地位的持续生产,所以 consumer 须要实时记录本人生产到了哪个 offset,以便故障复原后持续生产。

​ Kafka 0.9 版本之前,consumer 默认将 offset 保留在 Zookeeper 中,从 0.9 版本开始,consumer 默认将 offset 保留在 Kafka 一个内置的 topic 中,该 topic 为__consumer_offsets。
1)批改配置文件

consumer.properties exclude.internal.topics=false

2)读取 offset

0.11.0.0 之前版本:

bin/kafka-console-consumer.sh –topic __consumer_offsets -zookeeper hadoop102:2181 –formatter “kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter” –consumer.config config/consumer.properties –from-beginning

0.11.0.0 之后版本(含):

bin/kafka-console-consumer.sh –topic __consumer_offsets -zookeeper hadoop102:2181 –formatter “kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageForm atter” –consumer.config config/consumer.properties –from-beginning

2.4 消费者组案例

1)需要:测试同一个消费者组中的消费者,同一时刻只能有一个消费者生产。2)案例实操(1)在 hadoop102、hadoop103 上批改 consumer.properties 配置文件中的 group.id 属性为任意组名。

[atguigu@hadoop103 config]$ vi consumer.properties
group.id=zywx

​(2)在 hadoop102、hadoop103 上别离启动消费者

[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh –bootstrap-server hadoop103:9092 –topic first –consumer.config config/consumer.properties
[atguigu@hadoop103 kafka]$ bin/kafka-console-consumer.sh –bootstrap-server hadoop102:9092 –topic first –consumer.config config/consumer.properties

​(3)在 hadoop104 上启动生产者

[atguigu@hadoop104 kafka]$ bin/kafka-console-producer.sh \ –broker-list hadoop102:9092 –topic first
>hello world

​(4)查看 hadoop102 和 hadoop103 的接收者。同一时刻只有一个消费者接管到音讯。

五. Kafka API

5.1 Producer API

5.1.1 音讯发送流程

Kafka 的 Producer 发送音讯采纳的是异步发送的形式。在音讯发送的过程中,波及到了两个线程——main 线程和 Sender 线程,以及一个线程共享变量——RecordAccumulator。main 线程将音讯发送 RecordAccumulator,Sender 线程一直从 RecordAccumulator 中拉取音讯发送到 Kafka broker。

5.1.2 异步发送 API

1)导入依赖

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.1.1</version>
</dependency>

2)编写代码

1. 不带回调函数的 API

package com.zywx.producer;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class MyProducer {
public static void main(String[] args) {
//1. 创立 kafka 生产者的配置信息
Properties properties = new Properties();
// 指定连贯的 kafka 集群
properties.put(“bootstrap.servers”,”192.168.25.128:9091″);

//ACK 应答级别
properties.put(“acks”, “all”);

// 重试次数
properties.put(“retries”, 1);

// 批次大小
properties.put(“batch.size”, 16384);

// 等待时间
properties.put(“linger.ms”, 1);

//RecordAccumulator 缓冲区大小
properties.put(“buffer.memory”, 33554432);

//key,value 的序列化类
properties.put(“key.serializer”, “org.apache.kafka.common.serialization.StringSerializer”);
properties.put(“value.serializer”, “org.apache.kafka.common.serialization.StringSerializer”);

// 创立生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

// 发送数据
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<String, String>(“first”,”zywx—-” + i));
}

// 敞开资源
producer.close();
}
}

2. 带回调函数的 API

​ 回调函数会在 producer 收到 ack 时调用,为异步调用,该办法有两个参数,别离是 RecordMetadata 和 Exception,如果 Exception 为 null,阐明音讯发送胜利,如果 Exception 不为 null,阐明音讯发送失败。

public class CallBackProducer {
public static void main(String[] args) {
//1. 创立配置信息
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,”192.168.25.128:9091″);

properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
“org.apache.kafka.common.serialization.StringSerializer”);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
“org.apache.kafka.common.serialization.StringSerializer”);

//2. 创立生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);

//3. 发送数据
for (int i = 0; i < 10 ; i++) {
producer.send(new ProducerRecord<String, String>(“lol”,”zywx===” + i), (recordMetadata, e) -> {
if (e == null){
System.out.println(recordMetadata.partition()+”======”+recordMetadata.offset());
}else {
e.printStackTrace();
}
});
}

//4. 敞开资源
producer.close();
}
}

package com.zywx.producer;

import org.apache.kafka.clients.producer.*;

import java.util.ArrayList;
import java.util.Properties;

public class CallBackProducer {
public static void main(String[] args) {
//1. 创立配置信息
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,”192.168.25.128:9091″);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,”org.apache.kafka.common.serialization.StringSerializer”);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,”org.apache.kafka.common.serialization.StringSerializer”);

//2. 创立生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);

ArrayList<String> list = new ArrayList<>();
list.add(“a”);
list.add(“b”);
list.add(“c”);
//3. 发送数据
for (int i = 0; i < 10 ; i++) {
producer.send(new ProducerRecord<String, String>(“lol”, list.get(i%3),”zywx===” + i), (recordMetadata, e) -> {
if (e == null){
System.out.println(recordMetadata.partition()+”======”+recordMetadata.offset());
}else {
e.printStackTrace();
}
});
}

//4. 敞开资源
producer.close();
}
}

5.1.3 自定义分区器

编写自定义分区器

package com.zywx.partitioner;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

public class MyPartitioner implements Partitioner {

@Override
public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
// Integer count = cluster.partitionCountForTopic(topic);
// return key.toString().hashCode() % count;
return 1;
}

@Override
public void close() {

}

@Override
public void configure(Map<String, ?> map) {

}
}

自定义分区器实现的接口

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//

package org.apache.kafka.clients.producer;

import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Configurable;

public interface Partitioner extends Configurable {
int partition(String var1, Object var2, byte[] var3, Object var4, byte[] var5, Cluster var6);

void close();
}

接口的实现类(默认的分区办法)

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//

package org.apache.kafka.clients.producer.internals;

import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;

public class DefaultPartitioner implements Partitioner {


private final AtomicInteger counter = new AtomicInteger((new Random()).nextInt());

public DefaultPartitioner() {}

public void configure(Map<String, ?> configs) {
}

public int partition(String topic, Object key, byte\[\] keyBytes, Object value, byte\[\] valueBytes, Cluster cluster) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
    if (keyBytes == null) {int nextValue = this.counter.getAndIncrement();
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
        if (availablePartitions.size() > 0) {int part = Utils.toPositive(nextValue) % availablePartitions.size();
            return ((PartitionInfo)availablePartitions.get(part)).partition();} else {return Utils.toPositive(nextValue) % numPartitions;
        }
    } else {return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }
}

public void close() {}

}

测试自定义分区器

package com.zywx.producer;

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

public class PartitionProducer {

public static void main(String\[\] args) {        
    //1. 创立 kafka 生产者的配置信息
    Properties properties = new Properties();       
   // 指定连贯的 kafka 集群
   properties.put("bootstrap.servers","192.168.25.128:9091");
    //ACK 应答级别
    properties.put("acks", "all");

    // 重试次数
    properties.put("retries", 1);

    // 批次大小
    properties.put("batch.size", 16384);

    // 等待时间
    properties.put("linger.ms", 1);

    //RecordAccumulator 缓冲区大小
    properties.put("buffer.memory", 33554432);

    //key,value 的序列化类
    properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    // 增加分区器
    properties.put(ProducerConfig.PARTITIONER\_CLASS\_CONFIG, "com.zywx.partitioner.MyPartitioner");

    // 创立生产者对象
    KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

    // 发送数据
    for (int i = 0; i < 10; i++) {producer.send(new ProducerRecord<String, String>("first", "zywx----" + i), new Callback() {
            @Override
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e == null){System.out.println(recordMetadata.partition());
                }
            }
        });
    }

    // 敞开资源
    producer.close();}

}

5.1.4 同步发送 API

​ 同步发送的意思就是,一条音讯发送之后,会阻塞以后线程,直至返回 ack。

因为 send 办法返回的是一个 Future 对象,依据 Futrue 对象的特点,咱们也能够实现同步发送的成果,只需在调用 Future 对象的 get 方发即可。

    …………
    
    // 发送数据
    for (int i = 0; i < 10; i++) {//Future 的 get 办法会阻塞其余线程, 实现同步.(个别不应用,效率太低)
        Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>("first", "zywx----" + i));
        try {RecordMetadata recordMetadata = future.get();
        } catch (InterruptedException e) {e.printStackTrace();
        } catch (ExecutionException e) {e.printStackTrace();
        }
    }
    
    ……………


5.2 Consumer API

5.2.1 根本的音讯监听

package com.zywx.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class MyConsumer {

public static void main(String\[\] args) {
    //1. 创立消费者的配置信息
    Properties properties = new Properties();

    //2. 给配置信息赋值
    // 连贯的集群
    properties.put(ConsumerConfig.BOOTSTRAP\_SERVERS\_CONFIG,"192.168.25.128:9091");
    // 开启主动提交
    properties.put(ConsumerConfig.ENABLE\_AUTO\_COMMIT\_CONFIG,true);
    // 主动提交 offset 的提早
    properties.put(ConsumerConfig.AUTO\_COMMIT\_INTERVAL\_MS\_CONFIG,"1000");
    
    //Key/Value 反序列化        properties.put(ConsumerConfig.KEY\_DESERIALIZER\_CLASS\_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
    properties.put(ConsumerConfig.VALUE\_DESERIALIZER\_CLASS\_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
    
    // 消費者組
    properties.put(ConsumerConfig.GROUP\_ID\_CONFIG,"demodata");

    //3. 创立消费者
    KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);

    //4. 订阅主题
    consumer.subscribe(Arrays.asList("first","lol"));

    while (true){
        //5. 获取数据
        ConsumerRecords<String, String> records = consumer.poll(100);

        //6. 解析并打印
        for (ConsumerRecord<String, String> record : records) {System.out.println(record.key()+"======="+record.value());
        }
    }

}

}

从新读取之前发送的音讯

(只能生产到保留的最近七天音讯,从七天中最早的音讯开始生产)

// 该消费者应属于一个新的消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG,”demodata”);
// 配置 重置消费者的 offset
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,”earliest”);

上述代码中应用的是主动提交,主动提交会呈现漏数据或者数据反复生产的问题

5.2.2 手动提交 offset

1)同步提交

…………

properties.put("enable.auto.commit", "false");// 敞开主动提交 offset

…………

    while (true){
        //5. 获取数据
        ConsumerRecords<String, String> records = consumer.poll(100);

        //6. 解析并打印
        for (ConsumerRecord<String, String> record : records) {System.out.println(record.key()+"======="+record.value());
        }
        
        // 同步提交,以后线程会阻塞直到 offset 提交胜利
         consumer.commitSync();}

…………

2)异步提交

…………

properties.put("enable.auto.commit", "false");// 敞开主动提交 offset

…………

    while (true){
        //5. 获取数据
        ConsumerRecords<String, String> records = consumer.poll(100);

        //6. 解析并打印
        for (ConsumerRecord<String, String> record : records) {System.out.println(record.key()+"======="+record.value());
        }
        
        // 异步提交
         consumer.commitAsync(new OffsetCommitCallback() {
             @Override
            public void onComplete(Map<TopicPartition,OffsetAndMetadata> offsets,Exception                 exception) {if (exception != null) {System.err.println("Commit failed for" +offsets);
                 }
             }
         });

    }

…………

异步提交能够防止漏数据的问题,然而一样无奈防止数据反复生产的问题。

5.2.3 自定义提交 offset

​ offset 的保护是相当繁琐的,因为须要思考到消费者的 Rebalace。

​ 当有新的消费者退出消费者组、已有的消费者推出消费者组或者所订阅的主题的分区发生变化,就会触发到分区的重新分配,重新分配的过程叫做 Rebalance。

​ Consumer Rebalance 的触发条件:(1)Consumer 减少或删除会触发 Consumer Group 的 Rebalance(2)Broker 的减少或者缩小都会触发 Consumer Rebalance

​ 消费者产生 Rebalance 之后,每个消费者生产的分区就会发生变化。因而消费者要首先获取到本人被重新分配到的分区,并且定位到每个分区最近提交的 offset 地位持续生产。

package com.zywx.consumer;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.util.*;

public class CustomConsumer {

 private static Map<TopicPartition, Long> currentOffset = new HashMap<>();
public static void main(String\[\] args) {

// 创立配置信息
 Properties props = new Properties();
 
//Kafka 集群
 props.put("bootstrap.servers", "hadoop102:9092");
 
// 消费者组,只有 group.id 雷同,就属于同一个消费者组
 props.put("group.id", "test");
 
// 敞开主动提交 offset
 props.put("enable.auto.commit", "false");
 
 //Key 和 Value 的反序列化类
 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
 
 // 创立一个消费者
 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

 // 消费者订阅主题
 consumer.subscribe(Arrays.asList("first"), new ConsumerRebalanceListener() {

     // 该办法会在 Rebalance 之前调用
     @Override
     public void onPartitionsRevoked(Collection<TopicPartition> partitions) {commitOffset(currentOffset);
     }
     // 该办法会在 Rebalance 之后调用
     @Override
     public void onPartitionsAssigned(Collection<TopicPartition> partitions) {currentOffset.clear();
         for (TopicPartition partition : partitions) {consumer.seek(partition, getOffset(partition));// 定位到最近提交的 offset 地位持续生产
         }
     }
 });
 while (true) {
     //5. 获取数据
    ConsumerRecords<String, String> records = consumer.poll(100);

    //6. 解析并打印
    for (ConsumerRecord<String, String> record : records) {System.out.println(record.key()+"======="+record.value());
    }
        
    // 异步提交
     consumer.commitAsync(new OffsetCommitCallback() {
         @Override
        public void onComplete(Map<TopicPartition,OffsetAndMetadata> offsets,Exception             exception) {if (exception != null) {System.err.println("Commit failed for" +offsets);
             }
         }
     });
 }
 // 获取某分区的最新 offset
 private static long getOffset(TopicPartition partition) {return 0;}
 // 提交该消费者所有分区的 offset
 private static void commitOffset(Map<TopicPartition, Long> currentOffset) {// 将 offset 信息入库办法}

}

简略说,就是在生产完数据,将数据入库的同时,把生产到的 offset 信息也存到一张专门的 offset 表中(这张表用消费者组 ID+Topic+ 分区名三列做复合主键)。在产生故障或者 rebalance 当前读取最新的 offset。

5.3 自定义拦截器

5.3.1 拦截器原理

​ 对于 producer 而言,interceptor 使得用户在音讯发送前以及 producer 回调逻辑前有机会 对音讯做一些定制化需要,比方批改音讯等。同时,producer 容许用户指定多个 interceptor 按序作用于同一条音讯从而造成一个拦挡链(interceptor chain)。

​ Intercetpor 的实现接口是 org.apache.kafka.clients.producer.ProducerInterceptor,其定义的办法包含:

​(1)configure(configs)

​ 获取配置信息和初始化数据时调用。

​(2)onSend(ProducerRecord):

​ 该办法封装进 KafkaProducer.send 办法中,即它运行在用户主线程中。Producer 确保在音讯被序列化以及计算分区前调用该办法。用户能够在该办法中对音讯做任何操作,但最好保障不要批改音讯所属的 topic 和分区,否则会影响指标分区的计算。

​(3)onAcknowledgement(RecordMetadata, Exception):

​ 该办法会在音讯从 RecordAccumulator 胜利发送到 Kafka Broker 之后,或者在发送过程中失败时调用。并且通常都是在 producer 回调逻辑触发之前。onAcknowledgement 运行在 producer 的 IO 线程中,因而不要在该办法中放入很重的逻辑,否则会拖慢 producer 的音讯 发送效率。

​(4)close:

​ 敞开 interceptor,次要用于执行一些资源清理工作。

​ 如前所述,interceptor 可能被运行在多个线程中,因而在具体实现时用户须要自行确保线程平安。另外假使指定了多个 interceptor,则 producer 将依照指定顺序调用它们,并仅仅是捕捉每个 interceptor 可能抛出的异样记录到谬误日志中而非在向上传递。这在应用过程中要特地注意。

5.3.2 拦截器编码

(1)减少工夫戳拦截器

package com.zywx.interceptor;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;

public class TimeInterceptor implements ProducerInterceptor<String,String> {

@Override
public void configure(Map<String, ?> map) {

}

@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {
    //1. 取出数据
    String value = producerRecord.value();
    //2. 创立新的 ProducerRecord(没有 setValue 办法)
    return new ProducerRecord<String, String>(producerRecord.topic(),producerRecord.partition(),producerRecord.key(),System.currentTimeMillis()+","+value);
}

@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {

}

@Override
public void close() {}

}

(2)统计发送音讯胜利和发送失败音讯数,并在 producer 敞开时打印这两个计数器

package com.zywx.interceptor;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;

/**
*
*/
public class CounterInterceptor implements ProducerInterceptor<String,String> {

int success;
int error;

/\*\*
 \* 默认返回的是 null, 要改成返回参数自身
 \* @param producerRecord
 \* @return
 \*/
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {return producerRecord;}

@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {if (recordMetadata != null){success++;}else {error++;}
}

@Override
public void close() {System.out.println("success:"+success);
    System.out.println("error:"+error);
}

@Override
public void configure(Map<String, ?> map) {}

}

(3)在 producer 中增加拦截器 / 拦挡链

…………

    // 增加拦截器
    ArrayList<String> interceptors = new ArrayList<>();
    interceptors.add("com.zywx.interceptor.TimeInterceptor");
    interceptors.add("com.zywx.interceptor.CounterInterceptor");
    properties.put(ProducerConfig.INTERCEPTOR\_CLASSES\_CONFIG,interceptors);
    

…………

后果:

六. Kafka 监控

Kafka Eagle

1)批改 kafka 启动命令

批改 kafka-server-start.sh 命令中

if [“x$KAFKA_HEAP_OPTS” = “x”]; then
export KAFKA_HEAP_OPTS=”-Xmx1G -Xms1G”
fi

批改为

if [“x$KAFKA_HEAP_OPTS” = “x”]; then
export KAFKA_HEAP_OPTS=”-server -Xms2G -Xmx2G -XX:PermSize=128m
-XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 –
XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70″
export JMX_PORT=”9999″
#export KAFKA_HEAP_OPTS=”-Xmx1G -Xms1G”
fi

集群每个节点都要批改

2)上传压缩包 kafka-eagle-bin-1.3.7.tar.gz 到集群 /opt/software 目录

3)解压到本地

[atguigu@hadoop102 software]$ tar -zxvf kafka-eagle-bin1.3.7.tar.gz

4)进入方才解压的目录

[atguigu@hadoop102 kafka-eagle-bin-1.3.7]$ ll
总用量 82932
-rw-rw-r–. 1 atguigu atguigu 84920710 8 月 13 23:00 kafka-eagleweb-1.3.7-bin.tar.gz

5)将 kafka-eagle-web-1.3.7-bin.tar.gz 解压至 /opt/module

[atguigu@hadoop102 kafka-eagle-bin-1.3.7]$ tar -zxvf kafka-eagleweb-1.3.7-bin.tar.gz -C /opt/module/

6)批改名称

[atguigu@hadoop102 module]$ mv kafka-eagle-web-1.3.7/ eagle

7)给启动文件执行权限

[atguigu@hadoop102 eagle]$ cd bin/
[atguigu@hadoop102 bin]$ ll
总用量 12
-rw-r–r–. 1 atguigu atguigu 1848 8 月 22 2017 ke.bat
-rw-r–r–. 1 atguigu atguigu 7190 7 月 30 20:12 ke.sh
[atguigu@hadoop102 bin]$ chmod 777 ke.sh

8)批改配置文件

multi zookeeper&kafka cluster list

kafka.eagle.zk.cluster.alias=cluster1
cluster1.zk.list=hadoop102:2181,hadoop103:2181,hadoop104:2181

kafka offset storage

cluster1.kafka.eagle.offset.storage=kafka

enable kafka metrics

kafka.eagle.metrics.charts=true
kafka.eagle.sql.fix.error=false

kafka jdbc driver address

kafka.eagle.driver=com.mysql.jdbc.Driver
kafka.eagle.url=jdbc:mysql://hadoop102:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
kafka.eagle.username=root
kafka.eagle.password=root

9)增加环境变量

export KE_HOME=/opt/module/eagle
export PATH=$PATH:$KE_HOME/bin

留神:source /etc/profile

10)启动

[atguigu@hadoop102 eagle]$ bin/ke.sh start

11)登录页面查看监控数据


七. Kafka 与 Spring-boot 整合

7.1 我的项目依赖

pom.xml

`<?xml version=”1.0″ encoding=”UTF-8″?>
<project xmlns=”http://maven.apache.org/POM/4.0.0″ xmlns:xsi=”http://www.w3.org/2001/XMLSchema-instance”

     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.3.3.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>spring\_boot\_kafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring\_boot\_kafka</name>
<description>Demo project for Spring Boot</description>`

<properties>
    <java.version>1.8</java.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
        <exclusions>
            <exclusion>
                <groupId>org.junit.vintage</groupId>
                <artifactId>junit-vintage-engine</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.44</version>
    </dependency>

</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
        </plugin>
    </plugins>
</build>

</project>

7.2 实体类 User

package com.example.spring_boot_kafka.entity;

import lombok.Data;
import lombok.experimental.Accessors;

@Data
@Accessors(chain = true)
public class User {

private Integer id;
private String name;
private Integer age;

}

7.3 音讯发送

package com.example.spring_boot_kafka.producer;

import com.alibaba.fastjson.JSON;
import com.example.spring_boot_kafka.entity.User;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class UserProducer {

@Autowired
private KafkaTemplate kafkaTemplate;

public void sendUser(Integer id){User user = new User();
    user.setId(id).setAge(17).setName("张三");
    System.err.println("发送用户日志:"+user);
    kafkaTemplate.send("user", JSON.toJSONString(user));
}

}

7.4 音讯接管(监听)

package com.example.spring_boot_kafka.consumer;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.Optional;

@Component
@Slf4j
public class UserConsumer {

@KafkaListener(topics = {"user"})
public void consumer(ConsumerRecord consumerRecord){
    // 判断是否为 null
    Optional kafkaMessage = Optional.ofNullable(consumerRecord.value());
    log.info(">>>>record="+kafkaMessage);
    if (kafkaMessage.isPresent()){
        // 失去 Optional 实例中的值
        Object message = kafkaMessage.get();
        System.err.println("生产音讯:"+message);
    }
}

}

7.5 配置文件

spring.application.name=kafka-user
server.port=8080

============== kafka ===================

指定 kafka 代理地址,能够多个

spring.kafka.bootstrap-servers=localhost:9092

=============== provider =======================

spring.kafka.producer.retries=0

每次批量发送音讯的数量

spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432

指定音讯 key 和音讯体的编解码形式

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

=============== consumer =======================

指定默认消费者 group id

spring.kafka.consumer.group-id=user-log-group

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100

指定音讯 key 和音讯体的编解码形式

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

7.6 启动类

package com.example.spring_boot_kafka;

import com.example.spring_boot_kafka.producer.UserProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import javax.annotation.PostConstruct;

@SpringBootApplication
public class SpringBootKafkaApplication {
@Autowired
private UserProducer kafkaSender;
@PostConstruct
public void init(){
for (int i = 0; i < 10; i++){
kafkaSender.sendUser(i);
}
}

public static void main(String[] args) {
SpringApplication.run(SpringBootKafkaApplication.class, args);
}

}

7.7 测试后果

正文完
 0