本文作者:可樂可樂可,博主個人主頁:可樂可樂可的個人主頁
輕松理解AQS框架
本文需要以下知識鋪墊:Java、臨界區、信號量、鎖
AQS(AbstractQueuedSynchronizer,抽象佇列同步器)是Java中重入鎖ReentrantLock、讀寫鎖、信號量的實作基石,
學會、了解AQS框架對了解Java鎖有很大的幫助
說的比唱的好聽,AQS原始碼下來2k+行,這是人干的活嗎?
為了解決大家AQS不了解以及看了忘,忘了看的惡性回圈,下面將帶領大家從簡到繁,一步步的學會AQS框架,
本文中涉及的代碼以及做好了中文的注釋,帶伙可以訪問我的github倉庫拉下來看
github倉庫地址:Jirath-Liu
AQS是啥
各位Java開發者必然會了解一個類,叫ReentrantLock,
在早期使用ReentrantLock效率是遠遠超過synchronized關鍵字的,現在差距一步步縮小了,
不知道有沒有人點開過ReentrantLock的原始碼一探究竟?
ReentrantLock內部真正起作用的是Sync類,ReentrantLock所有跟鎖有關的方法都呼叫了Sync的方法來實際實作,
而Sync的父類正是我們的主角——AbstractQueuedSynchronizer

