AQS原始碼分析
AQS全稱AbstractQueuedSynchronizer(抽象佇列同步器)
AQS中維護了一個被volatile修飾的int型別的同步狀態state,以及CLH等待佇列,
state同步狀態用于維護同步資源被使用的情況,AQS本身并不關心state的值及其含義,完全由AQS的子類去定義以及維護,
CLH等待佇列是由一個雙向鏈表來實作的,存在head和tail指標分別指向鏈表中的頭節點以及尾節點,同時鏈表中的節點由AQS中的Node靜態內部類來表示,
ReentrantLock、ReentrantReadWriteLock、CountDownLatch、CyclicBarrier、Semaphore底層都是基于AQS來實作的,

AQS支持的模式
AQS支持兩種模式,一種是獨占模式,一種是共享模式,
獨占模式表示,同步資源在同一時刻只能被一個執行緒所持有,對應AQS的acquire()以及release()方法,
共享模式表示,同步資源在同一時刻可以被多個執行緒所持有,對應AQS的acquireShared()以及releaseShared()方法,
acquire()方法:獨占模式下獲取同步資源,
release()方法:獨占模式下釋放同步資源,
acquireShared()方法:共享模式下獲取同步資源,
releaseShared()方法:共享模式下釋放同步資源,
AQS使用了模板方法設計模式,在acquire()、release()、acquireShared()、releaseShared()方法中都會呼叫其對應的try方法,比如acquire()方法中會呼叫tryAcquire()方法,release()方法中會呼叫tryRelease()方法,AQS子類只需要重寫AQS提供的tryAcquire()、tryRelease()或tryAcquireShared()、tryReleaseShared()方法即可,只需要告訴AQS嘗試獲取同步資源或嘗試釋放同步資源是否成功,同時需要保證方法的實作是執行緒安全的,
tryAcquire()、tryRelease()、tryAcquireShared()、tryReleaseShared()方法都沒有使用abstract進行修飾,同時方法中都會直接拋出UnsupportedOperationException例外,好處是不需要強制子類同時實作獨占模式和共享模式中的方法,因為大多數AQS的子類都僅支持一種模式,用戶只需要根據實際情況進行選擇即可,
tryAcquire(int arg)方法:獨占模式下嘗試獲取同步資源,同時AQS規定,如果獲取同步資源成功則回傳true,否則回傳false,
tryRelease(int arg)方法:獨占模式下嘗試釋放同步資源,同時AQS規定,如果釋放同步資源成功則回傳true,否則回傳false,
tryAcquireShared(int arg)方法:共享模式下嘗試獲取同步資源,同時AQS規定,如果獲取同步資源失敗則回傳負數,否則回傳剩余的資源個數,
tryReleaseShared(int arg)方法:共享模式下嘗試釋放同步資源,同時AQS規定,如果釋放同步資源成功則回傳true,否則回傳false,
剖析AQS中的Node類

