我正在處理一個計算時間很長的專案,我將有數百個節點運行它,作為我實作的一部分,我有一個狀態處理程式物件/結構,它與 API 對話并獲取所需的資訊,如引數、然后狀態處理程式呼叫主要的密集函式。
它為了密切關注計算密集型函式,我希望它以已完成百分比回傳狀態處理函式,以便狀態處理程式可以更新 API,然后允許密集函式繼續計算而不會丟失任何堆疊(例如它的變數和檔案句柄)
我研究了異步函式,但它們似乎只回傳一次。
先感謝您!
uj5u.com熱心網友回復:
您要的是generators,目前還不穩定。您可以在nightly上對它們進行試驗,或者手動創建與它們類似的東西并手動呼叫它(盡管它的語法不如生成器好)。例如這樣的事情:
enum Status<T, K> {
Updated(T),
Finished(K)
}
struct State { /* ... */ }
impl State {
pub fn new() -> Self {
Self { /* ... */ }
}
pub fn call(self) -> Status<Self, ()> {
// Do some update on State and return
// State::Updated(self). When you finised
// return State::Finished(()). Note that this
// method consumes self. You could make it take
// &mut self, but then you would have to worry
// about how to prevent it beeing called after
// it finished. If you want to return some intermidiate
// value in each step you can make Status::Updated
// contain a (state, value) instead.
todo!()
}
}
fn foo() {
let mut state = State::new();
let result = loop {
match state.call() {
Status::Updated(s) => state = s,
Status::Finished(result) => break result
}
};
}
uj5u.com熱心網友回復:
Async 實際上可以暫停和恢復,但它適用于 IO 系結程式,這些程式基本上一直在等待一些外部 IO。它不適用于計算繁重的任務。
關于如何解決這個問題,我想到了兩種方法:
- 執行緒和通道
- 回呼
解決方案 1:執行緒和通道
use std::{sync::mpsc, thread, time::Duration};
struct StatusUpdate {
task_id: i32,
percent: f32,
}
impl StatusUpdate {
pub fn new(task_id: i32, percent: f32) -> Self {
Self { task_id, percent }
}
}
fn expensive_computation(id: i32, status_update: mpsc::Sender<StatusUpdate>) {
status_update.send(StatusUpdate::new(id, 0.0)).unwrap();
thread::sleep(Duration::from_millis(1000));
status_update.send(StatusUpdate::new(id, 33.3)).unwrap();
thread::sleep(Duration::from_millis(1000));
status_update.send(StatusUpdate::new(id, 66.6)).unwrap();
thread::sleep(Duration::from_millis(1000));
status_update.send(StatusUpdate::new(id, 100.0)).unwrap();
}
fn main() {
let (status_sender_1, status_receiver) = mpsc::channel();
let status_sender_2 = status_sender_1.clone();
thread::spawn(move || expensive_computation(1, status_sender_1));
thread::spawn(move || expensive_computation(2, status_sender_2));
for status_update in status_receiver {
println!(
"Task {} is done {} %",
status_update.task_id, status_update.percent
);
}
}
Task 1 is done 0 %
Task 2 is done 0 %
Task 1 is done 33.3 %
Task 2 is done 33.3 %
Task 1 is done 66.6 %
Task 2 is done 66.6 %
Task 2 is done 100 %
Task 1 is done 100 %
解決方案 2:回呼
use std::{thread, time::Duration};
struct StatusUpdate {
task_id: i32,
percent: f32,
}
impl StatusUpdate {
pub fn new(task_id: i32, percent: f32) -> Self {
Self { task_id, percent }
}
}
fn expensive_computation<F: FnMut(StatusUpdate)>(id: i32, mut update_status: F) {
update_status(StatusUpdate::new(id, 0.0));
thread::sleep(Duration::from_millis(1000));
update_status(StatusUpdate::new(id, 33.3));
thread::sleep(Duration::from_millis(1000));
update_status(StatusUpdate::new(id, 66.6));
thread::sleep(Duration::from_millis(1000));
update_status(StatusUpdate::new(id, 100.0));
}
fn main() {
expensive_computation(1, |status_update| {
println!(
"Task {} is done {} %",
status_update.task_id, status_update.percent
);
});
}
Task 1 is done 0 %
Task 1 is done 33.3 %
Task 1 is done 66.6 %
Task 1 is done 100 %
請注意,使用通道解決方案,一次處理不同執行緒上的多個計算要容易得多。使用回呼,執行緒之間的通信是困難的/不可能的。
我是否能夠在使用該資料執行某些操作時暫停執行昂貴的功能,然后讓它恢復?
不,這不是執行緒可以完成的事情。一般來說。
您可以以低于主執行緒的優先級運行它們,這意味著它們不會被積極地調度,從而減少了主執行緒中的延遲。但總的來說,作業系統是搶占式的,并且能夠在執行緒之間來回切換,因此您不必擔心“暫停”。
uj5u.com熱心網友回復:
這就是我想出使用 tokio
use tokio::task;
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel(32);
tokio::spawn(expensive_computation(tx));
while let Some(message) = rx.recv().await {
write_data(message).await;
}
}
async fn expensive_computation(tx: mpsc::Sender<String>) {
for i in 0..100 {
if i % 10 == 0 {
tx.send(format!("{}", i)).await.unwrap();
}
println!("expensive_computation {}", i);
}
}
async fn write_data(i: String) {
println!("Writing data {}", i);
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/qiye/513477.html
標籤:异步锈
