将 delivery_mode=NON_PERSISTENT 设为 0 就好了(NON_PERSISTENT 就是 0)
from loguru import loggerimport settingsfrom nameko.standalone.events import event_dispatcherfrom nameko.constants import NON_PERSISTENTimport timeconfig = { 'AMQP_URI': f'amqp://{settings.RABBITMQ_CONFIG.username}:' f'{settings.RABBITMQ_CONFIG.password}@{settings.RABBITMQ_CONFIG.host}:' f'{settings.RABBITMQ_CONFIG.port}/{settings.RABBITMQ_CONFIG.vhost}'}dispatch = event_dispatcher(config,delivery_mode=NON_PERSISTENT)for _ in range(100000): dispatch( 'test_publishe', 'to_publish', '1234567890' )
message 不长久化,push 速率就能够到 2.5k/s
上图是我在 macbook 上通过 wifi push 到服务器的,提早如下:
PING 192.168.31.245 (192.168.31.245): 56 data bytes64 bytes from 192.168.31.245: icmp_seq=0 ttl=64 time=4.563 ms64 bytes from 192.168.31.245: icmp_seq=1 ttl=64 time=4.206 ms64 bytes from 192.168.31.245: icmp_seq=2 ttl=64 time=3.787 ms64 bytes from 192.168.31.245: icmp_seq=3 ttl=64 time=3.741 ms64 bytes from 192.168.31.245: icmp_seq=4 ttl=64 time=4.791 ms64 bytes from 192.168.31.245: icmp_seq=5 ttl=64 time=4.327 ms64 bytes from 192.168.31.245: icmp_seq=6 ttl=64 time=3.905 ms64 bytes from 192.168.31.245: icmp_seq=7 ttl=64 time=4.072 ms64 bytes from 192.168.31.245: icmp_seq=8 ttl=64 time=4.170 ms64 bytes from 192.168.31.245: icmp_seq=9 ttl=64 time=4.190 ms64 bytes from 192.168.31.245: icmp_seq=10 ttl=64 time=10.588 ms64 bytes from 192.168.31.245: icmp_seq=11 ttl=64 time=4.108 ms^C--- 192.168.31.245 ping statistics ---12 packets transmitted, 12 packets received, 0.0% packet lossround-trip min/avg/max/stddev = 3.741/4.704/10.588/1.797 ms
那如果开启音讯长久化呢?速率能够到多少?
应用 delivery_mode=PERSISTENT
能够看到,速率在 200 出头
服务器的硬盘是 SSD 三星 pm981
然而我很奇怪,难道 rabbitmq 就这么一点吞吐量?
而后我用 kombu 裸写了一个生产者
from vine.promises import promisefrom kombu import Exchange, Queuefrom kombu import Connectionfrom kombu.messaging import Producerfrom kombu.transport.base import Messagefrom kombu import Exchange, Queuefrom loguru import loggerimport timeamqp_uri = 'amqp://pon:pon@192.168.31.245:5672//'def declare_exchange(exchange: Exchange): with Connection(amqp_uri) as conn: with conn.channel() as channel: exchange.declare(channel=channel)def declare_queue(queue: Queue): with Connection(amqp_uri) as conn: with conn.channel() as channel: queue.declare(channel=channel)imdb_exchange = Exchange('imdb', type='fanout')declare_exchange(exchange=imdb_exchange)imdb_queue = Queue('imdb_refresh', imdb_exchange, routing_key='to_refresh', durable=True)declare_queue(queue=imdb_queue)with Connection(amqp_uri) as conn: with conn.channel() as channel: started_at = time.time() message = Message(channel=channel, body='123456789') producer = Producer( channel, exchange=imdb_exchange ) for _ in range(1000000): res = producer.publish( body=message.body, routing_key='to_refresh', headers=message.headers ) ended_at = time.time() logger.debug(f'pay time {ended_at-started_at} s') # logger.debug(res)
如图:
差距有点大这都 3w/s 了!