深入理解Java并發框架AQS系列(一):執行緒
深入理解Java并發框架AQS系列(二):AQS框架簡介及鎖概念
深入理解Java并發框架AQS系列(三):獨占鎖(Exclusive Lock)
深入理解Java并發框架AQS系列(四):共享鎖(Shared Lock)
深入理解Java并發框架AQS系列(五):條件佇列(Condition)
一、前言
AQS中的條件佇列相比較前文中的“獨占鎖”、“共享鎖”等比較獨立,即便沒有條件佇列也絲毫不影響諸如ReentrantLock、Semaphore類的實作,那如此說來條件佇列是否就是一個可有可無的產物?答案是否定的,我們來看下直接或間接用到條件佇列的JDK并發類:
ReentrantLock獨占鎖經典類ReentrantReadWriteLock讀寫鎖ArrayBlockingQueue基于陣列的阻塞佇列CyclicBarrier回圈柵欄,解決執行緒同步問題DelayQueue延時佇列LinkedBlockingDeque雙向阻塞佇列PriorityBlockingQueue支持優先級的無界阻塞佇列ThreadPoolExecutor執行緒池構造器ScheduledThreadPoolExecutor可基于時間調度的執行緒池構造器StampedLock郵戳鎖,1.8后引入,更高效的讀寫鎖
如此豪華的陣容,可見Condition的地位不可小覷
我們簡單描述下條件佇列實作的功能:有3個執行緒A、B、C,分別呼叫wait/await方法后,執行緒進入阻塞,在沒有其他執行緒去喚醒的情況下,3個執行緒將永遠處于阻塞狀態,此時如果有另外執行緒呼叫notify/signal,那么A、B、C執行緒中的某一個將被激活(根據其進入條件佇列的順序而定),從而執行后續的邏輯;如果呼叫notifyAll/signalAll的話,那么3個執行緒都將被激活,這可能是我們對條件佇列的簡單認識,這樣的描述是否準確呢?可能不太嚴謹,我們引入JDK的條件佇列來做說明
統一話術:其實語法層面支持的wait/notify與AQS都屬于JDK的范疇,但為了區分兩者,我們定義如下:
- JDK條件佇列:語法層面提供支持的
wait/notify,即Object類中的wait()/notify()方法 - AQS條件佇列:AQS提供的條件佇列,即AQS內部的
ConditionObject類
二、JDK中的條件佇列(wait/notify)
眾所周知,在JDK中,wait/notify/notifyAll是根物件Object中內置的方法,且方法均被定義為native本地方法
// 等待
public final native void wait(long timeout) throws InterruptedException;
// 喚醒
public final native void notify();
// 喚醒所有等待執行緒
public final native void notifyAll();
2.1、wait
// 步驟1
synchronized (obj) {
// 步驟2
before();
// 步驟3
obj.wait();
// 步驟4
after();
}
相信大家對上述代碼并不陌生,我們將JDK的條件佇列抽象為4步,逐一闡述
- 步驟1:
synchronized (obj)- 在jdk中如果想呼叫
Object.wait()方法,必須首先獲取該物件的synchronized鎖,當前步驟,如果成功獲取到鎖,那么將進入“步驟2”,如果存在并發,當前執行緒將會進入阻塞(執行緒狀態為BLOCKED),知道獲取到鎖為止
- 在jdk中如果想呼叫
- 步驟2:
before()- 我們知道
synchronized是獨占鎖,所以在執行步驟2代碼時,程式是不存在并發的,即同一時刻,只有一個執行緒正在執行,此處也相對好理解
- 我們知道
- 步驟3:
obj.wait()- 此步驟是將當前執行緒放入條件佇列,同時釋放
obj的同步鎖,此處跟我們對synchronized的認知有悖,我們一般認為synchronized (obj) {......}在大括號中的代碼會一直持有鎖,而事實情況卻是,當程式執行wait()方法時,會釋放obj的同步鎖
- 此步驟是將當前執行緒放入條件佇列,同時釋放
- 步驟4:
after()- 此步驟是并發執行還是串行執行?假設我們現在有3個執行緒A、B、C都已經執行完畢
wait()方法,并進入了條件佇列,等待其他執行緒喚醒;此時另外一個執行緒執行了notifyAll()時,后續的激活流程是怎么樣的?- 錯誤觀點:有很多同學直觀感受是,執行緒A、B、C同時被激活,所以步驟4是并發執行的;就像是百米賽跑,所有同學都準備就緒(wait),一聲槍響后(notifyAll),所有人開始賽跑,并跑到終點(步驟4)
- 正確觀點:其實“步驟4”是串行執行的,大家再檢查下代碼后便可發現,“步驟4”處于
synchronized的大括號之間;還是拿上述賽跑舉例,如果認為從聽到槍響至跑到終點是“步驟4”的話,那真實的場景應該是這樣的:一聲槍響后,A起跑,B、C原地不動;A跑到終點后,B開始起跑,C原地不動;最后是C跑到終點
- 此步驟是并發執行還是串行執行?假設我們現在有3個執行緒A、B、C都已經執行完畢
由此我們斷定,obj.wait()雖然是native方法,但其內部經歷了釋放鎖、重新搶鎖的兩個大環節
2.2、notify
synchronized (obj) {
obj.notify();
// obj.notifyAll();
}
所有因obj.wait()阻塞的執行緒,都要通過notify來喚醒
notify()喚醒條件佇列中,隊首節點notifyAll()喚醒條件佇列中所有節點
三、AQS中的條件佇列(await/signal)
我們初看AQS中的條件佇列時,發現其提供了與JDK條件佇列幾乎一致的功能
| JDK | AQS |
|---|---|
wait |
await |
notify |
singal |
notifyAll |
singalAll |
用法上也及其相似:
await :
// 初始化
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
try {
lock.lock();
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
singal:
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
try {
lock.lock();
condition.signal();
} finally {
lock.unlock();
}
3.1、條件佇列
我們知道在AQS內部維護了一個阻塞佇列,資料結構如下:

上圖描述的是一個長度為 3 的FIFO阻塞佇列,因為頭結點常駐記憶體,所以不算在內;我們可以發現阻塞佇列中每個節點都包含了前、后參考
那AQS內部的另一個條件佇列又是什么樣的資料結構呢?

可見,條件佇列為單向串列,只有指向下一個節點的參考;沒有被喚醒的節點全部存盤在條件佇列上,上圖描述的是一個長度為 5 的條件佇列,即有5個執行緒執行了await()方法;與阻塞佇列不同,條件佇列沒有常駐記憶體的“head結點”,且一個處于正常狀態節點的waitStatus為 -2 ,當有新節點加入時,將會追加至佇列尾部
3.2、喚醒
當我們呼叫signal()方法時,會發生什么?我們還是拿長度為 5 的條件佇列舉例說明,在AQS內部會經歷佇列轉移,即由條件佇列轉移至阻塞佇列

而signalAll()執行時,具體執行流程與signal()類似,即會將條件佇列中的所有節點全部轉移至阻塞佇列(并發度為1,按順序依次激活)中,依靠阻塞佇列自身依次喚醒的機制,達到激活所有執行緒的目的
四、JDK vs AQS
經過上文的介紹,似乎AQS做了與wait/notify相同的功能,相比較而言,甚至JDK的寫法更簡潔;那他們在性能上的表現如何呢?讓我們來做個對比
4.1、對比
我們模擬這樣的一個場景:啟動10個執行緒,分別呼叫wait()方法,當所有執行緒都進入阻塞后,呼叫notifyAll(),10個執行緒均被喚醒并執行完畢后,方法結束, 上述方法執行10000次,對比JDK與AQS耗時
JDK測驗代碼:
public class ConditionCompareTest {
@Test
public void runTest() throws InterruptedException {
long begin = System.currentTimeMillis();
for (int i = 0; i < 10000; i++) {
if (i % 1000 == 0) {
System.out.println(i);
}
jdkTest();
}
long cost = System.currentTimeMillis() - begin;
System.out.println("耗時: " + cost);
}
public void jdkTest() throws InterruptedException {
Object lock = new Object();
List<Thread> list = Lists.newArrayList();
// 步驟一:啟動10個執行緒,并進入wait等待
for (int i = 0; i < 10; i++) {
Thread thread = new Thread(() -> {
try {
synchronized (lock) {
lock.wait();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
thread.start();
list.add(thread);
}
// 步驟二:等待10個執行緒全部進入wait方法
while (true) {
boolean allWaiting = true;
for (Thread thread : list) {
if (thread.getState() != Thread.State.WAITING) {
allWaiting = false;
break;
}
}
if (allWaiting) {
break;
}
}
// 步驟三:喚醒10個執行緒
synchronized (lock) {
lock.notifyAll();
}
// 步驟四:等待10個執行緒全部執行完畢
for (Thread thread : list) {
thread.join();
}
}
}
AQS測驗代碼:
public class ConditionCompareTest {
private ReentrantLock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
@Test
public void runTest() throws InterruptedException {
long begin = System.currentTimeMillis();
for (int i = 0; i < 10000; i++) {
if (i % 1000 == 0) {
System.out.println(i);
}
aqsTest();
}
long cost = System.currentTimeMillis() - begin;
System.out.println("耗時: " + cost);
}
@Test
public void aqsTest() throws InterruptedException {
AtomicInteger lockedNum = new AtomicInteger();
List<Thread> list = Lists.newArrayList();
// 步驟一:啟動10個執行緒,并進入wait等待
for (int i = 0; i < 10; i++) {
Thread thread = new Thread(() -> {
try {
lock.lock();
lockedNum.incrementAndGet();
condition.await();
lock.unlock();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
thread.start();
list.add(thread);
}
// 步驟二:等待10個執行緒全部進入wait方法
while (true) {
if (lockedNum.get() != 10) {
continue;
}
boolean allWaiting = true;
for (Thread thread : list) {
if (thread.getState() != Thread.State.WAITING) {
allWaiting = false;
break;
}
}
if (allWaiting) {
break;
}
}
// 步驟三:喚醒10個執行緒
lock.lock();
condition.signalAll();
lock.unlock();
// 步驟四:等待10個執行緒全部執行完畢
for (Thread thread : list) {
thread.join();
}
}
}
| 條件佇列 | 耗時1 | 耗時2 | 耗時3 | 耗時4 | 耗時5 | 平均耗時(ms) |
|---|---|---|---|---|---|---|
JDK |
5000 | 5076 | 5054 | 5089 | 4942 | 5032 |
AQS |
5358 | 5440 | 5444 | 5473 | 5472 | 5437 |
4.2、基準測驗Q&A
基于以上的測驗我們還是有一些疑問的,不要小看這些疑問,通過這些疑問我們可以把之前的知識點全都串聯起來
- Q:AQS測驗中的“步驟二”,為什么在判斷“等待10個執行緒全部進入wait方法”時,要引入
lockedNum.get() != 10的判斷?直接通過判斷所有執行緒是否均為waiting方法不可以嗎? - A:如果真的洗掉
lockedNum.get() != 10的判斷,在多次并發測驗時,會有較小的概率出現程式死鎖的情況(作者電腦的環境是平均5萬次呼叫會出現一次),為什么會出現死鎖呢?我們追AQS原始碼就會發現,不管是呼叫lock()還是await,掛起執行緒使用的方法均為LockSupport.park()方法,此方法會將執行緒置為WAITING狀態,也就是執行緒狀態是WAITING狀態時,有可能執行緒剛進入lock()方法,從而導致await與thread.join()的死鎖 - Q:既然是這樣,為什么JDK的測驗沒有出現死鎖?
- A:我們看到JDK的加鎖是通過
synchronized關鍵字完成的,而當執行緒因為等待synchronized資源而阻塞時,執行緒狀態將變為BLOCKED,而進入wait()方法后,狀態才會變為WAITING - Q:那看來只有通過引入
AtomicInteger lockedNum變數才能解決死鎖問題了 - A:其實解決問題的方式有很多種,我們甚至可以簡單將
ReentrantLock lock置為公平鎖,也能解決上述死鎖問題;因為當前場景發生死鎖的情況是,singalAll()先于await()發生,而當所有執行緒都變成WAITING狀態后,公平鎖則確保了singalAll()一定是在所有執行緒都呼叫了await(),但因為synchronized本身是非公平鎖,故如果AQS使用公平鎖的話,性能偏差較大 - Q:那這樣看來,AQS中的阻塞佇列相對比JDK的沒有優勢可言啊,用法上沒有JDK簡潔,性能上還沒人家快
- A:的確,如果真是只是單純的使用阻塞、喚醒功能的話,還是建議使用JDK內置的方式;但AQS的優勢并不在此
五、再說AQS條件佇列
AQS的優勢在于,其提供了豐富的api可以查詢條件佇列的狀態;例如當我們想看一下在條件佇列中等待節點的個數時,使用JDK的wait/notify時,是無法做的;AQS提供的api如下:
boolean hasWaiters()阻塞佇列中是否有等待節點int getWaitQueueLength()獲取阻塞佇列長度Collection<Thread> getWaitingThreads()獲取阻塞佇列中執行緒物件
這些api為程式提供了更靈活的控制,條件佇列對于javaer已不是黑盒;當然使用AQS的條件佇列必然要引入獨占鎖,例如ReentrantLock,自然地我們還可以通過它查看條件佇列外圍的一些指標,例如:
Interrupted回應中斷,借助獨占鎖,提供回應中斷能力;wait/notify不提供,因為雖然wait方法回應中斷,但是synchronized關鍵字是會一直阻塞的boolean tryLock()嘗試獲取鎖;wait/notify不提供int getHoldCount()獲取阻塞執行緒的數量boolean isLocked()是否持有鎖fair/nonFair提供公平/非公平鎖...
可見整個AQS體系相比較Object的wait/notify方法是相當靈活的,提供了很多監控條件佇列、阻塞佇列的指標
六、致謝
這里要特別感謝一下神策資料的架構師金滿倉,同時也是我私下的摯友,他功力深厚,對程式有著自己獨到的見地,在整個AQS撰寫期間,不厭其煩地給我提供了很多理論及資料上的支持,幫我拓寬視野,再次感謝!
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/281180.html
標籤:架構設計
