我的 Rust 代碼用于RwLock在多個執行緒中處理資料。每個執行緒在使用read鎖時填充一個公共存盤(例如填充資料庫,但我的情況有點不同)。最終,公共存盤將填滿。我需要暫停所有處理,重新分配存盤空間(例如從云中分配更多磁盤空間),然后繼續。
// psudo-code
fn thread_worker(tasks) {
let lock = rwlock.read().unwrap();
for task in tasks {
// please ignore out_of_space check race condition
// it's here just to explain the question
if out_of_space {
drop(lock);
let write_lock = rwlock.write().unwrap();
// get more storage
drop(write_lock);
lock = rwlock.read().unwrap();
}
// handle task WITHOUT getting a read lock on every pass
// getting a lock is far costlier than actual task processing
}
drop(lock);
}
由于所有執行緒會在大約同一時間迅速耗盡空間,因此它們都可以釋放read鎖,并獲得write. 獲得write鎖的第一個執行緒將解決存盤問題。但是現在我有一個可能的臨時死鎖情況——所有其他執行緒也在等待write鎖,即使它們不再需要它。
所以這種情況是有可能發生的:給定 3 個執行緒都在等待write,第一個執行緒獲取write,修復問題,釋放write,然后等待read。第二個進入write但很快跳過,因為問題已經解決并發布。第一個和第二個執行緒將進入read并繼續處理,但第三個執行緒仍在等待write并將等待很長時間,直到前兩個執行緒用完空間或完成所有作業。
鑒于所有執行緒正在等待write,如何在第一個執行緒完成作業后“中止”所有其他執行緒的等待,但在釋放write它已經獲得的鎖之前?
我看到有一個poisoning功能,但它是為恐慌而設計的,在生產中重用它似乎是錯誤的,而且很難正確完成。Rust 開發人員也在考慮洗掉它。
PS 每一次回圈迭代本質上就是一個data[index] = value賦值,其中data是一個被多個執行緒共享的巨大的memmap。它index在所有執行緒中都在緩慢增長,因此最終所有執行緒都用完了 memmap 大小。當這種情況發生時,memmap 被銷毀,檔案重新分配,并創建一個新的 memmap。因此,不可能在每次回圈迭代中獲得讀鎖。
uj5u.com熱心網友回復:
你可以有一個AtomicBool作為寫作的看門人:一次只有一個執行緒可以訪問write()。其他需要 write 的執行緒一旦發現已經有一個 writer 就會放棄,然后他們就干脆退回到 reading。因此,您不是“中止”其他撰寫器,而是阻止他們啟動 awrite()開始。
例如,假設一個基本實作看起來像這樣 ( playground ):
use parking_lot::RwLock;
pub struct Data<T> {
store: RwLock<T>,
}
impl<T: Default> Data<T> {
pub fn process(&self, needed_size: usize, f: impl FnOnce(&T)) {
let mut store = self.store.read();
if Self::needs_resize(&store, needed_size) {
drop(store);
let mut wstore = self.store.write();
Self::resize(&mut wstore, needed_size);
drop(wstore);
store = self.store.read();
}
f(&store)
}
fn needs_resize(_store: &T, _needed_size: usize) -> bool {
unimplemented!()
}
fn resize(store: &mut T, to_size: usize) {
if !Self::needs_resize(store, to_size) {
return; // someone else reserved enough for us
}
unimplemented!()
}
}
避免不必要寫入的實作可能如下所示:
use parking_lot::RwLock;
use std::sync::atomic::{AtomicBool, Ordering};
pub struct Data<T> {
store: RwLock<T>,
has_writer: AtomicBool,
}
impl<T: Default> Data<T> {
pub fn process(&self, needed_size: usize, f: impl FnOnce(&T)) {
loop {
let mut store = self.store.read();
if Self::needs_resize(&store, needed_size) {
drop(store);
if self.has_writer.swap(true, Ordering::SeqCst) {
continue; // someone else is writing, go back to read
}
let mut wstore = self.store.write();
Self::resize(&mut wstore, needed_size);
drop(wstore);
self.has_writer.store(false, Ordering::SeqCst);
let mut wstore = self.store.write();
Self::resize(&mut wstore, needed_size);
drop(wstore);
store = self.store.read();
}
break f(&store);
}
}
// needs_resize() and resize() the same stubs as before
...
}
操場
uj5u.com熱心網友回復:
假設任務是獨立的并且比較短,解決這個問題最簡單的方法是不為整批作業持有讀鎖,而是為每個任務解鎖:
// psudo-code
fn thread_worker(tasks) {
for task in tasks {
if out_of_space {
let write_lock = rwlock.write().unwrap();
// get more storage
...
}
// process task
let lock = rwlock.read().unwrap();
...
}
}
請注意這如何避免顯式丟棄,這通常是一種很好的做法(RAII)。在使用塊結束時丟棄是自動的。
這仍然有這個問題,但只是很短的一段時間,特別是如果任務數量遠大于工人數量。
為了使其更加健壯,您可能會從std::sync::Barrier 中獲得靈感,并使用 CondVar 和計數器實作類似的功能。
PS 考慮一種替代設計,其中不是作業人員處理重新分配,而是由單獨的專用管理任務執行。當檢測到 out_of_space 條件時,您向該空間管理器發送一個信號(使用CondVar或mpsc通道),并在作業人員睡眠時執行重新分配作業并定期檢查 out_of_space。
PSS 目前尚不清楚 read() 鎖對您的描述有何保護(我假設您以某種方式需要它)。如果您可以同時運行處理和請求存盤,那么普通互斥鎖也可以完成這項作業:
// psudo-code
fn thread_worker(tasks) {
for task in tasks {
if out_of_space {
let write_lock = mutex.lock().unwrap();
// get more storage
...
}
// process task
...
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/qiye/362901.html
