我有一個 Flask 服務器,它接受來自客戶端的 HTTP 請求。此 HTTP 服務器需要使用 websocket 連接將作業委托給第三方服務器(出于性能原因)。
我發現很難理解如何創建一個可以為 HTTP 請求保持打開狀態的永久 websocket 連接。在一次性腳本中向 websocket 服務器發送請求作業正常,如下所示:
async def send(websocket, payload):
await websocket.send(json.dumps(payload).encode("utf-8"))
async def recv(websocket):
data = await websocket.recv()
return json.loads(data)
async def main(payload):
uri = f"wss://the-third-party-server.com/xyz"
async with websockets.connect(uri) as websocket:
future = send(websocket, payload)
future_r = recv(websocket)
_, output = await asyncio.gather(future, future_r)
return output
asyncio.get_event_loop().run_until_complete(main({...}))
在這里,main()
建立一個 WSS 連接并在完成后關閉它,但是我如何為傳入的 HTTP 請求保持該連接打開,以便我可以在不重新建立 WSS 連接的情況下為每個請求呼叫 main() ?
uj5u.com熱心網友回復:
主要的問題是,當你撰寫一個回應 http(s) 的 web 應用程式時,你的代碼有一個非常特殊的“生命周期”:通常你有一個“視圖”函式來獲取請求資料,執行所有收集回應資料并將其回傳所需的操作。
大多數 web 框架中的這個“視圖”函式必須獨立于系統的其余部分——它應該能夠不依賴其他資料或物件來執行它的職責,除了它在呼叫時獲得的東西——它們是請求資料和系統配置 - 使應用程式服務器(旨在將您的程式實際連接到互聯網的框架部分)可以選擇多種方式來為您的程式提供服務:它們可以在多個并行執行緒或多個并行行程中運行您的視圖函式,或者即使在各種容器或物理服務器中的不同行程中:您的應用程式也不需要關心這一點。
如果您想要一個在呼叫視圖函式時可用的資源,您需要打破這種范式。例如,框架通常希望創建一個資料庫連接池,以便同一行程上的視圖可以重用這些連接。這些資料庫連接通常由框架本身提供,它實作了一種機制,允許重復使用,并在需要時以透明的方式提供。如果您想保持 websocket 連接活動,您必須重新創建相同型別的機制。
以某種方式,您需要一個 Python 物件,它可以調解您的 websocket 資料,就像您的 web 視圖函式的“服務器”一樣。
這比聽起來更簡單 - 一個特殊的 Python 類設計為每個行程有一個實體,它保持連接,并且能夠發送和接收從并行呼叫接收的資料而無需修改它就足夠了。將確保此實體存在于當前行程中的可呼叫物件足以在配置為將您的應用程式提供給網路的任何策略下作業。
如果您使用的是不使用 asyncio 的 Flask,您會遇到更復雜的情況 - 您將失去視圖中的異步能力,他們將不得不等待 websocket 請求完成 - 這將是您的作業應用程式服務器讓您在不同的執行緒或行程中查看以確保可用性。而且,你的作業是讓你的 websocket 的異步回圈在一個單獨的執行緒中運行,以便它可以發出它需要的請求。
這是一些示例代碼。請注意,除了每個行程使用一個 websocket 之外,這沒有任何形式的失敗情況下的規定,但是,最重要的是:它沒有并行執行任何操作:所有的 send-recv 對都在阻塞,因為您沒有提供任何線索一種允許將每個傳出訊息與其回應配對的機制。
import asyncio
import threading
from queue import Queue
class AWebSocket:
instance = None
def __new__(cls, *args, **kw):
if cls.instance:
return cls.instance
return super().__new__(cls, *args, **kw)
def __init__(self, *args, **kw):
cls = self.__class__
if cls.instance:
# init will be called even if new finds the existing instance,
# so we have to check again
return
self.outgoing = Queue()
self.responses = Queue()
self.socket_thread = threading.Thread(target=self.start_socket)
self.socket_thread.start()
def start_socket():
# starts an async loop in a separate thread, and keep
# the web socket running, in this separate thread
asyncio.get_event_loop().run_until_complete(self.core())
def core(self):
self.socket = websockets.connect(uri)
async def _send(self, websocket, payload):
await websocket.send(json.dumps(payload).encode("utf-8"))
async def _recv(self, websocket):
data = await websocket.recv()
return json.loads(data)
async def core(self):
uri = f"wss://the-third-party-server.com/xyz"
async with websockets.connect(uri) as websocket:
self.websocket = websocket
while True:
# This code is as you wrote it:
# it essentially blocks until a message is sent
# and the answer is received back.
# You have to have a mechanism in your websocket
# messages allowing you to identify the corresponding
# answer to each request. On doing so, this is trivially
# paralellizable simply by calling asyncio.create_task
# instead of awaiting on asyncio.gather
payload = self.outgoing.get()
future = self._send(websocket, payload)
future_r = self._recv(websocket)
_, response = await asyncio.gather(future, future_r)
self.responses.put(response)
def send(self, payload):
# This is the method you call from your views
# simply do:
# `output = AWebSocket().send(payload)`
self.outgoing.put(payload)
return self.responses.get()
轉載請註明出處,本文鏈接:https://www.uj5u.com/caozuo/318804.html
上一篇:為什么每個檔案都成功加載后,我的網站會顯示加載圖示?
下一篇:如何在F#中簡化異步編程