from flask import Flask, requestfrom flask_redis import FlaskRedisimport time, requests, jsonifyfrom rocketmq.client import PushConsumer, ConsumeStatusfrom celery import Celeryimport all_mqfrom concurrent.futures import ThreadPoolExecutor#executor = ThreadPoolExecutor(2)mq = all_mq.RocketMQ()class FlaskApp(Flask):    def __init__(self, *args, **kwargs):        super(FlaskApp, self).__init__(*args, **kwargs)        self._activate_background_job()    def _activate_background_job(self):        def run_job():            mq.onMessage()        t1 = threading.Thread(target=run_job)        t1.start()app = FlaskApp(__name__)
import json, timefrom rocketmq.client import PushConsumer, dll, ConsumeStatusimport tracebackimport loggingclass RocketMQ():    def __init__(self):        logging.basicConfig(level=logging.CRITICAL, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')        self.logger = logging.getLogger(__name__)        self.consumer = PushConsumer("itxxxconsumer")        self.consumer.set_name_server_address("rxx-xxx.com:9876")        self.topic_name = "itxx"        #缩小日志输入        dll.SetPushConsumerLogLevel("rexxx.com:9876".encode('utf-8'), 1)    def callback(self,msg):        sdp_status_json = msg.body        sdp_status_json1 = sdp_status_json.decode('utf-8')        print(sdp_status_json1, msg.id)        return ConsumeStatus.CONSUME_SUCCESS    def onMessage(self):        self.consumer.subscribe(self.topic_name, self.callback)        self.consumer.start()        while True:            time.sleep(2)        self.consumer.shutdown()    def my_func(test_body):        print(test_body)