ConditionObject內部類
條件變數 ConditionObject 實作了 Condition 介面
變數構造器定義
public class ConditionObject implements Condition, java.io.Serializable {
//第一個等待節點
private transient Node firstWaiter;
//最后一個等待節點
private transient Node lastWaiter;
// 重復中斷狀態位
private static final int REINTERRUPT = 1;
//發生例外狀態位
private static final int THROW_IE = -1;
public ConditionObject() { }
}
await 等待操作
當節點被添加到等待佇列后,需要等待條件成立,await相關方法,
//根據外部中斷
public final void await() throws InterruptedException {
if (Thread.interrupted())//是否執行緒中斷
throw new InterruptedException();//拋出.,
Node node = addConditionWaiter();//生成一個Node等待節點將其放入條件阻塞佇列
int savedState = fullyRelease(node);//fullyRelease 呼叫release,釋放AQS競爭佇列中當前節點后面的等待鎖的節點
int interruptMode = 0;
while (!isOnSyncQueue(node)) {//節點未放入到AQS的競爭佇列之前一直阻塞
LockSupport.park(this);
//發生中斷時退出
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//重新獲取鎖
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
//保證執行緒不可中斷的等待
public final void awaitUninterruptibly() {
Node node = addConditionWaiter();
//將執行緒節點從AQS競爭佇列中 釋放
int savedState = fullyRelease(node);
boolean interrupted = false;
//節點在等待佇列, 讓執行緒阻塞 往里放
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if (Thread.interrupted())
interrupted = true;
}
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}
下面這三個 和await差不多 多了 時間 ,時間型別
public final long awaitNanos(long nanosTimeout){}//超時等待 整體流程和 aiait一樣,只是park方法有等待時長
public final boolean awaitUntil(Date deadline){}//park 方法換成了 parkUntil
public final boolean await(long time, TimeUnit unit){}
addConditionWaiter原理
//添加等待節點
private Node addConditionWaiter() {
Node t = lastWaiter;
// 判斷最后一個等待節點是否位空,狀態CONDITION 不是unlinkCancelledWaiters方法
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
//創建新的節點 插入到等待佇列 標準鏈表操作
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
fullyRelease原理
final int fullyRelease(Node node) {
boolean failed = true;
try {//當前執行緒的 state狀態
int savedState = getState();
if (release(savedState)) {//釋放 并喚醒后繼節點
failed = false;
return savedState;
} else {//失敗拋出例外
throw new IllegalMonitorStateException();
}
} finally {
if (failed)// 如果失敗,那么將節點狀態變為CANCELLED即可
node.waitStatus = Node.CANCELLED;
}
}
isOnSyncQueue原理
final boolean isOnSyncQueue(Node node) {
//節點狀態CONDITION 前節點不為空 說明不在競爭佇列上
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) //若節點的next節點不為空,那么一定在AQS 競爭佇列上
return true;
return findNodeFromTail(node);
}
private boolean findNodeFromTail(Node node) {
Node t = tail;//從尾節點開始遍歷 找到該節點
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
reportInterruptAfterWait原理
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
//根據外部狀態 執行不同操作
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
checkInterruptWhileWaiting 原理
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ? //執行緒是否發生了中斷 ,正常情況下回傳0
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
unlinkCancelledWaiters原理
//從條件佇列中斷開已取消的侍者節點,
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {//狀態CONDITION 的不要
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
就是說先將節點放進等待佇列,然后釋放競爭佇列的,等條件達到要求、中斷或者超時,執行緒等待完成后 重新獲取鎖
signal 喚醒操作
//喚醒操作 呼叫 doSignal方法
public final void signal() {
if (!isHeldExclusively())//保證當前執行緒持有鎖的狀態下執行
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
//喚醒一個等待節點 呼叫transferForSignal 方法
private void doSignal(Node first) {
do {//設定下一個等待節點
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);//如果放到 競爭佇列失敗 繼續喚醒下一個節點
}
singall
//喚醒所有 呼叫 doSignalAll方法
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {//回圈遍歷 transferForSignal 方法喚醒節點
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
transferForSignal原理
final boolean transferForSignal(Node node) {
//CAS 將執行緒節點狀態修改為0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//enq 方法 將節點添加到AQS競爭佇列
Node p = enq(node);
int ws = p.waitStatus;
//如果前驅節點的狀態 >0 CAS設定前驅節點狀態SIGNAL 失敗 unpark方法喚醒當前執行緒
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
transferForSignal方法將節點轉移到AQS的佇列中,實際是呼叫了enq 方法,
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/374735.html
標籤:java
