乐趣区

python3连接kafka模块pykafka生产者简单封装

1.1 安装模块

pip install pykafka

1.2 基本使用

  # -* coding:utf8 *-  
  from pykafka import KafkaClient  
  host = 'IP:9092, IP:9092, IP:9092'
  client = KafkaClient(hosts = host)  
  # 生产者  
  topicdocu = client.topics['my-topic']  
  producer = topicdocu.get_producer()  
  for i in range(100):  
      print i  
      producer.produce('test message' + str(i ** 2))  
  producer.stop()

1.3 简单封装

  class KafkaProduct():

      def __init__(self,hosts,topic):
          """
          初始化实例
          :param hosts: 连接地址
          :param topic:
          """
          self.__client = KafkaClient(hosts=hosts)
          self.__topic = self.__client.topics[topic.encode()]
  
      def __set_topic(self, topic):
          self.__topic = self.__client.topics[topic.encode()]
  
      def set_topic(self, topic):
          """
          设置 topic
          :param topic:
          :return:
          """
          self.__set_topic(topic)
  
      def get_topics(self):
          """
          获取当前所有 topic
          :return:
          """
          return self.__client.topics
  
      def get_topic(self):
          """
          获取当前 topic
          :return:
          """
          return self.__topic
  
      def Producer(self):
          """
          生产者对象
          :return:
          """
          with self.__topic.get_producer(delivery_reports=True) as producer:
              next_data = ''
              while True:
                  if next_data:
                      producer.produce(str(next_data).encode())
                  next_data = yield True
  
      def send_data(self,datas):
          """
          发送数据
          :param datas: 需要传入的可迭代对象
          :return:
          """
          c = self.Producer()
          next(c)
          for i in datas:
              c.send(i)

if __name__ == '__main__':

    hosts = "1.2.3.4:9999,2.3.4.5:9090" #连接 hosts
    topic = "test_523"
    K = KafkaProduct(hosts=hosts, topic=topic)  #
    #K.set_topic("test")  #切换设置新的 topic
    K.get_topic()  #获取当前设置的 topic
    #K.get_topics() #获取所有 topic
    data = range(10000)  #要发送的可迭代对象
    K.send_data(data)

1.4 引用来源

博客园:Python 测试 Kafka 集群 (pykafka)

知乎: 使用生成器把 Kafka 写入效率提高 1000 倍

退出移动版