关于python:5种Python使用定时调度任务的方式

5次阅读

共计 3348 个字符,预计需要花费 9 分钟才能阅读完成。

摘要:Python 有几种办法能够定时调度一个工作,这就是咱们将在本文中学习的内容。

本文分享自华为云社区《Python 中应用定时调度工作(Schedule Jobs)的 5 种形式)》,作者:Regan Yue。

明天构建的大多数应用程序都须要某种形式的调度机制。轮询 API 或数据库、一直查看零碎健康状况、将日志存档等是常见的例子。Kubernetes 和 Apache Mesos 等应用主动伸缩扩容技术(Auto-scaling)的软件须要查看部署的应用程序的状态,为此它们应用定期运行的存活探针(Liveness Probe)。调度工作须要与业务逻辑解耦,因而咱们要应用解耦的执行队列,例如 Redis 队列。

Python 有几种办法能够定时调度一个工作,这就是咱们将在本文中学习的内容。我将应用以下形式探讨调度工作:

  1. 简略循环(Simple Loops)
  2. 简略循环然而应用了线程(Simple Loops but Threaded)
  3. 调度库(Schedule Library)
  4. Python Crontab
  5. RQ 调度器作为解耦队列(RQ Scheduler as decoupled queues)

简略循环 Simple loops

应用简略循环来实现调度工作这是毫不费力的。应用有限运行的 while 循环定期调用函数可用于调度作业,但这不是最好的办法,不过它是很无效的。能够应用内置 time 模块的 slleep() 来提早执行。不过这并不是大多数作业的调度形式,因为,它看起来很难看,而且与其余办法相比,它的可读性较差。

import time
​
def task():
    print("Job Completed!")
​
 while 1:
    task()
    time.sleep(10)

当波及到每天早上 9:00 或每周三早晨 7:45 等这些日程安排时,事件就变得比拟辣手了。

import datetime
​
def task():
    print("Job Completed!")
​
 while 1:
    now = datetime.datetime.now()
    # schedule at every wednesday,7:45 pm
    if now.weekday == 3 and now.strftime("%H:%m") == "19:45":
        task()
    # sleep for 6 days
    time.sleep(6 * 24 * 60 * 60)

这是我的第一工夫想到的解决办法,不用谢!这种办法的一个问题是这里的逻辑是阻塞的,即一旦在 python 我的项目中发现这段代码,它就会卡在 while 1 循环中,从而阻塞其余代码的执行。

简略循环然而应用了线程 Simple loops but threaded

线程是计算机科学中的一个概念。具备本人指令的小程序由过程执行并独立治理,这就能够解决咱们第一种办法的阻塞状况,让咱们看看怎么样。

import time
import threading
​
def task():
    print("Job Completed!")
​
def schedule():
    while 1:
        task()
        time.sleep(10)
​
# makes our logic non blocking
thread = threading.Thread(target=schedule)
thread.start()

线程启动后,其底层逻辑无奈被主线程批改,因而咱们可能须要增加资源,程序通过这些资源能够查看特定场景并依据它们执行逻辑。

定时调度库 Schedule Library

早些时候,我说应用 while 循环进行调度看起来很俊俏,调度库能够解决这个问题。

import schedule
import time
​
def task():
    print("Job Executing!")
​
# for every n minutes
schedule.every(10).minutes.do(task)
​
# every hour
schedule.every().hour.do(task)
​
# every daya at specific time
schedule.every().day.at("10:30").do(task)
​
# schedule by name of day
schedule.every().monday.do(task)
​
# name of day with time
schedule.every().wednesday.at("13:15").do(task)
​
while True:
    schedule.run_pending()
    time.sleep(1)

正如您所见,通过这样咱们能够毫不费力地创立多个调度打算。我特地喜爱创立作业的形式和办法链(Method Chaining),另一方面,这个片段有一个 while 循环,这意味着代码被阻塞,不过我置信你曾经晓得什么能够帮忙咱们解决这个问题。

Python Crontab

Liunx 中的 crontab 实用程序是一种易于应用且被宽泛承受的调度解决方案。Python 库 python-crontab 提供了一个 API 来应用 Python 中的 CLI 工具。在 crontab 中,一个定时调度应用 unix-cron 字符串格局( *)来形容,它是一组五个值的一条线,这表明当作业应该被执行时,python-crontab 将在文件中写入 crontab 的打算转换为写入编程办法。

from crontab import CronTab
​
cron = CronTab(user='root')
​
job = cron.new(command='my_script.sh')
​
job.hour.every(1)
cron.write()

python-crontab 不会主动保留打算,须要执行 write() 办法来保留打算。还有更多功能,我强烈建议您查看他们的文档。

RQ 调度器 RQ Scheduler

有些工作不能立刻执行,因而咱们须要依据 LIFO 或 FIFO 等队列零碎创立工作队列并弹出工作。python-rq 容许咱们做到这一点,应用 Redis 作为代理来排队作业。新作业的条目存储为带有信息的哈希映射,例如 created_at, enqueued_at, origin, data, description.

排队工作由名为 worker 的程序执行。workers 在 Redis 缓存中也有一个条目,负责将工作出列以及更新 Redis 中的工作状态。工作能够在须要时排队,但要安顿它们,咱们须要 rq-scheduler。

from rq_scheduler import Scheduler
​
queue = Queue('circle', connection=Redis())
scheduler = Scheduler(queue=queue)
​
scheduler.schedule(scheduled_time=datetime.utcnow(), # Time for first execution, in UTC timezone
    func=func,                     # Function to be queued
    args=[arg1, arg2],             # Arguments passed into function when executed
    kwargs={'foo': 'bar'},         # Keyword arguments passed into function when executed
    interval=60,                   # Time before the function is called again, in seconds
    repeat=None,                     # Repeat this number of times (None means repeat forever)
    meta={'foo': 'bar'}            # Arbitrary pickleable data on the job itself
)

RQ worker(RQ 工作器)必须在终端中独自启动或通过 python-rq 工作器启动。一旦工作被触发,就能够在工作终端中看到,在胜利和失败场景中都能够应用独自的函数回调。

总结 Conclusion

还有一些用于调度的库,但在这里,我曾经探讨了最常见的库。值得一提的是 Celery,celery 的另一个长处是用户能够在多个代理之间进行抉择。我很感谢你读到最初。也能够看看我的其余文章。干杯!

翻译起源:https://python.plainenglish.i…

点击关注,第一工夫理解华为云陈腐技术~

正文完
 0