這是我的第一個初始代碼,它成功地請求了多個以太坊地址余額。
import requests
import time
import pandas as pd
start = time.time()
df = pd.read_csv('ethereumaddresses.csv')
Wallet_Address=(df.loc[:,'Address'])
results = []
start = time.time()
for address in Wallet_Address:
url = f"https://blockscout.com/eth/mainnet/api?module=account&action=eth_get_balance&address={address}"
response = requests.get(url)
results = response.json()
print(results)
end = time.time()
total_time = end - start
print(f"It took {total_time} to make {len(Wallet_Address)} API calls")
但是,我請求 1000 個以太坊地址,并且我想使用異步功能改進我的代碼。這是我的嘗試。我究竟做錯了什么?
import asyncio
import aiohttp
import time
import pandas as pd
start = time.time()
df = pd.read_csv('Ethereum/ethereumaddresses.csv')
Wallet_Address=(df.loc[:,'Address'])
results = []
def get_tasks(session):
tasks = []
for address in Wallet_Address:
url = f"https://blockscout.com/eth/mainnet/api?module=account&action=eth_get_balance&address={address}"
tasks.append(session.get(url,ssl=False))
print(address)
return tasks
session_timeout = aiohttp.ClientTimeout(total=None)
async def get_balances():
async with aiohttp.ClientSession(timeout=session_timeout) as session:
tasks = get_tasks(session)
responses = await asyncio.gather(*tasks)
for response in responses:
results.append(await response.json())
asyncio.run(get_balances())
end = time.time()
total_time = end - start
print(f"It took {total_time} seconds to make {len(Wallet_Address)} API calls")
它給了我一個錯誤:
RuntimeError: await wasn't used with future
_OverlappedFuture exception was never retrieved
future: <_OverlappedFuture finished exception=OSError(22, 'The I/O operation has been aborted because of either a thread exit or an application request', None, 995, None)>
Traceback (most recent call last):
File "AppData\Local\Programs\Python\Python310\lib\asyncio\windows_events.py", line 817, in _poll
value = callback(transferred, key, ov)
File "AppData\Local\Programs\Python\Python310\lib\asyncio\windows_events.py", line 604, in finish_connect
ov.getresult()
OSError: [WinError 995] The I/O operation has been aborted because of either a thread exit or an application request
uj5u.com熱心網友回復:
根據我的快速搜索和一些測驗,這個錯誤似乎與您的代碼本身無關。這是一個相關的帖子(盡管是針對 C#,但關于相同的 Windows 錯誤):
由于執行緒退出或應用程式請求,I/O 操作已中止
由于我可以運行(與網路相關的部分)您的異步代碼而不會出現此錯誤,因此我認為這與您正在訪問的作業系統或 API 有關。您提到您正在嘗試執行 1000 個并發請求。速率限制是一回事。所以也許你的連接被服務器關閉了。用我們掌握的資訊是不可能明確診斷的。
為了進一步測驗這一點,我建議你嘗試運行你已經擁有的相同腳本,但只有兩個或三個并發請求(即你的錢包地址的一個非常小的子集)。如果這行得通,你至少有一個起點。
如果這與請求數量太大有關,您可以考慮分批收集您的協程。這是您可以替換的函式的簡單實作asyncio.gather:
from asyncio import gather
from collections.abc import Awaitable
from typing import TypeVar
T = TypeVar("T")
async def gather_in_batches(
*aws: Awaitable[T],
batch_size: int,
return_exceptions: bool = False,
) -> list[T]:
results: list[T] = []
for idx in range(0, len(aws), batch_size):
results.extend(
await gather(
*aws[idx:idx batch_size],
return_exceptions=return_exceptions,
)
)
return results
例如,如果您有 1000 個協程并將批量大小設定為 50,則該gather函式在該回圈中按順序等待 20 次。如果需要,您甚至可以使用 來在每次迭代之間引入人為延遲asyncio.sleep。也許這會有所幫助。
關于您的腳本,我冒昧地對其進行了重構并對其進行了一些清理。這是一個應該按預期作業的完整版本:
import json
import sys
from asyncio import gather, run
from collections.abc import Awaitable, Iterable
from time import time
from typing import Any, TypeVar
import pandas as pd
from aiohttp import ClientSession, ClientTimeout
T = TypeVar("T")
BASE_URL = "https://blockscout.com/eth/mainnet/api?module=account&action=eth_get_balance&address="
DEFAULT_TIMEOUT = ClientTimeout(total=None)
def load_wallet_addresses(csv_path: str) -> pd.Series: # type: ignore[type-arg]
df = pd.read_csv(csv_path)
return df.loc[:, "Address"]
async def gather_in_batches(
*aws: Awaitable[T],
batch_size: int,
return_exceptions: bool = False,
) -> list[T]:
results: list[T] = []
for idx in range(0, len(aws), batch_size):
results.extend(
await gather(
*aws[idx:idx batch_size],
return_exceptions=return_exceptions,
)
)
return results
async def get_balance(session: ClientSession, address: str) -> dict[str, Any]:
async with session.get(BASE_URL address, ssl=False) as response:
return await response.json() # type: ignore[no-any-return]
async def get_balances_in_batches(
addresses: Iterable[str],
batch_size: int = 1,
) -> list[dict[str, Any]]:
async with ClientSession(timeout=DEFAULT_TIMEOUT) as session:
coroutines = (get_balance(session, address) for address in addresses)
return await gather_in_batches(*coroutines, batch_size=batch_size)
async def main() -> None:
csv_path = sys.argv[1]
try:
batch_size = int(sys.argv[2])
except IndexError:
batch_size = 1
addresses = load_wallet_addresses(csv_path)
time_start = time()
results = await get_balances_in_batches(addresses, batch_size)
time_total = time() - time_start
print(json.dumps(results, indent=4))
num_total = len(addresses)
num_batches = num_total // batch_size (num_total % batch_size > 0)
print(
f"It took {time_total:.3} to make {num_total} API calls in "
f"{num_batches} batches of up to {batch_size} concurrent calls each."
)
if __name__ == "__main__":
run(main())
你這樣稱呼它:
python path/to/script.py path/to/wallets.csv 32
最后一個引數是批量大小,它是可選的。如果省略它,批量大小默認為1,這意味著請求都是按順序完成的。
希望這可以幫助。
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/523581.html