Node類提供的核心屬性
// 節點封裝的執行緒
volatile Thread thread;
// 指向前驅節點的指標
volatile Node prev;
// 指向后繼節點的指標
volatile Node next;
// 節點的等待狀態(默認為0)(默認為0)(默認為0)
volatile int waitStatus;
// 下一個正在等待的節點
Node nextWaiter;
// 共享模式下的標識節點
static final Node SHARED = new Node();
// 獨占模式下的標識節點
static final Node EXCLUSIVE = null;
同時Node類中維護了一系列節點的等待狀態值
// CANCELLED狀態,表示執行緒已超時等等,處于CANCELLED狀態的節點會從等待佇列中剔除,不會參與到同步資源的競爭當中
static final int CANCELLED = 1;
// SIGNAL狀態,如果節點的等待狀態為SIGNAL,那么當它釋放同步資源時,將會喚醒離它最近的同時等待狀態不為CANCELLED的后繼節點(同時也能說明節點存在后繼節點)
static final int SIGNAL = -1;
// 表示執行緒在指定的條件下進行等待
static final int CONDITION = -2;
// PROPAGATE狀態,表示實際存在可用資源,需要再往下傳播(喚醒)
static final int PROPAGATE = -3;
因此每個Node節點中都會包含節點封裝的執行緒、分別指向前驅和后繼節點的指標、節點的等待狀態、指向下一個正在等待的節點的指標,
自定義AQS獨占模式下的同步器來實作獨享鎖
/**
* 自定義AQS獨占模式下的同步器來實作獨享鎖
*/
public class Mutex implements Lock, java.io.Serializable {
/**
* 自定義AQS獨占模式下的同步器
* 使用state為0表示當前鎖沒有被執行緒所持有
* 使用state為1表示當前鎖已經被執行緒所持有
*/
private static class Sync extends AbstractQueuedSynchronizer {
/**
* 判斷鎖是否被當前執行緒所持有
*/
protected boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
/**
* 嘗試獲取鎖
* 判斷鎖是否存在,如果鎖不存在則獲取鎖(通過CAS控制)
*/
public boolean tryAcquire(int acquires) {
assert acquires == 1; // 值必須是1(獨享鎖只有一把鎖嘛)
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread()); // 將當前執行緒設定為獨占模式下擁有同步資源的執行緒
return true;
}
return false;
}
/**
* 嘗試釋放鎖(要求被誰加的鎖只能被誰釋放)
* 判斷當前擁有同步資源的執行緒是否為當前執行緒,如果不是則拋出例外,否則釋放鎖
* 這里有三種呼叫情況,鎖空閑的狀態下呼叫、鎖已經被執行緒所持有但被并非擁有鎖的執行緒呼叫、鎖已經被執行緒所持有并被擁有鎖的執行緒呼叫,只有第三種情況才能夠解鎖成功
*/
protected boolean tryRelease(int releases) {
assert releases == 1; // 值必須是1(獨享鎖只有一把鎖嘛)
if (Thread.currentThread() != getExclusiveOwnerThread()) // 要求被誰加的鎖只能被誰釋放
throw new IllegalMonitorStateException();
if (getState() == 0) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null); // 將獨占模式中擁有同步資源的執行緒置為NULL
setState(0);
return true;
}
/**
* 提供一個Condition實體
*/
Condition newCondition() {
return new ConditionObject();
}
/**
* 判斷鎖是否被執行緒所持有
*/
final boolean isLocked() {
return getState() == 1;
}
}
/**
* 同步器
*/
private final Sync sync = new Sync();
/**
* 加鎖
*/
public void lock() {
sync.acquire(1);
}
/**
* 嘗試獲取鎖
*/
public boolean tryLock() {
return sync.tryAcquire(1);
}
/**
* 解鎖
* 解鎖只能呼叫同步器的release(),不能呼叫tryRelease()方法,因為tryRelease()方法只是簡單的修改一下同步狀態的值而已,并沒有去喚醒等待佇列中的執行緒,正常是需要喚醒等待佇列中離頭節點最近的同時等待狀態不為CANCELLED的節點
*/
public void unlock() {
sync.release(1);
}
/**
* 回傳與此Mutex系結的Condition實體
*/
public Condition newCondition() {
return sync.newCondition();
}
/**
* 判斷鎖是否被執行緒所持有
*/
public boolean isLocked() {
return sync.isLocked();
}
/**
* 判斷是否有執行緒在等待獲取鎖
*/
public boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
/**
* 可能拋出InterruptedException的加鎖(如果執行緒被設定了中斷標識那么直接拋出例外)
*/
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
/**
* 在指定的時間內嘗試獲取鎖
*/
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
}
AQS子類(同步器)一般都是通過內部類實作,然后作為內部組件來使用,
public class Main {
static class MyRunnable implements Runnable {
private Mutex mutex = new Mutex();
@Override
public void run() {
System.out.println(String.format("%s Running", Thread.currentThread().getName()));
mutex.lock();
System.out.println(String.format("%s加鎖", Thread.currentThread().getName()));
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
mutex.unlock();
System.out.println(String.format("%s解鎖", Thread.currentThread().getName()));
}
}
public static void main(String[] args) {
Runnable runnable = new MyRunnable();
Thread threadA = new Thread(runnable, "執行緒A");
Thread threadB = new Thread(runnable, "執行緒B");
Thread threadC = new Thread(runnable, "執行緒C");
threadA.start();
threadB.start();
threadC.start();
}
}

