Celery 实现分布式定时工作并开启监控 (Celery-Beat、Celery-Once、flower)
原理:celery-beat 作为任务调度,当达到定时工夫时,beat 将工作 id 装载进 rabbitmq 队列中,worker 在队列的另一端取出工作 id,并匹配以后注册的工作。如果没有注册,那么会报错。除此之外,worker 还会通过 celery-once 来尝试从 redis 中获取分布式锁,只有获取到锁的 worker 才会执行这个工作。worker 执行胜利或者失败通过 flower 监控
1. 环境筹备
pip install celery
pip install flower
pip install celery_once
2. 代码构造
代码构造如下:
工作函数 sendDingTest.py(钉钉机器人):
from dingtalkchatbot.chatbot import DingtalkChatbot
from start import celery_app
from datetime import datetime
import socket
from celery_once import QueueOnce
WEB_HOOK_SPIDER = '在钉钉机器人页面获取 web_hook'
# 基于工作名及传递的参数值来确认是否是同一个工作
@celery_app.task(base=QueueOnce, once={'graceful': True})
def send_ding_test(arg1, arg2):
dingding = DingtalkChatbot(WEB_HOOK_SPIDER)
arg3 = arg1 + arg2
dingding.send_text(msg="城市数据定时工作测试 -{},{} 执行时刻:{}".format(arg3, socket.gethostname(), datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
配置文件 config.py:
from celery.schedules import crontab
class celeryConfig(object):
# accept_content = ['json'] # 能够是 set,list,tuple,pickle,yaml
# result_accept_content = ['json']
timezone = 'Asia/Shanghai' # 中国只有两个时区,一个上海,一个乌鲁木齐
broker_url = "amqp://user:password@ip:port/vhost"
backend = ""include = ['jobs.sendDingTest'] # worker 启动时要导入的工作模块,须要在这里增加,以便 worker 可能找到咱们的工作
beat_schedule = {
'add-every-monday-morning':
{
'task': 'jobs.sendDingTest.send_ding_test', # 这里要写全门路,否则 worker 找不到
'schedule': crontab(minute="*/2"),
'args': (16, 16),
},
}
# celery-once 配置
ONCE = {
'backend': 'celery_once.backends.Redis',
'settings': {
'url': 'redis://ip:port/database',
'default_timeout': 60 * 60 # 分布式锁的默认超时工夫
}
}
启动函数 start.py:
from celery import Celery
import os
# windows 平台须要设置,os.environ.setdefault('FORKED_BY_MULTIPROCESSING', '1')
celery_app = Celery()
celery_app.config_from_object('config.celeryConfig')
3. 启动定时工作与监控
# 留神:beat,worker,flower 都能够不在同一台服务器上,散布开的话,须要在其余服务器上 copy 一份代码
# 开启 beat(start 是文件名)celery -A start.celery_app beat
# 开启 worker
celery -A start.celery_app worker -c 1 -l info
# 开启 flower(默认地址是 localhost:5555)celery -A start flower
以下是 flower 页面,能够查看 worker 数量、音讯队列(须要 rabbitmq 开启 rabbitmq_management)和工作执行后果等。
4. 文档参考
celery-once 文档
flower 文档
celery 官网文档