決定從這篇文章開始,開一個讀原始碼系列,不限制平臺語言或工具,任何自己感興趣的都會寫,前幾天碰到一個小問題又讀了一遍ConcurrentQueue的原始碼,那就拿C#中比較常用的并發佇列ConcurrentQueue作為開篇來聊一聊它的實作原理,
話不多說,直奔主題,
要提前說明下的是,本文決議的原始碼是基于.NET Framework 4.8版本,地址是:https://referencesource.microsoft.com/#mscorlib/system/Collections/Concurrent/ConcurrentQueue.cs
本來是打算用.NET Core版本的,但是找了一下竟然沒找到:https://github.com/dotnet/runtime/tree/master/src/libraries/System.Collections.Concurrent/src/System/Collections/Concurrent
不知道是我找錯位置了還是咋回事,有知道的大佬告知一下,不過我覺得實作原理應該類似吧,后面找到了我對比一下,不同的話再寫一篇來分析,
帶著問題出發
如果是自己實作一個簡單的佇列功能,我們該如何設計它的存盤結構呢?一般來說有這兩種方式:陣列或者鏈表,先來簡單分析下,
我們都知道,陣列是固定空間的集合,意味著初始化的時候要指定陣列大小,但是佇列的長度是隨時變化的,超出陣列大小了怎么辦?這時候就必須要對陣列進行擴容,問題又來了,擴容要擴多少呢,少了不夠用多了浪費記憶體空間,與之相反的,鏈表是動態空間型別的資料結構,元素之間通過指標相連,不需要提前分配空間,需要多少分配多少,但隨之而來的問題是,大量的出隊入隊操作伴隨著大量物件的創建銷毀,GC的壓力又變得非常大,
事實上,在C#的普通佇列Queue型別中選擇使用陣列進行實作,它實作了一套擴容機制,這里不再詳細描述,有興趣的直接看原始碼,比較簡單,
回到主題,要實作一個高性能的執行緒安全佇列,我們試著回答以下問題:
- 存盤結構是怎樣的
- 如何初始化(初始容量給多少比較好?)
- 常用操作(入隊出隊)如何實作
- 執行緒安全是如何保證的
存盤結構
通過原始碼可以看到ConcurrentQueue采用了陣列+鏈表的組合模式,充分吸收了2種結構的優點,
具體來說,它的總體結構是一個鏈表,鏈表的每個節點是一個包含陣列的特殊物件,我們稱之為Segment(段或節,原話是a queue is a linked list of small arrays, each node is called a segment.),它里面的陣列是存盤真實資料的地方,容量固定大小是32,每一個Segment有指向下一個Segment的的指標,以此形成鏈表結構,而佇列中維護了2個特殊的指標,他們分別指向佇列的首段(head segment)和尾段(tail segment),他們對入隊和出隊有著重要的作用,用一張圖來解釋佇列的內部結構:

嗯,畫圖畫到這里突然聯想到,搞成雙向鏈表的話是不是就神似B+樹的葉子節點?技術就是這么奇妙~
段的核心定義為:
/// <summary>
/// private class for ConcurrentQueue.
/// 鏈表節點(段)
/// </summary>
private class Segment
{
//實際存盤資料的容器
internal volatile T[] m_array;
//存盤對應位置資料的狀態,當資料的對應狀態位標記為true時該資料才是有效的
internal volatile VolatileBool[] m_state;
//下一段的指標
private volatile Segment m_next;
//當前段在佇列中的索引
internal readonly long m_index;
//兩個位置指標
private volatile int m_low;
private volatile int m_high;
//所屬的佇列實體
private volatile ConcurrentQueue<T> m_source;
}
佇列的核心定義為:
/// <summary>
/// 執行緒安全的先進先出集合,
/// </summary>
public class ConcurrentQueue<T> : IProducerConsumerCollection<T>, IReadOnlyCollection<T>
{
//首段
[NonSerialized]
private volatile Segment m_head;
//尾段
[NonSerialized]
private volatile Segment m_tail;
//每一段的大小
private const int SEGMENT_SIZE = 32;
//截取快照的運算元量
[NonSerialized]
internal volatile int m_numSnapshotTakers = 0;
}
常規操作
先從初始化一個佇列開始看起,
創建佇列實體
與普通Queue不同的是,ConcurrentQueue不再支持初始化時指定佇列大小(capacity),僅僅提供一個無參建構式和一個IEnumerable<T>引數的建構式,
/// <summary>
/// Initializes a new instance of the <see cref="ConcurrentQueue{T}"/> class.
/// </summary>
public ConcurrentQueue()
{
m_head = m_tail = new Segment(0, this);
}
無參建構式很簡單,創建了一個Segment實體并把首尾指標都指向它,此時佇列只包含一個Segment,它的索引是0,佇列容量是32,
繼續看一下Segment是如何被初始化的:
/// <summary>
/// Create and initialize a segment with the specified index.
/// </summary>
internal Segment(long index, ConcurrentQueue<T> source)
{
m_array = new T[SEGMENT_SIZE];
m_state = new VolatileBool[SEGMENT_SIZE]; //all initialized to false
m_high = -1;
Contract.Assert(index >= 0);
m_index = index;
m_source = source;
}
Segment只提供了一個建構式,接受的引數分別是佇列索引和佇列實體,它創建了一個長度為32的陣列,并創建了與之對應的狀態陣列,然后初始化了位置指標(m_low=0,m_high=-1,此時表示一個空的Segment),
到這里,一個并發佇列就創建好了,
使用集合創建佇列的程序和上面類似,只是多了兩個步驟:入隊和擴容,下面會重點描述這兩部分所以這里不再過多介紹,
元素入隊
先亮出原始碼:
/// <summary>
/// Adds an object to the end of the <see cref="ConcurrentQueue{T}"/>.
/// </summary>
/// <param name="item">The object to add to the end of the <see
/// cref="ConcurrentQueue{T}"/>. The value can be a null reference
/// (Nothing in Visual Basic) for reference types.
/// </param>
public void Enqueue(T item)
{
SpinWait spin = new SpinWait();
while (true)
{
Segment tail = m_tail;
if (tail.TryAppend(item))
return;
spin.SpinOnce();
}
}
通過原始碼可以看到,入隊操作是在隊尾(m_tail)進行的,它嘗試在最后一個Segment中追加指定的元素,如果成功了就直接回傳,失敗的話就自旋等待,直到成功為止,那什么情況下會失敗呢?這就要繼續看看是如何追加元素的:
internal bool TryAppend(T value)
{
//先判斷一下高位指標有沒有達到陣列邊界(也就是陣列是否裝滿了)
if (m_high >= SEGMENT_SIZE - 1)
{
return false;
}
int newhigh = SEGMENT_SIZE;
try
{ }
finally
{
//使用原子操作讓高位指標加1
newhigh = Interlocked.Increment(ref m_high);
//如果陣列還有空位
if (newhigh <= SEGMENT_SIZE - 1)
{
//把資料放到陣列中,同時更新狀態
m_array[newhigh] = value;
m_state[newhigh].m_value = https://www.cnblogs.com/hohoa/p/true;
}
//陣列滿了要觸發擴容
if (newhigh == SEGMENT_SIZE - 1)
{
Grow();
}
}
return newhigh <= SEGMENT_SIZE - 1;
}
所以,只有當尾段m_tail裝滿的情況下追加元素才會失敗,這時候必須要等待下一個段產生,也就是擴容(細細品一下Grow這個詞真的很妙),自旋就是在等擴容完成才能有地方放資料,而在保存資料的時候,通過原子自增操作保證了同一個位置只會有一個資料被寫入,從而實作了執行緒安全,
注意:這里的裝滿并不是指陣列每個位置都有資料,而是指最后一個位置已被使用,
繼續看一下擴容是怎么一個程序:
/// <summary>
/// Create a new segment and append to the current one
/// Update the m_tail pointer
/// This method is called when there is no contention
/// </summary>
internal void Grow()
{
//no CAS is needed, since there is no contention (other threads are blocked, busy waiting)
Segment newSegment = new Segment(m_index + 1, m_source); //m_index is Int64, we don't need to worry about overflow
m_next = newSegment;
Contract.Assert(m_source.m_tail == this);
m_source.m_tail = m_next;
}
在普通佇列中,擴容是通過創建一個更大的陣列然后把資料拷貝過去實作擴容的,這個操作比較耗時,而在并發佇列中就非常簡單了,首先創建一個新Segment,然后把當前Segment的next指向它,最后掛到佇列的末尾去就可以了,全部是指標操作非常高效,而且從代碼注釋中可以看到,這里不會出現執行緒競爭的情況,因為其他執行緒都因為位置不夠被阻塞都在自旋等待中,
元素出隊
還是先亮出原始碼:
public bool TryDequeue(out T result)
{
while (!IsEmpty)
{
Segment head = m_head;
if (head.TryRemove(out result))
return true;
//since method IsEmpty spins, we don't need to spin in the while loop
}
result = default(T);
return false;
}
可以看到只有在佇列不為空(IsEmpty==false)的情況下才會嘗試出隊操作,而出隊是在首段上進行操作的,關于如何判斷佇列是否為空總結就一句話:當首段m_head不包含任何資料且沒有下一段的時候佇列才為空,詳細的判斷程序原始碼注釋中寫的很清楚,限于篇幅不詳細介紹,
出隊的本質是從首段中移除低位指標所指向的元素,看一下具體實作步驟:
internal bool TryRemove(out T result)
{
SpinWait spin = new SpinWait();
int lowLocal = Low, highLocal = High;
//判斷當前段是否為空
while (lowLocal <= highLocal)
{
//判斷低位指標位置是否可以移除
if (Interlocked.CompareExchange(ref m_low, lowLocal + 1, lowLocal) == lowLocal)
{
SpinWait spinLocal = new SpinWait();
//判斷元素是否有效
while (!m_state[lowLocal].m_value)
{
spinLocal.SpinOnce();
}
//取出元素
result = m_array[lowLocal];
//釋放參考關系
if (m_source.m_numSnapshotTakers <= 0)
{
m_array[lowLocal] = default(T);
}
//判斷當前段的元素是否全部被移除了,要丟棄它
if (lowLocal + 1 >= SEGMENT_SIZE)
{
spinLocal = new SpinWait();
while (m_next == null)
{
spinLocal.SpinOnce();
}
Contract.Assert(m_source.m_head == this);
m_source.m_head = m_next;
}
return true;
}
else
{
//執行緒競爭失敗,自旋等待并重置
spin.SpinOnce();
lowLocal = Low; highLocal = High;
}
}//end of while
result = default(T);
return false;
}
首先,只有當前Segment不為空的情況下才嘗試移除元素,否則就直接回傳false,然后通過一個原子操作Interlocked.CompareExchange判斷當前低位指標上是否有其他執行緒同時也在移除,如果有那就進入自旋等待,沒有的話就從這個位置取出元素并把低位指標往前推進一位,如果當前佇列沒有正在進行截取快照的操作,那取出元素后還要把這個位置給釋放掉,當這個Segment的所有元素都被移除掉了,這時候要把它丟棄,簡單來說就是讓佇列的首段指標指向它的下一段即可,丟棄的這一段等著GC來收拾它,
這里稍微提一下Interlocked.CompareExchange,它的意思是比較和交換,也就是更為大家所熟悉的CAS(Compare-and-Swap),它主要做了以下2件事情:
- 比較m_low和lowLocal的值是否相等
- 如果相等則m_low=lowLocal+1,如果不相等就什么都不做,不管是否相等,始侄訓傳m_low的原始值
整個操作是原子性的,對CPU而言就是一條指令,這樣就可以保證當前位置只有一個執行緒執行出隊操作,
還有一個
TryPeek()方法和出隊類似,它是從隊首獲取一個元素但是無需移除該元素,可以看做Dequeue的簡化版,不再詳細介紹,
獲取佇列中元素的數量
與普通Queue不同的是,ConcurrentQueue并沒有維護一個表示佇列中元素個數的計數器,那就意味著要得到這個數量必須實時去計算,我們看一下計算程序:
public int Count
{
get
{
Segment head, tail;
int headLow, tailHigh;
GetHeadTailPositions(out head, out tail, out headLow, out tailHigh);
if (head == tail)
{
return tailHigh - headLow + 1;
}
int count = SEGMENT_SIZE - headLow;
count += SEGMENT_SIZE * ((int)(tail.m_index - head.m_index - 1));
count += tailHigh + 1;
return count;
}
}
大致思路是,先計算(GetHeadTailPositions)出首段的低位指標和尾段的高位指標,這中間的總長度就是我們要的數量,然后分成3節依次累加每一個Segment包含的元素個數得到最終的佇列長度,可以看到這是一個開銷比較大的操作,
正因為如此,微軟官方推薦使用IsEmpty屬性來判斷佇列是否為空,而不是使用佇列長度Count==0來判斷,使用ConcurrentStack也是一樣,
截取快照(take snapshot)
所謂的take snapshot就是指一些格式轉換的操作,例如ToArray()、ToList()、GetEnumerator()這種型別的方法,在前面佇列的核心定義中我們提到有一個m_numSnapshotTakers欄位,這時候就派上用場了,下面以比較典型的ToList()原始碼舉例說明:
private List<T> ToList()
{
// Increments the number of active snapshot takers. This increment must happen before the snapshot is
// taken. At the same time, Decrement must happen after list copying is over. Only in this way, can it
// eliminate race condition when Segment.TryRemove() checks whether m_numSnapshotTakers == 0.
Interlocked.Increment(ref m_numSnapshotTakers);
List<T> list = new List<T>();
try
{
Segment head, tail;
int headLow, tailHigh;
GetHeadTailPositions(out head, out tail, out headLow, out tailHigh);
if (head == tail)
{
head.AddToList(list, headLow, tailHigh);
}
else
{
head.AddToList(list, headLow, SEGMENT_SIZE - 1);
Segment curr = head.Next;
while (curr != tail)
{
curr.AddToList(list, 0, SEGMENT_SIZE - 1);
curr = curr.Next;
}
tail.AddToList(list, 0, tailHigh);
}
}
finally
{
// This Decrement must happen after copying is over.
Interlocked.Decrement(ref m_numSnapshotTakers);
}
return list;
}
可以看到,ToList的邏輯和Count非常相似,都是先計算出兩個首尾位置指標,然后把佇列分為3節依次遍歷處理,最大的不同之處在于方法的開頭和結尾分別對m_numSnapshotTakers做了一個原子操作,
在方法的第一行,使用Interlocked.Increment做了一次遞增,這時候表示佇列正在進行一次截取快照操作,在處理完后又在finally中用Interlocked.Decrement做了一次遞減表示當前操作已完成,這樣確保了在進行快照時不被出隊影響,感覺這塊很難描述的特別好,所以保留了原始的英文注釋,大家慢慢體會,
到這里,基本把ConcurrentQueue的核心說清楚了,
總結一下
回到文章開頭提出的幾個問題,現在應該有了很清晰的答案:
- 存盤結構 -- 采用陣列和鏈表的組合形式
- 如何初始化 -- 創建固定大小的段,無需指定初始容量
- 常用操作如何實作 -- 尾段入隊,首段出隊
- 執行緒安全問題 -- 使用SpinWait自旋等待和原子操作實作
以上所述均是個人理解,如果有錯誤的地方還請不吝指正,以免誤導他人,
推薦相關閱讀,篇篇都是干貨:https://www.cnblogs.com/lucifer1982/category/126755.html
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/63635.html
標籤:其他
上一篇:資料結構(二)—堆疊
