簡要描述;簡介:
下面的代碼是 C Concurrency in Action 2nd Edition 中無鎖佇列的實作。佇列使用鏈表作為底層資料結構,采用拆分參考計數技術實作。
讓我困惑的是,compare_exchange_strong代碼中有多個使用acquire記憶體順序,但我不明白為什么使用這個順序。
例如,在對物件count成員的所有訪問中node(見下文),沒有帶有releaseorder 的存盤操作,但有很多compare_exchange_strong帶有acquireorder 的操作。
關于這個無鎖佇列的完整代碼,我對代碼的解釋,以及我的問題的詳細描述,請閱讀完整描述部分。
詳細描述:
我正在閱讀 Anthony Williams 的第二版 C Concurrency in Action。本書描述了如何使用拆分參考計數技術實作無鎖佇列。我將首先根據我的理解簡要解釋代碼的作業原理,以幫助您快速閱讀代碼。這個佇列的完整實作將在后面給出。
該實作使用單鏈表來實作佇列。佇列中保存著指向鏈表頭節點和尾節點的指標,在代碼中分別是head和,指向一個啞節點。tailtail
將一個元素推入佇列時,我們需要將元素值放入 指向的虛擬節點中tail,然后在該節點之后添加一個虛擬節點,讓tail指向新的虛擬節點。
當從佇列中彈出一個元素時,我們應該彈出 指向的元素head,然后設定head為head->next。當head和tail指向同一個節點(即一個虛擬節點)時,佇列為空。
該代碼使用參考計數來管理已洗掉節點的生命周期。與節點關聯的參考計數分為兩部分,外部參考計數和內部參考計數。外部計數加上內部計數是該節點的完整參考計數值。外部計數存盤在指向節點的指標中(即counted_node_ptr代碼中),而內部計數存盤在節點物件內部(即count代碼中)。
為了防止一個執行緒指向的物件在解參考指標之前被另一個執行緒洗掉,必須先將外部計數器遞增,以確保該節點不被洗掉。這是increase_external_count()在代碼中完成的。
當外部參考不再參考節點時,存盤在其中的外部計數必須添加到節點的內部計數中,這是由free_external_counter(). 內部計數存盤在count.internal_count節點物件中。并count.external_counters表示參考該節點的外部參考的數量。因為這些外部參考每個都有自己的外部計數,所以這些計數都必須考慮在內。只有當且僅當內部計數為 0 且外部計數器的編號也為 0 時,該節點才能被安全洗掉。
對于pop沒有成功指向被彈出節點的參考(通常是因為另一個執行緒已經彈出它),這些參考被簡單地丟棄,但必須減去它們的參考計數。這是由release_ref(). 此函式將節點的內部計數減一。每次遞減參考計數,或者內部計數和外部計數合并時,都會檢查是否滿足要洗掉的節點的條件。
下面是書中無鎖佇列的完整實作:
template<typename T>
class lock_free_queue
{
private:
struct node;
struct counted_node_ptr
{
int external_count;
node* ptr;
};
std::atomic<counted_node_ptr> head;
std::atomic<counted_node_ptr> tail;
struct node_counter
{
unsigned internal_count:30;
unsigned external_counters:2;
};
struct node
{
std::atomic<T*> data;
std::atomic<node_counter> count;
counted_node_ptr next;
node()
{
node_counter new_count;
new_count.internal_count=0;
new_count.external_counters=2;
count.store(new_count);
next.ptr=nullptr;
next.external_count=0;
data.store(nullptr);
}
void release_ref()
{
node_counter old_counter=
count.load(std::memory_order_relaxed);
node_counter new_counter;
do
{
new_counter=old_counter;
--new_counter.internal_count;
}
while(!count.compare_exchange_strong(
old_counter,new_counter,
std::memory_order_acquire,std::memory_order_relaxed));
if(!new_counter.internal_count &&
!new_counter.external_counters)
{
delete this;
}
}
};
static void increase_external_count(
std::atomic<counted_node_ptr>& counter,
counted_node_ptr& old_counter)
{
counted_node_ptr new_counter;
do
{
new_counter=old_counter;
new_counter.external_count;
}
while(!counter.compare_exchange_strong(
old_counter,new_counter,
std::memory_order_acquire,std::memory_order_relaxed));
old_counter.external_count=new_counter.external_count;
}
static void free_external_counter(counted_node_ptr &old_node_ptr)
{
node* const ptr=old_node_ptr.ptr;
int const count_increase=old_node_ptr.external_count-2;
node_counter old_counter=
ptr->count.load(std::memory_order_relaxed);
node_counter new_counter;
do
{
new_counter=old_counter;
--new_counter.external_counters;
new_counter.internal_count =count_increase;
}
while(!ptr->count.compare_exchange_strong(
old_counter,new_counter,
std::memory_order_acquire,std::memory_order_relaxed));
if(!new_counter.internal_count &&
!new_counter.external_counters)
{
delete ptr;
}
}
public:
lock_free_queue() {
counted_node_ptr h;
h.ptr = new node;
h.external_count = 1;
head.store(h);
tail.store(h);
}
lock_free_queue(const lock_free_queue&) = delete;
lock_free_queue& operator=(const lock_free_queue&) = delete;
~lock_free_queue() {
while (pop())
;
}
void push(T new_value)
{
std::unique_ptr<T> new_data(new T(new_value));
counted_node_ptr new_next;
new_next.ptr=new node;
new_next.external_count=1;
counted_node_ptr old_tail=tail.load();
for(;;)
{
increase_external_count(tail,old_tail);
T* old_data=nullptr;
if(old_tail.ptr->data.compare_exchange_strong(
old_data,new_data.get()))
{
old_tail.ptr->next=new_next;
old_tail=tail.exchange(new_next);
free_external_counter(old_tail);
new_data.release();
break;
}
old_tail.ptr->release_ref();
}
}
std::unique_ptr<T> pop()
{
counted_node_ptr old_head=head.load(std::memory_order_relaxed);
for(;;)
{
increase_external_count(head,old_head);
node* const ptr=old_head.ptr;
if(ptr==tail.load().ptr)
{
ptr->release_ref();
return std::unique_ptr<T>();
}
if(head.compare_exchange_strong(old_head,ptr->next))
{
T* const res=ptr->data.exchange(nullptr);
free_external_counter(old_head);
return std::unique_ptr<T>(res);
}
ptr->release_ref();
}
}
};
我想我完全理解如何push和pop作業,并認為它們是正確的。但是我不明白的是為什么代碼中的一些原子操作使用那個記憶體順序,而書上并沒有給出原因。
物件的count成員node存盤節點的內部計數值,以及對節點的外部參考數。該count成員在代碼中僅使用了幾次:
- 在 的建構式中,
node有一個 seq-cststore:count
node()
{
node_counter new_count;
new_count.internal_count=0;
new_count.external_counters=2;
count.store(new_count); // seq-cst store
next.ptr=nullptr;
next.external_count=0;
}
- 在
release_ref, 有relaxedload和compare_exchange_strong對于count:
void release_ref()
{
node_counter old_counter=
count.load(std::memory_order_relaxed); // relaxed load
node_counter new_counter;
do
{
new_counter=old_counter;
--new_counter.internal_count;
}
while(!count.compare_exchange_strong(
old_counter,new_counter,
std::memory_order_acquire,std::memory_order_relaxed)); // acquire RMW operation
// ...
}
- 在
free_external_counter中,還有relaxedload和acquirecompare_exchange_strong:
static void free_external_counter(counted_node_ptr &old_node_ptr)
{
node* const ptr=old_node_ptr.ptr;
int const count_increase=old_node_ptr.external_count-2;
node_counter old_counter=
ptr->count.load(std::memory_order_relaxed); // relaxed load
node_counter new_counter;
do
{
new_counter=old_counter;
--new_counter.external_counters;
new_counter.internal_count =count_increase;
}
while(!ptr->count.compare_exchange_strong(
old_counter,new_counter,
std::memory_order_acquire,std::memory_order_relaxed)); // acquire RMW
// ...
}
可以看出,除了在 的建構式中執行了一個 seq-cst 存盤外node,其他操作都是帶relaxedor acquireorder 的加載操作,但沒有帶 order 的 store 操作release。
那么acquire這里使用記憶體順序的目的是什么?它應該與什么操作同步?
另外,increase_external_count還使用acquire了命令,這里的原因是什么?雖然其他原子操作使用默認的 seq-cst 排序,但其中一些操作似乎也可以替換為較弱的記憶體排序。
uj5u.com熱心網友回復:
接得好。這些是資料競賽。 ThreadSanitizer 同意這兩種情況。
在free_external_counter:
while(!ptr->count.compare_exchange_strong(
old_counter,new_counter,
std::memory_order_acquire,std::memory_order_relaxed));
if(!new_counter.internal_count &&
!new_counter.external_counters)
{
delete ptr;
}
我們在(to get )*ptr之前讀取物件。但由于未發布,理論上可以在它之后重新排序。如果是這樣,那么當讀取發生時,計數器已經遞減,同時其他一些執行緒可能會再次遞減它然后完成。compare_exchangeptr->countcompare_exchange*ptr*ptrdelete ptr
我不確定這在現實生活中是如何發生的,沒有時間機器,因為我們必須閱讀*ptr才能找到ptr->count我們應該寫入的物件。您需要一些神奇的機器,它可以推測性地使存盤全域可見,然后如果發現存盤了錯誤的地址或值,則回滾所有其他內核。但無論如何,C 記憶體模型并沒有禁止它。
有一個類似的錯誤release_ref,其中正在比賽的物件是*this。
我認為這些可以通過將比較交換升級到acq_rel. 這應該確保所有訪問都*ptr在洗掉之前發生。實際上,正如您所指出的,我們只需要在計數變為零的情況下進行獲取排序,因為只有delete ptr在compare_exchange. 你可以通過在 上release訂購compare_exchange,并在塊內放置一個獲取柵欄if,就在之前delete ptr;。不過,我還沒有花時間證明這是正確的。
另一方面,我也認為您可以將compare_exchangeinincrease_external_count弱化為relaxed. 當它被呼叫時,計數器必須已經是正數,否則訪問計數器本身是不安全的。因此,如果在 compare_exchange 之后的代碼在它之前重新排序,它將同樣受到很好的保護。(當然,這不是證據。)
轉載請註明出處,本文鏈接:https://www.uj5u.com/qiye/520386.html
標籤:C 多线程并发原子无锁
