关于python3.x:日夕如是寒暑不间基于Python3Tornado6APSchedulerCelery打造并发异步动态定时任务轮询服务

8次阅读

共计 10638 个字符,预计需要花费 27 分钟才能阅读完成。

原文转载自「刘悦的技术博客」https://v3u.cn/a_id_220

定时工作的典型落地场景在各行业中都很广泛,比方领取零碎中,领取过程中因为网络或者其余因素导致呈现掉单、卡单的状况,账单变成了“单边账”,这种状况对于领取用户来说,毫无疑问是劫难级别的体验,明明本人付了钱,扣了款,然而订单状态却未发生变化。所以,每一笔订单的领取工作流程中都须要一个定时轮询的备选计划,一旦领取中产生问题,定时轮询服务就能够及时发现和更正订单状态。

又比方,之前的一篇以寡治众各个击破,超大文件分片上传之构建基于 Vue.js3.0+Ant-desgin+Tornado6 纯异步 IO 高效写入服务,在超大型文件分片传输工作过程中,一旦分片上传或者分片合并环节出了问题,就有可能导致超大型文件无奈残缺的传输到服务器中,从而节约大量的零碎带宽资源,所以每一个分片传输工作执行过程中也须要一个对应的定时轮询来“盯”着,避免过程中呈现问题。

在理论业务场景中,定时服务根本都作为主利用的从属服务而存在,不同定时工作的调度工夫可能不一样,所以如果可能配合主服务并发异步调用定时工作,则能够单利用可能反对上万,甚至十万以上的定时工作,并且不同工作可能有独立的调度工夫,这里通过 Tornado 配合 APScheduler 和 Celery,别离展现不同的异步定时工作调用逻辑。

APScheduler

APScheduler(advanceded python scheduler)是一款及其优良的 Python3 定时工作框架,它不仅反对并发异步调用定时工作,还能够动静地对定时工作进行治理,同时也反对定时工作的长久化。

首先装置 APScheduler 以及 Tornado6:

pip3 install apscheduler
pip3 install tornado==6.1

随后导入基于 Tornado 的异步 APScheduler:

from datetime import datetime    
from tornado.ioloop import IOLoop, PeriodicCallback    
from tornado.web import RequestHandler, Application    
from apscheduler.schedulers.tornado import TornadoScheduler

这里 TornadoScheduler 实例就具备了 Tornado 的事件循环个性,随后申明异步定时工作:

async def task():    
    print('[APScheduler][Task]-{}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')))

随后初始化定时工作对象:

scheduler = None  
  
# 初始化  
def init_scheduler():  
  
    global scheduler  
  
    scheduler = TornadoScheduler()  
  
    scheduler.start()  
  
    scheduler.add_job(task,"interval",seconds=3,id="job1",args=())  
  
    print("定时工作启动")

这里启动后就增加一个定时工作,每隔三秒执行一次。

接着 main 入口启动服务:

if __name__ == '__main__':  
  
    init_scheduler()

零碎返回:

C:\Users\liuyue\www\tornado6>python test_scheduler.py  
定时工作启动  
[APScheduler][Task]-2022-07-28 22:13:47.792582  
[APScheduler][Task]-2022-07-28 22:13:50.783016  
[APScheduler][Task]-2022-07-28 22:13:53.783362  
[APScheduler][Task]-2022-07-28 22:13:56.775059  
[APScheduler][Task]-2022-07-28 22:13:59.779563

随后创立 Tornado 控制器视图:

class SchedulerHandler(RequestHandler):  
    def get(self):  
        job_id = self.get_query_argument('job_id', None)  
        action = self.get_query_argument('action', None)  
        if job_id:  
            # 增加工作  
            if 'add' == action:  
                if job_id not in job_ids:  
                    job_ids.append(job_id)  
                    scheduler.add_job(task, 'interval', seconds=3, id=job_id, args=(job_id,))  
                    self.write('[TASK ADDED] - {}'.format(job_id))  
                else:  
                    self.write('[TASK EXISTS] - {}'.format(job_id))  
            # 删除工作  
            elif 'remove' == action:  
                if job_id in job_ids:  
                    scheduler.remove_job(job_id)  
                    self.write('[TASK REMOVED] - {}'.format(job_id))  
                else:  
                    self.write('[TASK NOT FOUND] - {}'.format(job_id))  
        else:  
            self.write('[INVALID PARAMS] INVALID job_id or action')

这里通过传参来动静的删减异步定时工作,对于实现轮询工作的定时工作,齐全能够物理删除,从而节约系统资源,随后增加路由并且启动 Tornado 服务:

