Kafka集群部署指南

33次阅读

共计 4007 个字符,预计需要花费 11 分钟才能阅读完成。

一、前言

1、Kafka 简介

Kafka 是一个开源的分布式消息引擎 / 消息中间件,同时 Kafka 也是一个流处理平台。Kakfa 支持以发布 / 订阅的方式在应用间传递消息,同时并基于消息功能添加了 Kafka Connect、Kafka Streams 以支持连接其他系统的数据(Elasticsearch、Hadoop 等)

Kafka 最核心的最成熟的还是他的消息引擎,所以 Kafka 大部分应用场景还是用来作为消息队列削峰平谷。另外,Kafka 也是目前性能最好的消息中间件。

2、Kafka 架构

在 Kafka 集群 (Cluster) 中,一个 Kafka 节点就是一个 Broker,消息由 Topic 来承载,可以存储在 1 个或多个 Partition 中。发布消息的应用为 Producer、消费消息的应用为 Consumer,多个 Consumer 可以促成 Consumer Group 共同消费一个 Topic 中的消息。

概念 / 对象 简单说明
Broker Kafka 节点
Topic 主题,用来承载消息
Partition 分区,用于主题分片存储
Producer 生产者,向主题发布消息的应用
Consumer 消费者,从主题订阅消息的应用
Consumer Group 消费者组,由多个消费者组成

3、准备工作

1、Kafka 服务器

准备 3 台 CentOS 服务器,并配置好静态 IP、主机名

服务器名 IP 说明
kafka01 192.168.88.51 Kafka 节点 1
kafka02 192.168.88.52 Kafka 节点 2
kafka03 192.168.88.53 Kafka 节点 3

软件版本说明

说明
Linux Server CentOS 7
Kafka 2.3.0

2、ZooKeeper 集群

Kakfa 集群需要依赖 ZooKeeper 存储 Broker、Topic 等信息,这里我们部署三台 ZK

服务器名 IP 说明
zk01 192.168.88.21 ZooKeeper 节点
zk02 192.168.88.22 ZooKeeper 节点
zk03 192.168.88.23 ZooKeeper 节点

部署过程参考:https://ken.io/note/zookeeper…

二、部署过程

1、应用 & 数据目录

# 创建应用目录
mkdir /usr/kafka

#创建 Kafka 数据目录
mkdir /kafka
mkdir /kafka/logs
chmod 777 -R /kafka

2、下载 & 解压

Kafka 官方下载地址:https://kafka.apache.org/down…
这次我下载的是 2.3.0 版本

# 创建并进入下载目录
mkdir /home/downloads
cd /home/downloads

#下载安装包
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.3.0/kafka_2.12-2.3.0.tgz 

#解压到应用目录
tar -zvxf kafka_2.12-2.3.0.tgz -C /usr/kafka

kafka_2.12-2.3.0.tgz 其中 2.12 是 Scala 编译器的版本,2.3.0 才是 Kafka 的版本

3、Kafka 节点配置

# 进入应用目录
cd /usr/kafka/kafka_2.12-2.3.0/

#修改配置文件
vi config/server.properties

通用配置

配置日志目录、指定 ZooKeeper 服务器

# A comma separated list of directories under which to store log files
log.dirs=/kafka/logs

# root directory for all kafka znodes.
zookeeper.connect=192.168.88.21:2181,192.168.88.22:2181,192.168.88.23:2181

分节点配置

  • Kafka01
broker.id=0

#listeners=PLAINTEXT://:9092
listeners=PLAINTEXT://192.168.88.51:9092
  • Kafka02
broker.id=1

#listeners=PLAINTEXT://:9092
listeners=PLAINTEXT://192.168.88.52:9092
  • Kafka03
broker.id=2

#listeners=PLAINTEXT://:9092
listeners=PLAINTEXT://192.168.88.53:9092

4、防火墙配置

# 开放端口
firewall-cmd --add-port=9092/tcp --permanent

#重新加载防火墙配置
firewall-cmd --reload

5、启动 Kafka

# 进入 kafka 根目录
cd /usr/kafka/kafka_2.12-2.3.0/
#启动
/bin/kafka-server-start.sh config/server.properties &

#启动成功输出示例(最后几行)
[2019-06-26 21:48:57,183] INFO Kafka commitId: fc1aaa116b661c8a (org.apache.kafka.common.utils.AppInfoParser)
[2019-06-26 21:48:57,183] INFO Kafka startTimeMs: 1561531737175 (org.apache.kafka.common.utils.AppInfoParser)
[2019-06-26 21:48:57,185] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)

三、Kafka 测试

1、创建 Topic

在 kafka01(Broker)上创建测试 Tpoic:test-ken-io,这里我们指定了 3 个副本、1 个分区

bin/kafka-topics.sh --create --bootstrap-server 192.168.88.51:9092 --replication-factor 3 --partitions 1 --topic test-ken-io

Topic 在 kafka01 上创建后也会同步到集群中另外两个 Broker:kafka02、kafka03

2、查看 Topic

我们可以通过命令列出指定 Broker 的

bin/kafka-topics.sh --list --bootstrap-server 192.168.88.52:9092

3、发送消息

这里我们向 Broker(id=0)的 Topic=test-ken-io 发送消息

bin/kafka-console-producer.sh --broker-list  192.168.88.51:9092  --topic test-ken-io

#消息内容
> test by ken.io

4、消费消息

在 Kafka02 上消费 Broker03 的消息

bin/kafka-console-consumer.sh --bootstrap-server 192.168.88.53:9092 --topic test-ken-io --from-beginning

在 Kafka03 上消费 Broker02 的消息

bin/kafka-console-consumer.sh --bootstrap-server 192.168.88.52:9092 --topic test-ken-io --from-beginning

然后均能收到消息

test by ken.io

这是因为这两个消费消息的命令是建立了两个不同的 Consumer
如果我们启动 Consumer 指定 Consumer Group Id 就可以作为一个消费组协同工,1 个消息同时只会被一个 Consumer 消费到

bin/kafka-console-consumer.sh --bootstrap-server 192.168.88.53:9092 --topic test-ken-io --from-beginning --group testgroup_ken

bin/kafka-console-consumer.sh --bootstrap-server 192.168.88.52:9092 --topic test-ken-io --from-beginning --group testgroup_ken

四、备注

1、Kafka 常用配置项说明

Kafka 常用 Broker 配置说明:

配置项 默认值 / 示例值 说明
broker.id 0 Broker 唯一标识
listeners PLAINTEXT://192.168.88.53:9092 监听信息,PLAINTEXT 表示明文传输
log.dirs kafka/logs kafka 数据存放地址,可以填写多个。用 ”,” 间隔
message.max.bytes message.max.bytes 单个消息长度限制,单位是字节
num.partitions 1 默认分区数
log.flush.interval.messages Long.MaxValue 在数据被写入到硬盘和消费者可用前最大累积的消息的数量
log.flush.interval.ms Long.MaxValue 在数据被写入到硬盘前的最大时间
log.flush.scheduler.interval.ms Long.MaxValue 检查数据是否要写入到硬盘的时间间隔。
log.retention.hours 24 控制一个 log 保留时间,单位:小时
zookeeper.connect 192.168.88.21:2181 ZooKeeper 服务器地址,多台用 ”,” 间隔

2、附录

  • https://kafka.apache.org/
  • https://zh.wikipedia.org/zh-c…

本文首发于我的独立博客:https://ken.io/note/kafka-cluster-deploy-guide

正文完
 0