Python中實作定時任務
在專案中,我們可能遇到有定時任務的需求,
- 其一:每隔一個時間段就執行任務,
比如:壓測中每隔45分鐘調整溫箱的溫度, - 其二:定時執行任務,
例如每天早上 8 點定時推送早報,
今天,我跟大家分享下 Python 定時任務的實作方法,
固定時間間隔執行任務
import time
import logging
logging.basicConfig(
level=logging.debug,
format="%(asctime)s.%(msecs)d | %(threadName)s | %(levelname)s - %(message)s"
)
def task():
logging.info("Task Start.")
time.sleep(1)
logging.info("Task Done.")
time.sleep
第一種辦法是最簡單又最暴力,
那就是在一個死回圈中,使用執行緒睡眠函式 sleep(),
while True:
task()
time.sleep(5)
上述的方法有幾個問題:
- 阻塞主行程,這個也好解決,可以放到執行緒中去執行
- 一次只有一個task,這個也有解決辦法,可以多啟幾個執行緒
threading.Timer
既然第一種方法暴力,那么有沒有比較優雅點的方法?
Python 標準庫 threading 中有個 Timer 類,
它會新啟動一個執行緒來執行定時任務,所以它是非阻塞函式,
原理:執行緒中預置一個finished的事件,通過finished.wait等待固定時間間隔,
超時則執行任務,如果需要取消任務,可以呼叫Timer.cancel來取消任務,
from threading import Timer
t = Timer(task, 5)
t.start()
優點:
不阻塞主行程,task在執行緒中執行
缺點:
一個Timer只能執行一次task就結束了,
如果想回圈,需要改造一下task函式,
from threading import Timer
def repeat_task():
t = Timer(5, repeat_task)
# 開始任務的位置決定了是任務之間等待固定間隔時間
# 還是每個任務的開始等待固定間隔時間
t.start()
task()
這樣可以回圈執行,但是仍然只能一個執行緒一個任務,
Q:如何跳出回圈
A:加入一個識別符號:
sleep
可以用一個布林值來控制Timer
可以用一個Event來控制
固定時間點執行任務
以上是用內置的方法中比較簡單的實作方式,簡單的功能可以實作,
前面執行的都是指定間隔時間的定時任務,那怎么執行指定時間點的任務呢?
上面的方法是可以做到的,有兩種思路,
- 前面我們指定了間隔時間,那指定時間點,就先計算當前時間到指定時間點的間隔時間
- 不管間隔時間,而是記錄任務時間點,然后實時去檢查是否到達指定時間點
思路有了,但是對于固定時間點執行任務的場景以及后面更復雜的場景,自己實作可能就變得復雜,
下面再介紹幾個進階的定時任務的實作方式,可以適應更復雜的業務場景,
sched
第三種方式是使用標準庫中sched
模塊,sched
是事件調度器,
它通過 scheduler
類來調度事件,從而達到定時執行任務的效果,
簡單示例如下:
import sched
schedule = sched.scheduler() # 初始化 sched 模塊的 scheduler 類
schedule.enter(10, 1, task) # 增加調度任務
schedule.run() # 開始調度任務
scheduler 提供了兩個添加調度任務的函式:
-
enter(delay, priority, action, argument=(), kwargs={})
該函式可以延遲一定時間執行任務,delay 表示延遲多長時間執行任務,單位是秒,
priority為優先級,越小優先級越大,兩個任務指定相同的延遲時間,優先級大的任務會向被執行,
action 即需要執行的函式,argument 和 kwargs 分別是函式的位置和關鍵字引數, -
scheduler.enterabs(time, priority, action, argument=(), kwargs={})
添加一項任務,但這個任務會在 time 這時刻執行,因此,time 是絕對時間,其他引數用法與 enter() 中的引數用法是一致,
優點:
- 執行時間間隔和時間點執行任務
- 可以添加不同的任務
- 任務可以設定優先級
缺點:
scheduler 中的每個調度任務只會作業一次,不會無限回圈被呼叫,如果想重復執行同一任務, 需要重復添加調度任務即可,
import sched
import threading
from functools import wraps
s = sched.scheduler()
STOP_FLAG = threading.Event()
def repeat(interval):
def wrapper(func):
@wraps(func)
def inner():
if not STOP_FLAG.is_set():
s.enter(interval, 0, inner)
func()
return inner
return wrapper
@repeat(5)
def new_task():
return task()
new_task()
t = threading.Thread(target=s.run)
t.start()
實作原理
當然我們僅僅學會怎么用還是不夠的,不能知其然而不知其所以然,
介紹了那么多方法,那么多庫,但是底層的實作邏輯都是差不多的,
一個完整的定時任務系統,有三個部分:
- 任務佇列(task queue)
根據執行時間和優先級進行排序
排序sorted(): 任務一多,整個佇列重排,性能堪憂
heapq.pop():堆排序,只獲取最高優先級的任務,不重復排序 - 調度器(scheduler)
- 執行器(executor)
從前到后,實作的內容越來越多,控制的精細度也就越來越高,實作的功能也就越來越豐富,
import sched
import time
import threading
from functools import wraps
from task import task
from typing import Callable
STOP_FLAG = threading.Event()
def timeloop(func: Callable, interval: int):
while True:
func()
time.sleep(interval)
def repeat_task(func: Callable, interval: int):
# 新建任務的前后,決定了是在任務結束后等待時間,還是固定間隔
if not STOP_FLAG.is_set():
t = threading.Timer(interval, repeat_task, args=(func, interval))
t.start()
func()
def timer_schedule(func: Callable, interval: int):
repeat_task(func, interval)
s = sched.scheduler()
def repeat(interval):
def wrapper(func):
@wraps(func)
def inner():
if not STOP_FLAG.is_set():
s.enter(interval, 0, inner)
func()
return inner
return wrapper
@repeat(5)
def new_task():
return task()
def sched_schedule(func: Callable, interval: int):
func()
t = threading.Thread(target=s.run)
t.start()
return t
def main(schedule):
schedule_thread = schedule(new_task, 5)
while True:
try:
time.sleep(1)
except KeyboardInterrupt:
STOP_FLAG.set()
schedule_thread.join()
break
if __name__ == "__main__":
# main(timeloop)
# main(timer_schedule)
main(sched_schedule)
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/557184.html
標籤:其他
上一篇:Python中實作定時任務
下一篇:返回列表