-- coding:utf-8 --
"""
Author:Yjx
Time:2019-8-17 14:22
"""
from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.errors import KafkaError
from threading import Thread
import time
class kafkaConsumer(object):
def __init__(self, kafkatopic, kafkahost, kafkaport): self.kafkatopic = kafkatopic self.kafkahost = kafkahost self.kafkaport = kafkaport bootstrap_server = [] for host in self.kafkahost: bootstrap_server.append('{kafka_host}:{kafka_port}'.format(kafka_host=host, kafka_port=self.kafkaport)) self.consumer = KafkaConsumer(self.kafkatopic, bootstrap_servers=bootstrap_server, auto_offset_reset='latest', consumer_timeout_ms=20000)def getMsg(self): try: for msg in self.consumer: # yield msg print('get_key_>>>>', msg.key, 'get_message_>>>>', msg.value) print('get_message_>>>>', msg) # print('****done****') except KafkaError as e: print('KafkaError:', e)
if name == '__main__':
para_host = ['10.20.15.242', '10.20.15.239', '10.20.15.247']para_port = 9092consumer = kafkaConsumer("topic-1", para_host, para_port)messages = consumer.getMsg()