关于后端:celery笔记六之worker介绍

47次阅读

共计 3987 个字符,预计需要花费 10 分钟才能阅读完成。

本文首发于公众号:Hunter 后端
原文链接:celery 笔记六之 worker 介绍

后面咱们介绍过 celery 的现实的设计形式是几个 worker 解决特定的工作队列的数据,这样能够防止工作在队列中的积压。

这一篇笔记咱们介绍一下如何应用 worker 进步零碎中工作的解决效率。

  1. worker 启动
  2. worker 与队列
  3. worker 检测
  4. 其余 worker 命令

1、worker 启动

后面介绍过 worker 的启动形式,在 celery 配置文件的上一级目录运行上面的命令:

celery -A hunter worker -l INFO

其中,-l 示意日志等级,相当于是 –loglevel=INFO

celery -A hunter worker --loglevel=INFO

指定 worker 的 hostname

celery -A hunter worker -l INFO -n worker1@%h

其中,%h 示意主机名,蕴含域名在内,%n 示意仅蕴含主机名,%d 示意仅蕴含域名。

以下是示例:

变量 示例 后果
%h worker1@%h worker1@george.example.com
%n worker1@%n worker1@george
%d worker1@%d worker1@example.com

指定日志文件地址

logfile 参数能够指定日志文件地址:

celery -A hunter worker --loglevel=INFO --logfile=/Users/hunter/python/celery_log/celery.log

杀死 worker 过程

咱们能够通过获取 worker 的过程 id 来杀死这些过程:

ps aux | grep 'celery -A hunter' | awk '{print $2}' |xargs sudo kill -9

并发解决

一般来说,当咱们间接启动 worker 的时候,会默认同时起好几个 worker 过程。

如果不指定 worker 的数量,worker 的过程会默认是所在机器的 CPU 的数量。

咱们也能够通过 concurrency 参数来指定启动 worker 的过程数。

比如说,咱们想启动三个 worker 的过程,能够如下指定:

celery -A hunter worker --concurrency=3 -l INFO

–concurrency 也能够简写成 -c:

celery -A hunter worker -c 3 -l INFO

这样,咱们在启动的命令行里输出上面的参数就能够看到启动了三个 worker 的过程:

ps aux |grep 'celery -A hunter'

这里有一个对于 worker 过程数启动多少的问题,是不是咱们的 worker 启动的越多,咱们的定时工作和延时工作就会执行得越快呢?

并不是,有试验证实 worker 的数量启动得越多,对于 task 解决的性能有可能还会起到一个反向作用,这里不作展开讨论,咱们能够设置 CPU 的数量即可。

当然,你也能够依据 worker 解决工作的状况,基于 application,基于工作负载,工作运行工夫等试验出一个最佳的数量。

2、worker 与队列

生产指定队列的 task

咱们能够在运行 worker 的时候指定 worker 只生产特定队列的 task,这个特定队列,能够是一个,也能够是多个,用逗号分隔开。

指定的形式如下:

celery -A hunter worker -l INFO -Q queue_1,queue_2

列出所有沉闷的 queues

上面的命令能够列出所有零碎沉闷的队列信息:

celery -A hunter inspect active_queues

假如目前咱们相干配置如下:

app.conf.task_queues = (Queue('default_queue',),
    Queue('queue_1'),
    Queue('queue_2'),
)

app.conf.task_routes = {
    'blog.tasks.add': {'queue': 'queue_1',},
    'blog.tasks.minus': {'queue': 'queue_2',},
}

咱们这样启动 worker:

celery -A hunter worker -l INFO -c 3 -n worker1@%h

而后运行下面的查看队列命令:

celery -A hunter inspect active_queues

能够看到如下输入:

->  worker1@localhost: OK
    * {'name': 'default_queue', 'exchange': {...}, 'routing_key': 'default_queue', ...}
    * {'name': 'queue_1', 'exchange': {...}, 'routing_key': 'default_queue', ...}
    * {'name': 'queue_2', 'exchange': {...}, 'routing_key': 'default_queue', ...}

1 node online.

其中,输入后果最下面的 worker1@localhost 就是咱们启动 worker 通过 -n 指定的 hostnam,能够通过这个来指定 worker。

咱们能够指定 worker 输入对应的队列数据:

celery -A hunter inspect active_queues -d worker1@localhost

除了命令行,咱们也能够在交互界面来获取这些数据:

# 获取所有的队列信息
from hunter.celery import app
app.control.inspect().active_queues()

# 获取指定 worker 的队列信息
app.control.inspect(['worker1@localhost']).active_queues()

3、worker 的检测

app.control.inspect() 函数能够检测正在运行的 worker 信息,咱们能够用上面的命令来操作:

from hunter.celery import app

i = app.control.inspect()

这个操作是获取所有节点,咱们也能够指定单个或者多个节点检测:

# 输出数组参数,示意获取多个节点 worker 信息
i = app.control.inspect(['worker1@localhost', 'worker2@localhost'])

# 输出单个 worker 名,指定获取 worker 信息
i = app.control.inspect('worker1@localhost')

获取曾经注册的 task 列表

用到后面的 app.control.inspect() 函数和其下的 registered() 函数

i.registered()

# 输入后果为 worker 及其下的 task name 
# 输入示例为 {'worker1@localhost': ['blog.tasks.add', 'blog.tasks.minus', 'polls.tasks.multi']}

输入的格局是一个 dict,worker 的名称为 key,task 列表为 value

正在执行的 task

active() 用于获取正在执行的 task 函数

i.active()

# 输入 worker 正在执行的 task
# 输入示例为 {'worker1@localhost': [{'id': 'xxx', 'name': 'blog.tasks.add', 'args': [3, 4], 'hostname': 'worker1@localhost', 'time_start': 1659450162.58197, ..., 'worker_pid': 41167}

输入的后果也是一个 dict,每个 worker 下有 n 个正在 worker 中执行的 task 信息,这个 n 的最大数量取决于后面咱们启动 worker 时的 –concurrency 参数。

在其中的 task 信息里蕴含 task_id,task_name,和输出的参数,开始工夫,worker name 等。

行将运行的 task

比方咱们运行 add 延时工作,定时在 20s 之后运行:

add.apply_async((1, 1), countdown=20)

返回的后果每个 worker 下有一个工作列表,每个列表存有工作的信息:

i.scheduled()

# 输入信息如下
# {'worker1@localhost': [{'eta': '2022-08-02T22:56:49.503517+08:00', 'priority': 6, 'request': {'id': '23080c03-a906-4cc1-9ab1-f27890c58adb', 'name': 'blog.tasks.add', 'args': [1, 1], 'kwargs': {}, 'type': 'blog.tasks.add', 'hostname': 'worker1@localhost', 'time_start': None, 'acknowledged': False, 'delivery_info': {...}}]}

queue 队列中期待的 task

如果咱们有工作在 queue 中积压,咱们能够应用:

i.reserved()

来获取队列中期待的 task 列表

4、其余 worker 命令

ping-pong

检测 worker 还活着的 worker

应用 ping() 函数,能够失去 pong 字符串的回复表明该 worker 是存活的。

from hunter.celery import app

app.control.ping(timeout=0.5)

# [{'worker1@localhost': {'ok': 'pong'}}]

咱们也能够指定 worker 来操作:

app.control.ping(['worker1@localhost'])

如果你理解 redis 的存活检测操作的话,应该晓得在 redis-cli 里也能够执行这个 ping-pong 的一来一回的检测操作。

如果想获取更多后端相干文章,可扫码关注浏览:

正文完
 0