當等待佇列中的執行緒被喚醒,在嘗試獲取同步資源之前,其他執行緒可以呼叫acquire()或tryAcquire()方法來獲取同步資源,因此該鎖是非公平鎖,
獨占模式下獲取同步資源的原始碼分析
acquire()方法
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
總結:當執行緒要獲取同步資源時,可以呼叫acquire()或者tryAcquire()方法,acquire()方法中會呼叫AQS子類的tryAcquire()方法,嘗試獲取同步資源,如果獲取同步資源成功,則直接回傳,做自己的事情,否則將會執行addWaiter()方法,將當前執行緒封裝成Node節點然后加入到等待佇列當中,然后執行acquireQueued()方法,用于自旋獲取同步資源,如果所有條件都滿足那么最后將會執行selfInterrupt()方法,
addWaiter()方法
private Node addWaiter(Node mode) {
// 將當前執行緒封裝成Node節點,并且指定為獨占模式,獨占模式Node.EXCLUSIVE為NULL,也就是說節點的nextWaiter為NULL
Node node = new Node(Thread.currentThread(), mode);
// 將節點加入到隊尾當中
Node pred = tail;
if (pred != null) {
// 將當前節點的前驅指標指向尾節點
node.prev = pred;
// 通過CAS設定尾節點(如果pred指標所指向的尾節點就是當前的尾節點,也就是在這個程序當中沒有其他節點插入到隊尾,則將tail指標指向當前節點)
if (compareAndSetTail(pred, node)) {
// 將之前尾節點的后繼指標指向當前節點
pred.next = node;
return node;
}
}
// 如果不存在尾節點,也就是佇列為空,或者通過CAS設定尾節點失敗(也就是在這個程序當中有其他節點插入到隊尾),那么將會通過enq()方法死回圈進行設定,
enq(node);
// 無論怎么樣該方法最終都會回傳封裝了當前執行緒的節點,
return node;
}
總結:addWaiter()方法用于將當前執行緒封裝成Node節點然后加入到等待佇列當中,如果在這個程序中,等待佇列為慷訓者通過CAS設定尾節點失敗,那么將會通過enq()方法死回圈進行設定,
enq()方法
private Node enq(final Node node) {
// 死回圈
for (;;) {
Node t = tail;
// 如果尾節點為空則初始化佇列,創建一個空的節點,并且將head和tail指標都指向這個節點
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 將當前節點的前驅指標指向尾節點
node.prev = t;
// 通過CAS設定尾節點(如果t指標所指向的節點就是當前的尾節點,也就是在這個程序當中沒有其他節點插入到隊尾,則將tail指標指向當前節點)
if (compareAndSetTail(t, node)) {
// 將之前的尾節點的后繼指標指向當前節點
t.next = node;
return t;
}
}
}
}
總結:enq()方法中使用死回圈初始化佇列以及通過CAS設定尾節點,直到尾節點被設定成功,同時需要注意的是當佇列初始化后會有一個空的頭節點,該節點不包含任何的執行緒,然后再將當前節點加入到佇列當中,
acquireQueued()方法
final boolean acquireQueued(final Node node, int arg) {
// 失敗標識
boolean failed = true;
try {
// 中斷標識
boolean interrupted = false;
// 自旋
for (;;) {
// 獲取節點的前驅節點
final Node p = node.predecessor();
// 如果節點的前驅節點是頭節點那么嘗試獲取同步資源
// 強制要求佇列中的節點獲取同步資源的順序必須是從隊頭到隊尾,否則將會造成節點丟失,丟失了的節點中的執行緒將會永遠處于阻塞狀態,同時只有當執行緒獲取了同步資源后,它才能成為頭節點(佇列初始化后的頭節點除外),因此頭節點肯定是已經獲取過同步資源的(佇列初始化后的頭節點除外),因此為了遵循佇列中的節點獲取同步資源的順序必須是從隊頭到隊尾,所以永遠只有頭節點的后繼節點擁有嘗試獲取同步資源的權利,因此當在嘗試獲取同步資源之前,需要先判斷一下當前節點的前驅節點是否是頭節點,如果不是就不用獲取了
if (p == head && tryAcquire(arg)) {
// 當獲取同步資源成功,則將當前節點設定為頭節點
setHead(node);
// 將之前頭節點的后繼指標設定為null,幫助GC
p.next = null;
failed = false;
// 回傳中斷標識
return interrupted;
}
// 如果節點的前驅節點不是頭節點,或者嘗試獲取同步資源失敗,那么將會呼叫shouldParkAfterFailedAcquire()方法,判斷執行緒能否進行阻塞,當執行緒能夠被阻塞時,將會呼叫parkAndCheckInterrupt()方法阻塞執行緒
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 如果在執行該方法的程序中,拋出了例外(執行緒超時等等),則failed標識為true,那么將會執行cancelAcquire()方法,將當前節點的等待狀態設定為CANCELLED,同時從等待佇列中剔除,
if (failed)
cancelAcquire(node);
}
}
總結:acquireQueued()方法用于自旋獲取同步資源,同時該方法的方法出口只有一個,也就是當節點的前驅節點是頭節點,同時嘗試獲取同步資源成功,那么就會將當前節點設定為頭節點,否則就會呼叫shouldParkAfterFailedAcquire()方法,判斷執行緒能否進行阻塞,當執行緒能夠被阻塞時,將會呼叫parkAndCheckInterrupt()方法阻塞執行緒,等待被喚醒,同時在執行acquireQueued()方法的程序中,如果拋出了例外,則failed標識為true,那么將會執行cancelAcquire()方法,將當前節點的等待狀態設定為CANCELLED,同時從等待佇列中剔除,
shouldParkAfterFailedAcquire()方法
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 獲取節點的前驅節點的等待狀態
int ws = pred.waitStatus;
// 如果前驅節點的等待狀態為SIGNAL,那么當它釋放同步資源時,將會自動喚醒離它最近的同時等待狀態不為CANCELLED的后繼節點,因此當前節點就可以直接阻塞了,等待被喚醒時再去嘗試獲取同步資源
if (ws == Node.SIGNAL)
return true;
// 如果前驅節點的等待狀態為CANCELLED,那么通過回圈找到前一個不為CANCELLED狀態的節點,并且將當前節點的前驅指標指向該節點,將該節點的后繼指標指向當前節點
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 通過CAS將前驅節點的等待狀態設定為SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
總結:shouldParkAfterFailedAcquire()方法用于判斷執行緒能否進行阻塞,以及剔除被設定為CANCELLED狀態的節點,
正常情況下,執行緒第一次進來shouldParkAfterFailedAcquire()方法時,會將前驅節點的等待狀態設定為SIGNAL,然后再次自旋進來該方法,判斷到前驅節點的等待狀態為SIGNAL,直接回傳,然后就進入待阻塞狀態,
當該節點的前驅節點被CANCELLED時,如果前驅節點的前驅節點是頭節點,那么將會喚醒當前節點,那么它會再次自旋進來該方法,判斷到前驅節點的等待狀態為CANCELLED,就會將當前節點的前驅指標指向前一個不為CANCELLED狀態的節點,也就是頭節點,然后再將頭節點的后繼指標指向當前節點,然后再次自旋進來該方法,判斷到前驅節點的等待狀態為SIGNAL,直接回傳,再次進入待阻塞狀態,
無論怎么樣通過shouldParkAfterFailedAcquire()方法的所有節點最終都會進入待阻塞狀態,也就是說等待佇列中除了頭節點以外的所有執行緒都會處于阻塞狀態,
parkAndCheckInterrupt()方法
private final boolean parkAndCheckInterrupt() {
// 阻塞當前執行緒,blocker物件使用當前物件
LockSupport.park(this);
// 當被喚醒時回傳執行緒的中斷標識
return Thread.interrupted();
}
總結:parkAndCheckInterrupt()方法用于阻塞執行緒,同時當執行緒被喚醒時會回傳執行緒的中斷標識,盡管如果執行緒被設定了中斷標識,但也不會影響執行緒繼續往下執行,只不過當它成功獲取到同步資源時,會呼叫一次selfInterrupt()方法,再次為執行緒設定中斷標識,
selfInterrupt()方法
static void selfInterrupt() {
// 為執行緒設定中斷標識
Thread.currentThread().interrupt();
}
總結:當獲取了同步資源的執行緒被設定了中斷標識,才會呼叫selfInterrupt()方法,再次為執行緒設定中斷標識,因為在parkAndCheckInterrupt()方法中已經呼叫過一次Thread.interrupted()方法,避免外部又再次呼叫Thread.interrupted()方法導致執行緒的中斷標識被清除,
cancelAcquire()方法
private void cancelAcquire(Node node) {
if (node == null)
return;
// 將當前節點封裝的執行緒設定為NULL
node.thread = null;
// 通過回圈獲取當前節點不為CANCELLED狀態的前驅節點
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 獲取前驅節點的后繼節點(如果節點的前驅節點不是CANCELLED狀態,那么前驅節點的后繼節點就是它自己)
Node predNext = pred.next;
// 將節點的等待狀態設定為CANCELLED
node.waitStatus = Node.CANCELLED;
// 如果當前節點是尾節點,則直接通過CAS將tail指標指向當前節點不為CANCELLED狀態的前驅節點,同時通過CAS將前驅節點的后繼指標設定為NULL
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
// 如果當前節點的前驅節點不是頭節點 同時 前驅節點的等待狀態為SIGNAL(如果不是SIGNAL那就設定為SIGNAL) 且 前驅節點封裝的執行緒不為NULL
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
// 獲取節點的后繼節點
Node next = node.next;
// 如果后繼節點的等待狀態不為CANCELLED,則通過CAS將前驅節點的后繼指標指向當前節點的后繼節點
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next); // 這里并沒有將當前節點的后繼節點的前驅指標指向前驅節點(不用設定,unparkSuccessor()方法會自動跳過)
} else {
// 如果當前節點的前驅節點是頭節點,則直接喚醒當前節點的后繼節點,讓它來剔除當前節點
unparkSuccessor(node);
}
node.next = node;
}
}
總結:如果執行緒在阻塞的程序當中拋出了例外,也就是直接中斷acquireQueued()方法,然后執行finally陳述句塊,由于failed標識為true,因此會執行cancelAcquire()方法,將當前節點的等待狀態設定為CANCELLED,如果當前節點是尾節點,則直接通過CAS將tail指標指向當前節點不為CANCELLED狀態的前驅節點,同時將該前驅節點的后繼指標設定為NULL,如果當前節點的前驅節點不是頭節點,則通過CAS將前驅節點的后繼指標指向當前節點的后繼節點,如果當前節點的前驅節點是頭節點,那么喚醒當前節點的后繼節點,讓它來剔除當前節點,
獨占模式下釋放同步資源的原始碼分析
release()方法
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
// 如果佇列不等于空,同時頭節點的等待狀態不為0,也就是頭節點存在后繼節點,那么呼叫unparkSuccessor()方法,喚醒離頭節點最近的同時等待狀態不為CANCELLED的后繼節點,
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
總結:當獲取了同步資源的執行緒釋放同步資源時(外部執行緒或者頭節點中的執行緒),將會呼叫release()方法,release()方法中會呼叫AQS子類的tryRelease()方法,嘗試釋放同步資源,如果釋放同步資源成功,同時佇列不為空以及頭節點的等待狀態不為0,也就是頭節點存在后繼節點,那么就會呼叫unparkSuccessor()方法,喚醒離頭節點最近的(也就是頭節點的后繼節點)同時等待狀態不為CANCELLED的后繼節點,那么該節點將會通過自旋嘗試獲取同步資源,
unparkSuccessor()方法
private void unparkSuccessor(Node node) {
// 獲取節點的等待狀態
int ws = node.waitStatus;
// 如果節點的等待狀態不為CANCELLED,則通過CAS將節點的等待狀態設定為0(恢復成佇列初始化后的狀態)
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 獲取節點的后繼節點
Node s = node.next;
// 如果節點的后繼指標為NULL(不能說明節點就沒有后繼節點)或者后繼節點為CANCELLED狀態,那么就從后往前尋找離當前節點最近的同時等待狀態不為CANCELLED的后繼節點
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 喚醒該后繼節點中的執行緒
LockSupport.unpark(s.thread);
}
總結:unparkSuccessor()方法用于喚醒離節點最近的同時等待狀態不為CANCELLED的后繼節點,如果節點的后繼指標為NULL,不能說明節點就沒有后繼節點,或者后繼節點的等待狀態為CANCELLED,則從后往前,尋找離節點最近的同時等待狀態不為CANCELLED的節點,最侄訓醒該節點中的執行緒,
獨占模式下原始碼分析后的總結
1.當執行緒要獲取同步資源時,可以呼叫acquire()或者tryAcquire()方法,acquire()方法中會呼叫AQS子類的tryAcquire()方法,嘗試獲取同步資源,如果獲取同步資源成功,則直接回傳,做自己的事情,否則將會執行addWaiter()方法,將當前執行緒封裝成Node節點然后加入到等待佇列當中,然后執行acquireQueued()方法,用于自旋獲取同步資源,如果所有條件都滿足那么最終將會執行selfInterrupt()方法,
2.addWaiter()方法用于將當前執行緒封裝成Node節點然后加入到等待佇列當中,如果在這個程序中,等待佇列為慷訓者通過CAS設定尾節點失敗(也就是當前指標所指向的尾節點并不是真正的尾節點,也就是在這個程序當中有其他節點插入到隊尾),那么將會通過enq()方法死回圈進行設定,
3.enq()方法中使用死回圈初始化佇列以及通過CAS設定尾節點,直到尾節點被設定成功,同時需要注意的是當佇列初始化后會有一個空的頭節點,該節點不包含任何的執行緒,然后再將當前節點加入到佇列當中,
4.acquireQueued()方法用于自旋獲取同步資源,同時該方法的方法出口只有一個,也就是當節點的前驅節點是頭節點,同時嘗試獲取同步資源成功,那么就會將當前節點設定為頭節點,否則就會呼叫shouldParkAfterFailedAcquire()方法,判斷執行緒能否進行阻塞,當執行緒能夠被阻塞時,將會呼叫parkAndCheckInterrupt()方法阻塞執行緒,等待被喚醒,同時在執行acquireQueued()方法的程序中,如果拋出了例外,則failed標識為true,那么將會執行cancelAcquire()方法,將當前節點的等待狀態設定為CANCELLED,同時從等待佇列中剔除,
5.shouldParkAfterFailedAcquire()方法用于判斷執行緒能否進行阻塞,以及剔除被設定為CANCELLED狀態的節點,正常情況下,執行緒第一次進來shouldParkAfterFailedAcquire()方法時,會將前驅節點的等待狀態設定為SIGNAL,然后再次自旋進來該方法,判斷到前驅節點的等待狀態為SIGNAL,直接回傳,然后就進入待阻塞狀態,當該節點的前驅節點被CANCELLED時,如果前驅節點的前驅節點是頭節點,那么將會喚醒當前節點,那么它會再次自旋進來該方法,判斷到前驅節點的等待狀態為CANCELLED,就會將當前節點的前驅指標指向前一個不為CANCELLED狀態的節點,也就是頭節點,然后再將頭節點的后繼指標指向當前節點,然后再次自旋進來該方法,判斷到前驅節點的等待狀態為SIGNAL,直接回傳,再次進入待阻塞狀態,無論怎么樣通過shouldParkAfterFailedAcquire()方法的所有節點最終都會進入待阻塞狀態,也就是說等待佇列中除了頭節點以外的所有執行緒都會處于阻塞狀態,
6.parkAndCheckInterrupt()方法用于阻塞執行緒,同時當執行緒被喚醒時會回傳執行緒的中斷標識,盡管如果執行緒被設定了中斷標識,但也不會影響執行緒繼續往下執行,只不過當它成功獲取到同步資源時,會呼叫一次selfInterrupt()方法,再次為執行緒設定中斷標識,因為在parkAndCheckInterrupt()方法中已經呼叫過一次Thread.interrupted()方法,避免外部又再次呼叫Thread.interrupted()方法導致執行緒的中斷標識被清除,
此時等待佇列中除了頭節點以外的所有執行緒都會處于阻塞狀態
1.如果執行緒在阻塞的程序當中拋出了例外,也就是直接中斷acquireQueued()方法,然后執行finally陳述句塊,由于failed標識為true,因此會執行cancelAcquire()方法,將當前節點的等待狀態設定為CANCELLED,如果當前節點是尾節點,則直接通過CAS將tail指標指向當前節點不為CANCELLED狀態的前驅節點,同時將該前驅節點的后繼指標設定為NULL,如果當前節點的前驅節點不是頭節點,則通過CAS將前驅節點的后繼指標指向當前節點的后繼節點,如果當前節點的前驅節點是頭節點,那么喚醒當前節點的后繼節點,讓它來剔除當前節點,
2.當獲取了同步資源的執行緒釋放同步資源時(外部執行緒或者頭節點中的執行緒),將會呼叫release()方法,release()方法中會呼叫AQS子類的tryRelease()方法,嘗試釋放同步資源,如果釋放同步資源成功,同時佇列不為空以及頭節點的等待狀態不為0,也就是頭節點存在后繼節點,那么就會呼叫unparkSuccessor()方法,喚醒離頭節點最近的同時等待狀態不為CANCELLED的后繼節點,那么該節點將會通過自旋嘗試獲取同步資源,
3.在呼叫unparkSuccessor()方法喚醒離節點最近的同時等待狀態不為CANCELLED的后繼節點時,如果節點的后繼指標為NULL,不能說明節點就沒有后繼節點,或者后繼節點的等待狀態為CANCELLED,則從后往前,尋找離節點最近的同時等待狀態不為CANCELLED的節點,最侄訓醒該節點中的執行緒,
獨占模式FAQ
為什么要用CAS設定尾節點?
如果在設定尾節點的這個程序當中,有其他節點插入到隊尾,然后將tail指標指向當前節點,當前節點的前驅指標指向之前的尾節點,之前的尾節點的后繼指標指向當前節點,那么中間插入的節點就會丟失,
在acquireQueued()方法中,為什么嘗試獲取同步資源之前,需要先判斷一下當前節點的前驅節點是否是頭節點?
強制要求等待佇列中的節點獲取同步資源的順序必須是從隊頭到隊尾,否則將會造成節點丟失,丟失了的節點中的執行緒將會永遠處于阻塞狀態(當同步資源被釋放時,還沒來得及喚醒離頭節點最近同時等待狀態不為CANCELLED的后繼節點時,等待佇列中一個排在很后的節點被喚醒,然后它將會通過自旋嘗試獲取同步資源,一旦它獲取了同步資源,那么它將成為頭節點,最終它與之前頭節點之間的所有節點中的執行緒將會永遠處于阻塞狀態),同時只有當執行緒獲取了同步資源后,它才能成為頭節點(佇列初始化后的頭節點除外),因此頭節點肯定是已經獲取過同步資源的(佇列初始化后的頭節點除外),因此為了遵循佇列中的節點獲取同步資源的順序必須是從隊頭到隊尾,所以永遠只有頭節點的后繼節點擁有嘗試獲取同步資源的權利,因此當在嘗試獲取同步資源之前,需要先判斷一下當前節點的前驅節點是否是頭節點,如果不是就不用獲取了,至于頭節點釋放同步資源后,能否被后繼節點獲取到同步資源另說,因為當同步資源被釋放時,被喚醒的后繼節點可能還沒來得獲取同步資源,此時就被外部執行緒直接獲取了,因此被喚醒的這個執行緒又只能再次進入阻塞狀態,
為什么在unparkSuccessor()方法中,如果節點的后繼指標為NULL,需要從后往前尋找離節點最近的同時等待狀態不為CANCELLED的后繼節點,而不從前往后進行尋找?
如果節點的后繼指標為NULL,不能說明節點就沒有后繼節點,因為無論是在addWaiter()方法還是enq()方法將節點加入到佇列,它總是先將當前節點的前驅指標指向尾節點,然后再通過CAS將tail指標指向當前節點,如果在將之前尾節點的后繼指標指向當前節點之前,需要喚醒尾節點的后繼節點,由于此時尾節點的后繼指標仍然為NULL,因此無法通過next指標從前往后尋找,只能通過pred指標從后往前尋找,
執行緒在什么情況會被喚醒?
執行緒被喚醒只有兩種情況
一種是外部執行緒或者頭節點釋放同步資源時,需要喚醒離頭節點最近的同時等待狀態不為CANCELLED的后繼節點,那么該節點就會通過自旋嘗試獲取同步資源,
一種是當節點的前驅節點被CANCELLED時,如果前驅節點的前驅節點是頭節點,那么將會喚醒當前節點,將當前節點的前驅指標指向前一個不為CANCELLED狀態的節點,也就是頭節點,然后再將頭節點的后繼指標指向當前節點,
等待佇列中處于CANCELLED狀態的節點什么時候被剔除?
cancelAcquire()和shouldParkAfterFailedAcquire()方法都可以剔除等待佇列中處于CANCELLED狀態的節點,
*在unparkSuccessor()中需要剔除處于CANCELLED狀態的節點是為了避免同步問題,可能存在一個處于CANCELLED狀態的節點未來得及被剔除,然后它又作為要喚醒的節點的后繼節點,
自定義AQS共享模式下的同步器來實作共享鎖
/**
* 自定義AQS共享模式下的同步器來實作共享鎖
*/
public class Share {
/**
* 自定義AQS共享模式下的同步器
*/
private static class Sync extends AbstractQueuedSynchronizer {
/**
* 存盤執行緒獲取同步資源的情況
*/
private static ThreadLocal<Integer> threadLocal = new ThreadLocal<>();
/**
* 初始化同步資源
*/
public Sync(int state) {
setState(state);
}
/**
* 嘗試獲取同步資源(需要保證是執行緒安全的)
*/
@Override
protected int tryAcquireShared(int arg) {
int state = getState();
int available = state - arg;
if (available >= 0 && compareAndSetState(state, available)) { // 通過CAS保證原子性
threadLocal.set(arg);
return available;
}
return -1;
}
/**
* 釋放同步資源(執行緒釋放同步資源的個數必須等于它獲取同步資源的個數)
*/
@Override
protected boolean tryReleaseShared(int arg) {
if (threadLocal.get() != arg)
throw new UnsupportedOperationException();
if (compareAndSetState(getState(), getState() + arg)) { // 通過CAS保證原子性
threadLocal.set(null);
return true;
}
return false;
}
}
/**
* 初始化同步器的同步資源
*/
public Share(int permits) {
sync = new Sync(permits);
}
public Sync sync;
/**
* 獲取許可
*/
public void acquire(int permits) {
sync.acquireShared(permits);
}
/**
* 嘗試獲取許可
*/
public boolean tryAcquire(int permits) {
return sync.tryAcquireShared(permits) >= 0;
}
/**
* 釋放許可
*/
public boolean release(int permits) {
return sync.releaseShared(permits);
}
}
public class Main {
static class MyRunnable implements Runnable {
private Share share;
private int permits;
@Override
public void run() {
System.out.println(String.format("%s Running", Thread.currentThread().getName()));
share.acquire(permits);
System.out.println(String.format("%s獲取了%s個許可", Thread.currentThread().getName(), permits));
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
share.release(permits);
System.out.println(String.format("%s釋放了%s個許可", Thread.currentThread().getName(), permits));
}
public MyRunnable(Share share, int permits) {
this.share = share;
this.permits = permits;
}
}
public static void main(String[] args) {
Share share = new Share(10);
Thread threadA = new Thread(new MyRunnable(share,5),"執行緒A");
Thread threadB = new Thread(new MyRunnable(share,4),"執行緒B");
Thread threadC = new Thread(new MyRunnable(share,3),"執行緒C");
threadA.start();
threadB.start();
threadC.start();
}
}

