共计 13105 个字符,预计需要花费 33 分钟才能阅读完成。
音讯队列是什么
音讯队列是一种在应用程序之间进行通信的技术,容许将音讯从一个应用程序发送到另一个应用程序,而无需明确的连贯这些应用程序。音讯队列中的音讯被存储在一种称为队列的数据结构中,这些音讯在队列中保留,直到被消费者接管。这使得音讯的发送者和接收者可能异步地通信,而不用期待对方的响应,从而进步了零碎的可伸缩性和弹性。音讯队列还能够通过实现各种模式(例如公布 / 订阅模式、申请 / 响应模式等)来反对不同类型的应用程序通信。
音讯队列的要害概念
音讯队列中的要害概念包含:
- 音讯:要传递的数据或信息。
- 队列:用于存储音讯的数据结构,具备先进先出(FIFO)的个性。
- 生产者:向音讯队列发送音讯的应用程序。
- 消费者:从音讯队列接管音讯的应用程序。
- 绑定(Binding):将一个音讯队列绑定到一个交换机上,以确保音讯被路由到正确的队列。
- 交换机(Exchange):接管来自生产者的音讯并将其路由到一个或多个队列中。
- 路由键(Routing Key):用于将音讯从交换机路由到正确的队列。
“
这些概念组成了音讯队列的外围,使得生产者和消费者可能异步地通信,从而进步了零碎的可伸缩性和弹性。
”
音讯队列的利用场景
音讯队列的利用场景十分宽泛,以下是其中一些常见的利用场景:
- 异步工作:将须要执行的工作放入音讯队列中,由消费者异步地执行工作,进步零碎的响应速度和并发性。
- 分布式系统:在分布式系统中,通过音讯队列实现各个组件之间的异步通信,进步零碎的可伸缩性和弹性。
- 利用解耦:将音讯队列作为中间件,将各个应用程序解耦,防止间接依赖和影响,进步零碎的可维护性和可扩展性。
- 日志收集:通过将日志音讯发送到音讯队列,使得日志的收集和剖析能够异步地进行,防止间接影响业务解决。
- 音讯告诉:通过音讯队列向用户发送告诉音讯,如短信、邮件等,进步零碎的实时性和可靠性。
- 数据缓存:通过将热点数据缓存到音讯队列中,缩小零碎的拜访压力和响应工夫。
“
总之,音讯队列能够在各种分布式系统和异步场景中发挥作用,使得零碎更加高效、灵便和牢靠。
”
支流的音讯队列有哪些
以下是一些支流的音讯队列:
- Apache Kafka:Apache Kafka 是一种分布式公布 - 订阅音讯零碎,用于解决高吞吐量的实时数据。它是由 LinkedIn 开发的。
- RabbitMQ:RabbitMQ 是一个开源的音讯代理,用于反对多种音讯协定。它能够与多种编程语言一起应用,并且具备可扩展性和高可用性。
- Apache ActiveMQ:Apache ActiveMQ 是一个开源的音讯代理,它实现了 JMS(Java 音讯服务)标准,反对多种传输协定和音讯格局。
- Amazon SQS:Amazon SQS 是亚马逊提供的一种齐全托管的音讯队列服务,它具备高可用性和可扩展性。
- Microsoft Azure Service Bus:Microsoft Azure Service Bus 是微软提供的一种齐全托管的音讯队列服务,反对多种协定和语言,具备高可用性和可扩展性。
- Google Cloud Pub/Sub:Google Cloud Pub/Sub 是 Google 提供的一种齐全托管的音讯队列服务,用于解决大量的实时数据流。
支流的音讯队列比照
以下是一些常见的音讯队列零碎的比照:
- RabbitMQ:RabbitMQ 是一个风行的 AMQP(高级音讯队列协定)音讯代理。它具备良好的性能,可靠性和稳定性,并反对多种协定。RabbitMQ 是一个成熟的音讯队列零碎,具备宽泛的社区反对和丰盛的性能。它的治理界面易于应用,可用于监督和治理队列。
- Apache Kafka:Apache Kafka 是一个高性能的分布式流解决零碎和音讯队列平台。它设计用于解决大量的实时数据,并提供可扩展性,高吞吐量和低提早的个性。Kafka 的音讯模型是基于公布 / 订阅模式的,并反对多个消费者组,能够实现高效的音讯散发和负载平衡。
- ActiveMQ:Apache ActiveMQ 是一个风行的开源音讯代理,反对多种传输协定和编程语言,并提供高可用性和可扩展性的个性。它反对 JMS(Java 音讯服务)标准,并提供牢靠的消息传递和事务反对。
- Redis:Redis 是一种内存数据库,但它也能够用作音讯队列。Redis 的音讯队列模型是基于公布 / 订阅模式的,并反对多种数据结构和高级性能。Redis 的音讯队列性能较高,但可靠性和持久性取决于其配置和应用形式。
- Apache Pulsar:Apache Pulsar 是一个开源的分布式流解决零碎和音讯队列平台,具备高可用性和可扩展性。它反对多种协定和编程语言,并提供高效的消息传递和低提早的个性。Pulsar 的音讯模型是基于公布 / 订阅和队列模式的,并反对多个消费者组和程序音讯。
“
总的来说,每个音讯队列零碎都有本人的优缺点和实用场景,具体抉择取决于理论需要和条件。
”
支流的音讯队列利用场景
不同的音讯队列实用于不同的利用场景。以下是一些支流的音讯队列的利用场景:
- RabbitMQ:
- 高吞吐量的音讯队列
- 多种语言客户端库反对
- 反对多种音讯协定
- 反对简单的路由规定
- 反对音讯确认机制
- 适宜工作队列、日志解决、音讯通信等场景
- Kafka:
- 高吞吐量的音讯队列
- 分布式的设计,反对高可用和程度扩大
- 反对音讯的长久化和多正本备份
- 反对批量发送和生产音讯
- 适宜日志收集、流解决、音讯通信等场景
- ActiveMQ:
- 反对多种协定,如 AMQP、STOMP、OpenWire 等
- 反对多种音讯模式,如点对点、公布订阅等
- 反对事务、音讯确认等机制
- 反对集群和主从模式
- 适宜音讯通信、工作队列、日志解决等场景
- RocketMQ:
- 高吞吐量的音讯队列
- 反对分布式和高可用架构
- 反对多种音讯协定
- 反对批量发送和生产音讯
- 反对事务音讯和音讯轨迹
- 适宜大规模分布式系统、电商场景、金融场景等
- Redis:
- 反对多种数据结构和多种操作,如 list、pub/sub 等
- 反对长久化和复制
- 反对 Lua 脚本扩大
- 适宜高速缓存、音讯通信、工作队列等场景
- Pulsar:
- 适宜大规模分布式系统、物联网等场景
- 反对音讯的长久化和多正本备份
- 反对多租户、多协定和多种语言客户端
- 反对流解决和事务
- 反对动静扩大和缩减
“
总的来说,不同的音讯队列实用于不同的场景和需要,须要依据具体的业务需要抉择适合的音讯队列。
”
Kafka 入门实战
要害概念
Kafka 是一个分布式的流解决平台,罕用于高吞吐量的数据管道和实时流数据处理。以下是 Kafka 的要害概念:
- Topic(主题):Kafka 中的音讯都被公布到 topic,一个 topic 能够被认为是一个数据源,也能够被认为是一个音讯的分类。
- Partition(分区):每个 topic 能够被分为多个 partition,每个 partition 能够存储特定数量的音讯。每个 partition 都有一个惟一的标识符(partition id)。
- Offset:每个 partition 中的每个音讯都会被调配一个惟一的 offset,它是该音讯在 partition 中的惟一标识符。
- Producer(生产者):负责将音讯公布到指定的 topic。
- Consumer(消费者):消费者订阅了一个或多个 topic,并解决被公布到这些 topic 的音讯。
- Consumer Group:一组消费者能够组成一个消费者组,这些消费者一起生产一个 topic 中的所有 partition,每个 partition 只能由一个消费者组中的消费者进行生产。
- Broker:Kafka 集群中的每个节点都称为 broker,负责接管和解决音讯,一个 Kafka 集群能够由多个 broker 组成。
- ZooKeeper:Kafka 应用 ZooKeeper 来保护集群的元数据,如 broker 的状态、topic 和 partition 的状态等。
“
ZooKeeper 是一个开源的分布式协调服务,用于保护配置信息、命名、提供分布式同步和提供组服务等性能。它被设计为高性能、高可用、高扩展性的分布式协调服务,能够使分布式应用程序更加简略和牢靠。ZooKeeper 采纳 ZAB 协定(ZooKeeper Atomic Broadcast)实现主从复制,确保数据的强一致性。它的利用场景包含分布式锁、配置管理、服务发现、集群治理等。
”
Kafka 集群搭建
- 增加名称和 IP 的映射
cat >> /etc/hosts << EOF
192.168.11.247 kafka01
192.168.11.248 kafka02
192.168.11.249 kafka03
EOF
- 敞开防火墙
systemctl stop firewalld
systemctl disable firewalld
- 创立普通用户
groupadd kafka
useradd -g kafka kafka
- 敞开 SELinux
setenforce 0 # 长期敞开
sed -i 's/SELINUX=enforcing/SELINUX=disabled/g' /etc/selinux/config # 永恒敞开
- 装置 java 1.8 环境
# 可联网时应用 Yum 装置
yum install java-1.8.0-openjdk -y
[root@kafka01 ~]# java -version
openjdk version "1.8.0_362"
OpenJDK Runtime Environment (build 1.8.0_362-b08)
OpenJDK 64-Bit Server VM (build 25.362-b08, mixed mode)
[root@kafka01 ~]#
“
留神:接下来的操作请切换到普通用户 kafka 上面进行操作
”
- 下载 Kafka 和 ZooKeeper
wget https://archive.apache.org/dist/kafka/2.7.2/kafka_2.13-2.7.2.tgz
wget https://downloads.apache.org/zookeeper/zookeeper-3.7.1/apache-zookeeper-3.7.1-bin.tar.gz
- 解压 Kafka 和 ZooKeeper
tar -zf kafka_2.13-2.7.2.tgz
tar -zxf apache-zookeeper-3.7.1-bin.tar.gz
- 配置 ZooKeeper 在 ZooKeeper 中,咱们须要创立一个配置文件,该文件应蕴含 ZooKeeper 集群中所有节点的 IP 地址。
在 ZooKeeper 的根目录中,创立一个名为 conf 的文件夹,并在其中创立一个名为 zoo.cfg 的文件。
在 zoo.cfg 中增加以下内容:
tickTime=2000
dataDir=/data/zookeeperData
clientPort=2181
initLimit=5
syncLimit=2
# server.1=IP_ADDRESS_NODE_1:2888:3888
# server.2=IP_ADDRESS_NODE_2:2888:3888
# server.3=IP_ADDRESS_NODE_3:2888:3888
server.1=192.168.11.247:2888:3888
server.2=192.168.11.248:2888:3888
server.3=192.168.11.249:2888:3888
将“IP_ADDRESS_NODE_1”、“IP_ADDRESS_NODE_2”和“IP_ADDRESS_NODE_3”替换为每个 ZooKeeper 节点的 IP 地址。
- 创立 myid 文件 别离在每台节点的 /data/zookeeperData 目录下创立 myid,并写入编号,编号只能为数字,编号对应 zoo.cfg 配置中的 server.1、server.2、server.3。上面是在节点 1 上的配置:
cd zookeeperData/
echo 1 > myid # 对应节点 1
“
另外两个节点请依据理论状况调整配置。
”
- 启动 ZooKeeper
cd apache-zookeeper-3.7.1-bin
./bin/zkServer.sh start
- 配置 Kafka 别离在每台节点的 Kafka 的根目录中,创立一个名为 config 的文件夹,并在其中创立一个名为 server.properties 的文件(如果有则间接编辑即可)这是在节点 1 的 server.properties 中增加以下内容:
broker.id=1
listeners=listener1://192.168.11.247:9092
log.dirs=/data/kafka-logs
# zookeeper.connect=IP_ADDRESS_NODE_1:2181,IP_ADDRESS_NODE_2:2181,IP_ADDRESS_NODE_3:2181
zookeeper.connect=192.168.11.247:2181,192.168.11.248:2181,192.168.11.249:2181
将“IP_ADDRESS_NODE_1”、“IP_ADDRESS_NODE_2”和“IP_ADDRESS_NODE_3”替换为每个 ZooKeeper 节点的 IP 地址。
- broker.id:每台节点都须要配置惟一的 broker.id,以便 Kafka 可能正确地辨认和治理节点。broker.id 是一个整数,用于标识 Kafka 集群中的每个节点。
- listeners:在 Kafka 集群中,listeners 参数用于配置 Kafka 节点侦听客户端申请的地址和端口号。每台节点可能有多个 listeners 参数,以便能够从多个地址或端口号接管客户端申请。
“
另外两个节点请依据理论状况调整配置。
”
- 每台节点设置并行垃圾回收线程的数量 关上 Kafka 启动脚本 kafka-server-start.sh,在文件中找到蕴含 KAFKA_HEAP_OPTS 的行,在 KAFKA_HEAP_OPTS 变量中增加 -XX:ParallelGCThreads 选项,例如 -XX:ParallelGCThreads=4,其中 4 示意您的零碎有 4 个处理器。如果您不确定处理器数量,能够应用 Runtime.getRuntime().availableProcessors() 命令来查问。
例如,如果我的零碎有 1 个处理器,您能够将 KAFKA_HEAP_OPTS 设置为:
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -XX:ParallelGCThreads=1"
- 在每个节点上启动 Kafka
cd kafka_2.13-2.7.2
nohup bin/kafka-server-start.sh config/server.properties &
留神:在生产环境中,倡议应用零碎服务启动 Kafka。
Kafka 基本操作
- 创立一个 Kafka 主题
bin/kafka-topics.sh --create --zookeeper 192.168.11.247:2181 --replication-factor 3 --partitions 5 --topic my_topic
其中,my_topic 是您要创立的主题的名称。在这个命令中,咱们指定了主题的复制因子和分区数。replication-factor 指定了主题的正本数,通常设置为大于 1 的值以实现数据冗余和高可用性。partitions 指定了主题的分区数,这将决定 Kafka 如何在不同的消费者之间调配数据。
- 查看主题
bin/kafka-topics.sh --list --bootstrap-server 192.168.11.247:9092
- 删除主题
bin/kafka-topics.sh --zookeeper 192.168.11.247:2181 --delete --topic my_topic
如果 Kafka 配置中没有启用 delete.topic.enable 参数,那么主题的删除操作不会失效。期待一段时间,直到所有 Kafka 服务器都确认主题已被删除。能够应用以下命令查看主题是否已被删除:
bin/kafka-topics.sh --zookeeper 192.168.11.247:2181 --list
“
请留神,在生产环境中,删除主题时须要分外审慎。删除主题将永恒删除所有与该主题相干的音讯和元数据。在删除主题之前,请确保备份了所有必要的数据并已告诉所有相干方。
”
- 查看 Kafka 主题详情
bin/kafka-topics.sh --describe --bootstrap-server 192.168.11.247:9092 --topic my_topic
- 发送音讯到 Kafka 主题
bin/kafka-console-producer.sh --broker-list 192.168.11.247:9092 --topic my_topic
输出以上命令后,会进入一个交互式的命令行界面。在该界面中,每行输出的文本将被作为一条音讯发送到指定的主题中。按下 Ctrl+C 即可退出该命令行工具。
在上述命令中,192.168.11.247:9092 是 Kafka 集群其中一个节点的地址,my-topic 是主题名称。如果 Kafka 集群有多个节点,则能够用逗号分隔的形式指定多个 Kafka 服务器地址,例如 –broker-list kafka1:9092,kafka2:9092,kafka3:9092。
除了 kafka-console-producer 工具,也能够在编程语言中应用 Kafka 客户端 API 发送音讯到 Kafka 主题。
- 从 Kafka 主题中读取音讯
bin/kafka-console-consumer.sh --bootstrap-server 192.168.11.247:9092 --topic my_topic
输出以上命令后,该命令行工具将从指定的主题中读取音讯,并输入到命令行界面中。如果不指定 –from-beginning 参数,则该命令行工具将从最新的音讯开始读取;如果指定了 –from-beginning 参数,则该命令行工具将从最早的音讯开始读取。
- 查看以后 Kafka 服务器的健康状况
bin/kafka-broker-api-versions.sh --bootstrap-server 192.168.11.247:9092
消费者组
在 Kafka 中,消费者组是一组具备雷同 Group ID 的消费者。消费者组能够订阅一个或多个主题,并独特生产这些主题的音讯。每个消费者组中的消费者能够独立地生产音讯,因而 Kafka 容许分布式解决音讯。
当一个音讯发送到一个订阅了该主题的消费者组时,Kafka 将该音讯发送到组中的一个消费者。如果组中有多个消费者,则 Kafka 会采纳一些算法来确定哪个消费者将接管音讯,例如轮询、范畴和散列等算法。一旦一个消费者接管到音讯并开始解决它,其余消费者将无奈接管该音讯。这样能够确保音讯仅被消费者组中的一个消费者解决,从而防止了反复解决音讯的问题。
应用消费者组的益处包含:
- 反对并行生产:应用消费者组,多个消费者能够并行生产同一个主题的音讯,从而进步音讯解决能力。
- 进步可靠性:当一个消费者呈现故障或离线时,其余消费者能够接替它来解决音讯。
- 管制生产进度:应用消费者组,能够管制消费者生产音讯的地位和进度,例如从特定的偏移量开始生产音讯,或者从最新的音讯开始生产。
须要留神的是,消费者组在 Kafka 中是一个重要的概念,对于了解和应用 Kafka 来说十分重要。同时,Kafka 还提供了一些工具和 API,用于治理和监控消费者组的状态和偏移量信息,以确保 Kafka 消费者组的可靠性和高效性。
- 创立主题
bin/kafka-topics.sh --create --zookeeper 192.168.11.247:2181 --replication-factor 3 --partitions 5 --topic my_topic
- 启动一个消费者组并订阅该主题 my_topic
bin/kafka-console-consumer.sh --bootstrap-server 192.168.11.247:9092 --topic my_topic --group test_group
这将启动一个名为“test_group”的消费者组,并应用“my_topic”主题进行订阅。每个新音讯都将被发送到所有已连贯的消费者。
- 在另一个终端窗口中应用生产者来向主题发送数据
bin/kafka-console-producer.sh --broker-list 192.168.11.247:9092 --topic my_topic
这将启动一个生产者,它将期待输出要发送到“my_topic”主题的音讯。能够随时发送一些测试音讯来测试消费者组是否按预期工作。
- 增加另一个消费者到同一组中,能够关上另一个终端窗口并应用雷同的命令启动消费者
bin/kafka-console-consumer.sh --bootstrap-server 192.168.11.247:9092 --topic my_topic --group test_group
这将启动另一个消费者,并将其增加到名为“test_group”的消费者组中。当初,每个新音讯都将被发送到这两个消费者,它们将独特解决工作负载。
“
请留神,您能够在生产者和消费者之间轻松切换,并尝试不同的组合以测试您的 Kafka 集群。此外,Kafka 消费者组具备更高级的性能,如手动调配分区,从新均衡等,这些性能能够应用 Kafka API 进行实现。
”
kafka 主题有多个分区的发送和读取机制
在 Kafka 主题中有多个分区的状况下,如果在发送音讯时未指定分区,则 Kafka 会依据生产者的默认分区策略来确定将音讯发送到哪个分区。如果在消费者端应用 kafka-console-consumer.sh 命令行工具来读取音讯,并且未指定消费者要读取的分区,则 Kafka 将采纳默认的分区调配策略,该策略会依据消费者组和主题的分区数来调配分区。Kafka 提供了几种调配策略,包含轮询、范畴、散列等。默认状况下,应用轮询策略。
例如,如果您有一个主题,该主题有三个分区,并且有两个消费者退出同一消费者组并订阅该主题,则每个消费者将被调配到一个分区,并开始生产该分区中的音讯。如果有第三个消费者退出消费者组,则该消费者将始终处于闲暇状态,因为曾经有两个消费者解决了所有的分区。
如果您在消费者端应用 kafka-console-consumer.sh 命令行工具来读取音讯,并且想要指定要读取的分区,则能够应用 –partition 参数来指定要读取的分区。例如,应用以下命令来读取名为 test-topic 的主题的第 0 个分区中的音讯:
bin/kafka-console-consumer.sh --bootstrap-server 192.168.11.247:9092 --topic test-topic --partition 0
“
总之,倡议在 Kafka 主题中应用分区时,始终显式指定要发送和读取的分区,以确保音讯在各个分区之间均匀分布,并防止消费者之间的负载不均衡。
”
Kafka Go 客户端库
罕用的 Kafka Go 客户端库
- Sarama:Sarama 是一个应用 Go 编写的 Kafka 客户端库,提供了一系列 API 以简化与 Kafka 的交互。它反对各种 Kafka 性能,如生产者,消费者,管理员等。它还提供了一些高级性能,例如事务,压缩和 TLS 反对。
- Confluent-kafka-go:Confluent-kafka-go 是一个由 Confluent 公司保护的 Kafka 客户端库。它提供了一系列 API 以连贯 Kafka 集群并进行生产者和消费者操作。它反对 TLS,SASL 和 Kerberos 身份验证。
- Shopify/sarama:Shopify/sarama 是一个简略易用的 Kafka 客户端库,反对 Kafka 0.8.2 及以上版本。它反对高吞吐量和低提早,具备高度可配置性。它还提供了一些高级性能,例如 Kafka 音讯的压缩和批处理。
- Segmentio/kafka-go:Segmentio/kafka-go 是一个基于 Go 语言的 Kafka 客户端库,反对 Kafka 0.8 版本及以上。它提供了高性能的生产者和消费者 API,并反对 TLS 和 SASL 身份验证。
- Shopbrain/kafkawire:Shopbrain/kafkawire 是一个轻量级的 Kafka 客户端库,它应用 HTTP/ 2 协定连贯 Kafka 集群。它反对生产者和消费者 API,提供简略易用的 API,实用于解决大量数据的场景。
“
这些库都提供了一系列 API 以与 Kafka 交互,并具备不同的个性和用例,您能够依据本人的需要抉择适宜本人的库。
”
开始写代码
- 装置 Kafka Go 客户端库
go get github.com/Shopify/sarama
- 应用以下代码创立一个名为“test_topic”的主题,该主题有 3 个正本和 6 个分区
package main
import (
"fmt"
"github.com/Shopify/sarama"
)
func main() {
// 创立 Kafka 管理员客户端
admin, err := sarama.NewClusterAdmin([]string{"192.168.11.247:9092"}, sarama.NewConfig())
if err != nil {panic(err)
}
defer admin.Close()
// 要创立的主题的名称、分区数和正本数
topic := "test_topic"
partitions := int32(6)
replicationFactor := int16(3)
// 创立主题配置
topicConfig := &sarama.TopicDetail{
NumPartitions: partitions,
ReplicationFactor: replicationFactor,
ConfigEntries: map[string]*string{},}
// 应用管理员客户端的 CreateTopic 函数创立主题
err = admin.CreateTopic(topic, topicConfig, false)
if err != nil {panic(err)
}
fmt.Println("Topic created successfully")
}
- 创立一个生产者并向 ”test_topic” 主题发送音讯
package main
import (
"fmt"
"github.com/Shopify/sarama"
)
func main() {
// 创立 Kafka 生产者配置
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Retry.Max = 5
config.Producer.Return.Successes = true
// 创立 Kafka 生产者
producer, err := sarama.NewSyncProducer([]string{"192.168.11.247:9092"}, config)
if err != nil {panic(err)
}
defer producer.Close()
// 要发送的音讯
message := &sarama.ProducerMessage{
Topic: "test_topic",
Value: sarama.StringEncoder("hello, kafka"),
}
// 应用生产者的 SendMessage 函数发送音讯
partition, offset, err := producer.SendMessage(message)
if err != nil {panic(err)
}
fmt.Printf("Message sent to partition %d at offset %d\n", partition, offset)
}
- 创立一个消费者,从“test_topic”主题中生产音讯
package main
import (
"fmt"
"os"
"os/signal"
"syscall"
"github.com/Shopify/sarama"
)
func main() {
// 创立 Kafka 消费者配置
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
// 创立 Kafka 消费者
consumer, err := sarama.NewConsumer([]string{"192.168.11.247:9092"}, config)
if err != nil {panic(err)
}
defer consumer.Close()
// 生产的主题和分区
topic := "test_topic"
partition := int32(0)
// 应用消费者的 Consume 函数生产音讯
partitionConsumer, err := consumer.ConsumePartition(topic, partition, sarama.OffsetNewest)
if err != nil {panic(err)
}
defer partitionConsumer.Close()
// 解决生产的音讯
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
for {
select {case msg := <-partitionConsumer.Messages():
fmt.Printf("Received message: Topic=%s, Partition=%d, Offset=%d, Key=%s, Value=%s\n",
msg.Topic, msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
case err := <-partitionConsumer.Errors():
fmt.Println("Error while consuming partition:", err)
case <-signals:
fmt.Println("Interrupt signal received, shutting down consumer...")
return
}
}
}
留神:在下面的例子中,生产者没有指定往哪个分区发消息,消费者也没有指定从哪个分区读取音讯,那么机制是怎么?
Kafka 的生产者在发送音讯时能够不指定分区,这种状况下,Kafka 会应用默认的分区策略来为音讯抉择一个分区。默认的分区策略是基于音讯的 key 值进行哈希计算,从而确定音讯应该被发送到哪个分区中。
如果音讯没有 key 值,那么 Kafka 会应用轮询的形式将音讯顺次发送到每个可用的分区中,以实现负载平衡。
对于消费者来说,当不指定分区时,Kafka 会将消费者调配给所有可用分区的某些分区,以使消费者可能生产所有调配给它的分区的音讯。这个过程叫做分区调配。消费者能够通过指定消费者组来协调多个消费者之间的分区调配。如果消费者组中有多个消费者,则 Kafka 会将所有分区平均地调配给每个消费者,以实现负载平衡。当消费者退出或来到消费者组时,Kafka 会重新分配分区以确保负载平衡。
总的来说,Kafka 的生产者和消费者通过默认的分区策略和分区分配机制来实现主动负载平衡,同时又可能保证数据的可靠性和有序性。
本文转载于 WX 公众号:不背锅运维(喜爱的盆友关注咱们):https://mp.weixin.qq.com/s/SPLs4wv6XHWRIoJVjb8qZg