在以下程式中,我使用 Tokio 的 mpsc 頻道。發送者被移動到一個名為的任務input_message,接收者被移動到另一個名為 的任務printer。這兩個任務都tokio::spawn()在 main 函式中進行了編輯。任務是讀取用戶的input_message輸入并通過 Channel 發送。通道上獲取用戶輸入并將其簡單地列印到標準輸出的printer任務:recv()
use std::error::Error;
use tokio::sync::mpsc;
use std::io::{BufRead, Write};
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let (tx, mut rx) = mpsc::unbounded_channel::<String>();
let printer = tokio::spawn(async move {
loop {
let res = rx.recv().await; // (11) Comment this ..
// let res = rx.try_recv(); // (12) Uncomment this ,,
if let Some(m) = res { // .. and this
// if let Ok(m) = res { // ,, and this
if m.trim() == "q".to_string() {
break;
}
println!("Received: {}", m.trim());
}
}
println!("Printer exited");
});
let input_message = tokio::spawn(async move {
let stdin = std::io::stdin();
let mut bufr = std::io::BufReader::new(stdin);
let mut buf = String::new();
loop {
// Let the printer thread print the string before asking the user's input.
std::thread::sleep(std::time::Duration::from_millis(1));
print!("Enter input: ");
std::io::stdout().flush().unwrap();
bufr.read_line(&mut buf).unwrap();
if buf.trim() == "q".to_string() {
tx.send(buf).unwrap();
break;
}
tx.send(buf).unwrap();
buf = String::new();
}
println!("InputMessage exited");
});
tokio::join!(input_message, printer);
Ok(())
}
該程式的預期行為是:
- 詢問用戶一個隨機輸入(q 退出)
- 將相同的輸入列印到標準輸出
使用rx.recv().await第 11-13 行,程式似乎緩沖了代表用戶輸入的字串:printer任務不接收各種輸入,因此不會將字串列印到標準輸出。一旦退出訊息(即 q)被發送,input_message任務退出并且訊息似乎被沖出通道并且接收器一次處理它們,因此printer任務一次列印所有輸入。這是錯誤輸出的示例:
Enter input: Hello
Enter input: World
Enter input: q
InputMessage exited
Received: Hello
Received: World
Printer exited
我的問題是,通道如何緩沖訊息并僅在發送執行緒退出時一次性處理它們,而不是在發送它們時接收它們?
我試圖做的是使用第try_recv()12-14 行中的函式,它確實解決了問題。輸出正確列印,這是一個示例:
Enter input: Hello
Received: Hello
Enter input: World
Received: World
Enter input: q
InputMessage exited
Printer exited
鑒于此,我感到困惑。recv().await我得到了函式和函式之間的區別,try_recv()但我認為在這種情況下我忽略了更多的東西,這使得后者作業而前者不起作用。有沒有人能夠闡明并詳細說明這一點?為什么try_recv()作業recv().await而不作業,為什么不應該recv().await在這種情況下作業?就效率而言,是在回圈try_recv()不良還是“不良做法”?
uj5u.com熱心網友回復:
這里有幾件事要指出,但首先,您正在等待std::io::stdin()阻塞執行緒的行,直到有一行到達該流。當執行緒等待輸入時,不能在該執行緒上執行其他未來,如果您想深入了解為什么不應該這樣做,這篇博文是一個很好的資源。
Tokio 的io模塊為 提供了一個異步句柄stdin(),您可以將其用作快速修復,盡管檔案明確提到您應該為互動式用戶輸入啟動一個專用(非異步)執行緒,而不是使用異步句柄。
交換還需要將標準庫替換std::io::stdin()為包裝了一個而不是.tokio::io::stdin()BufReaderR: AsyncReadR: Read
為了防止輸入任務和輸出任務之間的交錯寫入,您可以使用回應器通道,該通道在輸出已列印時向輸入任務發出信號。String您可以發送Message帶有以下欄位的 a,而不是通過通道發送:
struct Message {
payload: String,
done_tx: oneshot::Sender<()>,
}
讀取輸入行后,Message通過通道將其發送到列印機任務。列印機任務列印String并通過done_tx表示輸入任務可以列印輸入提示并等待換行。
將所有這些與其他一些更改(例如等待訊息的 while 回圈)放在一起,您最終會得到如下結果:
use std::error::Error;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
use tokio::sync::{mpsc, oneshot};
#[derive(Debug)]
struct Message {
done_tx: oneshot::Sender<()>,
message: String,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let (tx, mut rx) = mpsc::unbounded_channel::<Message>();
let printer = tokio::spawn(async move {
while let Some(Message {
message: m,
done_tx,
}) = rx.recv().await
{
if m.trim() == "q".to_string() {
break;
}
println!("Received: {}", m.trim());
done_tx.send(()).unwrap();
}
println!("Printer exited");
});
let input_message = tokio::spawn(async move {
let stdin = tokio::io::stdin();
let mut stdout = tokio::io::stdout();
let mut bufr = tokio::io::BufReader::new(stdin);
let mut buf = String::new();
loop {
// Let the printer thread print the string before asking the user's input.
stdout.write(b"Enter input: ").await.unwrap();
stdout.flush().await.unwrap();
bufr.read_line(&mut buf).await.unwrap();
let end = buf.trim() == "q";
let (done_tx, done) = oneshot::channel();
let message = Message {
message: buf,
done_tx,
};
tx.send(message).unwrap();
if end {
break;
}
done.await.unwrap();
buf = String::new();
}
println!("InputMessage exited");
});
tokio::join!(input_message, printer);
Ok(())
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/qiye/419767.html
標籤:
上一篇:為什么在Java中擴展執行緒類以創建引數化建構式很重要
下一篇:C 執行緒池的低性能
