-- coding:utf-8 --

"""
Author:Yjx
Time:2019-8-17 16:26
"""
from kafka import KafkaProducer
from kafka import KafkaConsumer
from threading import Thread
import time

class kafkaProducer(object):

def __init__(self, kafkatopic, kafkahost, kafkaport):    self.kafkatopic = kafkatopic    self.kafkahost = kafkahost    self.kafkaport = kafkaport    bootstrap_server = []    for host in self.kafkahost:        bootstrap_server.append('{kafka_host}:{kafka_port}'.format(kafka_host=host, kafka_port=self.kafkaport))    self.producer = KafkaProducer(bootstrap_servers=bootstrap_server)def sendMsg(self):    try:        for i in range(10):            parmas_message = 'send_message_' + str(i+1)            self.producer.send(self.kafkatopic, parmas_message.encode('utf-8'))            self.producer.flush()            print(parmas_message)        self.producer.close()        print('****over****')    except KafkaError as e:        print('KafkaError:', e)

class kafkaConsumer(object):

def __init__(self, kafkatopic, kafkahost, kafkaport):    self.kafkatopic = kafkatopic    self.kafkahost = kafkahost    self.kafkaport = kafkaport    bootstrap_server = []    for host in self.kafkahost:        bootstrap_server.append('{kafka_host}:{kafka_port}'.format(kafka_host=host, kafka_port=self.kafkaport))    self.consumer = KafkaConsumer(self.kafkatopic, bootstrap_servers=bootstrap_server)def getMsg(self):    n = 1    # print(time.time())    try:        for msg in self.consumer:            print('****get_message_%d****' % n, msg)            n += 1        print('****done****')    except KafkaError as e:        print('KafkaError:', e)

def runGetMsg(para_host, para_port):

consumer = kafkaConsumer("topic-1", para_host, para_port)consumer.getMsg()

def runSendMsg(para_host, para_port):

producer = kafkaProducer("topic-1", para_host, para_port)producer.sendMsg()

if name == '__main__':

para_host = []para_port = 9092thread_1 = Thread(target=runSendMsg, args=(para_host, para_port))thread_1.start()thread_2 = Thread(target=runGetMsg, args=(para_host, para_port))thread_2.start()