前言
在Java多執行緒中的wait/notify通信模式結尾就已經介紹過,Java執行緒之間有兩種種等待/通知模式,在那篇博文中是利用Object監視器的方法(wait(),notify()、notifyAll())實作的,然而在實際生產環境中不推薦使用此方法,建議使用condition的等待通知模式,JUC包中很多核心實作也確實證實了這點,所以這必然是學習JUC包原始碼的基礎,
如果之前閱讀過前不久介紹同步佇列的博文學習JUC原始碼(1)——AQS同步佇列(原始碼分析結合圖文理解),就能更好理解Condition等待佇列了,都是基于AQS.Node實作的佇列,兩者是同步器實作的核心所在!
主要參考資料《Java并發編程藝術》(有需要的小伙伴可以找我,我這里只有電子PDF)同時結合ReentranLock、AQS、ArrayBlockingQueue等原始碼,
一、Condition等待佇列介紹
1、對比Object監視器方法與Condition方法
以下對比圖來源于《Java并發編程藝術》,可以清楚看到Condition比Object監視器更加靈活,支持中斷回應等,

2、Condition方法使用介紹
我們先看下阻塞佇列ArrayBlockingQueue中關于condition的經典應用,這里使用就是condition的等待通知模式實作有界阻塞佇列,即簡單總結:當佇列滿時,阻塞插入執行緒,佇列空時,獲取元素的執行緒等待,
/** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull; public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { // 佇列滿時等待 while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } } public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { // 佇列空時進入Condition等待佇列等待 while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
從中我們看出兩點:
- Condition要結合Lock物件實作,兩者是同時存在的,準確來說先有Lock物件,然后再創建Condition物件
- 執行緒呼叫這些方法時候,需要提前獲取到Condition物件關聯的鎖,Condition物件是由Lock物件(Lock.newConditoin())創建出來的,也就是說Condition是依賴Lock物件的,
同樣地,我們可以從中抽取出等待-通知模式,然后撰寫Demo如下,其中標紅的可以理解為通用的等待/通知模式
public class ConditionDemo { ReentrantLock lock = new ReentrantLock(); // 一般Condition都是作為成員變數 Condition condition = lock.newCondition(); public void waitCondition() throws InterruptedException { lock.lock(); try { // 當前執行緒等待 // 呼叫signal方法后回傳 System.out.println(Thread.currentThread()+" is waiting..., now: "+new Date()); Thread.sleep(2000); condition.await(); // 注意這里是await,而不是wait System.out.println(Thread.currentThread()+" return..., now: "+new Date()); } finally { lock.unlock(); } } public void signalCondition() throws InterruptedException { lock.lock(); try { // 喚醒獲得condition上的等待鎖 System.out.println(Thread.currentThread()+" is signaling..., now: "+new Date()); condition.signal(); } finally { lock.unlock(); } } public static void main(String[] args) { ConditionDemo condition = new ConditionDemo(); new Thread(()->{ try { condition.waitCondition(); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); new Thread(()->{ try { condition.signalCondition(); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } }
運行Demo,可以看到等待執行緒確實等待了2s之后從await()方法回傳,
Thread[Thread-0,5,main] is waiting..., now: Wed Dec 23 21:35:53 CST 2020 Thread[Thread-1,5,main] is signaling..., now: Wed Dec 23 21:35:55 CST 2020 Thread[Thread-0,5,main] return..., now: Wed Dec 23 21:35:55 CST 2020
接下來就是深入原始碼理解Condition等待佇列是如何實作的
二、Condition等待佇列的實作分析(原始碼分析)
1、Condition等待佇列介紹
(1)概念認識
先引出簡單的認識,其實對比同步佇列來說,很好理解,實際上更加簡單
- 等待佇列是一個單向FIFO佇列,佇列每個節點都包含了一個執行緒參考,該執行緒是在Condition物件上等待的執行緒;
- 實際上這里的等待佇列和AQS中的同步佇列,都是采用AQS.Node靜態內部類;
- 一個ConditionObject擁有首節點(fisrtWaiter)和尾節點(lastWaiter);
- 如果一個執行緒呼叫了Condition.await()方法,那么該執行緒將會釋放鎖(從同步佇列中移除),構造成節點加入等待佇列,等待被喚醒;
- 如果一個執行緒呼叫了Condition.signal()方法,那么該執行緒將會被喚醒(從等待佇列中移除),構造成節點加入同步佇列,嘗試重新獲取同步狀態;
(2)等待佇列結構圖
實際上,Condition的實作是在AQS中內部類ConditionObject實作Condition具體實作的:

等待佇列的結構圖如下圖,相比較同步佇列而言:
- 等待佇列來說更加簡單,是單向FIFO佇列;
- Condition擁有首尾節點參考,新增節點直接nextWaiter指向即可,這個程序不需要CAS保證,因為呼叫Condition.await()方法肯定是獲取了鎖的執行緒,也就是說該程序是來保證執行緒安全的,

實際上,AQS同步器只擁有一個同步佇列,但卻有多個Condition等待佇列,如下圖,
2、await()方法實作決議
當呼叫await()方法時,相當于同步佇列的首節點(獲取了鎖的節點)移動到了Condition的等待佇列中,

更具體來說是首先呼叫await()方法之前肯定是能獲取到同步狀態的執行緒,也就是同步佇列中首節點,之后呼叫await()方法由將釋放鎖,進入等待佇列,
分析原始碼(重點部分都已經注釋,結合圖應該更好理解):
1)呼叫await()方法,通過addConditionWaiter()方法加入等待執行緒,然后釋放全部同步狀態
2)進入while回圈,判斷是否已經移動到同步佇列中,如果已經被移動到同步佇列中則說明執行緒已經被喚醒(signal);
3)接下來嘗試獲取競爭同步狀態,即呼叫acquireQueue方法
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // addConditionWaiter()方法當前執行緒加入等待佇列 Node node = addConditionWaiter(); // 呼叫await()方法后釋放鎖(同步狀態) int savedState = fullyRelease(node); int interruptMode = 0; // 檢查node是否在同步佇列中,不是的話說明已經獲取到鎖 // LockSupport.unpark喚醒執行緒后,從這里回傳,此時已經在SyncQueue同步佇列中,退出回圈 // 從這里也可以看出,也是經典的等待/通知模式 while (!isOnSyncQueue(node)) { // 阻塞當前執行緒 LockSupport.park(this); // 在呼叫signal前拋出中斷例外,或者呼叫之后中斷,都退出回圈 // THROW_IE if interrupted before signalled, REINTERRUPT if after signalled if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 被喚醒后的執行緒重新嘗試競爭獲取同步狀態 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
結構流程如下圖:

3、signal()方法實作決議
呼叫signal方法將會喚醒等待佇列中等待時間最長的節點(首節點),在喚醒節點之前,會將節點移動到同步佇列中

具體來說當前執行緒獲取到了鎖,接著獲取等待佇列的首節點,將其移動到同步佇列中,并且喚醒節點中的執行緒,
分析原始碼(重點部分都已經注釋,結合圖應該更好理解):
1)進入signal()方法,呼叫doSignal(Node node)方法移動到同步佇列中,并喚醒節點中執行緒
public final void signal() { // 判斷當前執行緒是否持有獲得鎖 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) // 移動到同步佇列中,并且喚醒節點中的執行緒 doSignal(first); } private void doSignal(Node first) { do { // 如果等待佇列中只有一個節點(即首節點),則喚醒首節點后lastWaiter置空 if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; // 否則獲取等待佇列中的首節點,即next域斷開置空 first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); }
2)doSignal(Node node)方法中呼叫transferForSignal(Node node),通過呼叫enq(Node node)方法(這里其實就是同步佇列的入隊enq(Node node)方法),等待佇列中的頭結點執行緒安全地移動到同步佇列,當節點移動到同步佇列后,當前執行緒將會被喚醒(LockSupport.unpark(node.thread)),
final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ // 如果沒有正確設定等待狀態為初始狀態準備加入同步佇列中,則回傳,當前節點狀態為Node.CONDITION if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; // 將等待佇列中的頭結點移動到同步佇列中, 回傳已經加入的當前node在同步佇列中前節點 Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) // 喚醒當前node執行緒,回傳while(isOnSynQueue(Node node))處,退出回圈 LockSupport.unpark(node.thread); return true; }
流程結構如下圖:

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/239432.html
標籤:其他
上一篇:面向物件三大特征
