點關注,不迷路!如果本文對你有幫助的話不要忘記點贊支持哦!
1. Condition 定義
Condition是JUC里面提供于控制執行緒釋放鎖, 然后進行等待其他獲取鎖的執行緒發送 signal 信號來進行喚醒的工具類.
主要特點:
- Condition內部主要是由一個裝載執行緒節點 Node 的 Condition Queue 實作
- 對 Condition 的方法(await, signal等) 的呼叫必需是在本執行緒獲取了獨占鎖的前提下
- 因為 操作Condition的方法的前提是獲取獨占鎖, 所以 Condition Queue 內部是一條不支持并發安全的單向 queue (這是相對于 AQS 里面的 Sync Queue)
先看一下一個常見的 demo
import org.apache.log4j.Logger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* 此demo用于測驗 condition
* Created by xujiankang on 2017/2/8.
*/
public class ConditionTest {
private static final Logger logger = Logger.getLogger(ConditionTest.class);
static final Lock lock = new ReentrantLock();
static final Condition condition = lock.newCondition();
public static void main(String[] args) throws Exception{
final Thread thread1 = new Thread("Thread 1 "){
@Override
public void run() {
lock.lock(); // 執行緒 1獲取 lock
logger.info(Thread.currentThread().getName() + " 正在運行 .....");
try {
Thread.sleep(2 * 1000);
logger.info(Thread.currentThread().getName() + " 停止運行, 等待一個 signal ");
condition.await(); // 呼叫 condition.await 進行釋放鎖, 將當前節點封裝成一個 Node 放入 Condition Queue 里面, 等待喚醒
} catch (InterruptedException e) {
e.printStackTrace();
}
logger.info(Thread.currentThread().getName() + " 獲取一個 signal, 繼續執行 ");
lock.unlock(); // 釋放鎖
}
};
thread1.start(); // 執行緒 1 線運行
Thread.sleep(1 * 1000);
Thread thread2 = new Thread("Thread 2 "){
@Override
public void run() {
lock.lock(); // 執行緒 2獲取lock
logger.info(Thread.currentThread().getName() + " 正在運行.....");
thread1.interrupt(); // 對執行緒1 進行中斷 看看中斷后會怎么樣? 結果 執行緒 1還是獲取lock, 并且最后還進行 lock.unlock()操作
try {
Thread.sleep(2 * 1000);
}catch (Exception e){
}
condition.signal(); // 發送喚醒信號 從 AQS 的 Condition Queue 里面轉移 Node 到 Sync Queue
logger.info(Thread.currentThread().getName() + " 發送一個 signal ");
logger.info(Thread.currentThread().getName() + " 發送 signal 結束");
lock.unlock(); // 執行緒 2 釋放鎖
}
};
thread2.start();
}
}
整個執行步驟
- 執行緒 1 開始執行, 獲取 lock, 然后開始睡眠 2秒
- 當執行緒1睡眠到 1秒時, 執行緒2開始執行, 但是lock被執行緒1獲取, 所以 等待
- 執行緒 1 睡足2秒 呼叫 condition.await() 進行鎖的釋放, 并且將 執行緒1封裝成一個 node 放到 condition 的 Condition Queue里面, 等待其他獲取鎖的執行緒給他 signal, 或對其進行中斷(中斷后可以到 Sync Queue里面進而獲取 鎖)
- 執行緒 2 獲取鎖成功, 中斷 執行緒1, 執行緒被中斷后, node 從 Condition Queue 轉移到 Sync Queue 里面, 但是 lock 還是被 執行緒2獲取者, 所以 node呆在 Sync Queue 里面等待獲取 lock
- 執行緒 2睡了 2秒, 開始 用signal喚醒 Condition Queue 里面的節點(此時代表 執行緒1的node已經到 Sync Queue 里面)
- 執行緒 2釋放lock, 并且在 Sync Queue 里面進行喚醒等待獲取鎖的節點 node
- 執行緒1 得到喚醒, 獲取鎖
- 執行緒1 釋放鎖
執行結果
[2017-02-08 22:43:09,557] INFO Thread 1 (ConditionTest.java:26) - Thread 1 正在運行 .....
[2017-02-08 22:43:11,565] INFO Thread 1 (ConditionTest.java:30) - Thread 1 停止運行, 等待一個 signal
[2017-02-08 22:43:11,565] INFO Thread 2 (ConditionTest.java:48) - Thread 2 正在運行.....
java.lang.InterruptedException
[2017-02-08 22:43:13,566] INFO Thread 2 (ConditionTest.java:57) - Thread 2 發送一個 signal
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
[2017-02-08 22:43:13,566] INFO Thread 2 (ConditionTest.java:58) - Thread 2 發送 signal 結束
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
[2017-02-08 22:43:13,567] INFO Thread 1 (ConditionTest.java:35) - Thread 1 獲取一個 signal, 繼續執行
at com.lami.tuomatuo.search.base.concurrent.aqs.ConditionTest$1.run(ConditionTest.java:31)
2. Condition 建構式級基本屬性
主要是Condition Queue 的頭尾節點(這里頭尾節點不需要進行初始化)
/** First node of condition queue */
/** Condition Queue 里面的頭節點 */
private transient Node firstWaiter;
/** Last node of condition queue */
/** Condition Queue 里面的尾節點 */
private transient Node lastWaiter;
/** Creates a new {@code ConditionObject} instance */
/** 建構式 */
public ConditionObject(){}
3. Condition Queue enqueue節點方法 addConditionWaiter
addConditionWaiter方法主要用于呼叫 Condition.await 時將當前節點封裝成 一個Node, 加入到 Condition Queue里面
大家可以注意下, 下面對 Condition Queue 的操作都沒考慮到 并發(Sync Queue 的佇列是支持并發操作的), 這是為什么呢? 因為在進行操作 Condition 是當前的執行緒已經獲取了AQS的獨占鎖, 所以不需要考慮并發的情況
/**
* Adds a new waiter to wait queue
* 將當前執行緒封裝成一個 Node 節點 放入大 Condition Queue 里面
* 大家可以注意到, 下面對 Condition Queue 的操作都沒考慮到 并發(Sync Queue 的佇列是支持并發操作的), 這是為什么呢? 因為在進行操作 Condition 是當前的執行緒已經獲取了AQS的獨占鎖, 所以不需要考慮并發的情況
* @return
*/
private Node addConditionWaiter(){
Node t = lastWaiter; // 1. Condition queue 的尾節點
// If lastWaiter is cancelled, clean out // 2.尾節點已經Cancel, 直接進行清除,
// 這里有1個問題, 1 何時出現t.waitStatus != Node.CONDITION -> 在對執行緒進行中斷時 ConditionObject -> await -> checkInterruptWhileWaiting -> transferAfterCancelledWait "compareAndSetWaitStatus(node, Node.CONDITION, 0)" <- 導致這種情況一般是 執行緒中斷或 await 超時
// 一個注意點: 當Condition進行 awiat 超時或被中斷時, Condition里面的節點是沒有被洗掉掉的, 需要其他 await 在將執行緒加入 Condition Queue 時呼叫addConditionWaiter而進而洗掉, 或 await 操作差不多結束時, 呼叫 "node.nextWaiter != null" 進行判斷而洗掉 (PS: 通過 signal 進行喚醒時 node.nextWaiter 會被置空, 而中斷和超時時不會)
if(t != null && t.waitStatus != Node.CONDITION){
unlinkCancelledWaiters(); // 3. 呼叫 unlinkCancelledWaiters 對 "waitStatus != Node.CONDITION" 的節點進行洗掉(在Condition里面的Node的waitStatus 要么是CONDITION(正常), 要么就是 0 (signal/timeout/interrupt))
t = lastWaiter; // 4. 獲取最新的 lastWaiter
}
Node node = new Node(Thread.currentThread(), Node.CONDITION); // 5. 將執行緒封裝成 node 準備放入 Condition Queue 里面
if(t == null){
firstWaiter = node; // 6 .Condition Queue 是空的
}else{
t.nextWaiter = node; // 7. 最加到 queue 尾部
}
lastWaiter = node; // 8. 重新賦值 lastWaiter
return node;
}
4. Condition 喚醒 first節點方法 doSignal
這里的喚醒指的是將節點從 Condition Queue 轉移到 Sync Queue 里面
/**
* Removes and transfers nodes until hit non-cancelled one or
* null. Split out from signal in part to encourage compilers
* to inline the case of no waiters
* @param first
*/
/**
* 喚醒 Condition Queue 里面的頭節點, 注意這里的喚醒只是將 Node 從 Condition Queue 轉到 Sync Queue 里面(這時的 Node 也還是能被 Interrupt)
*/
private void doSignal(Node first){
do{
if((firstWaiter = first.nextWaiter) == null){ // 1. 將 first.nextWaiter 賦值給 nextWaiter 為下次做準備
lastWaiter = null; // 2. 這時若 nextWaiter == null, 則說明 Condition 為空了, 所以直接置空 lastWaiter
}
first.nextWaiter = null; // 3. first.nextWaiter == null 是判斷 Node 從 Condition queue 轉移到 Sync Queue 里面是通過 signal 還是 timeout/interrupt
}while(!transferForSignal(first) && (first = firstWaiter) != null); // 4. 呼叫 transferForSignal將 first 轉移到 Sync Queue 里面, 回傳不成功的話, 將 firstWaiter 賦值給 first
}
5. Condition 喚醒 所有 節點方法 doSignalAll
/**
* Removes and transfers all nodes
* @param first (non-null) the first node on condition queue
*/
/**
* 喚醒 Condition Queue 里面的所有的節點
*/
private void doSignalAll(Node first){
lastWaiter = firstWaiter = null; // 1. 將 lastWaiter, firstWaiter 置空
do{
Node next = first.nextWaiter; // 2. 初始化下個換新的節點
first.nextWaiter = null; // 3. first.nextWaiter == null 是判斷 Node 從 Condition queue 轉移到 Sync Queue 里面是通過 signal 還是 timeout/interrupt
transferForSignal(first); // 4. 呼叫 transferForSignal將 first 轉移到 Sync Queue 里面
first = next; // 5. 開始換新 next 節點
}while(first != null);
}
6. Condition 洗掉取消節點的方法 unlinkCancelledWaiters
一般的節點都會被 signal 喚醒, 從 Condition Queue 轉移到 Sync Queue, 而若遇到 interrupt 或 等待超時, 則直接改變 node 的狀態(從 CONDITION 變成 0), 并直接放入 Sync 里面, 而不清理Condition Queue 里面的節點, 所以需要下面的函式
/**
* http://czj4451.iteye.com/blog/1483264
*
* Unlinks cancelled waiter nodes from condition queue
* Called only while holding lock. This is called when
* cancellation occured during condition wait, and upon
* insertion of a new waiter when lastWaiter is seen to have
* been cancelled. This method is needed to avoid garbage
* retention in the absence of signals. So even though it may
* require a full traversal, it comes intot play when
* timeouts or cancellations all nodes rather than stoppping at a
* particular target to unlink all pointers to garbege nodes
* without requiring many re-traversals during cancellation
* storms
*/
/**
* 在 呼叫 addConditionWaiter 將執行緒放入 Condition Queue 里面時 或 awiat 方法獲取 差不多結束時 進行清理 Condition queue 里面的因 timeout/interrupt 而還存在的節點
* 這個洗掉操作比較巧妙, 其中引入了 trail 節點, 可以理解為traverse整個 Condition Queue 時遇到的最后一個有效的節點
*/
private void unlinkCancelledWaiters(){
Node t = firstWaiter;
Node trail = null;
while(t != null){
Node next = t.nextWaiter; // 1. 先初始化 next 節點
if(t.waitStatus != Node.CONDITION){ // 2. 節點不有效, 在Condition Queue 里面 Node.waitStatus 只有可能是 CONDITION 或是 0(timeout/interrupt引起的)
t.nextWaiter = null; // 3. Node.nextWaiter 置空
if(trail == null){ // 4. 一次都沒有遇到有效的節點
firstWaiter = next; // 5. 將 next 賦值給 firstWaiter(此時 next 可能也是無效的, 這只是一個臨時處理)
}else{
trail.nextWaiter = next; // 6. next 賦值給 trail.nextWaiter, 這一步其實就是洗掉節點 t
}
if(next == null){ // 7. next == null 說明 已經 traverse 完了 Condition Queue
lastWaiter = trail;
}
}else{
trail = t; // 8. 將有效節點賦值給 trail
}
t = next;
}
}
毫無疑問, 這是一段非常精巧的queue節點洗掉, 主要還是在 節點 trail 上, trail 節點可以理解為traverse整個 Condition Queue 時遇到的最后一個有效的節點
7. Condition 喚醒首節點方法 signal
/**
* Moves the longest-waiting thread, if one exists, from the
* wait queue for this condition to the wait queue for the
* owning lock
*
* @throws IllegalMonitorStateException if{@link #isHeldExclusively()}
* returns {@code false}
*/
/**
* 將 Condition queue 的頭節點轉移到 Sync Queue 里面
* 在進行呼叫 signal 時, 當前的執行緒必須獲取了 獨占的鎖
*/
@Override
public void signal() {
if(!isHeldExclusively()){ // 1. 判斷當前的執行緒是否已經獲取 獨占鎖
throw new IllegalMonitorStateException();
}
Node first = firstWaiter;
if(first != null){
doSignal(first); // 2. 呼叫 doSignal 進行轉移
}
}
上述面試題答案都整理成檔案筆記, 也還整理了一些面試資料&最新2020收集的一些大廠的面試真題(都整理成檔案,小部分截圖),有需要的可以 點擊進入暗號:csdn ,
8. Condition 喚醒所有節點方法 signalAll
/**
* Moves all threads from the wait queue for this condition to
* the wait queue for the owning lock
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively()}
* return {@code false}
*/
/**
* 將 Condition Queue 里面的節點都轉移到 Sync Queue 里面
*/
public final void signalAll(){
if(!isHeldExclusively()){
throw new IllegalMonitorStateException();
}
Node first = firstWaiter;
if(first != null){
doSignalAll(first);
}
}
9. Condition 釋放鎖進行等待方法 awaitUninterruptibly
awaitUninterruptibly 方法是一個不回應 中斷的方法
整個流程
- 將當前的執行緒封裝成 Node 加入到 Condition 里面
- 丟到當前執行緒所擁有的 獨占鎖
- 等待 其他獲取 獨占鎖的執行緒的喚醒, 喚醒從 Condition Queue 到 Sync Queue 里面, 進而獲取 獨占鎖
- 最后獲取 lock 之后, 在根據執行緒喚醒的方式(signal/interrupt) 進行處理
/**
* Implements uninterruptible condition wait
* <li>
* Save lock state return by {@link #getState()}
* </li>
*
* <li>
* Invoke {@link #release(int)} with saved state as argument,
* throwing IllegalMonitoringStateException if it fails
* Block until signalled
* Reacquire by invoking specified version of
* {@link #acquire(int)} with saved state as argument
* </li>
*/
/**
* 不回應執行緒中斷的方式進行 await
*/
public final void awaitUninterruptibly(){
Node node = addConditionWaiter(); // 1. 將當前執行緒封裝成一個 Node 放入 Condition Queue 里面
int savedState = fullyRelease(node); // 2. 釋放當前執行緒所獲取的所有的獨占鎖(PS: 獨占的鎖支持重入), 等等, 為什么要釋放呢? 以為你呼叫 awaitUninterruptibly 方法的前提就是你已經獲取了 獨占鎖
boolean interrupted = false; // 3. 執行緒中斷標識
while(!isOnSyncQueue(node)){ // 4. 這里是一個 while loop, 呼叫 isOnSyncQueue 判斷當前的 Node 是否已經被轉移到 Sync Queue 里面
LockSupport.park(this); // 5. 若當前 node 不在 sync queue 里面, 則先 block 一下等待其他執行緒呼叫 signal 進行喚醒; (這里若有其他執行緒對當前執行緒進行 中斷的換, 也能進行喚醒)
if(Thread.interrupted()){ // 6. 判斷這是喚醒是 signal 還是 interrupted(Thread.interrupted()會清楚執行緒的中斷標記, 但沒事, 我們有步驟7中的interrupted進行記錄)
interrupted = true; // 7. 說明這次喚醒是被中斷而喚醒的,這個標記若是true的話, 在 awiat 離開時還要 自己中斷一下(selfInterrupt), 其他的函式可能需要執行緒的中斷標識
}
}
if(acquireQueued(node, savedState) || interrupted){ // 8. acquireQueued 回傳 true 說明執行緒在 block 的程序中式被 inetrrupt 過(其實 acquireQueued 回傳 true 也有可能其中有一次喚醒是 通過 signal)
selfInterrupt(); // 9. 自我中斷, 外面的執行緒可以通過這個標識知道, 整個 awaitUninterruptibly 運行程序中 是否被中斷過
}
}
10. Condition 中斷標示
下面兩個是用于追蹤 呼叫 awaitXXX 方法時執行緒有沒有被中斷過
主要的區別是
1. REINTERRUPT: 代表執行緒是在 signal 后被中斷的 (REINTERRUPT = re-interrupt 再次中斷 最后會呼叫 selfInterrupt)
2. THROW_IE: 代表在接受 signal 前被中斷的, 則直接拋出例外 (Throw_IE = throw inner exception)
/**
* For interruptible waits, we need to track whether to throw
* InterruptedException, if interrupted while blocked on
* condition, versus reinterrupt current thread, if
* interrupted while blocked waiting to re-acquire
*/
/**
* 下面兩個是用于追蹤 呼叫 awaitXXX 方法時執行緒有沒有被中斷過
* 主要的區別是
* REINTERRUPT: 代表執行緒是在 signal 后被中斷的 (REINTERRUPT = re-interrupt 再次中斷 最后會呼叫 selfInterrupt)
* THROW_IE: 代表在接受 signal 前被中斷的, 則直接拋出例外 (Throw_IE = throw inner exception)
*/
/** Mode meaning to reinterrupt on exit from wait */
/** 在離開 awaitXX方法, 退出前再次 自我中斷 (呼叫 selfInterrupt)*/
private static final int REINTERRUPT = 1;
/** Mode meaning to throw InterruptedException on exit from wait */
/** 在離開 awaitXX方法, 退出前再次, 以為在 接受 signal 前被中斷, 所以需要拋出例外 */
private static final int THROW_IE = -1;
11. Condition 中斷處理方法
該方法主要是查 在 awaitXX 方法中的這次喚醒是否是中斷引起的, 若是中斷引起的, 就進行 Node 的轉移
/**
* Checks for interrupt, returning THROW_IE if interrupted
* before signalled, REINTERRUPT if after signalled, or
* 0 if not interrupted
*/
/**
* 檢查 在 awaitXX 方法中的這次喚醒是否是中斷引起的
* 若是中斷引起的, 則將 Node 從 Condition Queue 轉移到 Sync Queue 里面
* 回傳值的區別:
* 0: 此次喚醒是通過 signal -> LockSupport.unpark
* THROW_IE: 此次的喚醒是通過 interrupt, 并且 在 接受 signal 之前
* REINTERRUPT: 執行緒的喚醒是 接受過 signal 而后又被中斷
*/
private int checkInterruptWhileWaiting(Node node){
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;
}
12. Condition 中斷處理方法 reportInterruptAfterWait
/**
* Throws InterruptedException, reinterrupts current thread, or
* does nothing, depending on mode
*/
/**
* 這個方法是在 awaitXX 方法離開前呼叫的, 主要是根據
* interrupMode 判斷是拋出例外, 還是自我再中斷一下
*/
private void reportInterruptAfterWait(int interrupMode) throws InterruptedException{
if(interrupMode == THROW_IE){
throw new InterruptedException();
}
else if(interrupMode == REINTERRUPT){
selfInterrupt();
}
}
13. Condition 釋放鎖 進行等待的方法 await
await 此方法回應中斷請求, 當接受到中斷請求后會將節點從 Condition Queue 轉移到 Sync Queue
/**
* Implements interruptible condition wait
*
* <li>
* If current thread is interrupted, throw InterruptedException
* Save lock state returned by {@link #getState()}
* Invoke {@link #release(int)} with saved state as argument,
* throwing IllegalMonitorStateException if it fails
* Blocking until signalled or interrupted
* Reacquire by invoking specifized version of
* {@link #acquire(int)} with saved state as argument.
* If interrupted while blocked in step 4, throw InterruptedException
* </li>
*
* @throws InterruptedException
*/
/**
* 支持 InterruptedException 的 await <- 注意這里即使是執行緒被中斷,
* 還是需要獲取了獨占的鎖后, 再 呼叫 lock.unlock 進行釋放鎖
*/
@Override
public final void await() throws InterruptedException {
if(Thread.interrupted()){ // 1. 判斷執行緒是否中斷
throw new InterruptedException();
}
Node node = addConditionWaiter(); // 2. 將執行緒封裝成一個 Node 放到 Condition Queue 里面, 其中可能有些清理作業
int savedState = fullyRelease(node); // 3. 釋放當前執行緒所獲取的所有的鎖 (PS: 呼叫 await 方法時, 當前執行緒是必須已經獲取了獨占的鎖)
int interruptMode = 0;
while(!isOnSyncQueue(node)){ // 4. 判斷當前執行緒是否在 Sync Queue 里面(這里 Node 從 Condtion Queue 里面轉移到 Sync Queue 里面有兩種可能 (1) 其他執行緒呼叫 signal 進行轉移 (2) 當前執行緒被中斷而進行Node的轉移(就在checkInterruptWhileWaiting里面進行轉移))
LockSupport.park(this); // 5. 當前執行緒沒在 Sync Queue 里面, 則進行 block
if((interruptMode = checkInterruptWhileWaiting(node)) != 0){ // 6. 判斷此次執行緒的喚醒是否因為執行緒被中斷, 若是被中斷, 則會在checkInterruptWhileWaiting的transferAfterCancelledWait 進行節點的轉移; 回傳值 interruptMode != 0
break; // 說明此是通過執行緒中斷的方式進行喚醒, 并且已經進行了 node 的轉移, 轉移到 Sync Queue 里面
}
}
if(acquireQueued(node, savedState) && interruptMode != THROW_IE){ // 7. 呼叫 acquireQueued在 Sync Queue 里面進行 獨占鎖的獲取, 回傳值表明在獲取的程序中有沒有被中斷過
interruptMode = REINTERRUPT;
}
if(node.nextWaiter != null){ // clean up if cancelled // 8. 通過 "node.nextWaiter != null" 判斷 執行緒的喚醒是中斷還是 signal, 因為通過中斷喚醒的話, 此刻代表執行緒的 Node 在 Condition Queue 與 Sync Queue 里面都會存在
unlinkCancelledWaiters(); // 9. 進行 cancelled 節點的清除
}
if(interruptMode != 0){ // 10. "interruptMode != 0" 代表通過中斷的方式喚醒執行緒
reportInterruptAfterWait(interruptMode); // 11. 根據 interruptMode 的型別決定是拋出例外, 還是自己再中斷一下
}
}
14. Condition 釋放鎖 進行等待的方法 awaitNanos
awaitNanos 具有超時功能, 與回應中斷的功能, 不管中斷還是超時都會 將節點從 Condition Queue 轉移到 Sync Queue
/**
* Impelemnts timed condition wait
*
* <li>
* If current thread is interrupted, throw InterruptedException
* Save lock state returned by {@link #getState()}
* Invoke {@link #release(int)} with saved state as argument,
* throwing IllegalMonitorStateException if it fails
* Block until aignalled, interrupted, or timed out
* Reacquire by invoking specified version of
* {@link #acquire(int)} with saved state as argument
* If interrupted while blocked in step 4, throw InterruptedException
* </li>
*/
/**
* 所有 awaitXX 方法其實就是
* 0. 將當前的執行緒封裝成 Node 加入到 Condition 里面
* 1. 丟到當前執行緒所擁有的 獨占鎖,
* 2. 等待 其他獲取 獨占鎖的執行緒的喚醒, 喚醒從 Condition Queue 到 Sync Queue 里面, 進而獲取 獨占鎖
* 3. 最后獲取 lock 之后, 在根據執行緒喚醒的方式(signal/interrupt) 進行處理
* 4. 最后還是需要呼叫 lock./unlock 進行釋放鎖
*/
@Override
public final long awaitNanos(long nanosTimeout) throws InterruptedException {
if(Thread.interrupted()){ // 1. 判斷執行緒是否中斷
throw new InterruptedException();
}
Node node = addConditionWaiter(); // 2. 將執行緒封裝成一個 Node 放到 Condition Queue 里面, 其中可能有些清理作業
int savedState = fullyRelease(node); // 3. 釋放當前執行緒所獲取的所有的鎖 (PS: 呼叫 await 方法時, 當前執行緒是必須已經獲取了獨占的鎖)
final long deadline = System.nanoTime() + nanosTimeout; // 4. 計算 wait 的截止時間
int interruptMode = 0;
while(!isOnSyncQueue(node)){ // 5. 判斷當前執行緒是否在 Sync Queue 里面(這里 Node 從 Condtion Queue 里面轉移到 Sync Queue 里面有兩種可能 (1) 其他執行緒呼叫 signal 進行轉移 (2) 當前執行緒被中斷而進行Node的轉移(就在checkInterruptWhileWaiting里面進行轉移))
if(nanosTimeout <= 0L){ // 6. 等待時間超時(這里的 nanosTimeout 是有可能 < 0),
transferAfterCancelledWait(node); // 7. 呼叫 transferAfterCancelledWait 將 Node 從 Condition 轉移到 Sync Queue 里面
break;
}
if(nanosTimeout >= spinForTimeoutThreshold){ // 8. 當剩余時間 < spinForTimeoutThreshold, 其實函式 spin 比用 LockSupport.parkNanos 更高效
LockSupport.parkNanos(this, nanosTimeout); // 9. 進行執行緒的 block
}
if((interruptMode = checkInterruptWhileWaiting(node)) != 0){ // 10. 判斷此次執行緒的喚醒是否因為執行緒被中斷, 若是被中斷, 則會在checkInterruptWhileWaiting的transferAfterCancelledWait 進行節點的轉移; 回傳值 interruptMode != 0
break; // 說明此是通過執行緒中斷的方式進行喚醒, 并且已經進行了 node 的轉移, 轉移到 Sync Queue 里面
}
nanosTimeout = deadline - System.nanoTime(); // 11. 計算剩余時間
}
if(acquireQueued(node, savedState) && interruptMode != THROW_IE){ // 12. 呼叫 acquireQueued在 Sync Queue 里面進行 獨占鎖的獲取, 回傳值表明在獲取的程序中有沒有被中斷過
interruptMode = REINTERRUPT;
}
if(node.nextWaiter != null){ // 13. 通過 "node.nextWaiter != null" 判斷 執行緒的喚醒是中斷還是 signal, 因為通過中斷喚醒的話, 此刻代表執行緒的 Node 在 Condition Queue 與 Sync Queue 里面都會存在
unlinkCancelledWaiters(); // 14. 進行 cancelled 節點的清除
}
if(interruptMode != 0){ // 15. "interruptMode != 0" 代表通過中斷的方式喚醒執行緒
reportInterruptAfterWait(interruptMode); // 16. 根據 interruptMode 的型別決定是拋出例外, 還是自己再中斷一下
}
return deadline - System.nanoTime(); // 17 這個回傳值代表是 通過 signal 還是 超時
}
15. Condition 釋放鎖 進行等待的方法 awaitUntil
/**
* Implements absolute timed condition wait
* <li>
* If current thread is interrupted, throw InterruptedException
* Save lock state returned by {@link #getState()}
* Invoke {@link #release(int)} with saved state as argument,
* throwing IllegalMonitorStateException if it fails
* Block until signalled, interrupted, or timed out
* Reacquire by invoking specialized version of
* {@link #acquire(int)} with saved state as argument
* if interrupted while blocked in step 4, throw InterruptedException
* If timeed out while blocked in step 4, return false, else true
* </li>
*/
/**
* 所有 awaitXX 方法其實就是
* 0. 將當前的執行緒封裝成 Node 加入到 Condition 里面
* 1. 丟到當前執行緒所擁有的 獨占鎖,
* 2. 等待 其他獲取 獨占鎖的執行緒的喚醒, 喚醒從 Condition Queue 到 Sync Queue 里面, 進而獲取 獨占鎖
* 3. 最后獲取 lock 之后, 在根據執行緒喚醒的方式(signal/interrupt) 進行處理
* 4. 最后還是需要呼叫 lock./unlock 進行釋放鎖
*
* awaitUntil 和 awaitNanos 差不多
*/
@Override
public boolean awaitUntil(Date deadline) throws InterruptedException {
long abstime = deadline.getTime(); // 1. 判斷執行緒是否中斷
if(Thread.interrupted()){
throw new InterruptedException();
}
Node node = addConditionWaiter(); // 2. 將執行緒封裝成一個 Node 放到 Condition Queue 里面, 其中可能有些清理作業
int savedState = fullyRelease(node); // 3. 釋放當前執行緒所獲取的所有的鎖 (PS: 呼叫 await 方法時, 當前執行緒是必須已經獲取了獨占的鎖)
boolean timeout = false;
int interruptMode = 0;
while(!isOnSyncQueue(node)){ // 4. 判斷當前執行緒是否在 Sync Queue 里面(這里 Node 從 Condtion Queue 里面轉移到 Sync Queue 里面有兩種可能 (1) 其他執行緒呼叫 signal 進行轉移 (2) 當前執行緒被中斷而進行Node的轉移(就在checkInterruptWhileWaiting里面進行轉移))
if(System.currentTimeMillis() > abstime){ // 5. 計算是否超時
timeout = transferAfterCancelledWait(node); // 6. 呼叫 transferAfterCancelledWait 將 Node 從 Condition 轉移到 Sync Queue 里面
break;
}
LockSupport.parkUntil(this, abstime); // 7. 進行 執行緒的阻塞
if((interruptMode = checkInterruptWhileWaiting(node)) != 0){ // 8. 判斷此次執行緒的喚醒是否因為執行緒被中斷, 若是被中斷, 則會在checkInterruptWhileWaiting的transferAfterCancelledWait 進行節點的轉移; 回傳值 interruptMode != 0
break; // 說明此是通過執行緒中斷的方式進行喚醒, 并且已經進行了 node 的轉移, 轉移到 Sync Queue 里面
}
}
if(acquireQueued(node, savedState) && interruptMode != THROW_IE){ // 9. 呼叫 acquireQueued在 Sync Queue 里面進行 獨占鎖的獲取, 回傳值表明在獲取的程序中有沒有被中斷過
interruptMode = REINTERRUPT;
}
if(node.nextWaiter != null){ // 10. 通過 "node.nextWaiter != null" 判斷 執行緒的喚醒是中斷還是 signal, 因為通過中斷喚醒的話, 此刻代表執行緒的 Node 在 Condition Queue 與 Sync Queue 里面都會存在
unlinkCancelledWaiters(); // 11. 進行 cancelled 節點的清除
}
if(interruptMode != 0){ // 12. "interruptMode != 0" 代表通過中斷的方式喚醒執行緒
reportInterruptAfterWait(interruptMode); // 13. 根據 interruptMode 的型別決定是拋出例外, 還是自己再中斷一下
}
return !timeout; // 13. 回傳是否通過 interrupt 進行執行緒的喚醒
}
16. Condition 的 instrumentation 方法
/**
* Returns true if this condition was created by the given
* synchronization object
*/
/**判斷當前 condition 是否獲取 獨占鎖 */
final boolean isOwnedBy(KAbstractOwnableSynchronizer sync){
return sync == KAbstractQueuedSynchronizer.this;
}
/**
* Quires whether any threads are waiting on this condition
* Implements {@link KAbstractOwnableSynchronizer#"hasWaiters(ConditionObject)}
*
* @return {@code true} if there are any waiting threads
* @throws IllegalMonitorStateException if {@link #isHeldExclusively()}
* returns {@code false}
*/
/**
* 查看 Condition Queue 里面是否有 CONDITION 的節點
*/
protected final boolean hasWaiters(){
if(!isHeldExclusively()){
throw new IllegalMonitorStateException();
}
for(Node w = firstWaiter; w != null; w = w.nextWaiter ){
if(w.waitStatus == Node.CONDITION){
return true;
}
}
return false;
}
/**
* Returns an estimate of the number of threads waiting on
* this condition
* Implements {@link KAbstractOwnableSynchronizer#"getWaitQueueLength()}
*
* @return the estimated number of waiting threads
* @throws IllegalMonitorStateException if {@link #isHeldExclusively()}
* return {@code false}
*/
/**
* 獲取 Condition queue 里面的 CONDITION 的節點的個數
*/
protected final int getWaitQueueLength(){
if(!isHeldExclusively()){
throw new IllegalMonitorStateException();
}
int n = 0;
for(Node w = firstWaiter; w != null; w = w.nextWaiter){
if(w.waitStatus == Node.CONDITION){
++n;
}
}
return n;
}
/**
* Returns a collection containing those threads that may be
* waiting on this Condition
* Implements {@link KAbstractOwnableSynchronizer#'getWaitingThreads}
*
* @return the collection of thread
* @throws IllegalMonitorStateException if {@link #isHeldExclusively()}
* returns {@code false}
*/
/**
* 獲取 等待的執行緒
*/
protected final Collection<Thread> getWaitingThreads(){
if(!isHeldExclusively()){
throw new IllegalMonitorStateException();
}
ArrayList<Thread> list = new ArrayList<>();
for(Node w = firstWaiter; w != null; w = w.nextWaiter){
if(w.waitStatus == Node.CONDITION){
Thread t = w.thread;
if(t != null){
list.add(t);
}
}
}
return list;
}
到此這篇關于文章就結束了!
點關注,不迷路!如果本文對你有幫助的話不要忘記點贊支持哦!

上述面試題答案都整理成檔案筆記, 也還整理了一些面試資料&最新2020收集的一些大廠的面試真題(都整理成文檔,小部分截圖),有需要的可以 點擊進入暗號:csdn ,
希望對大家有所幫助,有用的話點贊給我支持!

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/50761.html
標籤:其他

