本文首发于公众号:Hunter 后端
原文链接:celery 笔记六之 worker 介绍
后面咱们介绍过 celery 的现实的设计形式是几个 worker 解决特定的工作队列的数据,这样能够防止工作在队列中的积压。
这一篇笔记咱们介绍一下如何应用 worker 进步零碎中工作的解决效率。
- worker 启动
- worker 与队列
- worker 检测
- 其余 worker 命令
1、worker 启动
后面介绍过 worker 的启动形式,在 celery 配置文件的上一级目录运行上面的命令:
celery -A hunter worker -l INFO
其中,-l 示意日志等级,相当于是 –loglevel=INFO
celery -A hunter worker --loglevel=INFO
指定 worker 的 hostname
celery -A hunter worker -l INFO -n worker1@%h
其中,%h 示意主机名,蕴含域名在内,%n 示意仅蕴含主机名,%d 示意仅蕴含域名。
以下是示例:
变量 | 示例 | 后果 |
---|---|---|
%h | worker1@%h | worker1@george.example.com |
%n | worker1@%n | worker1@george |
%d | worker1@%d | worker1@example.com |
指定日志文件地址
logfile 参数能够指定日志文件地址:
celery -A hunter worker --loglevel=INFO --logfile=/Users/hunter/python/celery_log/celery.log
杀死 worker 过程
咱们能够通过获取 worker 的过程 id 来杀死这些过程:
ps aux | grep 'celery -A hunter' | awk '{print $2}' |xargs sudo kill -9
并发解决
一般来说,当咱们间接启动 worker 的时候,会默认同时起好几个 worker 过程。
如果不指定 worker 的数量,worker 的过程会默认是所在机器的 CPU 的数量。
咱们也能够通过 concurrency 参数来指定启动 worker 的过程数。
比如说,咱们想启动三个 worker 的过程,能够如下指定:
celery -A hunter worker --concurrency=3 -l INFO
–concurrency 也能够简写成 -c:
celery -A hunter worker -c 3 -l INFO
这样,咱们在启动的命令行里输出上面的参数就能够看到启动了三个 worker 的过程:
ps aux |grep 'celery -A hunter'
这里有一个对于 worker 过程数启动多少的问题,是不是咱们的 worker 启动的越多,咱们的定时工作和延时工作就会执行得越快呢?
并不是,有试验证实 worker 的数量启动得越多,对于 task 解决的性能有可能还会起到一个反向作用,这里不作展开讨论,咱们能够设置 CPU 的数量即可。
当然,你也能够依据 worker 解决工作的状况,基于 application,基于工作负载,工作运行工夫等试验出一个最佳的数量。
2、worker 与队列
生产指定队列的 task
咱们能够在运行 worker 的时候指定 worker 只生产特定队列的 task,这个特定队列,能够是一个,也能够是多个,用逗号分隔开。
指定的形式如下:
celery -A hunter worker -l INFO -Q queue_1,queue_2
列出所有沉闷的 queues
上面的命令能够列出所有零碎沉闷的队列信息:
celery -A hunter inspect active_queues
假如目前咱们相干配置如下:
app.conf.task_queues = (Queue('default_queue',),
Queue('queue_1'),
Queue('queue_2'),
)
app.conf.task_routes = {
'blog.tasks.add': {'queue': 'queue_1',},
'blog.tasks.minus': {'queue': 'queue_2',},
}
咱们这样启动 worker:
celery -A hunter worker -l INFO -c 3 -n worker1@%h
而后运行下面的查看队列命令:
celery -A hunter inspect active_queues
能够看到如下输入:
-> worker1@localhost: OK
* {'name': 'default_queue', 'exchange': {...}, 'routing_key': 'default_queue', ...}
* {'name': 'queue_1', 'exchange': {...}, 'routing_key': 'default_queue', ...}
* {'name': 'queue_2', 'exchange': {...}, 'routing_key': 'default_queue', ...}
1 node online.
其中,输入后果最下面的 worker1@localhost 就是咱们启动 worker 通过 -n 指定的 hostnam,能够通过这个来指定 worker。
咱们能够指定 worker 输入对应的队列数据:
celery -A hunter inspect active_queues -d worker1@localhost
除了命令行,咱们也能够在交互界面来获取这些数据:
# 获取所有的队列信息
from hunter.celery import app
app.control.inspect().active_queues()
# 获取指定 worker 的队列信息
app.control.inspect(['worker1@localhost']).active_queues()
3、worker 的检测
app.control.inspect() 函数能够检测正在运行的 worker 信息,咱们能够用上面的命令来操作:
from hunter.celery import app
i = app.control.inspect()
这个操作是获取所有节点,咱们也能够指定单个或者多个节点检测:
# 输出数组参数,示意获取多个节点 worker 信息
i = app.control.inspect(['worker1@localhost', 'worker2@localhost'])
# 输出单个 worker 名,指定获取 worker 信息
i = app.control.inspect('worker1@localhost')
获取曾经注册的 task 列表
用到后面的 app.control.inspect() 函数和其下的 registered() 函数
i.registered()
# 输入后果为 worker 及其下的 task name
# 输入示例为 {'worker1@localhost': ['blog.tasks.add', 'blog.tasks.minus', 'polls.tasks.multi']}
输入的格局是一个 dict,worker 的名称为 key,task 列表为 value
正在执行的 task
active() 用于获取正在执行的 task 函数
i.active()
# 输入 worker 正在执行的 task
# 输入示例为 {'worker1@localhost': [{'id': 'xxx', 'name': 'blog.tasks.add', 'args': [3, 4], 'hostname': 'worker1@localhost', 'time_start': 1659450162.58197, ..., 'worker_pid': 41167}
输入的后果也是一个 dict,每个 worker 下有 n 个正在 worker 中执行的 task 信息,这个 n 的最大数量取决于后面咱们启动 worker 时的 –concurrency 参数。
在其中的 task 信息里蕴含 task_id,task_name,和输出的参数,开始工夫,worker name 等。
行将运行的 task
比方咱们运行 add 延时工作,定时在 20s 之后运行:
add.apply_async((1, 1), countdown=20)
返回的后果每个 worker 下有一个工作列表,每个列表存有工作的信息:
i.scheduled()
# 输入信息如下
# {'worker1@localhost': [{'eta': '2022-08-02T22:56:49.503517+08:00', 'priority': 6, 'request': {'id': '23080c03-a906-4cc1-9ab1-f27890c58adb', 'name': 'blog.tasks.add', 'args': [1, 1], 'kwargs': {}, 'type': 'blog.tasks.add', 'hostname': 'worker1@localhost', 'time_start': None, 'acknowledged': False, 'delivery_info': {...}}]}
queue 队列中期待的 task
如果咱们有工作在 queue 中积压,咱们能够应用:
i.reserved()
来获取队列中期待的 task 列表
4、其余 worker 命令
ping-pong
检测 worker 还活着的 worker
应用 ping() 函数,能够失去 pong 字符串的回复表明该 worker 是存活的。
from hunter.celery import app
app.control.ping(timeout=0.5)
# [{'worker1@localhost': {'ok': 'pong'}}]
咱们也能够指定 worker 来操作:
app.control.ping(['worker1@localhost'])
如果你理解 redis 的存活检测操作的话,应该晓得在 redis-cli 里也能够执行这个 ping-pong 的一来一回的检测操作。
如果想获取更多后端相干文章,可扫码关注浏览: