Java開發中,我們的應用程式經常會使用多執行緒提高程式的運行效率,多執行緒情況下訪問執行緒共享變數可能會帶來并發問題,此時就需要并發鎖解決并發問題,Java提供了兩種型別的并發控制機制:synchonrized關鍵字和AQS框架,二者各有優勢,不過在加鎖解鎖場景比較靈活的情況下,我們往往會采用AQS框架來解決并發問題,本文會對Java中的AQS框架的結構和原始碼進行簡單介紹,本文大多數內容參考了這篇博客
AQS結構
AQS的全稱是AbstractQueuedSynchronizer(抽象的佇列式的同步器),AQS定義了一套多執行緒訪問共享資源的同步器框架,許多同步類實作都依賴于它,如常用的ReentrantLock/Semaphore/CountDownLatch等,
如下圖所示,AQS主要包含兩部分內容:共享資源和等待佇列,AQS底層已經對這兩部分內容提供了很多方法,
- 共享資源:共享資源是一個volatile的int型別變數,
- 等待佇列:等待佇列是一個執行緒安全的佇列,當執行緒拿不到鎖時,會被park并放入佇列,
- 新執行緒:非公平情況下,新執行緒會先嘗試直接獲取資源,獲取不到才進入佇列,

核心思想
同步器的核心方法是acquire和release操作,其背后的思想也比較簡潔明確,
acquire操作是這樣的:
// acquire操作
while (當前同步器的狀態不允許獲取操作) {
如果當前執行緒不在佇列中,則將其插入佇列
阻塞當前執行緒
}
如果執行緒位于佇列中,則將其移出佇列
release操作是這樣的:
更新同步器的狀態
if (新的狀態允許某個被阻塞的執行緒獲取成功)
解除佇列中一個或多個執行緒的阻塞狀態
從這兩個操作中的思想中我們可以提取出三大關鍵操作:同步器的狀態變更、執行緒阻塞和釋放、插入和移出佇列,所以為了實作這兩個操作,需要協調三大關鍵操作引申出來的三個基本組件:
- 同步器狀態的原子性管理;
- 執行緒阻塞與解除阻塞;
- 佇列的管理;
同步器狀態的原子性管理
AQS類使用單個int(32位)來保存同步狀態,并暴露出getState、setState以及compareAndSet操作來讀取和更新這個同步狀態,其中屬性state被宣告為volatile,并且通過使用CAS指令來實作compareAndSetState,使得當且僅當同步狀態擁有一個一致的期望值的時候,才會被原子地設定成新值,這樣就達到了同步狀態的原子性管理,確保了同步狀態的原子性、可見性和有序性,
執行緒阻塞與解除阻塞
直到JSR166,阻塞執行緒和解除執行緒阻塞都是基于Java的內置管程,沒有其它非基于Java內置管程的API可以用來達到阻塞執行緒和解除執行緒阻塞,唯一可以選擇的是Thread.suspend和Thread.resume,但是它們都有無法解決的競態問題,所以也沒法用,目前該方法基本已被拋棄,具體不能用的原因可以官方給出的答復,
j.u.c.locks包提供了LockSupport類來解決這個問題,方法LockSupport.park阻塞當前執行緒直到有個LockSupport.unpark方法被呼叫,unpark的呼叫是沒有被計數的,因此在一個park呼叫前多次呼叫unpark方法只會解除一個park操作,另外,它們作用于每個執行緒而不是每個同步器,一個執行緒在一個新的同步器上呼叫park操作可能會立即回傳,因為在此之前可以有多余的unpark操作,但是,在缺少一個unpark操作時,下一次呼叫park就會阻塞,雖然可以顯式地取消多余的unpark呼叫,但并不值得這樣做,在需要的時候多次呼叫park會更高效,park方法同樣支持可選的相對或絕對的超時設定,以及與JVM的Thread.interrupt結合 ,可通過中斷來unpark一個執行緒,
佇列的管理
整個框架的核心就是如何管理執行緒阻塞佇列,該佇列是嚴格的FIFO佇列,因此不支持執行緒優先級的同步,同步佇列的最佳選擇是自身沒有使用底層鎖來構造的非阻塞資料結構,業界主要有兩種選擇,一種是MCS鎖,另一種是CLH鎖,其中CLH一般用于自旋,但是相比MCS,CLH更容易實作取消和超時,所以同步佇列選擇了CLH作為實作的基礎,
CLH佇列實際并不那么像佇列,它的出隊和入隊與實際的業務使用場景密切相關,它是一個鏈表佇列,通過AQS的兩個欄位head(頭節點)和tail(尾節點)來存取,這兩個欄位是volatile型別,初始化的時候都指向了一個空節點,
條件佇列
上一節的佇列其實是AQS的同步佇列,這一節的佇列是條件佇列,佇列的管理除了有同步佇列,還有條件佇列,AQS只有一個同步佇列,但是可以有多個條件佇列,AQS框架提供了一個ConditionObject類,給維護獨占同步的類以及實作Lock介面的類使用,
ConditionObject類實作了Condition介面,Condition介面提供了類似Object管程式的方法,如await、signal和signalAll操作,還擴展了帶有超時、檢測和監控的方法,ConditionObject類有效地將條件與其它同步操作結合到了一起,該類只支持Java風格的管程訪問規則,這些規則中,當且僅當當前執行緒持有鎖且要操作的條件(condition)屬于該鎖時,條件操作才是合法的,這樣,一個ConditionObject關聯到一個ReentrantLock上就表現的跟內置的管程(通過Object.wait等)一樣了,兩者的不同僅僅在于方法的名稱、額外的功能以及用戶可以為每個鎖宣告多個條件,
ConditionObject類和AQS共用了內部節點,有自己單獨的條件佇列,signal操作是通過將節點從條件佇列轉移到同步佇列中來實作的,沒有必要在需要喚醒的執行緒重新獲取到鎖之前將其喚醒,
原始碼分析
我們主要通過獨占式同步狀態的獲取和釋放、共享式同步狀態的獲取和釋放來看下AQS是如何實作的,
獨占式同步狀態的獲取
獨占式同步狀態呼叫的方法是acquire,代碼如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
上述代碼主要完成了同步狀態獲取、節點構造、加入同步佇列以及在同步佇列中自旋等待的相關作業,其主要邏輯是:首先呼叫子類實作的tryAcquire方法,該方法保證執行緒安全的獲取同步狀態,如果同步狀態獲取失敗,則構造獨占式同步節點(同一時刻只能有一個執行緒成功獲取同步狀態)并通過addWaiter方法將該節點加入到同步佇列的尾部,最后呼叫acquireQueued方法,使得該節點以自旋的方式獲取同步狀態,如果獲取不到則阻塞節點中的執行緒,而被阻塞執行緒的喚醒主要依靠前驅節點的出隊或阻塞執行緒被中斷來實作,
下面來首先來看下節點構造和加入同步佇列是如何實作的,代碼如下:
private Node addWaiter(Node mode) {
// 當前執行緒構造成Node節點
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 嘗試快速在尾節點后新增節點 提升演算法效率 先將尾節點指向pred
Node pred = tail;
if (pred != null) {
//尾節點不為空 當前執行緒節點的前驅節點指向尾節點
node.prev = pred;
//并發處理 尾節點有可能已經不是之前的節點 所以需要CAS更新
if (compareAndSetTail(pred, node)) {
//CAS更新成功 當前執行緒為尾節點 原先尾節點的后續節點就是當前節點
pred.next = node;
return node;
}
}
//第一個入隊的節點或者是尾節點后續節點新增失敗時進入enq
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
//尾節點為空 第一次入隊 設定頭尾節點一致 同步佇列的初始化
if (compareAndSetHead(new Node()))
tail = head;
} else {
//所有的執行緒節點在構造完成第一個節點后 依次加入到同步佇列中
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
節點進入同步佇列之后,就進入了一個自旋的程序,每個執行緒節點都在自省地觀察,當條件滿足,獲取到了同步狀態,就可以從這個自旋程序中退出,否則依舊留在這個自旋程序中并會阻塞節點的執行緒,代碼如下:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//獲取當前執行緒節點的前驅節點
final Node p = node.predecessor();
//前驅節點為頭節點且成功獲取同步狀態
if (p == head && tryAcquire(arg)) {
//設定當前節點為頭節點
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//是否阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
再來看看shouldParkAfterFailedAcquire和parkAndCheckInterrupt是怎么來阻塞當前執行緒的,代碼如下:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//前驅節點的狀態決定后續節點的行為
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*前驅節點為-1 后續節點可以被阻塞
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*前驅節點是初始或者共享狀態就設定為-1 使后續節點阻塞
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
//阻塞執行緒
LockSupport.park(this);
return Thread.interrupted();
}
獨占式同步獲取鎖示例圖

獨占式同步狀態的釋放
當同步狀態獲取成功之后,當前執行緒從acquire方法回傳,對于鎖這種并發組件而言,就意味著當前執行緒獲取了鎖,有獲取同步狀態的方法,就存在其對應的釋放方法,該方法為release,現在來看下這個方法的實作,代碼如下:
public final boolean release(int arg) {
if (tryRelease(arg)) {//同步狀態釋放成功
Node h = head;
if (h != null && h.waitStatus != 0)
//直接釋放頭節點
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*尋找符合條件的后續節點
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//喚醒后續節點
LockSupport.unpark(s.thread);
}
獨占式釋放是非常簡單而且明確的,
總結下獨占式同步狀態的獲取和釋放:在獲取同步狀態時,同步器維護一個同步佇列,獲取狀態失敗的執行緒都會被加入到佇列中并在佇列中進行自旋;移出佇列的條件是前驅節點為頭節點且成功獲取了同步狀態,在釋放同步狀態時,同步器呼叫tryRelease方法釋放同步狀態,然后喚醒頭節點的后繼節點,
共享式同步狀態的獲取
共享式同步狀態呼叫的方法是acquireShared,代碼如下:
public final void acquireShared(int arg) {
//獲取同步狀態的回傳值大于等于0時表示可以獲取同步狀態
//小于0時表示可以獲取不到同步狀態 需要進入佇列等待
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
//和獨占式一樣的入隊操作
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
//自旋
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
//前驅結點為頭節點且成功獲取同步狀態 可退出自旋
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
//退出自旋的節點變成首節點
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
與獨占式一樣,共享式獲取也需要釋放同步狀態,通過呼叫releaseShared方法可以釋放同步狀態,代碼如下:
public final boolean releaseShared(int arg) {
//釋放同步狀態
if (tryReleaseShared(arg)) {
//喚醒后續等待的節點
doReleaseShared();
return true;
}
return false;
}
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
//自旋
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
//喚醒后續節點
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
unparkSuccessor方法和獨占式是一樣的,
AQS的應用
AQS被大量的應用在了同步工具上,
ReentrantLock:ReentrantLock類使用AQS同步狀態來保存鎖重復持有的次數,當鎖被一個執行緒獲取時,ReentrantLock也會記錄下當前獲得鎖的執行緒標識,以便檢查是否是重復獲取,以及當錯誤的執行緒試圖進行解鎖操作時檢測是否存在非法狀態例外,ReentrantLock也使用了AQS提供的ConditionObject,還向外暴露了其它監控和監測相關的方法,
ReentrantReadWriteLock:ReentrantReadWriteLock類使用AQS同步狀態中的16位來保存寫鎖持有的次數,剩下的16位用來保存讀鎖的持有次數,WriteLock的構建方式同ReentrantLock,ReadLock則通過使用acquireShared方法來支持同時允許多個讀執行緒,
Semaphore:Semaphore類(信號量)使用AQS同步狀態來保存信號量的當前計數,它里面定義的acquireShared方法會減少計數,或當計數為非正值時阻塞執行緒;tryRelease方法會增加計數,在計數為正值時還要解除執行緒的阻塞,
CountDownLatch:CountDownLatch類使用AQS同步狀態來表示計數,當該計數為0時,所有的acquire操作(對應到CountDownLatch中就是await方法)才能通過,
FutureTask:FutureTask類使用AQS同步狀態來表示某個異步計算任務的運行狀態(初始化、運行中、被取消和完成),設定(FutureTask的set方法)或取消(FutureTask的cancel方法)一個FutureTask時會呼叫AQS的release操作,等待計算結果的執行緒的阻塞解除是通過AQS的acquire操作實作的,
SynchronousQueues:SynchronousQueues類使用了內部的等待節點,這些節點可以用于協調生產者和消費者,同時,它使用AQS同步狀態來控制當某個消費者消費當前一項時,允許一個生產者繼續生產,反之亦然,
除了這些j.u.c提供的工具,還可以基于AQS自定義符合自己需求的同步器,
我是御狐神,歡迎大家關注我的微信公眾號

本文最先發布至微信公眾號,著作權所有,禁止轉載!
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/302652.html
標籤:其他
