关于kafka:Kafka-Python的生产者和消费者

52次阅读

共计 2759 个字符,预计需要花费 7 分钟才能阅读完成。

Kafka Python 的生产者和消费者

在本教程中,咱们将应用 Python 构建 Kafka Producer 和 Consumer。除此之外,咱们还将学习如何在 Kafka 中设置配置以及如何应用组和偏移量概念。

建设

对于本教程,咱们应该在计算机上安装 python。另外,咱们须要拜访在咱们的设施或某些服务器上运行的 Apache Kafka。您能够查看如何在 Windows 上装置 Apache Kafka。除此之外,咱们须要 python 的_kafka_ 库来运行咱们的代码。要解决此问题,请在零碎上运行以下命令

pip install kafka

卡夫卡生产者

===

让咱们开始创立本人的 Kafka Producer。咱们必须从 kafka 库导入 KafkaProducer。咱们还须要将 Kafka 服务器的代理列表提供给 Producer,以便它能够连贯到 Kafka 服务器。咱们还须要提供要向其公布音讯的主题名称。这是创立生产者所需的最小配置。

from kafka import KafkaProducer

bootstrap_servers = ['localhost:9092']
topicName = 'myTopic'

producer = KafkaProducer(bootstrap_servers = bootstrap_servers)
producer = KafkaProducer()

咱们能够应用以下代码开始向该主题发送音讯。

ack = producer.send(topicName, b'Hello World!!!!!!!!')

metadata = ack.get()
print(metadata.topic)
print(metadata.partition)

下面的代码将音讯发送到 Kafka 服务器中名为“myTopic”的主题。然而,如果该主题尚未呈现在 Kafka 服务器中怎么办?在这种状况下,Kafka 会应用该名称创立一个新主题并向其公布音讯。不便吗?然而您应该记住要查看主题名称中是否存在拼写错误。

如果要为 Producer 设置更多属性或更改其序列化格局,则能够应用以下代码行。

producer = KafkaProducer(bootstrap_servers = bootstrap_servers, retries = 5,value_serializer=lambda m: json.dumps(m).encode('ascii'))

卡夫卡消费者

实现创立 Producer 的工作后,当初让咱们开始应用 python 构建 Consumer,看看这是否同样容易。导入 KafkaConsumer 后,咱们须要设置提供疏导服务器 ID 和主题名称,以与 Kafka 服务器建设连贯。

from kafka import KafkaConsumer
import sys

bootstrap_servers = ['localhost:9092']
topicName = 'myTopic'

consumer = KafkaConsumer (topicName, group_id = 'group1',bootstrap_servers = bootstrap_servers,
auto_offset_reset = 'earliest')

如咱们所见,咱们须要设置哪个组消费者属于。另外,咱们须要指定偏移量,此使用者应该从该偏移量读取主题中的音讯。在上述情况下,咱们最早指定了 auto_offset_reset,这意味着此使用者将从主题的结尾开始读取音讯。

之后,咱们能够开始浏览主题中的音讯。与每条音讯一起,咱们还取得了一些其余信息,例如音讯所属的分区,在该分区中的偏移量和键。

try:
    for message in consumer:
        print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))

except KeyboardInterrupt:
    sys.exit()

这将以以下格局打印输出。

就是这个。咱们曾经在 python 中创立了第一个 Kafka 使用者。咱们能够看到该使用者曾经浏览了该主题的音讯并将其打印在管制台上。

Docker 运行 Kafka

应用的是 zerocode 提供的 docker-compose 配置文件。

---
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.0.1
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    # -----------------------------------------------------------------------------
    # For connections _internal_ to the docker network, such as from other services
    # and components, use kafka:29092.
    #
    # See https://rmoff.net/2018/08/02/kafka-listeners-explained/ for details
    # -----------------------------------------------------------------------------
    image: confluentinc/cp-kafka:5.0.1
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

留神这里的 PLAINTEXT_HOST://localhost:9092 应用的是 localhost, 所以在容器内部拜访是没有问题,如果须要的是容器之间的拜访,即生产才和消费者也在容器里运行,则须要改成 hostname(如 kafka).

论断

咱们曾经学习了如何在 python 中创立 Kafka 生产者和消费者。

正文完
 0