我正在嘗試在 Flask 應用程式的后臺運行一個函式。該應用程式當前嘗試從 MIDI 鍵盤讀取輸入并將資訊發送到網站。我已經研究過執行緒,但到目前為止我的發現/嘗試沒有成功。
本質上,JavaScript/JQuery 每秒都會從“/get_notes”應用程式路由中請求資料。理論上,如果get_notes()在這些秒之間運行,它可以發送一個 json 格式的串列,其中包含在該特定時間段內按下的所有鍵。
我已經釋義并包含了下面 app.py 中的相關代碼。任何建議,將不勝感激!
from Flask import Flask
import mido
app = Flask(__name__)
# List to store pressed keys
notes_pressed = []
@app.route('/get_notes', methods=['GET'])
def return_pressed_notes():
return json.dumps(notes_pressed)
# Function to translate midi key numbers to note letters
def translate_key(key_num):
...
# Function that returns recently played notes within time period
def get_notes():
# Open port to listen for key presses
with mido.open_input() as inport:
for msg in inport:
# If key press is valid
# - note is pressed
# - velocity!=0 (prevents ghost notes)
if (msg.type=='note_on' and msg.velocity!=0 and msg.channel==0):
# Add new note to list
notes_pressed.append(translate_key(msg.note - lowest_key))
if __name__ == '__main__':
app.run()
# Somehow run get_notes() in background?
uj5u.com熱心網友回復:
您可以使用APScheduler
例子:
from Flask import Flask
from flask_apscheduler import APScheduler
app = Flask(__name__)
scheduler = APScheduler()
scheduler.init_app(app)
@scheduler.task('cron', id='get_note', hour=12) # every day at noon
def get_notes:
return 'blablabla'
if __name__ == '__main__':
scheduler.start()
app.run()
你覺得這個方法怎么樣?
uj5u.com熱心網友回復:
我在使用 Flask 構建一些在線賭場游戲時遇到了類似的問題。
我找到的解決方案是使用無限回圈運行 celery 任務并保存任務的當前狀態,以便燒瓶可以隨時訪問它。
Celery 是一種用于運行后臺任務的生產級解決方案,適用于 Flask。
我不會在這里深入研究芹菜,因為它是一個復雜的話題,但這些指南非常適合用燒瓶實作芹菜:https : //blog.miguelgrinberg.com/post/using-celery-with-flask
在你的情況下,我不太確定 mido 庫是如何作業的,但根據你的問題,它看起來像“with mido.open_input() as inport:”已經打開了一個 I/O 通道并永久維護它。在這種情況下,您甚至不需要重新格式化函式,只需將輸出串列添加為任務的狀態即可。
@celery.task(bind=True)
def get_notes(self):
notes_pressed = []
# Open port to listen for key presses
with mido.open_input() as inport:
for msg in inport:
# If key press is valid
# - note is pressed
# - velocity!=0 (prevents ghost notes)
if (msg.type=='note_on' and msg.velocity!=0 and msg.channel==0):
# Add new note to list
notes_pressed.append(translate_key(msg.note - lowest_key))
self.update_state(state="PROGRESS",
meta={data: notes_pressed})
然后您可以在您的 Flask 應用程式中創建兩個不同的端點。一個初始化 celery 任務,另一個讀取其狀態:
@app.route('/longtask', methods=['POST'])
def longtask():
task = get_notes.apply_async()
return jsonify({}), 202, {'Location': url_for('taskstatus',
task_id=task.id)}
@app.route('/status/<task_id>')
def taskstatus(task_id):
task = get_notes.AsyncResult(task_id)
notes_pressed = task.info.get("data")
#do whatever you want with it
return json.dumps(notes_pressed)
請告訴我這是否適合您,以及將 mido 與 celery 一起使用是否有任何問題。
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/366040.html
標籤:javascript Python html 查询 烧瓶
上一篇:添加一個將類添加到div的計時器,然后洗掉該類并將其添加到另一個div
下一篇:獲取所有特定div的“類”名稱
