单线程

对于只有一个过程一个线程一个 amqp 连贯的状况

咱们开了一个连贯用来工作生产,这个时候,咱们须要一个『后盾』来帮咱们保护 amqp 的心跳

怎么实现这个『后盾』呢?

抉择其实很多,比方:

  • 线程
  • eventlet 协程
  • gevent 协程

上面就以 『eventlet 协程』 举例子了

from kombu import Connection, Consumer, Queuefrom kombu import Exchange, Queuefrom loguru import loggerfrom kombu.transport.pyamqp import Messageimport timeimport eventleteventlet.monkey_patch()amqp_uri = 'amqp://pon:[email protected]:5672//'flag = 0def handle_message(message: Message):    logger.debug(message.body)    message.ack()refresh_exchange = Exchange('refresh', type='topic')imdb_queue = Queue('imdb', refresh_exchange,                   routing_key='to_imdb', durable=True)heartbeat_interval = 5def heartbeat_check_forever(heartbeat_interval: int | float = None):    while True:        conn.heartbeat_check()        logger.debug(f'心跳查看实现')        time.sleep(heartbeat_interval/2/2 if heartbeat_interval else 1)with Connection(amqp_uri, heartbeat=heartbeat_interval) as conn:    with conn.channel() as channel:        consumer = Consumer(channel, queues=[imdb_queue], prefetch_count=10)        consumer.on_message(handle_message)        with consumer:            eventlet.spawn_n(heartbeat_check_forever, heartbeat_interval)            while True:                conn.drain_events()