此代碼運行完成并列印出my_data如果我取消注釋函式中的sleep行的do_work值。如果我把它注釋掉,我的可執行檔案每次都會掛起。
為什么 Condvar 不喚醒最后一個執行緒?提到收集句柄并等待它們加入主執行緒,但這應該由人造絲作用域處理,對嗎?
sleep如果沒有 in 中的陳述句,我如何才能完成此代碼do_work()?
use std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc, Barrier, Condvar, Mutex,
},
thread,
time::Duration,
};
fn do_work(
mtx: Arc<Mutex<bool>>,
cond_var: Arc<Condvar>,
barrier: Arc<Barrier>,
quitting: &AtomicBool,
range: &mut [f32],
) {
while !quitting.load(Ordering::SeqCst) {
barrier.wait();
//thread::sleep(Duration::from_micros(1));
let mut started = mtx.lock().unwrap();
while !*started && !quitting.load(Ordering::SeqCst) {
started = cond_var.wait(started).unwrap();
}
if quitting.load(Ordering::SeqCst) {
break;
} else {
range.iter_mut().for_each(|i| *i = 1.0);
}
}
println!("{:?} Joining", thread::current().id());
}
fn start_work(mtx: Arc<Mutex<bool>>, cond_var: Arc<Condvar>) {
let mut started = mtx.lock().unwrap();
*started = true;
cond_var.notify_all();
}
fn reset_work(mtx: Arc<Mutex<bool>>) {
let mut started = mtx.lock().unwrap();
*started = false;
}
fn main() {
let num_threads = 4;
let test_barrier = Arc::new(Barrier::new(num_threads 1));
let test_mutex = Arc::new(Mutex::new(false));
let test_cond_var = Arc::new(Condvar::new());
let mut my_data = vec![0.0; 1024];
my_data
.iter_mut()
.enumerate()
.for_each(|(i, iter)| *iter = i as f32);
let chunk_size = my_data.len() / num_threads;
let quitting = AtomicBool::new(false);
rayon::scope(|s| {
for chunk in my_data.chunks_mut(chunk_size) {
let thread_mtx = test_mutex.clone();
let thread_cond_var = test_cond_var.clone();
let thread_barrier = Arc::clone(&test_barrier);
let temp = &quitting;
s.spawn(move |_| do_work(thread_mtx, thread_cond_var, thread_barrier, &temp, chunk));
}
test_barrier.wait();
let _upper_bound = 1024 / num_threads;
for _i in 0..10 {
start_work(test_mutex.clone(), test_cond_var.clone());
test_barrier.wait();
reset_work(test_mutex.clone());
}
quitting.store(true, Ordering::SeqCst);
});
println!("my_data is: {:?}", my_data);
}
Cargo.toml 依賴項:
rayon = "*"
這是對do_work稍后將進行的更復雜數學的測驗,但我試圖獲得一系列成功修改較大的Vec.
uj5u.com熱心網友回復:
我讓預期的行為按預期作業。這似乎特別令人費解,并且應該有更好的方法。我很樂意接受一個比我所擁有的答案更復雜的答案,但它至少會產生所需的行為,一個帶有預旋轉執行緒的執行緒池,只要它從主執行緒接收到正確的信號并且可以確定性地關閉,就會產生突發作業大大地。額外的開始和結束執行緒提供了一個握手機制,以確保沒有競爭條件出現在障礙上。
use std::{
sync::{
atomic::{AtomicBool, Ordering},
Barrier,
},
thread,
};
fn do_work(
start_barrier: &Barrier,
finish_barrier: &Barrier,
quitting: &AtomicBool,
starting: &AtomicBool,
finishing: &AtomicBool,
range: &mut [f32],
) {
while !quitting.load(Ordering::SeqCst) {
start_barrier.wait();
while !starting.load(Ordering::SeqCst) && !quitting.load(Ordering::SeqCst) {}
{
// let mut started = mtx.lock().unwrap();
// while !*started && !quitting.load(Ordering::SeqCst) {
// started = cond_var.wait(started).unwrap();
// }
}
if quitting.load(Ordering::SeqCst) {
break;
} else {
range.iter_mut().for_each(|i| *i = 1.0);
}
finish_barrier.wait();
while !finishing.load(Ordering::SeqCst) && !quitting.load(Ordering::SeqCst) {}
}
println!("{:?} Joining", thread::current().id());
}
fn main() {
let num_threads = 4;
let start_barrier = Barrier::new(num_threads 1);
let finish_barrier = Barrier::new(num_threads 1);
let mut my_data = vec![0.0; 1024];
my_data
.iter_mut()
.enumerate()
.for_each(|(i, iter)| *iter = i as f32);
let chunk_size = my_data.len() / num_threads;
let quitting = AtomicBool::new(false);
let starting = AtomicBool::new(false);
let finishing = AtomicBool::new(false);
rayon::scope(|s| {
for chunk in my_data.chunks_mut(chunk_size) {
let thread_start_barrier = &start_barrier;
let thread_finish_barrier = &finish_barrier;
let thread_quitting = &quitting;
let thread_starting = &starting;
let thread_finishing = &finishing;
s.spawn(move |_| do_work( thread_start_barrier,
thread_finish_barrier,
thread_quitting,
thread_starting,
thread_finishing,
chunk));
}
let num_rounds = 10;
for i in 0..num_rounds {
let start = std::time::Instant::now();
start_barrier.wait();
finishing.store(false, Ordering::SeqCst);
starting.store(true, Ordering::SeqCst);
finish_barrier.wait();
if i == num_rounds-1 {
quitting.store(true, Ordering::SeqCst);
}
finishing.store(true, Ordering::SeqCst);
starting.store(false, Ordering::SeqCst);
println!("Round {} took: {:?}", i, std::time::Instant::now() - start);
}
});
println!("my_data is: {:?}", my_data);
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/caozuo/430573.html
