我正在嘗試HashMap使用該Arc<Mutex<T>>模式作為受Rust 食譜啟發的網站抓取練習的一部分來寫信。
第一部分使用tokio運行時。我無法通過正在完成的任務并回傳,HashMap因為它只是掛起。
type Db = Arc<Mutex<HashMap<String, bool>>>;
pub async fn handle_async_tasks(db: Db) -> BoxResult<HashMap<String, bool>> {
let links = NodeUrl::new("https://www.inverness-courier.co.uk/")
.await
.unwrap();
let arc = db.clone();
let mut handles = Vec::new();
for link in links.links_with_paths {
let x = arc.clone();
handles.push(tokio::spawn(async move {
process(x, link).await;
}));
}
// for handle in handles {
// handle.await.expect("Task panicked!");
// } < I tried this as well>
futures::future::join_all(handles).await;
let readables = arc.lock().await;
for (key, value) in readables.clone().into_iter() {
println!("Checking db: k, v ==>{} / {}", key, value);
}
let clone_db = readables.clone();
return Ok(clone_db);
}
async fn process(db: Db, url: Url) {
let mut db = db.lock().await;
println!("checking {}", url);
if check_link(&url).await.is_ok() {
db.insert(url.to_string(), true);
} else {
db.insert(url.to_string(), false);
}
}
async fn check_link(url: &Url) -> BoxResult<bool> {
let res = reqwest::get(url.as_ref()).await?;
Ok(res.status() != StatusCode::NOT_FOUND)
}
pub struct NodeUrl {
domain: String,
pub links_with_paths: Vec<Url>,
}
#[tokio::main]
async fn main() {
let db: Db = Arc::new(Mutex::new(HashMap::new()));
let db = futures::executor::block_on(task::handle_async_tasks(db));
}
我想回傳HashMap到main()在主執行緒被阻塞。如何等待所有異步執行緒行程完成并回傳HashMap?
uj5u.com熱心網友回復:
let links = NodeUrl::new("https://www.some-site.com/.co.uk/").await.unwrap();
這對我來說似乎不是一個有效的 URL。
async fn process(db: Db, url: Url) {
let mut db = db.lock().await;
println!("checking {}", url);
if check_link(&url).await.is_ok() {
db.insert(url.to_string(), true);
} else {
db.insert(url.to_string(), false);
}
}
這是非常成問題的。在整個請求期間,您持有資料庫的排他鎖。這使您的應用程式有效地串行。默認超時時間reqwest為 30 秒。因此,如果服務器沒有回應并且您有很多鏈接要通過該程式,則該程式可能看起來只是“掛起”。
您應該只獲得盡可能短的資料庫鎖 - 只是為了進行插入:
async fn process(db: Db, url: Url) {
println!("checking {}", url);
if check_link(&url).await.is_ok() {
let mut db = db.lock().await;
db.insert(url.to_string(), true);
} else {
let mut db = db.lock().await;
db.insert(url.to_string(), false);
}
}
或者甚至更好,消除無用的,如果:
async fn process(db: Db, url: Url) {
println!("checking {}", url);
let valid = check_link(&url).await.is_ok();
db.lock().await.insert(url.to_string(), valid);
}
最后你沒有展示你的main函式,你呼叫handle_async_tasks或運行其他東西的方式可能有問題。
uj5u.com熱心網友回復:
我的主要問題是如何處理MutexGuard- 最后我通過使用clone和回傳內部值來完成。
不需要使用futures::executor in main:因為我們在 tokio 運行時內,呼叫.await就足以同步訪問最終值。
克隆Arc一次就足夠了;在將它傳遞到多執行緒背景關系之前,我已經克隆了它兩次。
感謝@orlp 指出與check_link函式相關的錯誤邏輯。
pub async fn handle_async_tasks() -> BoxResult<HashMap<String, bool>> {
let get_links = NodeUrl::new("https://www.invernesscourier.co.uk/")
.await
.unwrap();
let db: Db = Arc::new(Mutex::new(HashMap::new()));
let mut handles = Vec::new();
for link in get_links.links_with_paths {
let x = db.clone();
handles.push(tokio::spawn(async move {
process(x, link).await;
}));
}
futures::future::join_all(handles).await;
let guard = db.lock().await;
let cloned = guard.clone();
Ok(cloned)
}
#[tokio::main]
async fn main() {
let db = task::handle_async_tasks().await.unwrap();
for (key, value) in db.into_iter() {
println!("Checking db: {} / {}", key, value);
}
}
這絕不是最好的 Rust 代碼,但我想分享我最終是如何解決問題的。
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/371838.html
