谁再不提前告诉我,就间接复制我的文章而后发表,全家生孩子没屁眼。
原文链接:https://segmentfault.com/a/11…
作者:SyntaxError
1. 配置
from App.tasks.DatabaseTask import send_ding_test # 我的工作函数
from flask_apscheduler.auth import HTTPBasicAuth
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
class Config(object):
JOBS = [
# interval 定时执行(从 start_date 到 end_date,距离 20s,蕴含首尾)# func 也能够写字符串模式,例如:'App.tasks.DatabaseTask:send_ding_test'
{
'id': 'job2',
'func': send_ding_test,
'trigger': 'interval',
'start_date': '2021-01-27 13:31:00',
'end_date': '2021-01-27 13:33:00',
'seconds': 20,
'replace_existing': True # 从新执行程序时,会将 jobStore 中的工作替换掉
},
# date 一次执行
{
'id': 'job1',
'func': send_ding_test,
'trigger': 'date',
'run_date': '2021-01-30 11:22:00',
'replace_existing': True
},
# cron 式定时调度,相似 linux 的 crontab
{
'id': 'job3',
'func': send_ding_test,
'trigger': 'cron',
'day_of_week': '0-6',
'month': '*',
'hour': '6',
'minute': '0',
'second': '0',
'replace_existing': True
}
]
# 存储定时工作(默认是存储在内存中)SCHEDULER_JOBSTORES = {'default':SQLAlchemyJobStore(url='mysql+pymysql://xxx/xx')}
# 设置时区,时区不统一会导致定时工作的工夫谬误
SCHEDULER_TIMEZONE = 'Asia/Shanghai'
# 肯定要开启 API 性能,这样才能够用 api 的形式去查看和批改定时工作
SCHEDULER_API_ENABLED = True
# api 前缀(默认是 /scheduler)SCHEDULER_API_PREFIX = '/scheduler'
# 配置容许执行定时工作的主机名
SCHEDULER_ALLOWED_HOSTS = ['*']
# auth 验证。默认是敞开的,SCHEDULER_AUTH = HTTPBasicAuth()
# 设置定时工作的执行器(默认是最大执行数量为 10 的线程池)SCHEDULER_EXECUTORS = {'default': {'type': 'threadpool', 'max_workers': 10}}
# 另外 flask-apscheduler 内有日志记录器。name 为 apscheduler.scheduler 和 apscheduler.executors.default。如果须要保留日志,则须要对此日志记录器进行配置
2. 初始化 app
import logging
from flask import Flask
from config import Config # 上边的配置文件
from flask_apscheduler.scheduler import APScheduler
from logging.handlers import RotatingFileHandler
# flask-apscheduler 内置有日志器,为了让外部的日志器打印的内容输入,我这里做了个配置
# 创立日志记录器,指明日志保留门路,每个日志大小,保留日志文件个数下限
file_log_handler = RotatingFileHandler('logs/runserver.log', maxBytes=1024*1024*100, backupCount=5)
# 创立日志的记录格局,],日志等级,记录时间,报错地位,行数,日志信息
formatter = logging.Formatter(fmt="%(asctime)s - %(levelname)s - %(threadName)s:%(thread)s - %(filename)s - %(funcName)s - %(message)s",
datefmt='%Y-%m-%d %H:%M:%S %a'
)
# 为刚创立的日志记录器设置日志记录格局
file_log_handler.setFormatter(formatter)
# 为全局日志对象增加日志记录器
logger = logging.getLogger("apscheduler")
logger.addHandler(file_log_handler)
logger.setLevel(logging.INFO)
def create_app():
app = Flask(__name__)
app.config.from_object(Config)
scheduler = APScheduler()
scheduler.init_app(app)
scheduler.start()
# 配置 api 权限验证的回调函数
@scheduler.authenticate
def authenticate(auth):
return auth['username'] == 'guest' and auth['password'] == 'guest'
app = create_app()
app.run(host='127.0.0.1', port='7788')
3.API 调用
Flask-APScheduler 内置了丰盛的 api 接口,能够让开发者动静的查看和更改定时工作,十分不便。这里我找到外部的一部分源码,能够看到所有的 api 接口的调用形式。
def _load_api(self):
"""Add the routes for the scheduler API."""
# 获取定时工作信息
self._add_url_route('get_scheduler_info', '', api.get_scheduler_info,'GET')
# 增加工作
self._add_url_route('add_job', '/jobs', api.add_job, 'POST')
# 获取工作
self._add_url_route('get_job', '/jobs/<job_id>', api.get_job, 'GET')
self._add_url_route('get_jobs', '/jobs', api.get_jobs, 'GET')
self._add_url_route('delete_job', '/jobs/<job_id>', api.delete_job, 'DELETE')
self._add_url_route('update_job', '/jobs/<job_id>', api.update_job, 'PATCH')
self._add_url_route('pause_job', '/jobs/<job_id>/pause', api.pause_job, 'POST')
self._add_url_route('resume_job', '/jobs/<job_id>/resume', api.resume_job, 'POST')
# 立刻执行一次定时工作
self._add_url_route('run_job', '/jobs/<job_id>/run', api.run_job, 'POST')
调用形式我这里用 postman 演示。首先是查看定时工作状态:
查看所有的定时工作:
立即执行一次定时工作:
还有就是,这些 api 能够带权限验证的,还记得我在配置那里配置了一项参数是:
SCHEDULER_AUTH = HTTPBasicAuth()
减少这项配置后,所有的 api 调用都必须携带 AUTHORIZATION。我这里说下怎么应用。在 init_app 那里我曾经设置了用户名明码(guest,guest),在申请接口时,须要携带加密后的用户名明码,加密形式如下:
import base64
# 用户名明码用: 分隔
string = "guest:guest".encode('utf-8')
a = base64.b64encode(string)
# print(a)
# b'Z3Vlc3Q6Z3Vlc3Q='
print(b"basic" + a) # 最初必须拼接上 basic
# b'basic Z3Vlc3Q6Z3Vlc3Q='
而后将这个字符串放在申请头中,申请头的键就是 AUTHORIZATION:
大家有什么问题,能够在文章下方留言,我看到后会立刻回复
其余很常见的问题,比方定时反复执行的问题,网上曾经有很多解决方案了,这里不再论述