public void lock() {
sync.lock();
}
public void lockInterruptibly() throws InterruptedException {
sync.lockInterruptibly();
}
public boolean tryLock() {
return sync.tryLock();
}
....
關于ReentrantLock這里就不在啰嗦了,如果有想了解的可以留言,給大伙安排上
我們現在關心的,是里面的這個Sync——AQS的實作類
Sync,或者說AQS的實作類,究竟做了什么,達到了加鎖的目的?
加鎖大家應該都知道是什么概念吧,信號想必比各位也應該了解(不清楚的先去搜搜
加鎖的實質就是信號量,若有執行緒占用了某個資源,就在信號量進行標記,其他執行緒就了解這段臨界區是被占用的,
我們AQS的原理其實就是信號量機制,Sync的機制如下圖

這其中的程序都是由AQS來實作的,Sync撰寫了一些核心的判斷來定制,

上圖為Sync的原始碼,acquire是AQS的方法,
扯了這么半天,想必對AQS有個模糊的認識了:
一個實作了信號量,等待,搶奪鎖的輕量級框架,
AbstractQueuedSynchronizer
提供一個框架來實作依賴于先進先出(FIFO)等待佇列的阻塞鎖和相關的同步器(信號量,事件等),
此類旨在為大多數依賴單個原子int值表示狀態的同步器提供有用的基礎,
子類必須定義更改此狀態的受保護方法,并定義該狀態對于獲取或釋放此物件而言意味著什么,
如何使用AQS來構建自己的鎖?
我們先學會用AQS,再探知AQS的原理,總得先會跑,再想怎么跑步省力氣吧
AQS框架的大佬給我們提供了四個需要我們實作的介面:

這幾個方法都默認直接拋出例外:UnsupportedOperationException,需要子類繼承來重寫,
這四個方法都是干啥的?
AQS使用了模板方法的設計模式,這四個方法除了撰寫后直接使用外,更會被框架的其他方法呼叫,只要我們按照規矩老老實實的撰寫好這四個方法,就能得到一個出自自己之手的高效的輕量的鎖,
這四個方法的功能就在下面了
// Main exported methods
/**
* 嘗試以獨占模式進行獲取,
*
* 此方法應查詢物件的狀態是否允許以獨占模式獲取它,如果允許則獲取它,
* 此方法始終由執行獲取的執行緒呼叫,
* 如果此方法報告失敗,則acquire方法可以將執行緒排隊(如果尚未排隊),
* 直到其他某個執行緒釋放該信號為止,
*
* 這可以用來實作方法Lock.tryLock() ,
* 默認實作拋出UnsupportedOperationException ,
*/
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
/**
* 嘗試設定狀態以反映排他模式下的發布,
* 始終由執行釋放的執行緒呼叫此方法,
* 默認實作拋出UnsupportedOperationException ,
*/
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
/**
* 嘗試以共享模式進行獲取,
* 此方法應查詢物件的狀態是否允許以共享模式獲取物件,如果允許則獲取物件,
* 此方法始終由執行獲取的執行緒呼叫,
* 如果此方法報告失敗,則acquire方法可以將執行緒排隊(如果尚未排隊),直到其他某個執行緒釋放該信號為止,
* 默認實作拋出UnsupportedOperationException ,
*/
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
/**
* 嘗試設定狀態以反映共享模式下的發布,
* 始終由執行釋放的執行緒呼叫此方法,
* 默認實作拋出UnsupportedOperationException ,
*/
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
/**
* 如果僅相對于當前(呼叫)執行緒保持同步,則回傳true ,
* 每次呼叫非等待的AbstractQueuedSynchronizer.ConditionObject方法時,都會呼叫此方法,
* (等待方法改為呼叫release ,)
* 默認實作拋出UnsupportedOperationException ,
* 此方法僅在AbstractQueuedSynchronizer.ConditionObject方法內部內部呼叫,
* 因此如果不使用條件,則無需定義,
*
*/
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
當然,只拿了這四個方法,肯定是一臉懵逼的,AQS框架提供了很多的方法供子類使用,這些方法都是模板方法,final型別


大致有這些方法:
- 獲取獨占鎖,以及各種姿勢來獲取(超時,回應中斷,嘗試等等),這些方法命名為acquire
- 獲取共享鎖,以及各種姿勢來獲取(超時,回應中斷,嘗試等等),這些方法命名為acquireShared
- 釋放鎖,包括釋放獨占鎖和釋放共享鎖,釋放鎖是不處理執行緒爭奪問題的
- 對等待(Wait)的操作
- 對AQS的感知,
- 是否有排隊,目標執行緒是不是在排隊
- 設定與獲取當前的執行緒(在父類AbstractOwnableSynchronizer中實作),
- CAS設定信號量(compareAndSetState,本人認為這里稱為信號量更合適),獲取信號量,
總結下來AQS給用戶提供了CAS獲取鎖,修改信號量,對AQS內部感知,鎖操作的方法
這些方法一次堆上來就會眼花繚亂,我們可以從ReentrantLock中獲取如何使用這些方法,
ReentrantLock中如何使用AQS
ReentrantLock中分為公平和非公平鎖,這里的公平意思是在獲取鎖的時候,非公平的鎖會直接嘗試進行獲取,而公平的鎖會先看看自己會不會排在第一個,這意味著一個執行緒釋放鎖后再次獲取鎖,成功的幾率會較其他執行緒高些,
我們先看公平鎖的實作,來理解如何使用AQS

abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
final boolean tryLock() {
Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(current);
return true;
}
//重入操作
} else if (getExclusiveOwnerThread() == current) {
if (++c < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(c);
return true;
}
return false;
}
abstract boolean initialTryLock();
final void lock() {
if (!initialTryLock())
acquire(1);
}
final void lockInterruptibly() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!initialTryLock())
acquireInterruptibly(1);
}
final boolean tryLockNanos(long nanos) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return initialTryLock() || tryAcquireNanos(1, nanos);
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (getExclusiveOwnerThread() != Thread.currentThread())
throw new IllegalMonitorStateException();
boolean free = (c == 0);
if (free)
setExclusiveOwnerThread(null);
setState(c);
return free;
}
protected final boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
final boolean isLocked() {
return getState() != 0;
}
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final boolean initialTryLock() {
Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedThreads() && compareAndSetState(0, 1)) {
setExclusiveOwnerThread(current);
return true;
}
//重入
} else if (getExclusiveOwnerThread() == current) {
if (++c < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(c);
return true;
}
return false;
}
protected final boolean tryAcquire(int acquires) {
if (getState() == 0 && !hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
}
通過上文,我們得知使用AQS,需要重寫三個方法,第四個方法若需要Condition也需要重寫,我們在ReentrantLock中找找這四個方法,因為ReentrantLock是獨占鎖,所以他沒有重寫acquireShare方法
ReentrantLock重寫的方法如下
從這幾個方法中,我們能看出,ReentrantLock為使用AQS所做的作業
abstract static class Sync extends AbstractQueuedSynchronizer {
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
//只有擁有者才能進行釋放
if (getExclusiveOwnerThread() != Thread.currentThread())
throw new IllegalMonitorStateException();
//若free為0,表示鎖解開,釋放執行緒標記,釋放信號量
boolean free = (c == 0);
if (free)
setExclusiveOwnerThread(null);
//因為此時還擁有鎖,所以不用考慮搶奪、并發問題
setState(c);
return free;
}
//直接判斷是不是自己就行了
protected final boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
}
static final class FairSync extends Sync {
//公平
protected final boolean tryAcquire(int acquires) {
/**
* 嘗試進行搶奪鎖,只有這幾個條件滿足時,才算成功,并且順序很重要
* 1. 信號量為0
* 2. 前面沒有排隊的執行緒(公平鎖才有的判斷
* 3. 嘗試使用CAS搶奪并成功
*/
if (getState() == 0 && !hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
}
接著,我們注意看三個方法tryLock,lock,initialTryLock這些是ReentrantLock使用的入口方法,從這些方法我們可以了解AQS是怎么使用的
final boolean tryLock() {
Thread current = Thread.currentThread();
int c = getState();
//若c不為0,表示有人占用,若為自己占有,就走重入邏輯
if (c == 0) {
//嘗試搶奪鎖
if (compareAndSetState(0, 1)) {
//cas修改成功,就表示獲得了鎖,標記自己就行了
setExclusiveOwnerThread(current);
return true;
}
//重入操作
} else if (getExclusiveOwnerThread() == current) {
if (++c < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(c);
return true;
}
return false;
}
abstract boolean initialTryLock();
final void lock() {
//先嘗試輕量加鎖,使用CAS嘗試搶奪或重入
if (!initialTryLock())
//直接獲取鎖,不成功就排隊等待
//1是信號量增加的值
acquire(1);
}
//在FairSync中
final boolean initialTryLock() {
Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//公平鎖需要判斷hasQueuedThreads()
if (!hasQueuedThreads() && compareAndSetState(0, 1)) {
setExclusiveOwnerThread(current);
return true;
}
//重入
} else if (getExclusiveOwnerThread() == current) {
if (++c < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(c);
return true;
}
return false;
}
總結一下ReentrantLock對AQS的使用
AQS中,鎖的占用是使用信號量來標記的,AQS實作了鎖何時去爭奪,以及排隊等邏輯,
說到底AQS還是一個框架,而核心爭奪鎖的部分是由我們撰寫的,
AQS提供了各種爭奪鎖會使用的工具類,我們只需要撰寫爭奪一次的代碼,AQS會幫我們進行重試,阻塞的操作,
AQS原理是什么,AQS做了什么
下面就是原始碼模式,為了幫助各位看懂這2k行的代碼,我先做了幾個圖,我們看圖來了解詳細的程序,
AQS獨占鎖整體的思維是:
- 使用CAS修改信號量的方式爭奪鎖
- 若沒搶到,就進入排隊
- 佇列內的節點若發現前面的節點狀態為Signal,就會進入阻塞狀態
- 佇列的第一個節點會不停的查詢鎖狀態嘗試進行爭奪
- 鎖的釋放將修改自己狀態為0(防止后續節點阻塞),喚醒后續節點進行爭奪
AQS在使用時的核心是兩個方法:acquire、release
這兩個方法的核心就是AQS的核心,可能有點難以接受,但還是建議看一看這個流程圖,心中有個大概,


這兩個圖如果你大概了解了,看看下面這個ABC三個執行緒的例子
非常推薦點開原始碼去看
- 執行緒A獲取鎖,進入執行狀態
- 執行緒B進來,搶奪失敗,進入排隊,發現Head為空,創建了一個Head(state為0),因為Head狀態為0,自己在第一個搶奪位,再次進行搶奪
- 執行緒B嘗試獲取鎖再次失敗,將Head標記為Signal,再次嘗試
- 執行緒B在第二次嘗試中依然失敗,檢查到Head為Signal,進入休眠
- 執行緒C進入,發現鎖搶奪失敗,將自己封裝為一個新節點掛在了執行緒C后,因為C不是第一個位置,所以不搶奪,檢測到執行緒B是0狀態,但是自己沒有獲得鎖,于是標記B為Signal,再次嘗試
- C再次嘗試,發現自己不是第一位,此時B為Signal,C進入休眠,
- 執行緒A釋放鎖,執行緒B被喚醒開始搶奪,成功后將Head更換為自己,并執行自己的流程
- 執行緒B釋放鎖,執行緒C被喚醒,將Head更換為C,執行自己的流程
如果上述的流程對你來說已經能夠摸清了,下面我們再逐步分析AQS中的方法
acquire()原始碼實作
首先,嘗試獲取鎖,失敗則加入佇列
//先嘗試獲取鎖,成功直接回傳,失敗則封裝執行緒加入佇列
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
//失敗則封裝執行緒加入佇列
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
進入一個死回圈,使用CAS去爭奪鎖,每一輪回圈都要檢查自己是不是應該阻塞
在檢查的程序中可能修改前者的狀態
/**
* 以排他的不間斷模式獲取已在佇列中的執行緒,
* 用于條件等待方法以及獲取,
*
* 當一個執行緒執行完任務后,不會洗掉自己,而是保持在Head上
* 此時后續節點用cas進行爭奪,成功后更新head
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//獲取之前的節點
final Node p = node.predecessor();
//如果p==Head表示自己在佇列的第一位,
// tryAcquire()為嘗試獲取鎖,為子類實作,若成功則表示搶到了鎖的權限(CAS操作)
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 若需要阻塞,則進行阻塞操作,并設定interrupt為true,執行緒將在這里阻塞,
// 直到有鎖的釋放,才會喚醒第一個等待的,
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
//若發生了例外,導致添加了佇列,但是例外彈出了
//這里需要洗掉創建的節點
if (failed)
cancelAcquire(node);
}
}
檢查自己是不是應該阻塞,Node信號在這里更新
這個方法有意思的地方是
- 要是前者狀態是取消,就會繼續向前訪問并不斷更新前置節點(這會造成GC回收這個取消的節點),最后斷掉這個廢掉的鏈(也可能是一個節點)
- 要是前者是0(默認狀態)就會修改前者為signal,然后回傳,再次嘗試,在下次訪問這里的時候再阻塞,
/**
*
* 判斷當前這個節點是不是應該阻塞起來
*
* 檢查并更新無法獲取的節點的狀態,
* 如果執行緒應阻塞,則回傳true,
* 這是所有采集回圈中的主要信號控制,
* 要求pred == node.prev,
*
* 前者為signal,阻塞
* 前者為cancel,更新前者
* 其他情況:更新前者為signal(更新后再一輪嘗試,還沒有鎖
* 就會再次訪問這里并阻塞了)
*
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* 該節點已經設定了狀態,要求釋放以發出信號,以便可以安全地停放
*/
return true;
if (ws > 0) {
/*
* 若前者是取消的,
* 則繼續向前找并更新記錄中的前者
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
//最后斷掉這個廢掉的鏈
pred.next = node;
} else {
/*
* waitStatus一定為0或PROPAGATE,
* 表示我們需要一個信號,但不要阻塞,
* 呼叫者將需要重試以確保在阻塞前無法獲取
*/
//嘗試修改狀態位signal,下次訪問時就會阻塞
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
release()原始碼實作
/**
* 解鎖,獨占鎖解鎖
* 以獨占模式發布,
* 呼叫子類實作的tryRelease,然后喚醒下一個等待的節點
* 如果{@link #tryRelease}回傳true,則通過解鎖一個或多個執行緒來實作,
* 此方法可用于實作方法{@link Lock#unlock}.
*/
public final boolean release(int arg) {
//嘗試解鎖,呼叫的是子類的實作,
//注意子類撰寫這里時,獨占鎖要判斷能不能解鎖
if (tryRelease(arg)) {
Node h = head;
//若有人在等待鎖,就喚醒第一個節點
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
喚醒后繼節點,
這里有兩個細節:
- 傳參是Head,在本方法中若Head不是0狀態就會更新到0狀態,更新到0狀態后,這個方法其他執行緒就不會訪問這個方法了,
- 查詢可用節點是從后向前查找的,因為前面的節點要是null,是沒有后置節點記錄的,
/**
* 喚醒節點的后繼者(如果存在),
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* 如果狀態是否定的(即可能需要信號),請嘗試清除以預期發出信號,
* 如果失敗或等待執行緒更改狀態,則可以,
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* 釋放執行緒將保留在后續執行緒中,該執行緒通常只是下一個節點,
* 但是,如果已取消或明顯為空,請從尾部向后移動以找到實際的未取消后繼,
*/
Node s = node.next;
//waitStatus表示該執行緒已經取消(只有cancel是大于0的)
if (s == null || s.waitStatus > 0) {
//若后繼節點是空的,或者這個節點已經被取消了,那么需要嘗試尋找一個合適的節點
s = null;
//從后往前遍歷,不斷更新s(要喚醒的執行緒)
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//喚醒一個執行緒(unpark方法是將執行緒從阻塞狀態喚醒)
LockSupport.unpark(s.thread);
}
以上,就是AQS中獨占鎖的原理,關于共享鎖,且聽下回分解
關注博主,不迷路:可樂可樂可的個人主頁
若文章對你有助,求賞一鍵三連

轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/262968.html
標籤:其他
上一篇:孫叫獸:我所認為的領導力!