if __name__ == '__main__':  
      
    routes = [url(r"/scheduler/",SchedulerHandler)]  
  
    init_scheduler()  
  
    # 申明 tornado 对象  
    application = Application(routes,debug=True)  
    application.listen(8888)  
    IOLoop.current().start()

APScheduler 定时工作长久化

所谓工作长久化,即工作存储在诸如文件或者数据库这样的长久化容器中,如果 APScheduler 定时工作服务过程中断,未执行的工作还会保留,当服务再次启动时,定时工作能够从数据库中读取进去再次被装载调用,这里以 redis 数据库为例子:

from apscheduler.jobstores.redis import RedisJobStore  
  
# 初始化  
def init_scheduler():  
  
    global scheduler  
  
    jobstores = {  
        'default': RedisJobStore(jobs_key='cron.jobs',run_times_key='cron.run_times',  
                     host='localhost', port=6379,)  
    }  
  
    scheduler = TornadoScheduler(jobstores=jobstores)  
  
    scheduler.start()  
  
    scheduler.add_job(task,"interval",seconds=3,id="job1",args=())  
  
    print("定时工作启动")

这里通过 jobstores 参数将 redis 装载到定时工作服务中,当创立工作时,数据库中会以 hash 的模式来存储工作明细:

127.0.0.1:6379> keys *  
1) "cron.run_times"  
2) "cron.jobs"  
127.0.0.1:6379> type cron.jobs  
hash  
127.0.0.1:6379> hgetall cron.jobs  
1) "job1"  
2) "\x80\x05\x95\x14\x02\x00\x00\x00\x00\x00\x00}\x94(\x8c\aversion\x94K\x01\x8c\x02id\x94\x8c\x04job1\x94\x8c\x04func\x94\x8c\x0e__main__:task1\x94\x8c\atrigger\x94\x8c\x1dapscheduler.triggers.interval\x94\x8c\x0fIntervalTrigger\x94\x93\x94)\x81\x94}\x94(h\x01K\x02\x8c\btimezone\x94\x8c\x1bpytz_deprecation_shim._impl\x94\x8c\twrap_zone\x94\x93\x94\x8c\bbuiltins\x94\x8c\agetattr\x94\x93\x94\x8c\bzoneinfo\x94\x8c\bZoneInfo\x94\x93\x94\x8c\t_unpickle\x94\x86\x94R\x94\x8c\x0cAsia/Irkutsk\x94K\x01\x86\x94R\x94h\x19\x86\x94R\x94\x8c\nstart_date\x94\x8c\bdatetime\x94\x8c\bdatetime\x94\x93\x94C\n\a\xe6\a\x1c\x16\x1e&\x0b\xc7\x8b\x94h\x1d\x86\x94R\x94\x8c\bend_date\x94N\x8c\binterval\x94h\x1f\x8c\ttimedelta\x94\x93\x94K\x00K\x03K\x00\x87\x94R\x94\x8c\x06jitter\x94Nub\x8c\bexecutor\x94\x8c\adefault\x94\x8c\x04args\x94)\x8c\x06kwargs\x94}\x94\x8c\x04name\x94\x8c\x05task1\x94\x8c\x12misfire_grace_time\x94K\x01\x8c\bcoalesce\x94\x88\x8c\rmax_instances\x94K\x01\x8c\rnext_run_time\x94h!C\n\a\xe6\a\x1c\x16\x1e,\x0b\xc7\x8b\x94h\x1d\x86\x94R\x94u."

而如果删除工作,redis 数据库中的工作也会同步删除。

至此,APScheduler 配合 Tornado 就实现了一个简略的并发异步定时工作服务。

Celery

celery 是一款在 Python 定时工作畛域“开风气之先”的框架,和 APScheduler 相比,celery 略显臃肿了一点,同时,celery 并不具备任何工作长久化的性能,也须要三方的容器进行反对。

首先装置 5.0 以上版本:

pip3 install celery==5.2.7

随后,初始化工作对象:

from celery import Celery  
from datetime import timedelta  
from redisbeat.scheduler import RedisScheduler  
  
app = Celery("tornado")  
  
  
app.conf["imports"] = ["celery_task"]  
  
# 定义 broker  
app.conf.broker_url = "redis://localhost:6379"  
  
# 工作后果  
app.conf.result_backend = "redis://localhost:6379"  
  
# 时区  
app.conf.timezone = "Asia/Shanghai"

这里工作代理(broker)和工作后果(result\_backend)也都存储在 redis 中。

紧接着申明异步工作办法:

from celery import shared_task  
import asyncio  
  
async def consume():  
  
    return 'test'  
  
@shared_task  
def async_job():  
  
    return asyncio.run(consume())

这里通过 asyncio 库间接调用异步办法。

而后增加定时工作的配置:

from datetime import timedelta   
  
