問題陳述
我想在異步 Rust 中實作一個有向無環計算圖框架,即計算“節點”的互連圖,每個節點都從前任節點獲取輸入并為后繼節點產生輸出。我計劃通過生成一組Futures 來實作這一點,每個計算節點一個,同時允許期貨之間的依賴關系。然而,在使用實作這個框架時,async我已經完全迷失在編譯器錯誤中。
最小的例子
這是我想做的一個最小示例的嘗試。有一個 floats 輸入串列,values任務是創建一個新串列outputwhere output[i] = values[i] output[i - 2]。這是我嘗試過的:
use std::sync;
fn some_complicated_expensive_fn(val1: f32, val2: f32) -> f32 {
val1 val2
}
fn example_async(values: &Vec<f32>) -> Vec<f32> {
let runtime = tokio::runtime::Runtime::new().unwrap();
let join_handles = sync::Arc::new(sync::Mutex::new(Vec::<tokio::task::JoinHandle<f32>>::new()));
for (i, value) in values.iter().enumerate() {
let future = {
let join_handles = join_handles.clone();
async move {
if i < 2 {
*value
} else {
let prev_value = join_handles.lock().unwrap()[i - 2].await.unwrap();
some_complicated_expensive_fn(*value, prev_value)
}
}
};
join_handles.lock().unwrap().push(runtime.spawn(future));
}
join_handles
.lock()
.unwrap()
.iter_mut()
.map(|join_handle| runtime.block_on(join_handle).unwrap())
.collect()
}
#[cfg(test)]
mod tests {
#[test]
fn test_example() {
let values = vec![1., 2., 3., 4., 5., 6.];
println!("{:?}", super::example_async(&values));
}
}
Mutex我收到關于未鎖定的錯誤Send:
error: future cannot be sent between threads safely
--> sim/src/compsim/runtime.rs:23:51
|
23 | join_handles.lock().unwrap().push(runtime.spawn(future));
| ^^^^^ future created by async block is not `Send`
|
= help: within `impl Future`, the trait `Send` is not implemented for `std::sync::MutexGuard<'_, Vec<tokio::task::JoinHandle<f32>>>`
note: future is not `Send` as this value is used across an await
--> sim/src/compsim/runtime.rs:18:38
|
18 | let prev_value = join_handles.lock().unwrap()[i - 2].await.unwrap();
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ first, await occurs here, with `join_handles.lock().unwrap()` maybe used later...
note: `join_handles.lock().unwrap()` is later dropped here
--> sim/src/compsim/runtime.rs:18:88
|
18 | let prev_value = join_handles.lock().unwrap()[i - 2].await.unwrap();
| ---------------------------- ^
| |
| has type `std::sync::MutexGuard<'_, Vec<tokio::task::JoinHandle<f32>>>` which is not `Send`
help: consider moving this into a `let` binding to create a shorter lived borrow
--> sim/src/compsim/runtime.rs:18:38
|
18 | let prev_value = join_handles.lock().unwrap()[i - 2].await.unwrap();
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
這是有道理的,我在Tokio 檔案中看到您可以使用 atokio::task::Mutex來代替,但是 a)我不確定如何,并且 b)我想知道是否有更好的整體方法我錯過了。非常感謝幫助!謝謝。
uj5u.com熱心網友回復:
編譯器抱怨你不能在join_handle被鎖定的情況下越過等待點,這是因為任務可能會在 之后被不同的執行緒拾取.await,并且必須在同一個執行緒中鎖定和解鎖鎖。您可以通過縮短鎖的壽命來解決此問題,例如,將每個句柄保持在 中Option,在等待之前獲取它。但是隨后您遇到了等待 aJoinHandle 消耗它的問題 - 您收到任務回傳的值,并且您丟失了句柄,因此您無法將其回傳給向量。(這是 Rust 值具有單一所有者的結果,因此一旦句柄將值傳遞給您,它就不再擁有它并且變得無用。)
句柄基本上就像生成任務結果的一次性通道一樣作業。由于您需要將結果放在另一個位置,因此您可以單獨創建一個包含一次性通道的向量,該向量保留另一個結果副本,需要它們的任務可以等待該副本。
pub fn example_async(values: &[f32]) -> Vec<f32> {
let runtime = tokio::runtime::Runtime::new().unwrap();
let (txs, rxs): (Vec<_>, Vec<_>) = (0..values.len())
.map(|_| {
let (tx, rx) = tokio::sync::oneshot::channel();
(Mutex::new(Some(tx)), Mutex::new(Some(rx)))
})
.unzip();
let txs = Arc::new(txs);
let rxs = Arc::new(rxs);
let mut join_handles = vec![];
for (i, value) in values.iter().copied().enumerate() {
let txs = Arc::clone(&txs);
let rxs = Arc::clone(&rxs);
let future = async move {
let result = if i < 2 {
value
} else {
let prev_rx = rxs[i - 2].lock().unwrap().take().unwrap();
let prev_value = prev_rx.await.unwrap();
some_complicated_expensive_fn(value, prev_value)
};
let tx = txs[i].lock().unwrap().take().unwrap();
tx.send(result).unwrap(); // here you'd use result.clone() for non-Copy result
result
};
join_handles.push(runtime.spawn(future));
}
join_handles
.into_iter()
.map(|handle| runtime.block_on(handle).unwrap())
.collect()
}
操場
轉載請註明出處,本文鏈接:https://www.uj5u.com/caozuo/410859.html
標籤:
