音讯队列是什么
很多人首次据说音讯队列的时候可能会感觉这个词有点高级,肯定充斥了简单的知识点,对,其实没错,生产中应用使确实很简单,但在学习时,咱们能够将其了解的很简略,怎么了解呢,拆开来看,音讯(Message)+ 队列(Queue):
- 音讯就很好了解了,微信音讯,短信音讯,小道消息,内幕消息等等,音讯在日常生活中几乎无所不在,这里的音讯也并无差别,不过我还是简略用“黑话”形容一下,一段承载生产实体传递到生产实体通信内容的结构化数据,通常是序列化的字节数组。
-
队列那就更简略了,数据结构学过吧,先进先出晓得吧,没了。
音讯队列其实是生产者 - 消费者模型的一种实现形式,如下图所示,把内存缓冲区换成音讯队列即可很清晰的表白其作用:作为生产者至消费者之间的一个音讯通道。
图片起源为什么要用音讯队列
这里我用一段文字做一个形象的比喻:
《若 》
一阵凉风吹过树林,几片枯黄的叶子在树枝上风雨飘摇,摩擦中还收回沙沙之声,好像都想把对方先推下去,殊不知无论早晚,终局都是一样。
风持续向前吹去,却撞在了一座巨大的修建上,不敢再向前。后面的修建像是一座住着吸血鬼的中世纪城堡,墙壁上雕刻着奇怪的图案,最上方的房檐上站立着几只彩色的石鹰,城堡两侧暗藏在浓密的迷雾中让人看不太清,但很显著肯定充斥了极致的对称美。
此时城堡的大厅内零零散散站着一些人,但却异样宁静,宁静到空气中充斥了肃杀之气,配合着这秋意的萧瑟使人不寒而栗。
这些人有同一个身份 – 赏金猎人,他们来此只有一个目标,那块悬浮在大厅上空的木牌。木牌看似没什么特别之处,但边角处暗红色的血迹和剑痕简直在明示其并不简略。
木牌有一个很斯文的名字,“见无”,意思高深莫测,在其上被见到的人马上就会在这个世界隐没。
“见无”上的工作多是由一些富商巨贾,寒门贵族公布,一是他们把握敌国的财产,请得动这些赏金猎人,二是总还得维持在江湖上那点伪善。虽说“见无”,但事件总有例外,高居工作榜首的天级、地级、玄级三个工作已在那里七百年之久,而明天让这些神龙见首不见尾的猎人们齐聚一厅,抬首张望的起因就是,玄级任务被实现了。
玄级任务不知由谁公布,但工作内容从公布至今却未曾变动:当代姑苏慕容家家主的项上人头。岁月更迭,慕容家主不知换了多少代,却未有一人是死于这个工作,反而因工作丢掉性命的赏金猎人那真是一茬又一茬,甚至慕容家曾经将此作为其震慑武林的伎俩。
对于玄级任务的发布者,坊间猜想颇多,有位于东北把控天下古玩的星家,虎踞辽北镇守内地的岳家,甚至还有猜想是隐世不出根植燕京的王家,每种猜想都有其原因,不用细说。
此时人们更关注的是,慕容家主被杀了?那可是慕容家,一门冷月剑法传承了千年,也震慑了武林千年的慕容家,有道是,“冷光浮照千万里,月下再无一丝声“。不过“见无”万千年来从未出过过错,猎人们不存在一丝对后果的质疑,至于为什么没有聚在一起探讨是谁实现了工作,是 因为所有人心中都是同一个答案,他叫“若”,没有姓氏没有出身,甚至没人见过他的脸,只晓得他的名字是“若”,一个仿若虚无的人。**
“若”的弱小没有人敢质疑,他曾。。
在下面的小故事中,工作榜单对应音讯队列,工作公布人是生产者,赏金猎人是消费者。
先来想像一下如果没有工作榜单,一个富商想杀死某人只能挨个给猎人们打电话,“喂,你有空不啦,我要杀 xxx,你要多少钱”,“没空”。发现问题了吗,这种形式效率是极其低下的并且想给人家打电话还得晓得电话号码,而采纳工作榜单就很优雅的解决了这个问题,有工作放上去就好,天然会有人解决。
这就是音讯队列的第一个益处,实现了逻辑上的 解耦 ,没有依赖,每个实体各司其职,岂不美哉。并且如果工作极多的话,通过榜单能够散发到很多赏金猎人,每个赏金猎人都能够满负荷干活,这就是 分布式 。
再思考一下,榜单上的工作其实并不是要马上实现,工作实现后猎人再通过榜单告诉发布者即可,这就是 异步 ,异步也是计算机世界利用十分宽泛的设计。
当然还能够利用音讯队列做更多事件,像下面故事中的工作分级等等,还能够设置不同的路由,某个工作只针对某些猎人可见。当工作过多时,只须要再招募猎人,实现了程度扩大。
总结一下,三大益处:解耦,分布式,异步。利用时可扩大出,不同路由策略、优先级、限流、程度扩大等等益处。
AMQP & rabbitMQ
维基百科:高级音讯队列协定 即Advanced Message Queuing Protocol(AMQP)是面向消息中间件提供的凋谢的应用层协定,其设计指标是对于 音讯 的排序、路由(包含点对点和订阅 - 公布)、放弃可靠性、保障安全性 [[1]](https://zh.wikipedia.org/wiki…。AMQP 标准了消息传递方和接管方的行为,以使音讯在不同的提供商之间实现互操作性,就像 SMTP,HTTP,FTP 等协定能够创立交互零碎一样。与先前的中间件规范(如 Java 音讯服务)不同的是,JMS 在特定的 API 接口层面和实现行为上进行了对立,而高级音讯队列协定则关注于各种音讯如何以字节流的模式进行传递。因而,应用了合乎协定实现的任意应用程序之间能够放弃对音讯的创立、传递。
官网:The Advanced Message Queuing Protocol (AMQP) is an open standard for passing business messages between applications or organizations. It connects systems, feeds business processes with the information they need and reliably transmits onward the instructions that achieve their goals.
概念性的形容看看就好,间接了解起来还是有些艰难的,从拆分组件的角度看就简略多了。
AMQP 中的组件:
Broker:rabbitMQ 服务就是 Broker,是一个比拟大的概念,形容的是整个应用服务。
交换机(exchange):用于接管来自生产者的音讯,并把音讯转发到音讯队列中。AMQP 中存在四种交换机,Direct exchange、Fanout exchange、Topic exchange、Headers exchange,区别
音讯队列(message queuq):上文说了。
binding:形容音讯队列和交换机的绑定关系,应用 routing key 形容。
图片起源
而 RabbitMQ 是应用基于 AMQP 来实现的开源音讯队列服务器,具备极高的稳定性和可靠性,自带一个监控平台,等下会用到。
本文配角 –celery
celery 概述
celery 是 python 实现的一个轻量级分布式框架零碎,应用 celery 能够很简略疾速的实现工作分布式下发。
celery 还提供了极其欠缺的文档,让开发者能够很疾速的上手和深刻学习。
celery 的利用场景就不说了,看了下面音讯队列的介绍应该很分明,什么场景能够用 celery。
broker,backend,worker 是什么
broker在上文也有提到,能够在此简略了解为用来传递 celery 工作音讯的中间件。最新的 celery5 中反对四种 broker:RabbitMQ、Redis、Amazon SQS、Zookeeper(试验性质)。个别最罕用的就是 redis 和 rabbitMQ。
backend是用来存储工作后果和中间状态的实体,backend 的抉择就很多了,redis/mongoDB/elsticsearch/rabbitMQ,甚至还能够本人申明。如果生产者或者其余服务须要关怀异步工作的后果则肯定要配置 backend。
worker,顾名思义,其作用就是执行工作,须要留神的是启动 worker 时个别须要设置其监听的队列和最大并发数。
一个简略的 demo
root_dir
├── celery_demo
│ ├── __init__.py
__init__.py
import time
from celery import Celery
from celery.exceptions import TimeoutError
from celery.result import AsyncResult
from kombu import Queue, Exchange
# celery 配置,4.0 之后引入了小写配置,这种大写配置在 6.0 之后将不再反对
# 能够参考此链接
# https://docs.celeryproject.org/en/stable/userguide/configuration.html?highlight=worker#std-setting-enable_utc
CONFIG = {
# 设置时区
'CELERY_TIMEZONE': 'Asia/Shanghai',
# 默认为 true,UTC 时区
'CELERY_ENABLE_UTC': False,
# broker,留神 rabbitMQ 的 VHOST 要给你应用的用户加权限
'BROKER_URL': 'amqp://root:root@192.168.1.5:5672/dev',
# backend 配置,留神指定 redis 数据库
'CELERY_RESULT_BACKEND': 'redis://192.168.1.5:30412/4',
# worker 最大并发数
'CELERYD_CONCURRENCY': 10,
# 如果不设置,默认是 celery 队列,此处应用默认的直连交换机,routing_key 完全一致才会调度到 celery_demo 队列
# 此处留神,元组中只有一个值的话,须要最初加逗号
'CELERY_QUEUES': (Queue("celery_demo", Exchange("celery_demo"), routing_key="celery_demo"),
)
}
app = Celery()
app.config_from_object(CONFIG)
@app.task(name='demo_task')
def demo_task(x, y):
print(f"这是一个 demo 工作,睡了 10 秒,并返回了 {x}+{y} 的后果。")
time.sleep(10)
return x + y
def call():
def get_result(task_id):
res = AsyncResult(task_id)
try:
# 拿到异步工作的后果,须要用 task_id 实例化 AsyncResult,再调用 get 办法,get 默认是阻塞办法,提供 timeout 参数,此处设置为 0.1 秒
res.get(0.1)
return res.get(0.1)
except TimeoutError:
return None
tasks = []
print("开始下发 11 个工作")
for _ in range(11):
tasks.append(demo_task.apply_async((_, _), routing_key='celery_demo', queue='celery_demo'))
print("期待 10 秒后查问后果")
time.sleep(10)
for index, task in enumerate(tasks):
task_result = get_result(task.id)
if task_result is not None:
print(f"工作 {index} 的返回值是:{task_result}")
else:
print(f"工作 {index} 还没执行完结")
print("再期待 10 秒")
time.sleep(10)
print(f"工作 10 的返回值是:{get_result(tasks[-1].id)}")
if __name__ == '__main__':
call()
pycharm 中启动 worker
启动后能够在 rabbitMQ 的监控看板上看到呈现了 celery_demo 队列。
因为设置了最大并发为 10,接下来下发 11 个工作看是什么后果。
能够看到在第一个 10 秒期待后,工作 10(第 11 个工作)并未完结,持续期待 10 秒后能力拿到后果,阐明最大并发数确实失效了。
罕用配置阐明
介绍一些罕用的配置
- CELERYD_PREFETCH_MULTIPLIER:预取音讯数量,默认会取 4 * 并发数,在下面的例子中则会最多预取 40 个音讯,如果设置为 1,则示意禁止预取。
- CELERY_ACKS_LATE:默认为 FALSE,这个参数的了解须要先理解下 rabbitMQ 的 ACK 机制,简略说就是如果设置 True 则会在工作执行实现后才会对音讯队列发送确认,表明音讯已被生产,如果工作执行中产生了异常情况,未发送确认音讯,则音讯队列会持续保留此音讯,直到下一个 worker 取走并胜利执行。此时衍生出了一个问题,须要保障此音讯是幂等的,也就是无论执行多少次后果都一样,否则可能会失去一些意料之外的后果。
- CELERYD_MAX_TASKS_PER_CHILD:worker 执行多少个工作会重启过程,默认为无限度,倡议设置此值,可防止内存透露。
- CELERY_ROUTES:能够在配置中指定每个工作的路由规定,下面例子应用的动静指定队列的形式,在调用时指定路由规定。
其余配置参考官网文档
信号(Signals)
celery 中的信号我了解为就是钩子函数,celery 提供了不同类型的钩子函数,别离对应不同组件,罕用的有三种:工作类型、worker 类型、日志类型。
上面附上代码示例,以演示信号的成果:
@after_task_publish.connect
def task_sent_handler(sender=None, headers=None, body=None, **kwargs):
# information about task are located in headers for task messages
# using the task protocol version 2.
info = headers if 'task' in headers else body
print('after_task_publish for task id {info[id]}'.format(info=info,))
@celeryd_after_setup.connect
def setup_direct_queue(sender, instance, **kwargs):
queue_name = '{0}.dq'.format(sender) # sender is the nodename of the worker
worker_logger.info(f"为 worker 新增一个监控队列:{queue_name}")
instance.app.amqp.queues.select_add(queue_name)
worker_logger.info(f"worker 以后监控队列:{','.join(instance.app.amqp.queues.keys())}")
@after_setup_logger.connect
def setup_logger(logger, loglevel, logfile, **kwargs):
worker_logger.info(f"worker 日志级别是:{loglevel}")
worker_logger.info(f"logger 中目前有 {len(logger.handlers)} 个 handler,别离是:{','.join(type(_).__name__ for _ in logger.handlers)}")
后果如下所示:
设置工作优先级
celery 应用 rabbitMQ 反对工作优先级非常简单,只须要在 Queue 的配置中加一个参数,并新增 CELERY_ACKS_LATE,CELERYD_PREFETCH_MULTIPLIER 配置,作用在上个章节有讲,如下
# 优先级范畴设置为 0 -9,最大设置为 255,数字越大优先级越高
'CELERY_ACKS_LATE': True,
'CELERYD_PREFETCH_MULTIPLIER': 1,
'CELERY_QUEUES': (Queue("celery_demo", Exchange("celery_demo"), routing_key="celery_demo", queue_arguments={'x-max-priority': 9}),
)
须要留神的是,x-max-priority 是 rabbitMQ3.5.0 版本后才反对的,不要用错版本哟
首先删除之前的 celery_demo 队列,再次启动 worker 后能够发现 celery_demo 队列多了 Pri 标识,表明曾经反对优先级。
接下来验证下优先级是否无效,数字越大,优先级越高
连贯工作类型信号 task_received,当工作被 worker 接管到时执行
@task_received.connect
def on_task_received(request, **kwargs):
# 函数的一个参数就是工作编号
worker_logger.info(f"工作 {request.args[0]} 已被 worker 接管,开始执行")
再革新一下 call 办法,因为 worker 的并发数是 10,所以先下发 10 个工作,让 worker 满并发,再以优先级由低到高下发 10 个工作,依照预期工作的执行程序应该 task19 到 task10 排列。
def priority_call():
tasks = []
print("同时下发 20 个工作")
for _ in range(10):
# apply_async 提供 priority 参数指定优先级
tasks.append(demo_task.apply_async((_, _), routing_key='celery_demo', queue='celery_demo', priority=0))
time.sleep(1)
for _ in range(10, 20):
# apply_async 提供 priority 参数指定优先级
tasks.append(demo_task.apply_async((_, _), routing_key='celery_demo', queue='celery_demo', priority=_ % 10))
后果如下,很显著能够看到,合乎预期
不晓得有没有人对这两个配置有疑难,看到网上有一些文章提到了肯定要配置这两个参数能力实现优先级,然而并没有说具体起因,简略说下我的了解。上文曾经阐明这两个配置的作用,不再赘述,只说下和优先级的关系。
'CELERY_ACKS_LATE': True,
'CELERYD_PREFETCH_MULTIPLIER': 1,
先假如如果不设置 CELERY_ACKS_LATE,celery 为进步性能会在工作真正执行前就会向队列发送确认音讯,这会导致只管一个 worker 设置了并发数是 10,但实际上在此 worker 上最多同时会有 20 个工作,其中 10 个正在运行,另外 10 个是还没发送确认音讯(ACK)的,这 10 个实际上并未开始运行,所以如果其优先级很高,然而却并未执行,也没有调配到其余 worker,反而可能低优先级却在其余 worker 开始执行了,显然不合乎优先级的预期。
CELERYD_PREFETCH_MULTIPLIER 的作用也是如此,你能够依照下面的剖析自行了解下,其目标都是为了保障每个 worker 的并发都只会调配一个工作。
工作流工作
官网文档写的十分好,有急躁还是去读文档比拟好。
咱们先来理解下 signature,翻译过去就是签名,这里和 java 的办法签名概念相似,java 中用办法名和参数类型组成了一个办法的签名,celery 中的 signature 同样是包装了 task 和指定的参数,不便能够能够对其进行传递,比方作为参数传递到某个函数。签名包装后还能够进行二次批改,比方新增或更新某个参数,当然还能够通过 immutable=True 将其设置为不可批改。
celery 通过继承 Signature 实现了几个易用的工作流工作类:
- chain:链式工作,串行执行,父工作的返回值会作为参数传递给子工作
- group:组工作,并行执行,应用 celery.result.GroupResult 获取后果
- chord:依赖一个 group 工作,group 工作完结后,将所有子工作的返回值作为参数传递给 chord 工作
-
chunks:个别用于将同一工作的极屡次执行分组下发,以升高音讯传输的老本
工作流工作通常在一组工作有执行程序的要求时才会用到,做过 DAG 任务调度工具的同学必定会容易了解,我举个小例子阐明下:
早上起床后的流程:穿衣服 -> 洗漱 -> 吃早餐,这就是一个串行执行的链式工作,可能吃早餐的时候还会看下新闻,吃早餐和看新闻就是一个并行执行的组工作
四个示例from celery import chain, group, chord, chunks @app.task(name='demo_task2') def demo_task2(x, y): return x * y @app.task(name='tsum') def tsum(nums): return sum(nums) def chain_call(): # 1 * 2 * 3 * 4 = 24 # .s()是.signature()的缩写 # 还可通过管道符调用 chain,具体参考文档 res = chain(*[demo_task2.signature(_, routing_key='celery_demo', queue='celery_demo') for _ in [(1, 2), (3,), (4,)]])() print(res.id) print(f"chain 工作:1 * 2 * 3 * 4={res.get()}") def group_call(): res = group(*[demo_task2.signature(_, routing_key='celery_demo', queue='celery_demo') for _ in [(1, 2), (3, 4), (5, 6)]])() print(res.id) print(f"chain 工作:1 * 2, 3 * 4, 5 * 6={res.get()}") def chord_call(): res = chord((demo_task2.signature(_, routing_key='celery_demo', queue='celery_demo') for _ in [(1, 2), (3, 4), (5, 6)]), tsum.s().set(routing_key='celery_demo', queue='celery_demo') )() print(res.id) print(f"chord 工作:sum(1 * 2, 3 * 4, 5 * 6)={res.get()}") def chunk_call(): res = chunks(demo_task2.s(), [(1, 2), (3, 4), (5, 6)], 2).apply_async(routing_key='celery_demo', queue='celery_demo') print(res.id) print(f"chunk 工作:1 * 2, 3 * 4={res.get()[0]}, 5 * 6={res.get()[1]}")
全副代码,可间接执行
import time
from celery import Celery
from celery.exceptions import TimeoutError
from celery.result import AsyncResult, GroupResult
from kombu import Queue, Exchange
from celery.signals import after_task_publish, celeryd_after_setup, after_setup_logger, task_received, task_success
from celery.utils.log import get_logger, worker_logger
from celery import chain, group, chord, chunks
import logging
# celery 配置,4.0 之后引入了小写配置,这种大写配置在 6.0 之后将不再反对
# 能够参考此链接
# https://docs.celeryproject.org/en/stable/userguide/configuration.html?highlight=worker#std-setting-enable_utc
CONFIG = {
# 设置时区
'CELERY_TIMEZONE': 'Asia/Shanghai',
# 默认为 true,UTC 时区
'CELERY_ENABLE_UTC': False,
# broker,留神 rabbitMQ 的 VHOST 要给你应用的用户加权限
'BROKER_URL': 'amqp://root:root@192.168.1.5:5672/dev',
# backend 配置,留神指定 redis 数据库
'CELERY_RESULT_BACKEND': 'redis://192.168.1.5:30412/4',
# worker 最大并发数
'CELERYD_CONCURRENCY': 10,
# 如果不设置,默认是 celery 队列,此处应用默认的直连交换机,routing_key 完全一致才会调度到 celery_demo 队列
'CELERY_ACKS_LATE': True,
'CELERYD_PREFETCH_MULTIPLIER': 1,
# 此处留神,元组中只有一个值的话,须要最初加逗号
'CELERY_QUEUES': (Queue("celery_demo", Exchange("celery_demo"), routing_key="celery_demo", queue_arguments={'x-max-priority': 9}),
)
}
app = Celery()
app.config_from_object(CONFIG)
@app.task(name='demo_task')
def demo_task(x, y):
time.sleep(10)
return x + y
@app.task(name='demo_task2')
def demo_task2(x, y):
return x * y
@app.task(name='tsum')
def tsum(nums):
return sum(nums)
@celeryd_after_setup.connect
def setup_direct_queue(sender, instance, **kwargs):
queue_name = '{0}.dq'.format(sender) # sender is the nodename of the worker
worker_logger.info(f"为 worker 新增一个监控队列:{queue_name}")
instance.app.amqp.queues.select_add(queue_name)
worker_logger.info(f"worker 以后监控队列:{','.join(instance.app.amqp.queues.keys())}")
@after_task_publish.connect
def task_sent_handler(sender=None, headers=None, body=None, **kwargs):
# information about task are located in headers for task messages
# using the task protocol version 2.
info = headers if 'task' in headers else body
print('after_task_publish for task id {info[id]}'.format(info=info,))
@task_received.connect
def on_task_received(request, **kwargs):
worker_logger.info(f"工作 {request.args[0]} 已被 worker 接管,开始执行")
@after_setup_logger.connect
def setup_logger(logger, loglevel, logfile, **kwargs):
worker_logger.info(f"worker 日志级别是:{loglevel}")
worker_logger.info(f"logger 中目前有 {len(logger.handlers)} 个 handler,别离是:{','.join(type(_).__name__ for _ in logger.handlers)}")
def call():
def get_result(task_id):
res = AsyncResult(task_id)
try:
# 拿到异步工作的后果,须要用 task_id 实例化 AsyncResult,再调用 get 办法,get 默认是阻塞办法,提供 timeout 参数,此处设置为 0.1 秒
res.get(0.1)
return res.get(0.1)
except TimeoutError:
return None
tasks = []
print("开始下发 11 个工作")
for _ in range(11):
tasks.append(demo_task.apply_async((_, _), routing_key='celery_demo', queue='celery_demo'))
print("期待 10 秒后查问后果")
time.sleep(10)
for index, task in enumerate(tasks):
task_result = get_result(task.id)
if task_result is not None:
print(f"工作 {index} 的返回值是:{task_result}")
else:
print(f"工作 {index} 还没执行完结")
print("再期待 10 秒")
time.sleep(10)
print(f"工作 10 的返回值是:{get_result(tasks[-1].id)}")
def priority_call():
tasks = []
print("先下发 10 个工作,占满 worker 的并发")
for _ in range(10):
# apply_async 提供 priority 参数指定优先级
tasks.append(demo_task.apply_async((_, _), routing_key='celery_demo', queue='celery_demo', priority=0))
# 保险起见,sleep 1
time.sleep(1)
print("再以优先级由低到高的程序下发 10 个工作,预期工作将逆序执行")
for _ in range(10, 20):
# apply_async 提供 priority 参数指定优先级
tasks.append(demo_task.apply_async((_, _), routing_key='celery_demo', queue='celery_demo', priority=_ % 10))
def chain_call():
# 1 * 2 * 3 * 4 = 24
# .s()是.signature()的缩写
# 还可通过管道符调用 chain,具体参考文档
res = chain(*[demo_task2.signature(_, routing_key='celery_demo', queue='celery_demo') for _ in [(1, 2), (3,), (4,)]])()
print(res.id)
print(f"chain 工作:1 * 2 * 3 * 4={res.get()}")
def group_call():
res = group(*[demo_task2.signature(_, routing_key='celery_demo', queue='celery_demo') for _ in [(1, 2), (3, 4), (5, 6)]])()
print(res.id)
print(f"chain 工作:1 * 2, 3 * 4, 5 * 6={res.get()}")
def chord_call():
res = chord((demo_task2.signature(_, routing_key='celery_demo', queue='celery_demo') for _ in [(1, 2), (3, 4), (5, 6)]),
tsum.s().set(routing_key='celery_demo', queue='celery_demo')
)()
print(res.id)
print(f"chord 工作:sum(1 * 2, 3 * 4, 5 * 6)={res.get()}")
def chunk_call():
res = chunks(demo_task2.s(), [(1, 2), (3, 4), (5, 6)], 2).apply_async(routing_key='celery_demo', queue='celery_demo')
print(res.id)
print(f"chunk 工作:1 * 2, 3 * 4={res.get()[0]}, 5 * 6={res.get()[1]}")
if __name__ == '__main__':
call()
priority_call()
chain_call()
group_call()
chord_call()
chunk_call()
斜体