我正在使用 FastAPI 在 python 中創建服務器,并且我想要一個與我的 API 無關的函式,每 5 分鐘在后臺運行一次(例如檢查來自 api 的內容并根據回應列印內容)
我試圖創建一個運行函式 start worker 的執行緒,但它不列印任何東西。
有誰知道該怎么做?
def start_worker():
print('[main]: starting worker...')
my_worker = worker.Worker()
my_worker.working_loop() # this function prints "hello" every 5 seconds
if __name__ == '__main__':
print('[main]: starting...')
uvicorn.run(app, host="0.0.0.0", port=8000, reload=True)
_worker_thread = Thread(target=start_worker, daemon=False)
_worker_thread.start()
uj5u.com熱心網友回復:
你應該在呼叫之前啟動你的執行緒uvicorn.run,就像uvicorn.run阻塞執行緒一樣。
PS:在您的問題中,您宣告您希望后臺任務每 5 分鐘運行一次,但在您的代碼中,您說每 5秒運行一次。下面的示例假定這是您想要的后者。如果您希望它每 5 分鐘執行一次,則將時間調整為60 * 5。
選項1
import time
import threading
from fastapi import FastAPI
import uvicorn
app = FastAPI()
class BackgroundTasks(threading.Thread):
def run(self,*args,**kwargs):
while True:
print('Hello')
time.sleep(5)
if __name__ == '__main__':
t = BackgroundTasks()
t.start()
uvicorn.run(app, host="0.0.0.0", port=8000)
你也可以使用 FastAPI 的啟動事件來啟動你的執行緒,只要在應用程式啟動之前運行是可以的。
@app.on_event("startup")
async def startup_event():
t = BackgroundTasks()
t.start()
選項 2
您可以改為對后臺任務使用重復事件調度程式,如下所示:
import sched, time
from threading import Thread
from fastapi import FastAPI
import uvicorn
app = FastAPI()
s = sched.scheduler(time.time, time.sleep)
def print_event(sc):
print("Hello")
sc.enter(5, 1, print_event, (sc,))
def start_scheduler():
s.enter(5, 1, print_event, (s,))
s.run()
@app.on_event("startup")
async def startup_event():
thread = Thread(target = start_scheduler)
thread.start()
if __name__ == '__main__':
uvicorn.run(app, host="0.0.0.0", port=8000)
轉載請註明出處,本文鏈接:https://www.uj5u.com/net/422754.html
標籤:
