-- coding:utf-8 --

"""
Author:Yjx
Time:2019-8-17 14:22
"""
from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.errors import KafkaError
from threading import Thread
import time

class kafkaConsumer(object):

def __init__(self, kafkatopic, kafkahost, kafkaport):    self.kafkatopic = kafkatopic    self.kafkahost = kafkahost    self.kafkaport = kafkaport    bootstrap_server = []    for host in self.kafkahost:        bootstrap_server.append('{kafka_host}:{kafka_port}'.format(kafka_host=host, kafka_port=self.kafkaport))    self.consumer = KafkaConsumer(self.kafkatopic, bootstrap_servers=bootstrap_server, auto_offset_reset='latest', consumer_timeout_ms=20000)def getMsg(self):    try:        for msg in self.consumer:            # yield msg            print('get_key_>>>>', msg.key, 'get_message_>>>>', msg.value)            print('get_message_>>>>', msg)        # print('****done****')    except KafkaError as e:        print('KafkaError:', e)

if name == '__main__':

para_host = ['10.20.15.242', '10.20.15.239', '10.20.15.247']para_port = 9092consumer = kafkaConsumer("topic-1", para_host, para_port)messages = consumer.getMsg()