前言

kombu 投递 message 的时候,反对多种序列化形式:

  • json
  • yaml
  • pickle

明天的主题就是,看看一个 dict 别离被这三种序列化形式序列化后,message 会长成什么样子(以 rabbitmq 治理界面的样子为准)

试验

先筹备一个 dict ,value 的 type 有 str、int、datetime、sub dict 等多种类型

data = {    'name': 'jike',    'age': 18,    'birthday': get_utc_now_timestamp(),    'score': {        'math': 100,        'science': 99.5,        'english': 59    }}

咱们先来看看不指定 serializer 是一个什么后果:

with Connection(amqp_uri) as conn:    with conn.channel() as channel:        started_at = time.time()        message = Message(channel=channel, body=data)        producer = Producer(            channel,            exchange=imdb_exchange        )        res = producer.publish(            body=message.body,            routing_key='to_refresh',            headers=message.headers        )        ended_at = time.time()        logger.debug(f'pay time {ended_at-started_at} s')

能够看到,因为 message 的 body 是 dict,所以 kombu 即使在 serializer 缺失的状况下,也抉择依照 json 进行序列化

如何得出 『依照 json 进行序列化』的论断的?因为看 message header 外面的 『content_type』属性:content_type:application/json

那如果咱们抉择 json 呢?

with Connection(amqp_uri) as conn:    with conn.channel() as channel:        started_at = time.time()        message = Message(channel=channel, body=data)        producer = Producer(            channel,            exchange=imdb_exchange        )        res = producer.publish(            body=message.body,            routing_key='to_refresh',            headers=message.headers,            serializer='json'        )        ended_at = time.time()        logger.debug(f'pay time {ended_at-started_at} s')

能够看到,和下面的没有区别,都是 content_type: application/json

那如果咱们抉择 yaml 呢?

with Connection(amqp_uri) as conn:    with conn.channel() as channel:        started_at = time.time()        message = Message(channel=channel, body=data)        producer = Producer(            channel,            exchange=imdb_exchange        )        res = producer.publish(            body=message.body,            routing_key='to_refresh',            headers=message.headers,            serializer='yaml'        )        ended_at = time.time()        logger.debug(f'pay time {ended_at-started_at} s')

能够看到,咱们传递的 dict 对象,变成了 yaml 文本

再来看看 pickle:

能够看到,此时,咱们看不出 body 了,因为 pickle 是一种二进制序列化形式

残缺代码:

from kombu import Exchange, Queuefrom kombu import Connectionfrom kombu.messaging import Producerfrom kombu.transport.base import Messagefrom kombu import Exchange, Queuefrom loguru import loggerimport timefrom datetime import datetime, timedelta, timezonedef get_min_utc_timestamp() -> datetime:    return (datetime(year=1970, month=1, day=1) + timedelta(seconds=1)).replace(tzinfo=timezone.utc)def get_utc_now_timestamp() -> datetime:    """ https://blog.csdn.net/ball4022/article/details/101670024 """    return datetime.utcnow().replace(tzinfo=timezone.utc)amqp_uri = 'amqp://pon:pon@192.168.31.245:5672//'def declare_exchange(exchange: Exchange):    with Connection(amqp_uri) as conn:        with conn.channel() as channel:            exchange.declare(channel=channel)def declare_queue(queue: Queue):    with Connection(amqp_uri) as conn:        with conn.channel() as channel:            queue.declare(channel=channel)imdb_exchange = Exchange('imdb', type='fanout')declare_exchange(exchange=imdb_exchange)imdb_queue = Queue('imdb_refresh', imdb_exchange,                   routing_key='to_refresh', durable=True)declare_queue(queue=imdb_queue)data = {    'name': 'jike',    'age': 18,    'birthday': get_utc_now_timestamp(),    'score': {        'math': 100,        'science': 99.5,        'english': 59    }}with Connection(amqp_uri) as conn:    with conn.channel() as channel:        started_at = time.time()        message = Message(channel=channel, body=data)        producer = Producer(            channel,            exchange=imdb_exchange        )        res = producer.publish(            body=message.body,            routing_key='to_refresh',            headers=message.headers,            serializer='pickle'        )        ended_at = time.time()        logger.debug(f'pay time {ended_at-started_at} s')

参考资料:
kombu doc: serialization