乐趣区

Celery中文翻译-Application

Celery 在使用前必须实例化,称为 application 或 app。app 是线程安全的,具有不同配置、组件、task 的多个 Celery 应用可以在同一个进程空间共存。
# 创建 Celery 应用
>>> from celery import Celery
>>> app = Celery()
>>> app
<Celery __main__:0x100469fd0>
最后一行文本化显示了 Celery 应用:包含应用所属类的名称,当前主模块名,以及内存地址。唯一重要的信息是模块名称。
Main Name
在 Celery 中发送 task 消息时,该消息仅包含要执行的 task 的名称。每一个 worker 维护一个 task 名称和对应函数的映射,这称为 task registry。
当定义一个 task 时,该 task 将注册到本地:
>>> @app.task
… def add(x, y):
… return x + y

>>> add
<@task: __main__.add>

>>> add.name
__main__.add

>>> app.tasks[‘__main__.add’]
<@task: __main__.add>
当 Celery 无法检测 task 函数属于哪个模块时,使用 main 模块名生成初始 task 名称。
这种方式仅适用于以下两种场景:

定义 task 的模块作为程序运行
app 在 python shell 中创建

# tasks.py
from celery import Celery
app = Celery()

@app.task
def add(x, y): return x + y

if __name__ == ‘__main__’:
app.worker_main()
如果直接运行 tasks.py,task 名将以__main__为前缀,但如果 tasks.py 被其他程序导入,task 名将以 tasks 为前缀。如下:
>>> from tasks import add
>>> add.name
tasks.add
也可以直接指定主模块名:
>>> app = Celery(‘tasks’)
>>> app.main
‘tasks’

>>> @app.task
… def add(x, y):
… return x + y

>>> add.name
tasks.add
Configuration
可以通过直接设置,或使用专用配置模块对 Celery 进行配置。
通过 app.conf 属性查看或直接设置配置:
>>> app.conf.timezone
‘Europe/London’

>>> app.conf.enable_utc = True
或用 app.conf.update 方法一次更新多个配置:
>>> app.conf.update(
… enable_utc=True,
… timezone=’Europe/London’,
…)
config_from_object
app.config_from_object()方法从配置模块或对象中导入配置。需要注意的是:调用 config_from_object()方法将重置在这之前配置的任何设置。
使用模块名 app.config_from_object()方法接收 python 模块的完全限定名 (fully qualified name) 或具体到其中的某个属性名,例如 ”celeryconfig”, “myproj.config.celery”, 或 ”myproj.config:CeleryConfig”:
from celery import Celery

app = Celery()
app.config_from_object(‘celeryconfig’)
只要能够正常执行 import celeryconfig,app 就能正常配置。
使用模块对象也可以传入一个已导入的模块对象,但不建议这样做。
import celeryconfig

from celery import Celery

app = Celery()
app.config_from_object(celeryconfig)
更推荐使用模块名的方式,因为这样在使用 prefork pool 时不需要序列化该模块。如果在实际应用中出现配置问题或序列化错误,请尝试使用模块名的方式。
使用配置类或对象
from celery import Celery

app = Celery()

class Config:
enable_utc = True
timezone = ‘Europe/London’

app.config_from_object(Config)
config_from_envvar
app.config_from_envvar()方法从环境变量中接收配置模块名。
import os
from celery import Celery

#: Set default configuration module name
os.environ.setdefault(‘CELERY_CONFIG_MODULE’, ‘celeryconfig’)

app = Celery()
app.config_from_envvar(‘CELERY_CONFIG_MODULE’)
通过环境变量指定配置模块:
$ CELERY_CONFIG_MODULE=”celeryconfig.prod” celery worker -l info
Censored configuration
如果要显示 Celery 配置,可能需要过滤某些敏感信息如密码、密钥等。Celery 提供了几种用于帮助显示配置的实用方法。
humanize()该方法返回列表字符串形式的配置,默认只包含改动过的配置,如果要显示内置的默认配置,设置 with_defaults 参数为 True:
>>> app.conf.humanize(with_defaults=False, censored=True)
table()该方法返回字典形式的配置:
>>> app.conf.table(with_defaults=False, censored=True)
Celery 可能不会移除所有的敏感信息,因为它使用正则表达式匹配键并判断是否移除。如果用户添加了包含敏感信息的自定义配置,可以使用 Celery 可能标记为敏感配置的名称来命名(API, TOKEN, KEY, SECRET, PASS, SIGNATURE, DATABASE)。
Laziness
应用实例是惰性的。
创建 Celery 实例只会执行以下操作:

创建用于 event 的 logical clock instance

创建 task registry

设置为当前应用(除非禁用了 set_as_current 参数)
调用 app.on_init()回调函数(默认不执行任何操作)

app.task()装饰器不会在 task 定义时立即创建 task,而是在 task 使用时或 finalized 应用后创建。
下例说明了在使用 task 或访问其属性前,都不会创建 task:
>>> @app.task
>>> def add(x, y):
… return x + y

>>> type(add)
<class ‘celery.local.PromiseProxy’>

>>> add.__evaluated__()
False

>>> add # <– causes repr(add) to happen
<@task: __main__.add>

>>> add.__evaluated__()
True
应用的 Finalization 指显式地调用 app.finalize()方法或隐式地访问 app.tasks 属性。
finalized 应用将会:

复制必须在应用间共享的 task。task 默认是共享的,但如果禁用了 task 装饰器的 shared 属性,将属于应用私有。
评估所有待处理的 task 装饰器
确保所有 task 绑定到当前应用。将 task 绑定到某个应用,以便可以从配置中读取默认值。

Breaking the chain
虽然可以依赖于当前应用,但最佳实践是将应用实例传递给任何需要它的对象,这个行为可以称为 app chain。
# 依赖于当前应用(bad)

from celery import current_app

class Scheduler(object):

def run(self):
app = current_app
# 传递应用实例(good)

class Scheduler(object):

def __init__(self, app):
self.app = app
在开发模式设置 CELERY_TRACE_APP 环境变量,可以在应用链断开时抛出异常:
$ CELERY_TRACE_APP=1 celery worker -l info
Abstract Tasks
使用 task()装饰器创建的 task 都继承自 celery.app.task 模块的 Task 基类。继承该类可以自定义 task 类:
from celery import Task
# 或者 from celery.app.task import Task

class DebugTask(Task):

def __call__(self, *args, **kwargs):
print(‘TASK STARTING: {0.name}[{0.request.id}]’.format(self))
return super(DebugTask, self).__call__(*args, **kwargs)
如果要重写__call__()方法,记得调用 super。这样在 task 直接调用时会执行基类的默认事件。
Task 基类是特殊的,因为它并未绑定到任何特定的应用。一旦 task 绑定到应用,它将读取配置以设置默认值等。

通过 base 参数指定基类
@app.task(base=DebugTask)
def add(x, y):
return x + y

通过 app.Task 属性指定基类
>>> from celery import Celery, Task

>>> app = Celery()

>>> class MyBaseTask(Task):
… queue = ‘hipri’

>>> app.Task = MyBaseTask
>>> app.Task
<unbound MyBaseTask>

>>> @app.task
… def add(x, y):
… return x + y

>>> add
<@task: __main__.add>

>>> add.__class__.mro()
[<class add of <Celery __main__:0x1012b4410>>,
<unbound MyBaseTask>,
<unbound Task>,
<type ‘object’>]

退出移动版