kafkaconsumer

78次阅读

共计 971 个字符,预计需要花费 3 分钟才能阅读完成。

– coding:utf-8 –

“””
Author:Yjx
Time:2019-8-17 14:22
“””
from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.errors import KafkaError
from threading import Thread
import time

class kafkaConsumer(object):

def __init__(self, kafkatopic, kafkahost, kafkaport):
    self.kafkatopic = kafkatopic
    self.kafkahost = kafkahost
    self.kafkaport = kafkaport
    bootstrap_server = []
    for host in self.kafkahost:
        bootstrap_server.append('{kafka_host}:{kafka_port}'.format(kafka_host=host, kafka_port=self.kafkaport))
    self.consumer = KafkaConsumer(self.kafkatopic, bootstrap_servers=bootstrap_server, auto_offset_reset='latest', consumer_timeout_ms=20000)

def getMsg(self):
    try:
        for msg in self.consumer:
            # yield msg
            print('get_key_>>>>', msg.key, 'get_message_>>>>', msg.value)
            print('get_message_>>>>', msg)
        # print('****done****')
    except KafkaError as e:
        print('KafkaError:', e)

if name == ‘__main__’:

para_host = ['10.20.15.242', '10.20.15.239', '10.20.15.247']
para_port = 9092
consumer = kafkaConsumer("topic-1", para_host, para_port)
messages = consumer.getMsg()

正文完
 0