共享模式下獲取同步資源的原始碼分析
acquireShared()方法
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
總結:當執行緒要獲取同步資源時,可以呼叫acquireShared()或tryAcquireShared()方法,acquireShared()方法中會呼叫AQS子類的tryAcquireShared()方法,嘗試獲取同步資源,如果獲取同步資源成功,則直接回傳,做自己的事情,否則將會呼叫doAcquireShared()方法,
doAcquireShared()方法
private void doAcquireShared(int arg) {
// 將當前執行緒封裝成Node節點,然后加入到等待佇列當中
// 當前節點會被指定為共享模式,共享模式Node.SHARED為一個空的節點,也就是說節點的nextWaiter不為NULL(isShared()方法回傳true)
// 在呼叫addWaiter()方法的程序中,如果等待佇列為慷訓者通過CAS設定尾節點失敗,那么將會通過enq()方法死回圈進行設定
final Node node = addWaiter(Node.SHARED);
// 失敗標識
boolean failed = true;
try {
// 中斷標識
boolean interrupted = false;
// 自旋
for (;;) {
// 獲取節點的前驅節點
final Node p = node.predecessor();
// 如果節點的前驅節點是頭節點,則嘗試獲取同步資源
if (p == head) {
int r = tryAcquireShared(arg);
// 如果獲取同步資源成功,則呼叫setHeadAndPropagate()方法
if (r >= 0) {
setHeadAndPropagate(node, r);
// 將之前的頭節點的后繼指標設定為NULL,help gc
p.next = null;
// 如果獲取了同步資源的執行緒被設定了中斷標識,那么呼叫selfInterrupt()方法,再次為執行緒設定一個中斷標識
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 如果節點的前驅節點不是頭節點,或者嘗試獲取同步資源失敗,那么將會呼叫shouldParkAfterFailedAcquire()方法,判斷執行緒能否進行阻塞,當執行緒能夠被阻塞時,將會呼叫parkAndCheckInterrupt()方法阻塞執行緒
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 如果在執行該方法的程序中,拋出了例外(執行緒超時等等),則failed標識為true,那么將會執行cancelAcquire()方法,將當前節點的等待狀態設定為CANCELLED,同時從等待佇列中剔除,
if (failed)
cancelAcquire(node);
}
}
總結:doAcquireShared()方法用于將當前執行緒封裝成Node節點然后加入到等待佇列當中,然后通過自旋獲取同步資源,同時該方法的方法出口只有一個,也就是當節點的前驅節點是頭節點,同時嘗試獲取同步資源成功,那么就會呼叫setHeadAndPropagate()方法,否則將會呼叫shouldParkAfterFailedAcquire()方法,判斷執行緒能否進行阻塞,當執行緒能夠被阻塞時,將會呼叫parkAndCheckInterrupt()方法阻塞執行緒,等待被喚醒,同時在執行doAcquireShared()方法的程序中,如果拋出了例外,則failed標識為true,那么將會執行cancelAcquire()方法,將當前節點的等待狀態設定為CANCELLED,同時從等待佇列中剔除,
setHeadAndPropagate()方法
private void setHeadAndPropagate(Node node, int propagate) {
// 獲取頭節點
Node h = head;
// 將當前節點設定為頭節點
setHead(node);
//如果執行緒獲取了同步資源后,仍然有剩余的可用資源(正常情況),或沒有剩余的可用資源但舊的和新的頭節點的等待狀態為PROPAGATE時(說明實際存在可用資源),那么將會呼叫doReleaseShared()方法
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared()) // 排除等待佇列中不為共享模式的節點
doReleaseShared();
}
}
總結:setHeadAndPropagate()方法用于將當前節點設定為頭節點,同時如果當執行緒獲取了同步資源后,仍然有剩余的可用資源(正常情況),或沒有剩余的可用資源但舊的和新的頭節點的等待狀態為PROPAGATE時(說明實際存在可用資源),那么將會呼叫doReleaseShared()方法,
doReleaseShared()方法
private void doReleaseShared() {
// 使用死回圈來保證CAS操作最終肯定成功
for (;;) {
// 獲取頭節點
Node h = head;
// 如果head指標和tail指標不是指向同一個節點,說明頭節點肯定存在后繼節點(使用head != tail可以避免頭節點存在后繼節點但是頭節點的后繼指標又為NULL的情況)
if (h != null && h != tail) {
// 獲取頭節點的等待狀態,如果等待狀態為SIGNAL,則通過CAS將頭節點的等待狀態設定為0(重置),然后喚醒離頭節點最近的同時等待狀態不為CANCELLED的后繼節點
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h);
}
// 如果頭節點的等待狀態為0,則通過CAS將頭節點的等待狀態設定為PROPAGATE
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head) // 如果執行完以上步驟后,h指標指向的頭節點仍然為當前的頭節點,則退出回圈,完成釋放程序,然后做自己的事情
break;
}
}
總結:當等待佇列中的執行緒獲取了同步資源后,仍然有剩余的可用資源,或沒有剩余的可用資源但舊的和新的頭節點的等待狀態為PROPAGATE,或者當執行緒釋放同步資源這兩種情況,都會呼叫doReleaseShared()方法,該方法使用死回圈來保證CAS操作最終肯定成功,如果頭節點存在后繼節點,同時頭節點的等待狀態為SIGNAL時,那么將會通過CAS將頭節點的等待狀態設定為0(重置),然后喚醒離頭節點最近的同時等待狀態不為CANCELLED的后繼節點,如果判斷到頭節點的等待狀態為0,那么將會通過CAS將節點的等待狀態設定為PROPAGATE,表示需要傳播下去,
共享模式下釋放同步資源的原始碼分析
releaseShared()方法
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
總結:當獲取了同步資源的執行緒釋放同步資源時,將會呼叫releaseShared()方法,releaseShared()方法中會呼叫AQS子類的tryReleaseShared()方法,嘗試釋放同步資源,如果釋放同步資源成功,則會呼叫doReleaseShared()方法,喚醒離頭節點最近的同時等待狀態不為CANCELLED的后繼節點,
共享模式下原始碼分析后的總結
1.當執行緒要獲取同步資源時,可以呼叫acquireShared()或tryAcquireShared()方法,acquireShared()方法中會呼叫AQS子類的tryAcquireShared()方法,嘗試獲取同步資源,如果獲取同步資源成功,則直接回傳,做自己的事情,否則將會呼叫doAcquireShared()方法,
2.doAcquireShared()方法用于將當前執行緒封裝成Node節點然后加入到等待佇列當中,然后通過自旋獲取同步資源,同時該方法的方法出口只有一個,也就是當節點的前驅節點是頭節點,同時嘗試獲取同步資源成功,那么就會呼叫setHeadAndPropagate()方法,否則將會呼叫shouldParkAfterFailedAcquire()方法,判斷執行緒能否進行阻塞,當執行緒能夠被阻塞時,將會呼叫parkAndCheckInterrupt()方法阻塞執行緒,等待被喚醒,同時在執行doAcquireShared()方法的程序中,如果拋出了例外,則failed標識為true,那么將會執行cancelAcquire()方法,將當前節點的等待狀態設定為CANCELLED,同時從等待佇列中剔除,
3.setHeadAndPropagate()方法用于將當前節點設定為頭節點,同時如果當執行緒獲取了同步資源后,仍然有剩余的可用資源(正常情況),或沒有剩余的可用資源但舊的和新的頭節點的等待狀態為PROPAGATE時(說明實際存在可用資源),那么將會呼叫doReleaseShared()方法,
4.當等待佇列中的執行緒獲取了同步資源后,仍然有剩余的可用資源,或沒有剩余的可用資源但舊的和新的頭節點的等待狀態為PROPAGATE,或者當執行緒釋放同步資源這兩種情況,都會呼叫doReleaseShared()方法,該方法使用死回圈來保證CAS操作最終肯定成功,如果頭節點存在后繼節點,同時頭節點的等待狀態為SIGNAL時,那么將會通過CAS將頭節點的等待狀態設定為0(重置),然后喚醒離頭節點最近的同時等待狀態不為CANCELLED的后繼節點,如果判斷到頭節點的等待狀態為0(表示并發釋放同步資源),那么將會通過CAS將節點的等待狀態設定為PROPAGATE,表示需要傳播下去,
5.當獲取了同步資源的執行緒釋放同步資源時,將會呼叫releaseShared()方法,releaseShared()方法中會呼叫AQS子類的tryReleaseShared()方法,嘗試釋放同步資源,如果釋放同步資源成功,則會呼叫doReleaseShared()方法,喚醒離頭節點最近的同時等待狀態不為CANCELLED的后繼節點,
共享模式FAQ
有哪些場景會將節點的等待狀態設定為PROPAGATE,以及它的作用是什么?
1.當執行緒A釋放同步資源時,將當前的頭節點的等待狀態設定為0,然后喚醒離頭節點最近的同時等待狀態不為CANCELLED的后繼節點,如果被喚醒的節點獲取了同步資源,然后在呼叫setHeadAndPropagate()方法之前,執行緒B釋放了同步資源,此時判斷到頭節點的等待狀態為0,那么就會將頭節點的等待狀態設定為PROPAGATE,表示并發釋放了同步資源,目前還有可用的同步資源,然后被喚醒的節點在執行setHeadAndPropagate()方法時,如果沒有剩余的可用資源,但是判斷到舊的頭節點的等待狀態為PROPAGATE,說明實際存在可用資源,那么會再次呼叫doReleaseShared()方法,去喚醒后繼節點,嘗試獲取同步資源,
2.如果被喚醒的節點獲取了同步資源,在將當前節點設定為頭節點之后,執行緒A和B釋放了同步資源,那么就跟場景1一樣,執行緒B會將頭節點的等待狀態設定為PROPAGATE,然后被喚醒的節點在執行setHeadAndPropagate()方法時,如果沒有剩余的可用資源,除了判斷舊的頭節點的等待狀態是否為PROPAGATE以外,還需要判斷新的頭節點的等待狀態是否為PROPAGATE,
場景一和場景二的區別是獲取同步資源的執行緒在設定頭節點之前還是頭節點之后,
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/1913.html
標籤:Java
上一篇:Java基礎概述
下一篇:明明的亂數
