摘要:當你使用java實作一個執行緒同步的物件時,一定會包含一個問題:你該如何保證多個執行緒訪問該物件時,正確地進行阻塞等待,正確地被喚醒?
本文分享自華為云社區《JUC中的AQS底層詳細超詳解,剖析AQS設計中所需要考慮的各種問題!》,作者: breakDawn ,
java中AQS究竟是做什么的?
當你使用java實作一個執行緒同步的物件時,一定會包含一個問題:
你該如何保證多個執行緒訪問該物件時,正確地進行阻塞等待,正確地被喚醒?
關于這個問題,java的設計者認為應該是一套通用的機制
因此將一套執行緒阻塞等待以及被喚醒時鎖分配的機制稱之為AQS
全稱 AbstractQuenedSynchronizer
中文名即抽象的佇列式同步器 ,
基于AQS,實作了例如ReentenLock之類的經典JUC類,
AQS簡要步驟
- 執行緒訪問資源,如果資源足夠,則把執行緒封裝成一個Node,設定為活躍執行緒進入CLH佇列,并扣去資源
- 資源不足,則變成等待執行緒Node,也進入CLH佇列
- CLH是一個雙向鏈式佇列, head節點是實際占用鎖的執行緒,后面的節點則都是等待執行緒所對應對應的節點
AQS的資源state
state定義
AQS中的資源是一個int值,而且是volatile的,并提供了3個方法給子類使用:
private volatile int state; protected final int getState() { return state; } protected final void setState(int newState) { state = newState; } // cas方法 compareAndSetState(int oldState, int newState);
如果state上限只有1,那么就是獨占模式Exclusive,例如 ReentrantLock
如果state上限大于1,那就是共享模式Share,例如 Semaphore、CountDownLatch、ReadWriteLock,CyclicBarrier
已經有CAS方法了,為什么資源state還要定義成volatile的?
對外暴露的getter/setter方法,是走不了CAS的,而且setter/getter沒有被synchronized修飾,所以必須要volatile,保證可見性
這樣基于AQS的實作可以直接通過getter/setter操作state變數,并且保證可見性,也避免重排序帶來的影響,比如CountDownLatch,ReentrantReadWriteLock,Semaphore都有體現(各種getState、setState)
對資源的操作什么時候用CAS,什么使用setState?
volatile的state成員有一個問題,就是如果是復合操作的話不能保證復合操作的原子性
因此涉及 state增減的情況,采用CAS
如果是state設定成某個固定值,則使用setState
AQS的CLH佇列
為什么需要一個CLH佇列
這個佇列的目的是為了公平鎖的實作
即為了保證先到先得,要求每個執行緒封裝后的Node按順序拼接起來,
CLH本質?是一個Queue容器嗎
不是的,本質上是一個鏈表式的佇列
因此核心在于鏈表節點Node的定義
除了比較容易想到的prev和next指標外
還包含了該節點內的執行緒
以及 waitStatus 等待狀態
4種等待狀態如下:
- CANCELLED(1): 因為超時或者中斷,節點會被設定為取消狀態,被取消的節點時不會參與到競爭中的,他會一直保持取消狀態不會轉變為其他狀態;
- SIGNAL(-1):后繼節點的執行緒處于等待狀態,而當前節點的執行緒如果釋放了同步狀態或者被取消,將會通知后繼節點,使后繼節點的執行緒得以運行
- CONDITION(-2) : 點在等待佇列中,節點執行緒等待在Condition上,當其他執行緒對Condition呼叫了signal()后,改節點將會從等待佇列中轉移到同步佇列中,加入到同步狀態的獲取中
- PROPAGATE(-3) : 表示下一次共享式同步狀態獲取將會無條件地傳播下去
- INIT( 0):
入隊是怎么保證安全的?
入隊程序可能引發沖突
因此會用CAS保障入隊安全,
private Node enq(final Node node) { //多次嘗試,直到成功為止 for (;;) { Node t = tail; //tail不存在,設定為首節點 if (t == null) { if (compareAndSetHead(new Node())) tail = head; } else { //設定為尾節點 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
出隊程序會發生什么?
一旦有節點出隊,說明有執行緒釋放資源了,隊頭的等待執行緒可以開始嘗試獲取了,
于是首節點的執行緒釋放同步狀態后,將會喚醒它的后繼節點(next)
而后繼節點將會在獲取同步狀態成功時將自己設定為首節點
注意在這個程序是不需要使用CAS來保證的,因為只有一個執行緒能夠成功獲取到同步狀態
AQS詳細資源獲取流程
1. tryAcquire嘗試獲取資源
AQS使用的設計模式是模板方法模式,
具體代碼如下:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 發現中斷過,則觸發中斷例外 selfInterrupt(); }
即AQS抽象基類AbstractQueuedSynchronizer給外部呼叫時,都是調的acquire(int arg)方法,這個方法的內容是寫死的,
而acquire中,需要呼叫tryAcquire(arg), 這個方法是需要子類實作的,作用是判斷資源是否足夠獲取arg個
(下面部分代碼注釋選自: (2條訊息) AQS子類的tryAcquire和tryRelease的實作_Mutou_ren的博客-CSDN博客_aqs tryacquire )
ReentrantLock中的tryAcquire實作
這里暫時只談論一種容易理解的tryAcuire實作,其他附加特性的tryAcquire先不提,
里面主要就做這幾件事:
- 獲取當前鎖的資源數
- 資源數為0,說明可以搶, 確認是前置節點是頭節點,進行CAS試圖爭搶,搶成功就回傳true,并設定當前執行緒
- 沒搶成功,回傳false
- 如果是重入的,則直接set設定增加后的狀態值,狀態值此時不一定為0和1了
protected final boolean tryAcquire(int acquires){ final Thread current = Thread.currentThread(); int c = getState(); // state==0代表當前沒有鎖,可以進行獲取 if (c == 0) { // 非公平才有的判斷,會判斷是否還有前驅節點,直接自己為頭節點了或者同步佇列空了才會繼續后面的鎖的獲取操作 if (!hasQueuedPredecessors() //CAS設定state為acquires,成功后標記exclusiveOwnerThread為當前執行緒 && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 當前占有執行緒等于自己,代表重入 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; // 出現負數,說明溢位了 if (nextc < 0) // throw new Error("Maximum lock count exceeded"); // 因為是重入操作,可以直接進行state的增加,所以不需要CAS setState(nextc); return true; } return false; }
2.addWaiter 添加到等待佇列
當獲取資源失敗,會進行addWaiter(Node.EXCLUSIVE), arg),
目的是創建一個等待節點Node,并添加到等待佇列
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; // 通過CAS競爭隊尾 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 競爭隊尾失敗,于是進行CAS頻繁回圈競爭隊尾 enq(node); return node; } private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
3. acquireQueued回圈阻塞-競爭
并在 "處于頭節點時嘗試獲取資源->睡眠->喚醒“中回圈,
當已經跑完任務的執行緒釋放資源時,會喚醒之前阻塞的執行緒,
當被喚醒后,就會檢查自己是不是頭節點,如果不是,且認為可以阻塞,那就繼續睡覺去了
(下面代碼注釋部分選自AQS(acquireQueued(Node, int) 3)–佇列同步器 - 小窩蝸 - 博客園 (http://cnblogs.com) )
final boolean acquireQueued(final Node node, int arg) { // 標識是否獲取資源失敗 boolean failed = true; try { // 標識當前執行緒是否被中斷過 boolean interrupted = false; // 自旋操作 for (;;) { // 獲取當前節點的前繼節點 final Node p = node.predecessor(); // 如果前繼節點為頭結點,說明排隊馬上排到自己了,可以嘗試獲取資源,若獲取資源成功,則執行下述操作 if (p == head && tryAcquire(arg)) { // 將當前節點設定為頭結點 setHead(node); // 說明前繼節點已經釋放掉資源了,將其next置空,好讓虛擬機提前回收掉前繼節點 p.next = null; // help GC // 獲取資源成功,修改標記位 failed = false; // 回傳中斷標記 return interrupted; } // 若前繼節點不是頭結點,或者獲取資源失敗, // 則需要判斷是否需要阻塞該節點持有的執行緒 // 若可以阻塞,則繼續執行parkAndCheckInterrupt()函式, // 將該執行緒阻塞直至被喚醒 // 喚醒后會檢查是否已經被中斷,若回傳true,則將interrupted標志置于true if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { // 最侄訓取資源失敗,則當前節點放棄獲取資源 if (failed) cancelAcquire(node); } }
4.shouldParkAfterFailedAcquire 檢查是否可以阻塞
該方法不會直接阻塞執行緒,因為一旦執行緒掛起,后續就只能通過喚醒機制,中間還發生了內核態用戶態切換,消耗很大,
因此會先不斷確認前繼節點的實際狀態,在只能阻塞的情況下才會去阻塞,
并且會過濾掉cancel的執行緒節點
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 獲取前繼節點的等待狀態 int ws = pred.waitStatus; // 如果等待狀態為Node.SIGNAL(-1),則直接回傳true即可以阻塞 // 因為這說明前繼節點完成資源的釋放或者中斷后,會主動喚醒后繼節點的(這也即是signal信號的含義),因此方法外面不用再反復CAS了,直接阻塞吧 if (ws == Node.SIGNAL) return true; // 如果前繼節點的等待值大于0即CANCELLED(1),說明前繼節點的執行緒發生過cancel動作 // 那就繼續往前遍歷,直到當前節點的前繼節點的狀態不為cancel if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 前繼節點的等待狀態不為SIGNAL(-1),也不為Cancel(1) // 那么只能是PROPAGATE(-3)或者CONDITION(-2)或者INITIAL(0) // 直接設定成SIGNAL,下一次還沒CAS成功,就直接睡覺了 // 因此在前面所有節點沒辯護的情況下, 最多一次之后就會回傳true讓外面阻塞 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
5.parkAndCheckInterrupt() 阻塞執行緒
使用LockSupport.park來阻塞當前這個物件所在的執行緒
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); // 確認是否是中斷導致的park結束,并清除中斷標記 return Thread.interrupted(); } public static void park(Object blocker) { Thread t = Thread.currentThread(); setBlocker(t, blocker); UNSAFE.park(false, 0L); setBlocker(t, null); }
lockSupport.park()和普通的wait|notify都有啥區別?
- 面向的主體不一樣,LockSuport主要是針對Thread進進行阻塞處理,可以指定阻塞佇列的目標物件,每次可以指定具體的執行緒喚醒,Object.wait()是以物件為緯度,阻塞當前的執行緒和喚醒單個(隨機)或者所有執行緒,
- 實作機制不同,雖然LockSuport可以指定monitor的object物件,但和object.wait(),兩者的阻塞佇列并不交叉,可以看下測驗例子,object.notifyAll()不能喚醒LockSupport的阻塞Thread.
如果還要深挖底層實作原理,可以詳細見該鏈接
簡而言之,是用mutex和condition保護了一個_counter的變數,當park時,這個變數置為了0,當unpark時,這個變數置為1,
底層用的C語言的pthread_mutex_unlock、pthread_cond_wait 、pthread_cond_signal ,但是針對了mutex和_cond兩個變數進行加鎖,
6.總體流程圖
代碼中頻繁出現的interruptd中斷標記是做什么用的?
對執行緒呼叫 t1.interrupt();時
會導致 LockSupport.park() 阻塞的執行緒重新被喚醒
即有兩種喚醒情況: 被前置節點喚醒,或者被外部中斷喚醒
這時候要根據呼叫的acuire型別決定是否在中斷發生時結束鎖的獲取,
上面介紹的是不可中斷鎖,
在parkAndCheckInterrupt中,當park結束阻塞時時,使用的是 Thread.interrupted() 而不是 .isInterrupted() 來回傳中斷狀態
因為前者會回傳執行緒當前的中斷標記狀態同時清除中斷標志位(置為false)
外層CAS回圈時, 就不會讓執行緒受中斷標記影響,只是記錄一下是否發生過中斷
當獲取鎖成功后,如果發現有過執行緒中斷,則會觸發中斷例外,
之后便由獲取鎖的呼叫者自己決定是否要處理執行緒中斷,像下面這樣:
reentrantLock.lock(); try { System.out.println("t1"); TimeUnit.SECONDS.sleep(30); } catch (InterruptedException e) { e.printStackTrace(); } finally { reentrantLock.unlock(); }
那么另一種情況就是可中斷鎖了,
ReentranLock有一個lockInterruptibly()方法就是這種情況
執行緒被喚醒時,如果發現自己被中斷過,就會直接拋例外而不是繼續獲取鎖
因此如果你的執行緒對中斷很敏感,那么就是用可中斷鎖,及時回應,
如果不敏感,也要注意處理中斷例外,
AQS的詳細資源釋放流程
首先AQS提供的模板方法為release方法,
核心邏輯就是對資源進行嘗試性釋放
如果成功,就喚醒等待佇列中的第一個頭節點
public final boolean release(int arg) { // 是否釋放成功,tryRelease是子類要實作的方法 if (tryRelease(arg)) { Node h = head; // 判斷頭節點是否正在阻塞中,是的話喚醒 if (h != null && h.waitStatus != 0) // 喚醒頭節點 unparkSuccessor(h); return true; } return false; }
看一下ReteenLock中的tryRelease實作
就是減一下資源值,
當資源值清零,則說明可以解除了對當前點的占用
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; // 設定當前占用執行緒為null setExclusiveOwnerThread(null); } // 不需要CAS,因為只有持有鎖的人才能做釋放,不擔心競爭 setState(c); return free; }
AQS如何實作公平和非公平?
以ReteenLock為例,它內部tryAcquire有兩種同步器的實作
- 非公平同步器NonfairSync
- 公平同步器FairSync
公平同步器和非公平同步器都是ReentrantLock中定義的一個static內部類
ReentrantLock根據配置的不同,使用這2個同步器做資源的獲取和同步操作
他們二者的提供的lock操作,本質上就是AQS的acquire(1)
static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; final void lock() { acquire(1); }
二者在公平和非公平的實作區別上,就是喚醒執行緒后,只有等待佇列的隊頭節點才會嘗試競爭,
而非公平鎖是只要喚醒了就可以嘗試競爭,
因此核心區別在于hasQueuedPredecessors方法!
公平和非公平鎖的優點和缺點
- 饑餓問題
非公平鎖可能引發“饑餓”,即一個執行緒反復搶占獲取,而其他執行緒一直拿不到,
而公平鎖不存在饑餓,只要排上隊了就一定能拿到
- 性能問題
非公平鎖的平均性能比公平鎖要高, 因為非公平鎖中所有人都可以CAS搶占,如果同步塊的時間非常短,那么可能所有人都不需要阻塞,減少CPU喚醒執行緒的開銷,整體的吞吐效率會高點,CPU也不必取喚醒所有執行緒,會減少喚起執行緒的數量,
性能測驗中公平鎖的耗時是非公平鎖的94.3倍, 總切換次數是133倍
Lock類是默認公平還是非公平?
默認是非公平的,原因就是勺ò訃慮的性能差距過大問題, 因此公平鎖只能用于特定對性能要求不高且饑餓發生概率不大的場景中,
獨占模式和共享模式的AQS區別
- 名字上, 共享模式都會帶一個shard
- 回傳值上,獨占模式相關acuire方法放回的是boolean型別, 而共享模式回傳的是int值
- 核心概念上, 區別在于同一時刻能否有多個執行緒可以獲取到其同步狀態
- 釋放時,共享模式需要用CAS進行釋放, 而獨占模式的release方法則不需要,直接setState即可,
- 共享模式應用:信號量、讀寫鎖
共享模式信號量Semaphore的Sync同步器
先實作了一個靜態內部類Sync
和上面的RLock類一個區別在于需要state初始化值,不一定為1
Sync(int permits) { setState(permits); }
再繼承實作了FairSync和NoFairSync
使用CAS實作值的增加或者減少
公平/非公平的區別同樣是hasQueuedPredecessors的判斷
protected int tryAcquireShared(int acquires) { for (;;) { // 隊頭判斷,公平鎖核心 if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; // 信號量不足,直接回傳負數 if (remaining < 0 || // 能搶成功,回傳修改后的值,搶失敗則for回圈繼續 compareAndSetState(available, remaining)) return remaining; } }
AQS如何處理重入
通過current == getExclusiveOwnerThread()來判斷并進行非CAS的setState操作
if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; // 出現負數,說明溢位了 if (nextc < 0) // throw new Error("Maximum lock count exceeded"); // 因為是重入操作,可以直接進行state的增加,所以不需要CAS setState(nextc); return true; }
注意處理重入問題時,如果是獨占鎖,是可以直接setState而不需要CAS的,因為不會競爭式地重入!
ReentrantLock釋放時,也會處理重入,關鍵點就是對getState() - release后的處理,是否回傳true或者false
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { // 只有資源數為0才會解鎖 // 才算釋放成功,否則這鎖還是占住了 free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
AQS如何回應超時
AQS提供的方法中帶有Nanos后綴的方法就是支持超時中斷的方法,
核心邏輯就是每次阻塞前,確認nanosTimeout是否已經超時了,
每次喚醒時,將nanosTimeout減去阻塞所花的時間,重新確認,并修改lastTime
關鍵部分見下圖
spinForTimeoutThreshold是什么?
首先這個值是寫死的1000L即1000納秒
1000納秒是個非常小的數字,而小于等于1000納秒的超時等待,無法做到十分的精確,那么就不要使用這么短的一個超時時間去影響超時計算的精確性,所以這時執行緒不做超時等待,直接做自旋就好了,
點擊關注,第一時間了解華為云新鮮技術~
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/519261.html
標籤:其他
