from flask import Flask, requestfrom flask_redis import FlaskRedisimport time, requests, jsonifyfrom rocketmq.client import PushConsumer, ConsumeStatusfrom celery import Celeryimport all_mqfrom concurrent.futures import ThreadPoolExecutor#executor = ThreadPoolExecutor(2)mq = all_mq.RocketMQ()class FlaskApp(Flask): def __init__(self, *args, **kwargs): super(FlaskApp, self).__init__(*args, **kwargs) self._activate_background_job() def _activate_background_job(self): def run_job(): mq.onMessage() t1 = threading.Thread(target=run_job) t1.start()app = FlaskApp(__name__)
import json, timefrom rocketmq.client import PushConsumer, dll, ConsumeStatusimport tracebackimport loggingclass RocketMQ(): def __init__(self): logging.basicConfig(level=logging.CRITICAL, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') self.logger = logging.getLogger(__name__) self.consumer = PushConsumer("itxxxconsumer") self.consumer.set_name_server_address("rxx-xxx.com:9876") self.topic_name = "itxx" #缩小日志输入 dll.SetPushConsumerLogLevel("rexxx.com:9876".encode('utf-8'), 1) def callback(self,msg): sdp_status_json = msg.body sdp_status_json1 = sdp_status_json.decode('utf-8') print(sdp_status_json1, msg.id) return ConsumeStatus.CONSUME_SUCCESS def onMessage(self): self.consumer.subscribe(self.topic_name, self.callback) self.consumer.start() while True: time.sleep(2) self.consumer.shutdown() def my_func(test_body): print(test_body)