本机 IP
是10.30.6.24
,前面配置过程当中须要根据本人 IP
信息配置批改
kafka
默认应用 127.0.0.1
拜访
配置 compose.yaml
文件如下
services:
zookeeper:
image: zookeeper
container_name: demo-zookeeper
ports:
- "2181:2181"
restart: always
kafka:
image: wurstmeister/kafka
container_name: demo-kafka
ports:
- "9092:9092"
ulimits:
nofile:
soft: 262144
hard: 262144
environment:
DOCKER_API_VERSION: 1.41
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://10.30.6.24:9092"
KAFKA_LISTENERS: "PLAINTEXT://0.0.0.0:9092"
KAFKA_BROKER_ID: 1
KAFKA_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LOG_DIRS: /kafka/kafka-logs-backend
depends_on:
- zookeeper
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- kafka-data:/kafka
command: /bin/sh -c "rm -f /kafka/kafka-logs-backend/meta.properties && start-kafka.sh"
volumes:
kafka-data: {}
启动命令
$ docker compose up -d
配置参数解释
ulimits
- 操作系统提供限度可应用资源量的形式
linux
零碎默认是 1024 个,具体执行命令ulimit -a
查看- 因为音讯队列文件读写频繁,须要调大该配置,批改
kafka
的默认最大关上文件数量 - 限度能够是硬限度或软限度,但软限度不能超过硬限度
环境变量解释
DOCKER_API_VERSION
:docker version
命令的API Version
输入信息KAFKA_ADVERTISED_LISTENERS
: 把kafka
的地址端口注册给zookeeper
, 这个中央的数据目前是PLAINTEXT://10.30.6.24:9092
,这个IP
地址须要根据具体机器IP
进行批改,指明客户端通过哪个IP
能够拜访到以后节点 ,如果网卡IP
有批改的话也须要批改这个中央的配置KAFKA_LISTENERS
:配置kafka
的监听端口,指明kafka
以后节点监听本机的哪个网卡,这个中央的IP
地址能够填写为0.0.0.0
示意监听所有网卡的信息KAFKA_BROKER_ID
: 一个kafka
节点 就是一个broker
,一个集群外面的broker id
惟一KAFKA_PORT
: 配置kafka
凋谢端口KAFKA_ZOOKEEPER_CONNECT
: 配置对应的zookeeper
连贯信息,因为是在同一个docker compose
当中,所以能够应用服务名称作为host
连贯信息KAFKA_LOG_DIRS
: 保留日志数据的目录,默认是/tmp/kafka-logs
挂载卷解释
- /var/run/docker.sock:/var/run/docker.sock
: 把 docker
的sock
挂在进去
- kafka-data:/kafka
: 把 kafka
日志信息挂载进去进行长久化,如果不须要进行数据长久化,能够去掉这一步挂载
启动命令
/bin/sh -c "rm -f /kafka/kafka-logs-backend/meta.properties && start-kafka.sh"
因为挂载数据的时候会把 kafka
的配置信息也挂载进去,并且保留在 meta.properties
文件当中
文件内容如下,会保留一个 cluster.id
,当容器销毁重建时候,kafka
会从新创立一个 cluster.id
,同时也会去查看meta.properties
的信息
#
#Mon Jun 27 06:38:03 GMT 2022
cluster.id=XMHTDGRvQ5yJnEfXKhuabg
version=0
broker.id=1
当容器启动中会产生报错如下,次要是 kafka
查看 cluster.id
不统一导致
kafka.common.InconsistentClusterIdException: The Cluster ID 2Z7pfznDRmWeLJNp3nZm8A doesn't match stored clusterId Some(XMHTDGRvQ5yJnEfXKhuabg) in meta.properties. The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.
所以须要配置在 kafka
启动之前删除长久化保留的 meta.properties
配置信息,这一步不影响长久化数据,次要是防止抵触报错
python
客户端操作
装置依赖库
$ pip install kafka-python
生产者
producer.py
import json
from kafka import KafkaProducer
# 配置 value 序列化办法,抉择 kafka 节点信息
# 如果是近程 broker 须要把 127.0.0.1 批改为对应 IP 地址
producer = KafkaProducer(value_serializer=lambda m: json.dumps(m).encode('ascii'),
bootstrap_servers=['10.30.6.24:9092'])
# 发送操作默认是异步的
for _ in range(100):
producer.send('my-topic', {'key': 'value'})
# 阻塞直到操作理论 send
producer.flush()
消费者
consumer.py
import json
from kafka import KafkaConsumer
# consumer 配置,topic 信息和生产者雷同
consumer = KafkaConsumer('my-topic',
group_id='my-group',
auto_offset_reset='earliest',
bootstrap_servers=['10.30.6.24:9092'],
value_deserializer=lambda m: json.loads(m.decode('ascii')))
for message in consumer:
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
auto_offset_reset
可选参数如下
earliest
: 当各分区下有已提交的offset
时,从提交的offset
开始生产,无提交的offset
时,从头开始生产latest
: 当各分区下有已提交的offset
时,从提交的offset
开始生产,无提交的offset
时,生产新产生的该分区下的数据(默认选项是这个)none
:topic
各分区都存在已提交的offset
时,从offset
后开始生产,只有有一个分区不存在已提交的offset
,则抛出异样(不要应用这个)
开启两个终端别离执行
$ python producer.py
$ python consumer.py
拓展
如果 python
客户端也是在容器外面,能够批改 compose.yaml
的kafka
容器的环境变量配置
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:9092"
python
消费者和生产者能够应用 kafka:9092
拜访broker
如果程序是在容器里面,也能够配置批改 /etc/hosts
新增一行数据
127.0.0.1 kafka
最终实现根据 kafka
这个 host
参数进行拜访
浏览参考
kafka
官网文档
kafka
配置近程拜访
kafka python
第三方库文档
kafka
了解 listeners
和advertised.listeners