乐趣区

关于python:django-celery-redis-实现定时任务

前言

前段时间,工作中有个要求,是将本来的定时工作,改成应用 celery 实现。本来我的项目中的定时工作是应用的 apscheduler。然而 django 与 celery 做定时的教程,网上比拟少,且版本对应不上,最初依据网上的教程 + 仔仔细细看了好多遍 celery 官网文档,终于跑通。

当初做一个疾速实现 django+celery 定时工作的教程 / 记录。
对于这三个组件的基础知识及装置就不赘述了。间接进入配置。

版本

Django==3.1.4
celery==5.0.5
redis==3.5.3

思路步骤

配置 celery 定时工作的思路和步骤次要为

  1. 创立 celery 实例
  2. 配置工作
  3. 编写工作函数
  4. 启动 woker 和 beat
  5. 存储后果

目录层级(供参考)

django_demo                    # 我的项目根目录
    ├── scheduler              # 这是一个 app
    │   ├── __init__.py
    │   ├── celery.py            # 实例化 celery 并指定 config
    │   ├── config.py           # celery 的配置文件
    │   └── tasks.py             # 工作函数
  1. celery.py

from __future__ import absolute_import
from celery import Celery

app = Celery("scheduler", broker="redis://:12345@localhost:6379/1", backend="redis://:12345@localhost:6379/2", include=["scheduler.tasks"])

app.config_from_object("scheduler.config")

此文件用于实例化 celery,并指定 broker 和 backend 为 redis(可写入配置文件)
include 是指向 task 文件
app.config_from_object() 指定 celery 的配置

  1. config.py

from __future__ import absolute_import

from datetime import timedelta
CELERY_TIMEZONE = "Asia/Shanghai"
CELERY_ENABLE_UTC = True

CELERYBEAT_SCHEDULE = {
    "test": {                                 #工作名,用于开发人员辨认
        "task": "scheduler.tasks.test",       #task 指向工作函数
        "schedule": timedelta(seconds=2),     #调度工夫 还能够应用 crontab
        "args": ()                            #没有参数能够不写},
}

此文件能够写一些 celery 的配置,以及通过 CELERYBEAT_SCHEDULE 将定时工作加载。
定时的工夫能够应用 crontab。
例如:crontab(hour=”*/24″) 示意每 24 小时执行一次
也能够指定工夫,详情参照 crontab 的应用办法

  1. tasks.py

from scheduler.celery import app

@app.task()
def test():
    print("hello")
    return None

import app 肯定是从实例化的 celery.py 中导入的。如果实例化文件不叫 celery.py, 或者对象不叫 app,须要对应扭转。
工作函数须要应用 @app.task()进行装璜。如果无需返回值,能够不写 return。

  1. 终端命令启动 worker 及 beat

此处须要开 2 个终端,或者在 liunx 中一起写入 shell 脚本中均可

celery -A scheduler.celery beat -l info  # 启动 beat
celery -A scheduler.celery worker -l info

scheduler.celery 是依据目录及文件所定的,请依据本人的我的项目进行扭转
-l info : 运行时输入日志

如果是在 windows 下进行启动则 worker 命令须要替换为

-P eventlet:是 windows 下启动 worker, 其余零碎删除
(如果提醒 eventlet 没有就应用 pip 装置下 eventlet)

celery -A scheduler.celery worker -l info -P eventlet  # windows 下启动 worker

以上是对于 celery 定时工作的简略配置




在工作我的项目中,代码须要尽可能的简洁,配置须要放在同一个总的配置文件中,并且我的项目分为开发和线上环境,所以最初我的 django 配置是这样的:

django_demo                        # 我的项目根目录
    ├── settings
    |    ├── base.py                #根底设置
    |    ├── develop.py          #开发环境设置          
    ├── scheduler              # 这是一个 app
    │   ├── __init__.py
    │   ├── celery.py            # 实例化 celery 并指定 config
    │   └── tasks.py             # 工作函数

我将上方的 config.py 中的配置别离写进了 django 中的 settings 文件中。

1. settings:

  • develop.py:
BROKER_URL = redis_url
CELERY_RESULT_BACKEND = redis_url

开发环境下,配置开发所用的 broker 和 backend,留神变量名不能写错 ,否则 celery 辨认不出
redis_url 是咱们的 redis 端口,我这里是写在了其余中央,不便对所有的组件进行治理。
也能够这样写:

BROKER_URL = "redis://:12345@localhost:6379/1"
CELERY_RESULT_BACKEND = "redis://:12345@localhost:6379/1"
  • base.py:
    此处我将工作的调度写入了我的项目的 base 配置。

    # celery 配置及工作配置
    CELERY_TIMEZONE = "Asia/Shanghai"
    CELERY_ENABLE_UTC = True
    
    CELERYBEAT_SCHEDULE = {
      "job1": {
          "task": "scheduler.tasks.job1",
          "schedule": crontab(minute=0, hour=3),
      },
      "job2": {
          "task": "scheduler.tasks.job2",
          "schedule": crontab(hour="*/24"),
      },
    }

2. celery.py

from __future__ import absolute_import

import os

from celery import Celery

profile = os.environ.setdefault("PROFILE", "production")
os.environ.setdefault("DJANGO_SETTINGS_MODULE", f"demo.settings.{profile}")

app = Celery("scheduler", include=["scheduler.jobs"])
app.config_from_object("django.conf:settings")
  1. 这里为了不便开发与线上进行区别,首先进行了整体环境的设置。
  2. profile 以及 os.environ.setdefault 都是对我的项目环境进行的设置.
  3. celery 的主体还是 app 与 app.config_from_object.

能够发现,我这里的 Celery()中的变量少了,更简洁了,这是因为,我把这些元素都写入了 django 的 settings 中了,celery 实例化的时候,会依据我配置的 ”django.conf:settings” 进行查找对应的值。

3.jobs.py

from scheduler.celery import app

@app.task()
def job1():
    pass

def job2():
    pass

总结:

celery 实现定时工作还是比拟容易实现的。次要是 celery 的实例化,配置与工作配置,工作函数,三个局部组成。对于配置能够随机应变,写到任何中央都能够。要害是三个文件在援用的时候,不必弄混就行了。
一个是实例化 celery 的时候,app.config_from_object() 留神写对咱们的配置门路
一个是配置文件中的配置定时工作时的工作函数门路
一个是工作函数的装璜器 @app.task()肯定是实例化 celery 中的对象。

展现下定时工作正在执行的输入。
local 是启动的 worker
local2 是启动的 beat

beat 终端中,运行起来的时候,会始终打印发送工作胜利。
worker 会始终输入工作接管胜利,并返回 return 值及工作函数中的输入。

参考

  • celery 官网文档
  • win10 下实现 django+celery 定时工作 保姆级别教程
退出移动版