乐趣区

关于后端:celery笔记五之消息队列的介绍

本文首发于公众号:Hunter 后端
原文链接:celery 笔记五之音讯队列的介绍

后面咱们介绍过 task 的解决形式,将 task 发送到队列 queue,而后 worker 从 queue 中一个个的获取 task 进行解决。

task 的队列 queue 能够是多个,解决 task 的 worker 也能够是多个,worker 能够解决任意 queue 的 task,也能够解决指定 queue 的 task,这个咱们在介绍 queue 的时候再做介绍。

这一篇咱们来介绍一下存储 task 的队列 queue。

  1. 默认队列 task_default_queue
  2. 定义队列
  3. 将 task 指定到队列 queue 生产

以下的操作都是在 Django 零碎的配置中应用。

1、默认队列 task_default_queue

当咱们运行一个最简略的延时工作比方 add.delay(1, 2) 时,并没有设置一个音讯队列,因为如果咱们没有指定,零碎会为咱们创立一个默认队列。

这个默认的队列被命名为 celery,值在 app.conf.task_default_queue,咱们能够查看一下:

from hunter.celery import app
app.conf.task_default_queue

# 输入为 'celery'

2、定义队列

咱们能够构想一下这个场景,咱们只有一个 worker 解决 task,每个 task 须要解决的工夫很长,因为 worker 被占用,这样在咱们的工作队列里就会积压很多的 task。

有一些须要即时解决的工作则会被推延解决,这样的状况下,咱们现实的设计是设置多个 worker,多个 worker 别离解决指定队列的 task。

对于 worker 的设置,比方增加多个 worker,给 worker 生产指定队列的 task,咱们在 worker 的笔记中再介绍,这里咱们介绍一下如何定义队列。

工作队列的定义如下:

# hunter/celery.py

from kombu import Queue

app.conf.task_queues = (Queue('blog_tasks',),
)

当咱们定义了工作队列之后,咱们能够将 task 指定输入到对应的 queue,假如 blog/tasks.py 下有这样一个 task:

# blog/tasks.py
from celery import shared_task

@shared_task
def add(x, y):
    return x + y

接下来咱们调用这个 task 的时候,须要指定队列:

from blog.tasks import add
add.apply_async((1, 2), queue='blog_tasks')

如果咱们就这样配置 celery,这个时候如果咱们间接再调用 delay() 函数,也就是不指定 queue 的话,会发现咱们收回的 task 是不能被 worker 解决的。

也就是说,上面的操作是不起作用的:

from blog.tasks import add
add.delay(1, 2)  # 此时,咱们的调用不会被队列接管到

如果须要在调用 task 的时候不指定队列,应用零碎默认的队列,这个时候咱们须要额定来指定一个 task_default_queue,celery 的配置如下:

# hunter/celery.py

app.conf.task_queues = (Queue('blog_tasks'),
    Queue('default_queue'),
)
app.conf.task_default_queue = 'default_queue'

这样,咱们在应用延时工作的时候,就不须要指定 queue 参数了,都会走咱们的默认 task 队列:

from blog.tasks import add
add.delay(1, 2)  # 队列会被 default_queue 接管到

而如果咱们想实现 add 的延时工作走的是 blog_tasks 这个队列,然而咱们在调用的时候不想那么麻烦每次都指定 queue 参数,这个就须要用到 task_routes 配置项了。

3、将 task 指定到队列 queue 生产

如果咱们想某些函数应用指定的 queue,咱们能够应用 task_routes 配置项来操作。

当初咱们有两个 application,blog 和 polls,这两个 application 下都有各自的 tasks,文件的内容如下:

# blog/tasks.py
from celery import shared_task

@shared_task
def add(x, y):
    return x + y

@shared_task
def minus(x, y):
    return x - y
# polls/tasks.py
from celery import shared_task

@shared_task
def multi(x, y):
    return x * y

咱们想要实现的最终的目标是在调用延时工作的时候,能够间接应用 delay() 的形式,不须要应用 apply_async(queue=’xx’)。

咱们想要实现的性能是,polls/tasks.py 下的所有的延时工作以及 blog/tasks.py 下的 add() 函数进入 queue_1 队列

blog 下的 minus() 函数进入 queue_2 队列

其余所有的 task 都走默认的队列,default_queue。

咱们能够如下配置:

app.conf.task_queues = (Queue('queue_1'),
    Queue('queue_2'),
    Queue('default_queue'),
)

app.conf.task_routes = {
    'polls.tasks.*': {'queue': 'queue_1',},
    'blog.tasks.add': {'queue': 'queue_1',},
    'blog.tasks.minus': {'queue': 'queue_2',},
}

app.conf.task_default_queue = 'default_queue'

如果想获取更多后端相干文章,可扫码关注浏览:

退出移动版