前言
可以通過分析PriorityBlockingQueue來了解JUC中的執行緒安全的佇列實作的一些套路,這些套路會在JUC中其他資料結構實作上反復出現,從而可以更合理的了解那些實作機制背后通用的部分,
BlockingQueue
A Queue that additionally supports operations that wait for the queue to become non-empty when retrieving an element, and wait for space to become available in the queue when storing an element.
阻塞佇列,這個介面就非常重要,它是定義了阻塞佇列需要實作的介面能力,它的子類有ArrayBlockingQueue,LinkedBlockingQueue,PriorityBlockingQueue等,
作為佇列的擴展,擴展的核心能力是阻塞能力,這個阻塞能力表示:當佇列空了的時候,獲取元素的操作需要阻塞,等待佇列有存盤新的元素進入,這里容易產生一個誤解:認為BlockingQueue也包含了當佇列滿的時候,放入操作阻塞的能力,這一點并不是BlockingQueue的能力要求,這和子類實作的是有界或無界佇列有關,
PriorityBlockingQueue
PriorityBlockingQueue的資料結構的實作是和PriorityQueue是一致的,完全可以參考前一篇的文章,這里重點是了解清楚通過鎖來保證執行緒安全的佇列的實作方式,知道了這些知識點,再要理解JUC中其他的佇列的實作簡直輕而易舉,
實作執行緒安全的關鍵兩個屬性:
/**
* Lock used for all public operations
*/
private final ReentrantLock lock;
/**
* Condition for blocking when empty
*/
private final Condition notEmpty;
各個操作的時候都執行lock.lock();鎖住,操作結束執行lock.unlock();解鎖,
比如簡單的查看元素數量的方法:
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return size;
} finally {
lock.unlock();
}
}
有了鎖,實作執行緒安全的操作變得簡單而不易出錯,下面我們看一些關鍵的操作方法的實作,
offer
offer方法將一個元素放入佇列,對于一個無界限佇列來說,需要處理擴容情況,而作為阻塞佇列,當放入元素意味著此時佇列不是空佇列,那么就需要通知那些來獲取佇列元素因為空佇列而阻塞的執行緒,繼續執行獲取元素的操作,
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;
Object[] array;
// 擴容邏輯
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftUpComparable(n, e, array);
else
siftUpUsingComparator(n, e, array, cmp);
size = n + 1;
// 喚醒執行緒
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}
對于一個元素的位置擺放和優先級佇列是一致的,完全可以參考前面一篇PriorityQueue,
take
take方法嘗試獲取佇列頭節點的元素,如果為空,就阻塞執行緒等待,直到佇列有元素再喚醒繼續執行獲取動作,這個Condition notEmpty內部機制可以參考前面的文章
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
while ( (result = dequeue()) == null)
// 等待喚醒
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}
通過notEmpty,實作了阻塞佇列的核心能力,那就是當獲取元素的時候佇列空了阻塞執行緒和當佇列有元素的時候進行喚醒動作,這一點在其他阻塞佇列中也這樣實作,另外,因為這是個無界佇列,并不會發生佇列滿的情況,所以就沒有在放入元素的時候處理阻塞的邏輯,而那些有界佇列就需要處理這種情況,當然,處理起來也非常簡單,再來一個標記佇列滿了的Condition就可以了,
擴容
這種佇列需要擴容,經過前面的一篇,我們已經不再陌生,而對于一個執行緒安全的佇列來說,正在擴容的時候只要確保持有鎖,擋住外界的讀寫操作,就不會有問題,如果你也是這么想的,那么從下面的實作代碼中就可以學習到一些優化細節和思路,
private void tryGrow(Object[] array, int oldCap) {
// 解鎖操作,這里需要清楚呼叫這個方法默認是需要保證獲得主鎖的
lock.unlock(); // must release and then re-acquire main lock
Object[] newArray = null;
// 對allocationSpinLock進行cas更新,呼叫的地方是用while包住的,所以沒有進入這個if的話還會自旋
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1)) {
try {
// 擴大的容量計算
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // grow faster if small
(oldCap >> 1));
// MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8
// oldCap最大就能到MAX_ARRAY_SIZE,如果計算出來的newCap大于MAX_ARRAY_SIZE,那么就判斷一下oldCap+1是不是已經超過了MAX_ARRAY_SIZE,如果超過就拋出OutOfMemoryError例外,也就是說最后一次擴容最大只能到MAX_ARRAY_SIZE的容量,下一次就不行了
if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
if (newCap > oldCap && queue == array)
// 新陣列
newArray = new Object[newCap];
} finally {
// 設定allocationSpinLock為0 放開擴容操作限制
allocationSpinLock = 0;
}
}
// 這個條件成立意味著cas失敗或者allocationSpinLock狀態為1,表示有執行緒正在擴容
if (newArray == null) // back off if another thread is allocating
// 讓出CPU
Thread.yield();
lock.lock();
if (newArray != null && queue == array) {
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
擴容操作并沒有像其他方法一樣上來就搶鎖,而是進行了lock.unlock()操作,再看下去發現它是使用原子cas更新allocationSpinLock來保證沒有其他執行緒可以并發執行擴容的邏輯代碼,這樣就優化了在擴容時對其他操作的性能影響,
總結
在清楚了PriorityQueue資料結構后對于理解PriorityBlockingQueue的實作機制就很簡單了,主要需要理解到ReentrantLock和Condition的作用,因為前面已經詳細進入過AQS系列的世界,現在看來從基礎的開始看起,一些花里胡哨的東西融會貫通也是容易的,
后面和PriorityBlockingQueue有點關聯的是DelayedWorkQueue和ScheduledThreadPoolExecutor,路線圖大概是這樣的:
UNSAFE->AQS->ReentrantLock+PriorityQueue->PriorityBlockingQueue->DelayedWorkQueue->ScheduledThreadPoolExecutor
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/540512.html
標籤:其他
