本机IP10.30.6.24,前面配置过程当中须要根据本人IP信息配置批改

kafka默认应用127.0.0.1拜访

配置compose.yaml文件如下

services:  zookeeper:    image: zookeeper    container_name: demo-zookeeper    ports:      - "2181:2181"    restart: always  kafka:    image: wurstmeister/kafka    container_name: demo-kafka    ports:      - "9092:9092"    ulimits:      nofile:        soft: 262144        hard: 262144    environment:      DOCKER_API_VERSION: 1.41      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://10.30.6.24:9092"      KAFKA_LISTENERS: "PLAINTEXT://0.0.0.0:9092"      KAFKA_BROKER_ID: 1      KAFKA_PORT: 9092      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181      KAFKA_LOG_DIRS: /kafka/kafka-logs-backend    depends_on:      - zookeeper    volumes:      - /var/run/docker.sock:/var/run/docker.sock      - kafka-data:/kafka    command: /bin/sh -c "rm -f /kafka/kafka-logs-backend/meta.properties && start-kafka.sh"volumes:  kafka-data: {}

启动命令

$ docker compose up -d

配置参数解释

ulimits

  • 操作系统提供限度可应用资源量的形式
  • linux零碎默认是1024个,具体执行命令ulimit -a查看
  • 因为音讯队列文件读写频繁,须要调大该配置,批改kafka的默认最大关上文件数量
  • 限度能够是硬限度或软限度,但软限度不能超过硬限度

环境变量解释

  • DOCKER_API_VERSION: docker version命令的API Version输入信息
  • KAFKA_ADVERTISED_LISTENERS: 把kafka的地址端口注册给zookeeper,这个中央的数据目前是PLAINTEXT://10.30.6.24:9092,这个IP地址须要根据具体机器IP进行批改,指明客户端通过哪个 IP 能够拜访到以后节点,如果网卡IP有批改的话也须要批改这个中央的配置
  • KAFKA_LISTENERS: 配置kafka的监听端口,指明 kafka 以后节点监听本机的哪个网卡,这个中央的IP地址能够填写为0.0.0.0示意监听所有网卡的信息
  • KAFKA_BROKER_ID: 一个 kafka节点 就是一个 broker,一个集群外面的broker id惟一
  • KAFKA_PORT: 配置kafka凋谢端口
  • KAFKA_ZOOKEEPER_CONNECT: 配置对应的zookeeper连贯信息,因为是在同一个docker compose当中,所以能够应用服务名称作为host连贯信息
  • KAFKA_LOG_DIRS: 保留日志数据的目录,默认是/tmp/kafka-logs

挂载卷解释

- /var/run/docker.sock:/var/run/docker.sock: 把dockersock挂在进去

- kafka-data:/kafka: 把kafka日志信息挂载进去进行长久化,如果不须要进行数据长久化,能够去掉这一步挂载

启动命令

/bin/sh -c "rm -f /kafka/kafka-logs-backend/meta.properties && start-kafka.sh"

因为挂载数据的时候会把kafka的配置信息也挂载进去,并且保留在meta.properties文件当中

文件内容如下,会保留一个cluster.id,当容器销毁重建时候,kafka会从新创立一个cluster.id,同时也会去查看meta.properties的信息

##Mon Jun 27 06:38:03 GMT 2022cluster.id=XMHTDGRvQ5yJnEfXKhuabgversion=0broker.id=1

当容器启动中会产生报错如下,次要是kafka查看cluster.id不统一导致

kafka.common.InconsistentClusterIdException: The Cluster ID 2Z7pfznDRmWeLJNp3nZm8A doesn't match stored clusterId Some(XMHTDGRvQ5yJnEfXKhuabg) in meta.properties. The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.

所以须要配置在kafka启动之前删除长久化保留的meta.properties配置信息,这一步不影响长久化数据,次要是防止抵触报错

python客户端操作

装置依赖库

$ pip install kafka-python

生产者

producer.py

import jsonfrom kafka import KafkaProducer# 配置value序列化办法,抉择kafka节点信息# 如果是近程broker须要把127.0.0.1批改为对应IP地址producer = KafkaProducer(    value_serializer=lambda m: json.dumps(m).encode('ascii'),    bootstrap_servers=['10.30.6.24:9092'])# 发送操作默认是异步的for _ in range(100):    producer.send('my-topic', {'key': 'value'})# 阻塞直到操作理论sendproducer.flush()

消费者

consumer.py

import jsonfrom kafka import KafkaConsumer# consumer配置,topic信息和生产者雷同consumer = KafkaConsumer('my-topic',                         group_id='my-group',                         auto_offset_reset='earliest',                         bootstrap_servers=['10.30.6.24:9092'],                         value_deserializer=lambda m: json.loads(m.decode('ascii')))for message in consumer:    # message value and key are raw bytes -- decode if necessary!    # e.g., for unicode: `message.value.decode('utf-8')`    print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,                                          message.offset, message.key,                                          message.value))

auto_offset_reset可选参数如下

  • earliest: 当各分区下有已提交的offset时,从提交的offset开始生产,无提交的offset时,从头开始生产
  • latest: 当各分区下有已提交的offset时,从提交的offset开始生产,无提交的offset时,生产新产生的该分区下的数据(默认选项是这个)
  • none: topic各分区都存在已提交的offset时,从offset后开始生产,只有有一个分区不存在已提交的offset,则抛出异样(不要应用这个)

开启两个终端别离执行

$ python producer.py
$ python consumer.py

拓展

如果python客户端也是在容器外面,能够批改compose.yamlkafka容器的环境变量配置

KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:9092"

python消费者和生产者能够应用kafka:9092拜访broker

如果程序是在容器里面,也能够配置批改/etc/hosts新增一行数据

127.0.0.1 kafka

最终实现根据kafka这个host参数进行拜访

浏览参考

kafka官网文档

kafka配置近程拜访

kafka python第三方库文档

kafka了解listenersadvertised.listeners