谁再不提前告诉我,就间接复制我的文章而后发表,全家生孩子没屁眼。原文链接:https://segmentfault.com/a/11...
作者:SyntaxError
1.配置
from App.tasks.DatabaseTask import send_ding_test # 我的工作函数from flask_apscheduler.auth import HTTPBasicAuthfrom apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStoreclass Config(object): JOBS = [ # interval定时执行(从start_date到end_date,距离20s,蕴含首尾) # func也能够写字符串模式,例如:'App.tasks.DatabaseTask:send_ding_test' { 'id': 'job2', 'func': send_ding_test, 'trigger': 'interval', 'start_date': '2021-01-27 13:31:00', 'end_date': '2021-01-27 13:33:00', 'seconds': 20, 'replace_existing': True # 从新执行程序时,会将jobStore中的工作替换掉 }, # date一次执行 { 'id': 'job1', 'func': send_ding_test, 'trigger': 'date', 'run_date': '2021-01-30 11:22:00', 'replace_existing': True }, # cron式定时调度,相似linux的crontab { 'id': 'job3', 'func': send_ding_test, 'trigger': 'cron', 'day_of_week': '0-6', 'month': '*', 'hour': '6', 'minute': '0', 'second': '0', 'replace_existing': True } ] # 存储定时工作(默认是存储在内存中) SCHEDULER_JOBSTORES = {'default':SQLAlchemyJobStore(url='mysql+pymysql://xxx/xx')} # 设置时区,时区不统一会导致定时工作的工夫谬误 SCHEDULER_TIMEZONE = 'Asia/Shanghai' # 肯定要开启API性能,这样才能够用api的形式去查看和批改定时工作 SCHEDULER_API_ENABLED = True # api前缀(默认是/scheduler) SCHEDULER_API_PREFIX = '/scheduler' # 配置容许执行定时工作的主机名 SCHEDULER_ALLOWED_HOSTS = ['*'] # auth验证。默认是敞开的, SCHEDULER_AUTH = HTTPBasicAuth() # 设置定时工作的执行器(默认是最大执行数量为10的线程池) SCHEDULER_EXECUTORS = {'default': {'type': 'threadpool', 'max_workers': 10}} # 另外flask-apscheduler内有日志记录器。name为apscheduler.scheduler和apscheduler.executors.default。如果须要保留日志,则须要对此日志记录器进行配置
2.初始化app
import loggingfrom flask import Flaskfrom config import Config # 上边的配置文件from flask_apscheduler.scheduler import APSchedulerfrom logging.handlers import RotatingFileHandler# flask-apscheduler内置有日志器,为了让外部的日志器打印的内容输入,我这里做了个配置# 创立日志记录器,指明日志保留门路,每个日志大小,保留日志文件个数下限file_log_handler = RotatingFileHandler('logs/runserver.log', maxBytes=1024*1024*100, backupCount=5)# 创立日志的记录格局,],日志等级,记录时间,报错地位,行数,日志信息formatter = logging.Formatter( fmt="%(asctime)s - %(levelname)s - %(threadName)s:%(thread)s - %(filename)s - %(funcName)s - %(message)s", datefmt='%Y-%m-%d %H:%M:%S %a')# 为刚创立的日志记录器设置日志记录格局file_log_handler.setFormatter(formatter)# 为全局日志对象增加日志记录器logger = logging.getLogger("apscheduler")logger.addHandler(file_log_handler)logger.setLevel(logging.INFO)def create_app(): app = Flask(__name__) app.config.from_object(Config) scheduler = APScheduler() scheduler.init_app(app) scheduler.start() # 配置api权限验证的回调函数 @scheduler.authenticate def authenticate(auth): return auth['username'] == 'guest' and auth['password'] == 'guest' app = create_app()app.run(host='127.0.0.1', port='7788')
3.API调用
Flask-APScheduler内置了丰盛的api接口,能够让开发者动静的查看和更改定时工作,十分不便。这里我找到外部的一部分源码,能够看到所有的api接口的调用形式。
def _load_api(self): """ Add the routes for the scheduler API. """ # 获取定时工作信息 self._add_url_route('get_scheduler_info', '', api.get_scheduler_info, 'GET') # 增加工作 self._add_url_route('add_job', '/jobs', api.add_job, 'POST') # 获取工作 self._add_url_route('get_job', '/jobs/<job_id>', api.get_job, 'GET') self._add_url_route('get_jobs', '/jobs', api.get_jobs, 'GET') self._add_url_route('delete_job', '/jobs/<job_id>', api.delete_job, 'DELETE') self._add_url_route('update_job', '/jobs/<job_id>', api.update_job, 'PATCH') self._add_url_route('pause_job', '/jobs/<job_id>/pause', api.pause_job, 'POST') self._add_url_route('resume_job', '/jobs/<job_id>/resume', api.resume_job, 'POST') # 立刻执行一次定时工作 self._add_url_route('run_job', '/jobs/<job_id>/run', api.run_job, 'POST')
调用形式我这里用postman演示。首先是查看定时工作状态:
查看所有的定时工作:
立即执行一次定时工作:
还有就是,这些api能够带权限验证的,还记得我在配置那里配置了一项参数是:
SCHEDULER_AUTH = HTTPBasicAuth()
减少这项配置后,所有的api调用都必须携带AUTHORIZATION。我这里说下怎么应用。在init_app那里我曾经设置了用户名明码(guest,guest),在申请接口时,须要携带加密后的用户名明码,加密形式如下:
import base64# 用户名明码用:分隔string = "guest:guest".encode('utf-8')a = base64.b64encode(string)# print(a)# b'Z3Vlc3Q6Z3Vlc3Q='print(b"basic " + a) # 最初必须拼接上basic# b'basic Z3Vlc3Q6Z3Vlc3Q='
而后将这个字符串放在申请头中,申请头的键就是AUTHORIZATION:
大家有什么问题,能够在文章下方留言,我看到后会立刻回复
其余很常见的问题,比方定时反复执行的问题,网上曾经有很多解决方案了,这里不再论述