本文首发于公众号:Hunter 后端
原文链接:celery 笔记三之 task 和 task 的调用
这一篇笔记介绍 task 和 task 的调用。
以下是本篇笔记目录:
- 根底的 task 定义形式
- 日志解决
- 工作重试
- 疏忽工作运行后果
- task 的调用
1、根底的 task 定义形式
后面两篇笔记中介绍了最简略的定义形式,应用 @app.task 作为装璜器:
@app.task
def add(x, y):
return x + y
如果是在 Django 零碎中应用 celery,须要定义一个延时工作或者周期定时工作,能够应用 @shared_task 来润饰
from celery import shared_task
@shared_task
def add(x, y):
return x + y
在 Django 零碎中应用 celery 的形式会在接下来的几篇笔记中介绍道。
多个装璜器
如果是 celery 的工作和其余装璜器一起联用,记得将 celery 的装璜器放在最初应用,也就是列表的最后面:
@app.task
@decorator1
@decorator2
def add(x, y):
return x + y
task 名称
每个 task 都有一个惟一的名称用来标识这个 task,如果咱们在定义的时候不指定,零碎会为咱们默认一个名称,这些名称会在 celery 的 worker 启动的时候被零碎扫描而后输入一个列表展现。
还是上一篇笔记中咱们定义的两个 task,咱们给其中一个指定 name:
#tasks1.py
from .celery import app
@app.task(name="tasks1.add")
def add(x, y):
return x + y
能够察看在 celery 的 worker 启动的时候,会有一个输入:
[tasks]
. proj.tasks2.mul
. tasks1.add
能够看到这个中央,零碎就会应用咱们定义的 name 了。
2、日志解决
咱们能够在启动 worker 的时候指定日志的输入,定义格局如下:
celery -A proj worker -l INFO --logfile=/Users/hunter/python/celery_log/celery.log
在 task 中的定义能够应用 celery 中办法:
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
也能够间接应用 logging 模块:
import logging
logger1 = logging.getLogger(__name__)
间接在 task 中输入:
@app.task(name="tasks1.add")
def add(x, y):
logger.info("this is from logger")
return x + y
而后在 worker 启动时指定的日志文件就会有咱们打印出的日志内容:
[2022-07-24 16:28:33,210: INFO/ForkPoolWorker-7] tasks1.add[4db4b0fc-c6ca-472a-8847-ae42e0a7959a]: this is from logger
[2022-07-24 16:28:33,224: INFO/ForkPoolWorker-7] Task tasks1.add[4db4b0fc-c6ca-472a-8847-ae42e0a7959a] succeeded in 0.016244667931459844s: 3
3、工作重试
对于一个 task,咱们能够对其设置 retry 参数来指定其在工作执行失败后会重试几次,以及隔多长时间重试。
比方对于上面的 div() 函数,咱们来输出除数为 0 的状况查看重试的性能。
当然,这里咱们是成心输出参数谬误,在理论的我的项目中可能会是其余的起因造成工作失败,比方数据库连贯失败等
工作重试的参数也都在 @app.task() 中定义:
# tasks1.py
@app.task(autoretry_for=(Exception,), default_retry_delay=10, retry_kwargs={'max_retries': 5})
def div(x, y):
return x / y
在这里,autoretry_for 示意的是某种报错状况下重试,咱们定义的 Exception 示意任何谬误都重试。
如果只是想在某种特定的 exception 状况下重试,将那种 exception 的值替换 Exception 即可。
default_retry_delay 示意重试距离时长,默认值是 3 * 60s,即三分钟,是以秒为单位,这里咱们设置的是 10s。
retry_kwargs 是一个 dict,其中有一个 max_retries 参数,示意的是最大重试次数,咱们定为 5
而后能够尝试调用这个延时工作:
from proj.tasks1 import div
div.delay(1, 0)
而后能够看到在日志文件会有如下输入:
[2022-07-24 16:59:35,653: INFO/ForkPoolWorker-7] Task proj.tasks1.div[1f65c410-1b2a-4127-9d83-a84b1ad9dd2c] retry: Retry in 10s: ZeroDivisionError('division by zero',)
且每隔 10s 执行一次,一共执行 5 次,5 次之后还是不胜利则会报错。
retry_backoff 和 retry_backoff_max
还有一个 retry_backoff 和 retry_backoff_max 参数,这两个参数是用于这种状况:如果你的 task 依赖另一个 service 服务,比方会调用其余零碎的 API,而后这两个参数能够用于防止申请过多的占用服务。
retry_backoff 参数能够设置成一个 布尔型数据,为 True 的话,主动重试的工夫距离会成倍的增长
第一次重试是 1 s 后
第二次是 2s 后
第三次是 4s 后
第四次是 8s 后
…
如果 retry_backoff 参数是一个数字,比方是 3,那么后续的间隔时间则是 3 的倍数增长
第一次重试 3s 后
第二次是 6s 后
第三次是 12s 后
第四次是 24s 后
retry_backoff_max 是重试的最大的间隔时间,比方重试次数设置的很大,retry_backoff 的间隔时间反复达到了这个值之后就不再增大了。
这个值默认是 600s,也就是 10 分钟。
咱们看一下上面这个例子:
# tasks1.py
@app.task(autoretry_for=(Exception,), retry_backoff=2, retry_backoff_max=40, retry_kwargs={'max_retries': 8})
def div(x, y):
return x / y
对于重试的机制,实践上应该是依照咱们后面列出来的重试工夫距离进行重试,然而如果咱们这样间接运行 div.delay(),得出的间隔时间是不定的,是在 0 到 最大值之间得出的一个随机值。
这样产生的起因是因为还有一个 retry_jitter 参数,这个参数默认是 True,所以工夫距离会是一个随机值。
如果须要工作延时的距离值是依照 retry_backoff 和 retry_backoff_max 两个设定值来运行,那么则须要将 retry_jitter 值设为 False。
# tasks1.py
@app.task(autoretry_for=(Exception,), retry_backoff=2, retry_backoff_max=40, retry_jitter=False, retry_kwargs={'max_retries': 8})
def div(x, y):
return x / y
而后运行 div 的延时工作,就能够看到延时工作依照法则的间隔时间重试了,以下是日志:
[2022-07-24 19:00:38,588: INFO/ForkPoolWorker-7] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] retry: Retry in 2s: ZeroDivisionError('division by zero',)
[2022-07-24 19:00:40,662: INFO/MainProcess] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] received
[2022-07-24 19:00:40,664: INFO/ForkPoolWorker-7] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] retry: Retry in 4s: ZeroDivisionError('division by zero',)
[2022-07-24 19:00:44,744: INFO/MainProcess] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] received
[2022-07-24 19:00:44,746: INFO/ForkPoolWorker-7] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] retry: Retry in 8s: ZeroDivisionError('division by zero',)
[2022-07-24 19:00:52,870: INFO/MainProcess] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] received
[2022-07-24 19:00:52,872: INFO/ForkPoolWorker-7] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] retry: Retry in 16s: ZeroDivisionError('division by zero',)
[2022-07-24 19:01:09,338: INFO/MainProcess] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] received
[2022-07-24 19:01:09,340: INFO/ForkPoolWorker-7] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] retry: Retry in 32s: ZeroDivisionError('division by zero',)
[2022-07-24 19:01:41,843: INFO/MainProcess] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] received
[2022-07-24 19:01:41,845: INFO/ForkPoolWorker-7] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] retry: Retry in 40s: ZeroDivisionError('division by zero',)
[2022-07-24 19:02:21,923: INFO/MainProcess] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] received
[2022-07-24 19:02:21,925: INFO/ForkPoolWorker-7] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] retry: Retry in 40s: ZeroDivisionError('division by zero',)
[2022-07-24 19:03:02,001: INFO/MainProcess] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] received
[2022-07-24 19:03:02,003: INFO/ForkPoolWorker-7] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] retry: Retry in 40s: ZeroDivisionError('division by zero',)
因为咱们设置的重试间隔时间最大为 40s,所以这个中央延时间隔时间到了 40 之后,就不再往上持续增长了。
4、疏忽工作运行后果
有时候延时工作的后果咱们并不想保留,然而咱们配置了 result_backend 参数,这个时候咱们有三种形式不保留运行后果。
1.ignore_result=True 不保留工作运行的后果
@app.task(ignore_result=True)
def add(x, y):
return x + y
2.app.conf 配置
也能够通过 app.conf 的配置来禁用后果的保留:
app.conf.update(task_ignore_result=True)
3. 执行单个工作的时候禁用
from proj.tasks1 import add
add.apply_async((1, 2), ignore_result=True)
apply_async() 函数的作用相当于是带参数的 delay(),或者 delay() 是简化版的 apply_async(),这个咱们上面会介绍。
5、task 的调用
后面简略两个简略的调用办法,一个是 apply_async(),一个是 delay()。
简略来说就是 delay() 是不带参数执行的 apply_async()。
以下用 add() 函数为例介绍一下他们的用法:
delay()
纯正的延时工作,只能如下操作:
add.delay(1, 2)
apply_async()
带参数的用法,add() 函数的参数用 () 包起来:
add.apply_async((1, 2))
也能够带其余参数,比方下面介绍的不保留运行后果:
add.apply_async((1, 2), ignore_result=True)
这个函数还能够指定延时的工夫:
countdown 参数
当初开始 10s 后开始运行:
add.apply_async((1, 2), countdown=10)
eta 参数
也能够用 eta 参数来指定 10s 后运行:
from datetime import datetime, timedelta
now = datetime.now()
add.apply_async((1, 2), eta=now + timedelta(seconds=10))
expires 参数
这个是用来设置过期的参数:
add.apply_async((1, 2), countdown=60, expires=120)
下面的参数示意,距当初 60 秒后开始执行,两分钟后过期
如果想获取更多后端相干文章,可扫码关注浏览:
如果想获取更多后端相干文章,可扫码关注浏览: