我第一次使用Webserver,我之前使用過套接字和并行性,但它非常不同且簡單,它沒有使用 Async 作為并行性。
我的目標很簡單,我有我的服務器和我的客戶端。在我的客戶端中,我想創建一個單獨的執行緒來接收服務器將發送的訊息,并在前一個執行緒中執行其他一些操作,如代碼示例 (client.py):
from typing import Dict
import websockets
import asyncio
import json
URL = "my localhost webserver"
connection = None
async def listen() -> None:
global connection
input("Press enter to connect.")
async with websockets.connect(URL) as ws:
connection = ws
msg_initial: Dict[str,str] = get_dict()
await ws.send(json.dumps(msg_initial))
## This i want to be in another thread
await receive_msg()
print("I`m at listener`s thread")
# do some stuffs
async def recieve_msg() -> None:
while True:
msg = await connection.recv()
print(f"Server: {msg}")
asyncio.get_event_loop().run_until_complete(listen())
對于我來說,我需要在其中使用一條訊息await,recv()但我不知道如何為此創建一個單獨的執行緒。我已經嘗試使用threading創建一個單獨的執行緒,但它沒有用。
有誰知道如何做到這一點以及是否有可能做到這一點?
uj5u.com熱心網友回復:
不清楚您想做什么可以按照您提出的確切方式完成。在下面的例子中,我連接到一個回顯服務器。直接實作您所建議的最直接的方法是創建一個新執行緒,將連接傳遞到該執行緒。但這并不完全有效:
import websockets
import asyncio
from threading import Thread
URL = "ws://localhost:4000"
async def listen() -> None:
async with websockets.connect(URL) as ws:
# pass connection:
t = Thread(target=receiver_thread, args=(ws,))
t.start()
# Generate some messages to be echoed back:
await ws.send('msg1')
await ws.send('msg2')
await ws.send('msg3')
await ws.send('msg4')
await ws.send('msg5')
def receiver_thread(connection):
print("I`m at listener`s thread")
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(receive_msg(connection))
async def receive_msg(connection) -> None:
while True:
msg = await connection.recv()
print(f"Server: {msg}")
asyncio.get_event_loop().run_until_complete(listen())
印刷:
I`m at listener`s thread
Server: msg1
Server: msg2
Server: msg3
Server: msg4
Server: msg5
Exception in thread Thread-1:
Traceback (most recent call last):
File "C:\Program Files\Python38\lib\threading.py", line 932, in _bootstrap_inner
self.run()
File "C:\Program Files\Python38\lib\threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "C:\Ron\test\test.py", line 22, in receiver_thread
loop.run_until_complete(receive_msg(connection))
File "C:\Program Files\Python38\lib\asyncio\base_events.py", line 616, in run_until_complete
return future.result()
File "C:\Ron\test\test.py", line 29, in receive_msg
msg = await connection.recv()
File "C:\Program Files\Python38\lib\site-packages\websockets\legacy\protocol.py", line 404, in recv
await asyncio.wait(
File "C:\Program Files\Python38\lib\asyncio\tasks.py", line 424, in wait
fs = {ensure_future(f, loop=loop) for f in set(fs)}
File "C:\Program Files\Python38\lib\asyncio\tasks.py", line 424, in <setcomp>
fs = {ensure_future(f, loop=loop) for f in set(fs)}
File "C:\Program Files\Python38\lib\asyncio\tasks.py", line 667, in ensure_future
raise ValueError('The future belongs to a different loop than '
ValueError: The future belongs to a different loop than the one specified as the loop argument
訊息接收正常,但問題出現在receiver_thread陳述句的函式中:
loop.run_until_complete(receive_msg(connection))
By necessity the started thread has no running event loop and cannot use the event loop being used by function listen and so must create a new event loop. That would be fine if this thread/event loop were not using any resources (i.e. the connection) from a difference event loop:
import websockets
import asyncio
from threading import Thread
URL = "ws://localhost:4000"
async def listen() -> None:
async with websockets.connect(URL) as ws:
t = Thread(target=receiver_thread)
t.start()
def receiver_thread():
print("I`m at listener`s thread")
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(receive_msg())
async def receive_msg() -> None:
await asyncio.sleep(2)
print('I just slept for 2 seconds')
asyncio.get_event_loop().run_until_complete(listen())
Prints:
I`m at listener`s thread
I just slept for 2 seconds
I can see no real need to be running anything in threads based on the minimal code you have showed but assuming you omitted showing some processing of the received message for which asyncio alone is not sufficient, then perhaps all you need to do is receive the messages in the current running loop (in function listen) and use threading just for the processing of the message:
from typing import Dict
import websockets
import asyncio
import json
from threading import Thread
URL = "my localhost webserver"
async def listen() -> None:
input("Press enter to connect.")
async with websockets.connect(URL) as ws:
msg_initial: Dict[str,str] = get_dict()
await ws.send(json.dumps(msg_initial))
while True:
msg = await ws.recv()
print(f"Server: {msg}")
# Non-daemon threads so program will not end until these threads terminate:
t = Thread(target=process_msg, args=(msg,))
t.start()
asyncio.get_event_loop().run_until_complete(listen())
Update
Based on your last comment to my answer concerning creating a chat program, you should either implement this using pure multithreading or pure asyncio. Here is a rough outline using asyncio:
import websockets
import asyncio
import aioconsole
URL = "my localhost webserver"
async def receiver(connection):
while True:
msg = await connection.recv()
print(f"\nServer: {msg}")
async def sender(connection):
while True:
msg = await aioconsole.ainput('\nEnter msg: ')
await connection.send(msg)
async def chat() -> None:
async with websockets.connect(URL) as ws:
await asyncio.gather(
receiver(ws),
sender(ws)
)
asyncio.get_event_loop().run_until_complete(chat())
However, you may be limited in the type of user input you can do with asyncio. I would think, therefore, that multithreading might be a better approach.
轉載請註明出處,本文鏈接:https://www.uj5u.com/qukuanlian/337952.html
