我的應用程式創建一個 Flask 應用程式以及一個后臺行程,該行程不時與我的 MySQL 資料庫(通過 SQLAlchemy)一起作業:
from task_manager import TaskManager
# Session is a sessionmaker created earlier
task_manager = TaskManager(timedelta(seconds = 1), timedelta(seconds = 1), Session)
threading.Thread(target=task_manager.scheduler_loop).start()
app.run(debug=True, host='0.0.0.0', port=5000)
每當這個行程找到一個可用的任務(這是在單獨執行緒中運行的 scheduler_loop 中),它會做一些作業:
with db_session(self.Session) as session:
task = session.query(Task).filter(or_(Task.date == None, Task.date <= datetime.now())).order_by(Task.priority).first()
if task is not None:
if task.type == "create_paper_task":
self.create_paper(session, task.paper_title)
elif task.type == "update_citations_task":
self.update_citations(session, task.paper)
session.delete(task)
...
def create_paper(self, session, paper_title):
...
# For the purposes of testing, I replaced a long API call with this sleep.
time.sleep(3)
paper = Paper(paper_title, year)
paper.citations.append(Citation(citations, datetime.now()))
session.add(paper)
如果我嘗試使用此代碼,SQLAlchemy 查詢會運行兩次。創建了兩個 Paper 物件,我收到此錯誤(可能是 Task 被洗掉兩次):
/app/task_manager.py:17: SAWarning: DELETE statement on table 'create_paper_task' expected to delete 1 row(s); 0 were matched. Please set confirm_deleted_rows=False within the mapper configuration to prevent this warning.
實際代碼本身不會運行兩次,并且肯定沒有多個調度程式執行緒在運行:我已經使用 print 陳述句對此進行了測驗。
現在,最奇怪的部分是這個問題只發生在
- 執行程序中有很長的等待時間。如果 time.sleep 被洗掉,沒有問題,并且
- The Flask app is running and the scheduler loop is running in a separate thread. If the Flask app isn't running, or the scheduler_loop is running in the main thread (so obviously the Flask app isn't running), then there's no problem.
Also, the Flask app isn't being used at all while I'm testing this, so that's not the issue.
uj5u.com熱心網友回復:
Flask的app.run功能會在你設定時運行你的初始化代碼兩次debug=True。這是 Flask 可以檢測代碼更改并根據需要動態重新啟動的方式的一部分。不利的一面是,這會導致您的執行緒運行兩次,從而在讀取和執行您的任務時產生競爭條件,這確實只會在任務花費足夠長的時間以使第二個執行緒開始作業時才會出現。
有關正在發生的事情的更多詳細資訊,請參閱此問題/答案:為什么運行 Flask 開發服務器會自行運行兩次?
為避免這種情況,您可以添加代碼以避免第二次執行,但這有一個限制,即修改后的代碼的自動重新加載功能將不再起作用。一般來說,最好使用 Celery 之類的東西來處理任務執行,而不是構建自己的解決方案。但是,如鏈接答案中所述,您可以使用類似
from werkzeug.serving import is_running_from_reloader
if is_running_from_reloader():
from task_manager import TaskManager
task_manager = TaskManager(timedelta(seconds = 1), timedelta(seconds = 1), Session)
threading.Thread(target=task_manager.scheduler_loop).start()
除非您處于第二個(重新加載)行程中,否則這將阻止您的執行緒被創建。請注意,如果您洗掉,這將阻止您的執行緒執行 debug=True。
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/450272.html
標籤:python multithreading flask sqlalchemy
下一篇:網易互聯網筆試(3.27)