# 须要执行工作的配置  
app.conf.beat_schedule = {  
    "task1": {  
        "task": "celery_task.async_consume",  #执行的办法  
        "schedule": timedelta(seconds=3),   
        "args":()},  
}

随后启动 worker 服务:

celery -A module_name worker --pool=solo -l info

接着启动 beat 服务:

celery -A module_name beat -l info

异步定时工作会被装载执行,零碎返回:

C:\Users\liuyue\www\tornado6>celery -A test_celery worker --pool=solo -l info  
  
 -------------- celery@LIUYUE354D v5.2.7 (dawn-chorus)  
--- ***** -----  
-- ******* ---- Windows-10-10.0.22000-SP0 2022-07-28 22:55:00  
- *** --- * ---  
- ** ---------- [config]  
- ** ---------- .> app:         tornado:0x23769b40430  
- ** ---------- .> transport:   redis://localhost:6379//  
- ** ---------- .> results:     redis://localhost:6379/  
- *** --- * --- .> concurrency: 4 (solo)  
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)  
--- ***** -----  
 -------------- [queues]  
                .> celery           exchange=celery(direct) key=celery  
  
  
[tasks]  
  . celery_task.async_job  
  . celery_task.job  
  . test_celery.sub  
  
[2022-07-28 22:55:02,234: INFO/MainProcess] Connected to redis://localhost:6379//  
[2022-07-28 22:55:04,267: INFO/MainProcess] mingle: searching for neighbors  
[2022-07-28 22:55:11,552: INFO/MainProcess] mingle: all alone  
[2022-07-28 22:55:21,837: INFO/MainProcess] celery@LIUYUE354D ready.  
[2022-07-28 22:58:26,032: INFO/MainProcess] Task celery_task.job[b0337808-c90b-450b-98bc-fd577f7039d0] received  
[2022-07-28 22:58:28,086: INFO/MainProcess] Task celery_task.job[b0337808-c90b-450b-98bc-fd577f7039d0] succeeded in 2.062999999994645s: 'test'  
[2022-07-28 22:58:28,099: INFO/MainProcess] Task celery_task.job[f4aa4304-02c3-48ee-8625-fa1fe27b8e98] received  
[2022-07-28 22:58:28,099: INFO/MainProcess] Task celery_task.job[f4aa4304-02c3-48ee-8625-fa1fe27b8e98] succeeded in 0.0s: 'test'  
[2022-07-28 22:58:28,975: INFO/MainProcess] Task celery_task.job[bb33981d-0629-4173-8375-128ba84d1f0f] received  
[2022-07-28 22:58:28,975: INFO/MainProcess] Task celery_task.job[bb33981d-0629-4173-8375-128ba84d1f0f] succeeded in 0.0s: 'test'

同时,在 redis 数据库中会以列表和字符串的模式存储工作明细和后果:

127.0.0.1:6379> keys *  
1) "celery-task-meta-f4aa4304-02c3-48ee-8625-fa1fe27b8e98"  
2) "celery-task-meta-bb33981d-0629-4173-8375-128ba84d1f0f"  
3) "_kombu.binding.celery"  
4) "celery-task-meta-b0337808-c90b-450b-98bc-fd577f7039d0"  
5) "cron.run_times"  
6) "cron.jobs"  
7) "celery"

从调度层面上讲,celery 和 APScheduler 并无太大的不同,但从应用老本上看,celery 比 APScheduler 多保护一个服务,worker 和 beat 双服务的模式无形中也减少了系统监控资源的开销。

动静保护异步定时工作

从工作管理层面上看,celery 毫无疑问输的很彻底,因为原生 celery 压根就不反对动静地批改定时工作。但咱们能够通过三方库的模式来曲线救国:

pip3 install redisbeat

这里通过 redis 的定时工作服务来取代 celery 原生的 beat 服务。

建设 redisbeat 实例:

from celery import Celery  
from datetime import timedelta  
from redisbeat.scheduler import RedisScheduler  
  
app = Celery("tornado")  
  
  
app.conf["imports"] = ["celery_task"]  
  
# 定义 broker  
app.conf.broker_url = "redis://localhost:6379"  
  
# 工作后果  
app.conf.result_backend = "redis://localhost:6379"  
  
# 时区  
app.conf.timezone = "Asia/Shanghai"  
  
@app.task  
def sub():  
  
    return "test"  
  
  
schduler = RedisScheduler(app=app)  
schduler.add(**{  
        'name': 'job1',  
        'task': 'test_celery.sub',  
        'schedule': timedelta(seconds=3),  
        'args': ()})

通过 schduler.add 办法就能够动静地增加定时工作,随后以 redisbeat 的模式启动 celery 服务:

celery -A test_celery beat -S redisbeat.RedisScheduler -l INFO

此时通过革新的零碎承受动静工作调用而执行:

C:\Users\liuyue\www\tornado6>celery -A test_celery worker --pool=solo -l info  
  
 -------------- celery@LIUYUE354D v5.2.7 (dawn-chorus)  
--- ***** -----  
-- ******* ---- Windows-10-10.0.22000-SP0 2022-07-28 23:09:50  
- *** --- * ---  
- ** ---------- [config]  
- ** ---------- .> app:         tornado:0x19c1a1f0040  
- ** ---------- .> transport:   redis://localhost:6379//  
- ** ---------- .> results:     redis://localhost:6379/  
- *** --- * --- .> concurrency: 4 (solo)  
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)  
--- ***** -----  
 -------------- [queues]  
                .> celery           exchange=celery(direct) key=celery  
  
  
[tasks]  
  . celery_task.async_job  
  . celery_task.job  
  . test_celery.sub  
  
[2022-07-28 23:09:52,916: INFO/MainProcess] Connected to redis://localhost:6379//  
[2022-07-28 23:09:54,971: INFO/MainProcess] mingle: searching for neighbors  
[2022-07-28 23:10:02,140: INFO/MainProcess] mingle: all alone  
[2022-07-28 23:10:12,427: INFO/MainProcess] celery@LIUYUE354D ready.  
[2022-07-28 23:10:12,440: INFO/MainProcess] Task test_celery.sub[ade9c5ad-d551-44f2-84e7-a2824b2d022d] received  
[2022-07-28 23:10:14,518: INFO/MainProcess] Task test_celery.sub[ade9c5ad-d551-44f2-84e7-a2824b2d022d] succeeded in 2.0780000000013388s: 'test'  
[2022-07-28 23:10:14,518: INFO/MainProcess] Task test_celery.sub[11927889-8385-4c88-aff1-42179b559db0] received  
[2022-07-28 23:10:14,518: INFO/MainProcess] Task test_celery.sub[11927889-8385-4c88-aff1-42179b559db0] succeeded in 0.0s: 'test'  
[2022-07-28 23:10:14,533: INFO/MainProcess] Task test_celery.sub[442cd168-5a68-4ade-b4e7-6ae4a92a53ae] received  
[2022-07-28 23:10:14,533: INFO/MainProcess] Task test_celery.sub[442cd168-5a68-4ade-b4e7-6ae4a92a53ae] succeeded in 0.0s: 'test'  
[2022-07-28 23:10:17,087: INFO/MainProcess] Task test_celery.sub[e4850b5d-28e9-47c8-88e6-d9086e93db88] received  
[2022-07-28 23:10:17,087: INFO/MainProcess] Task test_celery.sub[e4850b5d-28e9-47c8-88e6-d9086e93db88] succeeded in 0.0s: 'test'

响应的,也能够通过 remove 办法和工作 id 进行删除操作:

schduler.remove('job1')

工作明细的存储模式上,也由列表降级成为了有序汇合,进步了效率:

127.0.0.1:6379> type celery:beat:order_tasks  
zset  
127.0.0.1:6379> zrange celery:beat:order_tasks 0 -1  
1) "{\"py/reduce\": [{\"py/type\": \"celery.beat.ScheduleEntry\"}, {\"py/tuple\": [\"job1\", \"test_celery.sub\", {\"__reduce__\": [{\"py/type\": \"datetime.datetime\"}, [\"B+YHHBcMDgfyGg==\", {\"py/reduce\": [{\"py/function\": \"pytz._p\"}, {\"py/tuple\": [\"Asia/Shanghai\", 28800, 0, \"CST\"]}]}]], \"py/object\": \"datetime.datetime\"}, 43, {\"py/reduce\": [{\"py/type\": \"celery.schedules.schedule\"}, {\"py/tuple\": [{\"py/reduce\": [{\"py/type\": \"datetime.timedelta\"}, {\"py/tuple\": [0, 3, 0]}]}, false, null]}]}, {\"py/tuple\": []}, {}, {}]}]}"

至此,celery 配合 tornado 打造异步定时工作就实现了。

结语

APScheduler 长于灵活机动并能够依附于 Tornado 事件循环体系中,Celery 则娴于调度和分布式的反对并绝对独立,二者不分轩轾,各擅胜场,适宜不同的业务利用场景,当然,在异步定时工作执行异样时的解决策略也有很多方面须要欠缺,比方因为实例夯死导致的过期触发问题、工作追赶和工作沉积问题、工作流场景下工作异样后是整体重试还是断点续传重试等,都须要具体问题具体分析。

原文转载自「刘悦的技术博客」https://v3u.cn/a_id_220

正文完
 0