1.1安装模块

pip install pykafka

1.2基本使用

  # -* coding:utf8 *-    from pykafka import KafkaClient    host = 'IP:9092, IP:9092, IP:9092'  client = KafkaClient(hosts = host)    # 生产者    topicdocu = client.topics['my-topic']    producer = topicdocu.get_producer()    for i in range(100):        print i        producer.produce('test message ' + str(i ** 2))    producer.stop()

1.3简单封装

  class KafkaProduct():      def __init__(self,hosts,topic):          """          初始化实例          :param hosts: 连接地址          :param topic:          """          self.__client = KafkaClient(hosts=hosts)          self.__topic = self.__client.topics[topic.encode()]        def __set_topic(self, topic):          self.__topic = self.__client.topics[topic.encode()]        def set_topic(self, topic):          """          设置topic          :param topic:          :return:          """          self.__set_topic(topic)        def get_topics(self):          """          获取当前所有topic          :return:          """          return self.__client.topics        def get_topic(self):          """          获取当前topic          :return:          """          return self.__topic        def Producer(self):          """          生产者对象          :return:          """          with self.__topic.get_producer(delivery_reports=True) as producer:              next_data = ''              while True:                  if next_data:                      producer.produce(str(next_data).encode())                  next_data = yield True        def send_data(self,datas):          """          发送数据          :param datas:需要传入的可迭代对象          :return:          """          c = self.Producer()          next(c)          for i in datas:              c.send(i)if __name__ == '__main__':    hosts = "1.2.3.4:9999,2.3.4.5:9090" #连接hosts    topic = "test_523"    K = KafkaProduct(hosts=hosts, topic=topic)  #    #K.set_topic("test")  #切换设置新的topic    K.get_topic()  #获取当前设置的topic    #K.get_topics() #获取所有topic    data = range(10000)  #要发送的可迭代对象    K.send_data(data)

1.4引用来源

博客园:Python测试Kafka集群(pykafka)

知乎:使用生成器把Kafka写入效率提高1000倍