我正在撰寫一些用于異步 IO 函式的決議器代碼(使用 Trio)。決議器獲得一個匯出異步read()方法的物件,并在決議程序中呼叫該方法。
通常,此代碼將使用直接離開網路的資料運行,并使用 Trio 的網路功能。為此,顯然需要 Trio。但是,我還希望能夠呼叫決議器,并且已經有了完整的訊息。在這種情況下,網路代碼可以通過簡單的異步重新實作BytesIO或類似的方式有效地替換。
因為它await是實作的異步函式,所以決議器代碼也必須是異步的。read()在保證方法永遠不會阻塞的情況下,是否有一種簡單的方法可以從同步函式運行此異步代碼,而無需運行完整的事件回圈?
例如
async def parse(reader):
d = await reader.read(2)
# parse data
d2 = await reader.read(4)
# more parsing
return parsed_obj
您可以使用永不阻塞的異步方法創建一個物件read(),然后輕松地parse()從同步代碼呼叫,而不使用事件回圈嗎?
uj5u.com熱心網友回復:
你當然可以。
>>> async def x():
... return 42
...
>>> v=x()
>>> v.send(None)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration: 42
>>>
因此,
>>> def sync_call(p, *a, **kw):
... try:
... p(*a, **kw).send(None)
... except StopIteration as exc:
... return exc.value
... raise RuntimeError("Async function yielded")
...
>>> sync_call(x)
42
>>>
這確實適用于函式呼叫,假設您的呼叫鏈中沒有任何內容會產生(不存在的)主回圈:
>>> async def y(f):
... return (await f())/2
...
>>> sync_call(y,x)
21.0
uj5u.com熱心網友回復:
你提到有一個“整個訊息”的概念,但我希望網路上有兩個協議:一個框架協議(例如大小前綴或終止符)和訊息協議。像這樣的東西:
async def read_and_decode_message(reader, buffer):
assert isinstance(buffer, bytearray)
while True:
end_of_message = find_message_end_in_buffer(buffer)
if end_of_message is not None:
break
buffer.extend(await reader.read_some())
message_encoded = buffer[:end_of_message]
del buffer[:end_of_message]
return decode_message(message_encoded)
如果是這樣的話,你當然可以直接呼叫decode_message(),這應該是一個同步函式嗎?
我猜你的代碼不是這樣的,或者你肯定已經想到了,但也許可以這樣重構。
轉載請註明出處,本文鏈接:https://www.uj5u.com/qiye/513492.html
