–– coding:utf-8 ––
“””
Author:Yjx
Time:2019-8-17 16:26
“””
from kafka import KafkaProducer
from kafka import KafkaConsumer
from threading import Thread
import time
class kafkaProducer(object):
def __init__(self, kafkatopic, kafkahost, kafkaport):
self.kafkatopic = kafkatopic
self.kafkahost = kafkahost
self.kafkaport = kafkaport
bootstrap_server = []
for host in self.kafkahost:
bootstrap_server.append('{kafka_host}:{kafka_port}'.format(kafka_host=host, kafka_port=self.kafkaport))
self.producer = KafkaProducer(bootstrap_servers=bootstrap_server)
def sendMsg(self):
try:
for i in range(10):
parmas_message = 'send_message_' + str(i+1)
self.producer.send(self.kafkatopic, parmas_message.encode('utf-8'))
self.producer.flush()
print(parmas_message)
self.producer.close()
print('****over****')
except KafkaError as e:
print('KafkaError:', e)
class kafkaConsumer(object):
def __init__(self, kafkatopic, kafkahost, kafkaport):
self.kafkatopic = kafkatopic
self.kafkahost = kafkahost
self.kafkaport = kafkaport
bootstrap_server = []
for host in self.kafkahost:
bootstrap_server.append('{kafka_host}:{kafka_port}'.format(kafka_host=host, kafka_port=self.kafkaport))
self.consumer = KafkaConsumer(self.kafkatopic, bootstrap_servers=bootstrap_server)
def getMsg(self):
n = 1
# print(time.time())
try:
for msg in self.consumer:
print('****get_message_%d****' % n, msg)
n += 1
print('****done****')
except KafkaError as e:
print('KafkaError:', e)
def runGetMsg(para_host, para_port):
consumer = kafkaConsumer("topic-1", para_host, para_port)
consumer.getMsg()
def runSendMsg(para_host, para_port):
producer = kafkaProducer("topic-1", para_host, para_port)
producer.sendMsg()
if name == ‘__main__’:
para_host = []
para_port = 9092
thread_1 = Thread(target=runSendMsg, args=(para_host, para_port))
thread_1.start()
thread_2 = Thread(target=runGetMsg, args=(para_host, para_port))
thread_2.start()