Celery在使用前必须实例化,称为application或app。app是线程安全的,具有不同配置、组件、task的多个Celery应用可以在同一个进程空间共存。# 创建Celery应用>>> from celery import Celery>>> app = Celery()>>> app<Celery main:0x100469fd0>最后一行文本化显示了Celery应用:包含应用所属类的名称,当前主模块名,以及内存地址。唯一重要的信息是模块名称。Main Name在Celery中发送task消息时,该消息仅包含要执行的task的名称。每一个worker维护一个task名称和对应函数的映射,这称为task registry。当定义一个task时,该task将注册到本地:>>> @app.task… def add(x, y):… return x + y>>> add<@task: main.add>>>> add.name__main__.add>>> app.tasks[’main.add’]<@task: main.add>当Celery无法检测task函数属于哪个模块时,使用main模块名生成初始task名称。这种方式仅适用于以下两种场景:定义task的模块作为程序运行app在python shell中创建# tasks.pyfrom celery import Celeryapp = Celery()@app.taskdef add(x, y): return x + yif name == ‘main’: app.worker_main()如果直接运行tasks.py,task名将以__main__为前缀,但如果tasks.py被其他程序导入,task名将以tasks为前缀。如下:>>> from tasks import add>>> add.nametasks.add也可以直接指定主模块名:>>> app = Celery(’tasks’)>>> app.main’tasks’>>> @app.task… def add(x, y):… return x + y>>> add.nametasks.addConfiguration可以通过直接设置,或使用专用配置模块对Celery进行配置。通过app.conf属性查看或直接设置配置:>>> app.conf.timezone’Europe/London’>>> app.conf.enable_utc = True或用app.conf.update方法一次更新多个配置:>>> app.conf.update(… enable_utc=True,… timezone=‘Europe/London’,…)config_from_objectapp.config_from_object()方法从配置模块或对象中导入配置。需要注意的是:调用config_from_object()方法将重置在这之前配置的任何设置。使用模块名app.config_from_object()方法接收python模块的完全限定名(fully qualified name)或具体到其中的某个属性名,例如"celeryconfig", “myproj.config.celery”, 或"myproj.config:CeleryConfig":from celery import Celeryapp = Celery()app.config_from_object(‘celeryconfig’)只要能够正常执行import celeryconfig,app就能正常配置。使用模块对象也可以传入一个已导入的模块对象,但不建议这样做。import celeryconfigfrom celery import Celeryapp = Celery()app.config_from_object(celeryconfig)更推荐使用模块名的方式,因为这样在使用prefork pool时不需要序列化该模块。如果在实际应用中出现配置问题或序列化错误,请尝试使用模块名的方式。使用配置类或对象from celery import Celeryapp = Celery()class Config: enable_utc = True timezone = ‘Europe/London’app.config_from_object(Config)config_from_envvarapp.config_from_envvar()方法从环境变量中接收配置模块名。import osfrom celery import Celery#: Set default configuration module nameos.environ.setdefault(‘CELERY_CONFIG_MODULE’, ‘celeryconfig’)app = Celery()app.config_from_envvar(‘CELERY_CONFIG_MODULE’)通过环境变量指定配置模块:$ CELERY_CONFIG_MODULE=“celeryconfig.prod” celery worker -l infoCensored configuration如果要显示Celery配置,可能需要过滤某些敏感信息如密码、密钥等。Celery提供了几种用于帮助显示配置的实用方法。humanize()该方法返回列表字符串形式的配置,默认只包含改动过的配置,如果要显示内置的默认配置,设置with_defaults参数为True:>>> app.conf.humanize(with_defaults=False, censored=True)table()该方法返回字典形式的配置:>>> app.conf.table(with_defaults=False, censored=True)Celery可能不会移除所有的敏感信息,因为它使用正则表达式匹配键并判断是否移除。如果用户添加了包含敏感信息的自定义配置,可以使用Celery可能标记为敏感配置的名称来命名(API, TOKEN, KEY, SECRET, PASS, SIGNATURE, DATABASE)。Laziness应用实例是惰性的。创建Celery实例只会执行以下操作:创建用于event的logical clock instance创建task registry设置为当前应用(除非禁用了set_as_current参数)调用app.on_init()回调函数(默认不执行任何操作)app.task()装饰器不会在task定义时立即创建task,而是在task使用时或finalized应用后创建。下例说明了在使用task或访问其属性前,都不会创建task:>>> @app.task>>> def add(x, y):… return x + y>>> type(add)<class ‘celery.local.PromiseProxy’>>>> add.evaluated()False>>> add # <– causes repr(add) to happen<@task: main.add>>>> add.evaluated()True应用的Finalization指显式地调用app.finalize()方法或隐式地访问app.tasks属性。finalized应用将会:复制必须在应用间共享的task。task默认是共享的,但如果禁用了task装饰器的shared属性,将属于应用私有。评估所有待处理的task装饰器确保所有task绑定到当前应用。将task绑定到某个应用,以便可以从配置中读取默认值。Breaking the chain虽然可以依赖于当前应用,但最佳实践是将应用实例传递给任何需要它的对象,这个行为可以称为app chain。# 依赖于当前应用(bad)from celery import current_appclass Scheduler(object): def run(self): app = current_app# 传递应用实例(good)class Scheduler(object): def init(self, app): self.app = app在开发模式设置CELERY_TRACE_APP环境变量,可以在应用链断开时抛出异常:$ CELERY_TRACE_APP=1 celery worker -l infoAbstract Tasks使用task()装饰器创建的task都继承自celery.app.task模块的Task基类。继承该类可以自定义task类:from celery import Task# 或者 from celery.app.task import Taskclass DebugTask(Task): def call(self, *args, **kwargs): print(‘TASK STARTING: {0.name}[{0.request.id}]’.format(self)) return super(DebugTask, self).call(*args, **kwargs)如果要重写__call__()方法,记得调用super。这样在task直接调用时会执行基类的默认事件。Task基类是特殊的,因为它并未绑定到任何特定的应用。一旦task绑定到应用,它将读取配置以设置默认值等。通过base参数指定基类@app.task(base=DebugTask)def add(x, y): return x + y通过app.Task属性指定基类>>> from celery import Celery, Task>>> app = Celery()>>> class MyBaseTask(Task):… queue = ‘hipri’>>> app.Task = MyBaseTask>>> app.Task<unbound MyBaseTask>>>> @app.task… def add(x, y):… return x + y>>> add<@task: main.add>>>> add.class.mro()[<class add of <Celery main:0x1012b4410>>, <unbound MyBaseTask>, <unbound Task>, <type ‘object’>]