前言
kombu 投递 message 的时候,反对多种序列化形式:
- json
- yaml
- pickle
明天的主题就是,看看一个 dict 别离被这三种序列化形式序列化后,message 会长成什么样子 (以 rabbitmq 治理界面的样子为准)
试验
先筹备一个 dict,value 的 type 有 str、int、datetime、sub dict 等多种类型
data = {
'name': 'jike',
'age': 18,
'birthday': get_utc_now_timestamp(),
'score': {
'math': 100,
'science': 99.5,
'english': 59
}
}
咱们先来看看不指定 serializer 是一个什么后果:
with Connection(amqp_uri) as conn:
with conn.channel() as channel:
started_at = time.time()
message = Message(channel=channel, body=data)
producer = Producer(
channel,
exchange=imdb_exchange
)
res = producer.publish(
body=message.body,
routing_key='to_refresh',
headers=message.headers
)
ended_at = time.time()
logger.debug(f'pay time {ended_at-started_at} s')
能够看到,因为 message 的 body 是 dict,所以 kombu 即使在 serializer 缺失的状况下,也抉择依照 json 进行序列化
如何得出『依照 json 进行序列化』的论断的?因为看 message header 外面的『content_type』属性:
content_type:application/json
那如果咱们抉择 json 呢?
with Connection(amqp_uri) as conn:
with conn.channel() as channel:
started_at = time.time()
message = Message(channel=channel, body=data)
producer = Producer(
channel,
exchange=imdb_exchange
)
res = producer.publish(
body=message.body,
routing_key='to_refresh',
headers=message.headers,
serializer='json'
)
ended_at = time.time()
logger.debug(f'pay time {ended_at-started_at} s')
能够看到,和下面的没有区别,都是 content_type: application/json
那如果咱们抉择 yaml 呢?
with Connection(amqp_uri) as conn:
with conn.channel() as channel:
started_at = time.time()
message = Message(channel=channel, body=data)
producer = Producer(
channel,
exchange=imdb_exchange
)
res = producer.publish(
body=message.body,
routing_key='to_refresh',
headers=message.headers,
serializer='yaml'
)
ended_at = time.time()
logger.debug(f'pay time {ended_at-started_at} s')
能够看到,咱们传递的 dict 对象,变成了 yaml 文本
再来看看 pickle:
能够看到,此时,咱们看不出 body 了,因为 pickle 是一种二进制序列化形式
残缺代码:
from kombu import Exchange, Queue
from kombu import Connection
from kombu.messaging import Producer
from kombu.transport.base import Message
from kombu import Exchange, Queue
from loguru import logger
import time
from datetime import datetime, timedelta, timezone
def get_min_utc_timestamp() -> datetime:
return (datetime(year=1970, month=1, day=1) + timedelta(seconds=1)).replace(tzinfo=timezone.utc)
def get_utc_now_timestamp() -> datetime:
"""https://blog.csdn.net/ball4022/article/details/101670024"""
return datetime.utcnow().replace(tzinfo=timezone.utc)
amqp_uri = 'amqp://pon:pon@192.168.31.245:5672//'
def declare_exchange(exchange: Exchange):
with Connection(amqp_uri) as conn:
with conn.channel() as channel:
exchange.declare(channel=channel)
def declare_queue(queue: Queue):
with Connection(amqp_uri) as conn:
with conn.channel() as channel:
queue.declare(channel=channel)
imdb_exchange = Exchange('imdb', type='fanout')
declare_exchange(exchange=imdb_exchange)
imdb_queue = Queue('imdb_refresh', imdb_exchange,
routing_key='to_refresh', durable=True)
declare_queue(queue=imdb_queue)
data = {
'name': 'jike',
'age': 18,
'birthday': get_utc_now_timestamp(),
'score': {
'math': 100,
'science': 99.5,
'english': 59
}
}
with Connection(amqp_uri) as conn:
with conn.channel() as channel:
started_at = time.time()
message = Message(channel=channel, body=data)
producer = Producer(
channel,
exchange=imdb_exchange
)
res = producer.publish(
body=message.body,
routing_key='to_refresh',
headers=message.headers,
serializer='pickle'
)
ended_at = time.time()
logger.debug(f'pay time {ended_at-started_at} s')
参考资料:
kombu doc: serialization