–– coding:utf-8 ––
“””
Author:Yjx
Time:2019-8-17 14:22
“””
from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.errors import KafkaError
from threading import Thread
import time
class kafkaConsumer(object):
def __init__(self, kafkatopic, kafkahost, kafkaport):
self.kafkatopic = kafkatopic
self.kafkahost = kafkahost
self.kafkaport = kafkaport
bootstrap_server = []
for host in self.kafkahost:
bootstrap_server.append('{kafka_host}:{kafka_port}'.format(kafka_host=host, kafka_port=self.kafkaport))
self.consumer = KafkaConsumer(self.kafkatopic, bootstrap_servers=bootstrap_server, auto_offset_reset='latest', consumer_timeout_ms=20000)
def getMsg(self):
try:
for msg in self.consumer:
# yield msg
print('get_key_>>>>', msg.key, 'get_message_>>>>', msg.value)
print('get_message_>>>>', msg)
# print('****done****')
except KafkaError as e:
print('KafkaError:', e)
if name == ‘__main__’:
para_host = ['10.20.15.242', '10.20.15.239', '10.20.15.247']
para_port = 9092
consumer = kafkaConsumer("topic-1", para_host, para_port)
messages = consumer.getMsg()