前言
kombu 投递 message 的时候,反对多种序列化形式:
- json
- yaml
- pickle
明天的主题就是,看看一个 dict 别离被这三种序列化形式序列化后,message 会长成什么样子(以 rabbitmq 治理界面的样子为准)
试验
先筹备一个 dict ,value 的 type 有 str、int、datetime、sub dict 等多种类型
data = { 'name': 'jike', 'age': 18, 'birthday': get_utc_now_timestamp(), 'score': { 'math': 100, 'science': 99.5, 'english': 59 }}
咱们先来看看不指定 serializer 是一个什么后果:
with Connection(amqp_uri) as conn: with conn.channel() as channel: started_at = time.time() message = Message(channel=channel, body=data) producer = Producer( channel, exchange=imdb_exchange ) res = producer.publish( body=message.body, routing_key='to_refresh', headers=message.headers ) ended_at = time.time() logger.debug(f'pay time {ended_at-started_at} s')
能够看到,因为 message 的 body 是 dict,所以 kombu 即使在 serializer 缺失的状况下,也抉择依照 json 进行序列化
如何得出 『依照 json 进行序列化』的论断的?因为看 message header 外面的 『content_type』属性:content_type:application/json
那如果咱们抉择 json 呢?
with Connection(amqp_uri) as conn: with conn.channel() as channel: started_at = time.time() message = Message(channel=channel, body=data) producer = Producer( channel, exchange=imdb_exchange ) res = producer.publish( body=message.body, routing_key='to_refresh', headers=message.headers, serializer='json' ) ended_at = time.time() logger.debug(f'pay time {ended_at-started_at} s')
能够看到,和下面的没有区别,都是 content_type: application/json
那如果咱们抉择 yaml 呢?
with Connection(amqp_uri) as conn: with conn.channel() as channel: started_at = time.time() message = Message(channel=channel, body=data) producer = Producer( channel, exchange=imdb_exchange ) res = producer.publish( body=message.body, routing_key='to_refresh', headers=message.headers, serializer='yaml' ) ended_at = time.time() logger.debug(f'pay time {ended_at-started_at} s')
能够看到,咱们传递的 dict 对象,变成了 yaml 文本
再来看看 pickle:
能够看到,此时,咱们看不出 body 了,因为 pickle 是一种二进制序列化形式
残缺代码:
from kombu import Exchange, Queuefrom kombu import Connectionfrom kombu.messaging import Producerfrom kombu.transport.base import Messagefrom kombu import Exchange, Queuefrom loguru import loggerimport timefrom datetime import datetime, timedelta, timezonedef get_min_utc_timestamp() -> datetime: return (datetime(year=1970, month=1, day=1) + timedelta(seconds=1)).replace(tzinfo=timezone.utc)def get_utc_now_timestamp() -> datetime: """ https://blog.csdn.net/ball4022/article/details/101670024 """ return datetime.utcnow().replace(tzinfo=timezone.utc)amqp_uri = 'amqp://pon:pon@192.168.31.245:5672//'def declare_exchange(exchange: Exchange): with Connection(amqp_uri) as conn: with conn.channel() as channel: exchange.declare(channel=channel)def declare_queue(queue: Queue): with Connection(amqp_uri) as conn: with conn.channel() as channel: queue.declare(channel=channel)imdb_exchange = Exchange('imdb', type='fanout')declare_exchange(exchange=imdb_exchange)imdb_queue = Queue('imdb_refresh', imdb_exchange, routing_key='to_refresh', durable=True)declare_queue(queue=imdb_queue)data = { 'name': 'jike', 'age': 18, 'birthday': get_utc_now_timestamp(), 'score': { 'math': 100, 'science': 99.5, 'english': 59 }}with Connection(amqp_uri) as conn: with conn.channel() as channel: started_at = time.time() message = Message(channel=channel, body=data) producer = Producer( channel, exchange=imdb_exchange ) res = producer.publish( body=message.body, routing_key='to_refresh', headers=message.headers, serializer='pickle' ) ended_at = time.time() logger.debug(f'pay time {ended_at-started_at} s')
参考资料:
kombu doc: serialization