关于python:python操作docker-kafka

31次阅读

共计 3508 个字符,预计需要花费 9 分钟才能阅读完成。

本机 IP10.30.6.24,前面配置过程当中须要根据本人 IP 信息配置批改

kafka默认应用 127.0.0.1 拜访

配置 compose.yaml 文件如下

services:
  zookeeper:
    image: zookeeper
    container_name: demo-zookeeper
    ports:
      - "2181:2181"
    restart: always

  kafka:
    image: wurstmeister/kafka
    container_name: demo-kafka
    ports:
      - "9092:9092"
    ulimits:
      nofile:
        soft: 262144
        hard: 262144
    environment:
      DOCKER_API_VERSION: 1.41
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://10.30.6.24:9092"
      KAFKA_LISTENERS: "PLAINTEXT://0.0.0.0:9092"
      KAFKA_BROKER_ID: 1
      KAFKA_PORT: 9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LOG_DIRS: /kafka/kafka-logs-backend
    depends_on:
      - zookeeper
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
      - kafka-data:/kafka
    command: /bin/sh -c "rm -f /kafka/kafka-logs-backend/meta.properties && start-kafka.sh"


volumes:
  kafka-data: {}

启动命令

$ docker compose up -d

配置参数解释

ulimits

  • 操作系统提供限度可应用资源量的形式
  • linux零碎默认是 1024 个,具体执行命令 ulimit -a 查看
  • 因为音讯队列文件读写频繁,须要调大该配置,批改 kafka 的默认最大关上文件数量
  • 限度能够是硬限度或软限度,但软限度不能超过硬限度

环境变量解释

  • DOCKER_API_VERSION: docker version命令的 API Version 输入信息
  • KAFKA_ADVERTISED_LISTENERS: 把 kafka 的地址端口注册给 zookeeper, 这个中央的数据目前是PLAINTEXT://10.30.6.24:9092,这个IP 地址须要根据具体机器 IP 进行批改,指明客户端通过哪个 IP 能够拜访到以后节点 ,如果网卡IP 有批改的话也须要批改这个中央的配置
  • KAFKA_LISTENERS:配置 kafka 的监听端口,指明 kafka 以后节点监听本机的哪个网卡,这个中央的 IP 地址能够填写为 0.0.0.0 示意监听所有网卡的信息
  • KAFKA_BROKER_ID: 一个 kafka节点 就是一个 broker,一个集群外面的 broker id 惟一
  • KAFKA_PORT: 配置 kafka 凋谢端口
  • KAFKA_ZOOKEEPER_CONNECT: 配置对应的 zookeeper 连贯信息,因为是在同一个 docker compose 当中,所以能够应用服务名称作为 host 连贯信息
  • KAFKA_LOG_DIRS: 保留日志数据的目录,默认是/tmp/kafka-logs

挂载卷解释

- /var/run/docker.sock:/var/run/docker.sock: 把 dockersock挂在进去

- kafka-data:/kafka: 把 kafka 日志信息挂载进去进行长久化,如果不须要进行数据长久化,能够去掉这一步挂载

启动命令

/bin/sh -c "rm -f /kafka/kafka-logs-backend/meta.properties && start-kafka.sh"

因为挂载数据的时候会把 kafka 的配置信息也挂载进去,并且保留在 meta.properties 文件当中

文件内容如下,会保留一个 cluster.id,当容器销毁重建时候,kafka 会从新创立一个 cluster.id,同时也会去查看meta.properties 的信息

#
#Mon Jun 27 06:38:03 GMT 2022
cluster.id=XMHTDGRvQ5yJnEfXKhuabg
version=0
broker.id=1

当容器启动中会产生报错如下,次要是 kafka 查看 cluster.id 不统一导致

kafka.common.InconsistentClusterIdException: The Cluster ID 2Z7pfznDRmWeLJNp3nZm8A doesn't match stored clusterId Some(XMHTDGRvQ5yJnEfXKhuabg) in meta.properties. The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.

所以须要配置在 kafka 启动之前删除长久化保留的 meta.properties 配置信息,这一步不影响长久化数据,次要是防止抵触报错

python客户端操作

装置依赖库

$ pip install kafka-python

生产者

producer.py

import json

from kafka import KafkaProducer

# 配置 value 序列化办法,抉择 kafka 节点信息
# 如果是近程 broker 须要把 127.0.0.1 批改为对应 IP 地址
producer = KafkaProducer(value_serializer=lambda m: json.dumps(m).encode('ascii'),
    bootstrap_servers=['10.30.6.24:9092'])

# 发送操作默认是异步的
for _ in range(100):
    producer.send('my-topic', {'key': 'value'})

# 阻塞直到操作理论 send
producer.flush()

消费者

consumer.py

import json

from kafka import KafkaConsumer

# consumer 配置,topic 信息和生产者雷同
consumer = KafkaConsumer('my-topic',
                         group_id='my-group',
                         auto_offset_reset='earliest',
                         bootstrap_servers=['10.30.6.24:9092'],
                         value_deserializer=lambda m: json.loads(m.decode('ascii')))
for message in consumer:
    # message value and key are raw bytes -- decode if necessary!
    # e.g., for unicode: `message.value.decode('utf-8')`
    print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))

auto_offset_reset可选参数如下

  • earliest: 当各分区下有已提交的 offset 时,从提交的 offset 开始生产,无提交的 offset 时,从头开始生产
  • latest: 当各分区下有已提交的 offset 时,从提交的 offset 开始生产,无提交的 offset 时,生产新产生的该分区下的数据(默认选项是这个)
  • none: topic各分区都存在已提交的 offset 时,从 offset 后开始生产,只有有一个分区不存在已提交的offset,则抛出异样(不要应用这个)

开启两个终端别离执行

$ python producer.py
$ python consumer.py

拓展

如果 python 客户端也是在容器外面,能够批改 compose.yamlkafka容器的环境变量配置

KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:9092"

python消费者和生产者能够应用 kafka:9092 拜访broker

如果程序是在容器里面,也能够配置批改 /etc/hosts 新增一行数据

127.0.0.1 kafka

最终实现根据 kafka 这个 host 参数进行拜访

浏览参考

kafka官网文档

kafka配置近程拜访

kafka python第三方库文档

kafka了解 listenersadvertised.listeners

正文完
 0