乐趣区

Celery实际使用与内存泄漏问题面试

1. 实际使用

监控 task 的执行结果:任务 id,结果,traceback,children,任务状态

​ 配置 backend='redis://127.0.0.1:6379/5'给 Celery 的 app 对象,直接在 redis 中查看

​ 还可以

健壮 celerycelery -A proj worker -l info

☁  proj  tree
├── __init__.py 
├── celery.py | app=Clery('proj',include=['proj.tasks'])
                app.config_from_object('proj.config')
                if __name__==__main__:app.start()
├── config.py |  CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/6'
                BROKER_URL = 'redis://127.0.0.1:6379/5'      
└── tasks.py  |  @app.task            # 注意这个文件名必须是 tasks.py
                def add(x, y): return x + y             

​ tasks 可以有多个在 celery.py 中添加一行代码加载任务函数

app.autodiscover_tasks(['proj.sms', 'proj.email'])

Scheduler 计划定时任务:celery -A proj worker -B -l info

#config.py
CELERY_TIMEZONE = 'Asia/Shanghai' # 指定时区
from datetime import timedelta
CELERYBEAT_SCHEDULE = {
    'add-every-30-seconds': {
         'task': 'proj.tasks.add', # 指定要执行的函数任务
         'schedule': timedelta(seconds=30), # 指定计划时间间隔 30s 执行一次 task
         'args': (16, 16)
    },
}

celery.schedules import crontab 定时周期任务:(比如每周一执行一次)

​ 只需要修改 'schedule': crontab(hour=7, minute=30, day_of_week=1),

2.celery 扩展使用

指定队列名:

​ 启动加上 - Q 参数 celery -A proj worker --loglevel=info -Q 'testq'

​ 跑任务时 add.delay(3,4,queue='testq')

指定开启的 worker 进程数:单个 Celery 进程每分钟就可以处理数百万个任务

​ 底层是调用的 Python 的 multiprocessing 模块中的 Pool 进程池思想来做

​ 启动加上 - c 参数 celery -A proj worker --loglevel=info -c 2 2 个 worker 进程来同时抢任务

图像化查看 broker 里面的数据,查看任务状态,以及任务的详细信息:flower 的 webUI

pip install flower 注意创建 celery 实例 app 时指定的 broker 设置的 redis/5

​ 任意目录执行 celery flower --port=5555 --broker=redis://localhost:6379/5

3.DJango-celery 模式(嵌入到大型 DJango 项目中)

应用: django 调用 celery 跑异步任务,常见场景有注册成功,发送邮件可以异步来防止网络 IO 阻塞,以及耗时间的任务,可以在 WEB 应用中使用这种异步方式

  1. 安装 django-celery==3.1.17celery==3.1.17 对应
  2. 创建 celery 必须的数据库表结构 python manage.py migrate
  3. django 项目的 settings.py 文件中追加如下内容:backend,任务执行结果超时时间,worker 并发数也就是 -c 指定的数据,指定任务周期存储在 orm 数据库中
  4. 在 django 的 app 应用目录下创建 tasks.py 任务文件@task def add(x,y):
  5. 开启 django 服务和 celery 服务,虽然耦合了,还要开python manage.py celery worker --loglevel=info

4. 内存泄漏问题

celery 内存泄露分析

celery 配置项如下

CELERYD_CONCURRENCY = 2      celery worker 并发数
CELERYD_MAX_TASKS_PER_CHILD = 5   每个 worker 最大执行任务数

 
执行 celery -A ansibleAPI.celery worker 启动 celery,通过 ps -ef | grep celery 可以看到两个 celery worker 进程(8226,8228)。

利用 celery worker 进行某个任务,当 worker 没有执行到最大任务时(即销毁重建),每执行一次任务占用内存必然有所增加,任务数为 9,10 时(celery 均匀调度,并发数 * 最大任务数),分别有原 8228 worker 被销毁,重新创建 9386 worker 及原 8226 worker 被销毁,重新创建 9564 worker,此时,运行第 9 次时,占用总内存有所下降,运行第 10 次时,总内存回到初如值,同样任务执行第 19、20 次情况类似。

celery 并发计算规则
celery 任务并发只与 celery 配置项 CELERYD_CONCURRENCY 有关,与 CELERYD_MAX_TASKS_PER_CHILD 没有关系,即 CELERYD_CONCURRENCY=2,只能并发 2 个 worker,此时任务处理较大的文件时,执行两次可以看到两个 task 任务并行执行,而执行第三个任务时,开始排队,直到两个 worker 执行完毕。

结论
celery 执行完任务不释放内存与原 worker 一直没有被销毁有关,因此 CELERYD_MAX_TASKS_PER_CHILD 可以适当配置小点,而任务并发数与 CELERYD_CONCURRENCY 配置项有关,每增加一个 worker 必然增加内存消耗,同时也影响到一个 worker 何时被销毁,因为 celery 是均匀调度任务至每个 worker,因此也不宜配置过大,适当配置。

退出移动版