
上一節,你應該學到了ReentrantLock底層基于AQS的3個小組件 state、owner、queue,并且了解了下一個執行緒1進行加鎖修改owner和state的程序,還記得么?加鎖成功后,如下圖所示的狀態:

首次加鎖的時候,只使用到了owner和state這兩個小組件,并沒有涉及到等待佇列,所以這一節,我們繼續看一下,如果有下一個執行緒—執行緒2,這個哥們過來加鎖會是如何的?
直接從JDK原始碼層面理解AQS的另一個執行緒也來加鎖的入隊邏輯
直接從JDK原始碼層面理解AQS的另一個執行緒也來加鎖的入隊邏輯
當執行緒2這個哥們進行加鎖的時候,假設執行緒1還沒有釋放鎖,也就是基于上面的圖的狀態,執行緒2進行加鎖,同樣會走到如下lock方法的代碼:
static final class NonfairSync extends Sync {
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
如果執行緒2進行lock,當執行compareAndSetState(0,1)的時候,由于state此時已經是1了,肯定會CAS操作失敗,計入else邏輯,在NonFairSync的父類AQS中可以找到如下代碼:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// 接著又會呼叫NonFairSync實作的tryAcquire:
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
這個上面的tryAcquire方法實際呼叫了一個nonfairTryAcquire,從名字上看,叫做非公平獲取的一個方法,(后面講非公平鎖的會講到),
但是當你看過這個方法的脈絡你會發現,state是1,第一個if不滿足,owner是執行緒1,當前是執行緒2,第二個if也不滿足,結果直接回傳了false,
所以到這里你會發現執行緒2加鎖,截止到現在,會執行到如下圖所示步驟3所示:

接著由于tryAcquire回傳false,會進入&&后面的方法呼叫addWaiter(Node.EXCLUSIVE),
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
addWaiter從名字上,你可以連蒙帶猜下,其實這個方法的意思就是添加到等待佇列的進行等待的意思,讓我們來看下:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
首先傳入的是一個Node mode,就是Node.EXCLUSIVE,從名字上看就是一個獨占Node的意思,
你可以這個Node.EXCLUSIVE看下:
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
}
果然,你可以看到Node中有一堆靜態變數,通過null,空Node、1、1、-2、-3表示一些Node的角色型別,
接著往下看addWaitder方法:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
//省略后面代碼
return node;
}
這個new Node又做了什么?可以看下Node的構造方法和成員變數:
volatile Node prev;
volatile Node next;
volatile int waitStatus;
volatile Thread thread;
Node nextWaiter;
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
除了next、prev、thread表示雙向鏈表的前后指標和對應的資料元素之外,還有兩個變數nextWaiter和waitStatus,可以從名字上猜出來,表示等待節點和等待狀態的意思,
這里傳入了thread=執行緒2,mode= EXCLUSIVE = null ,其實nextWaiter這里更像是個標記,表示獨占型別的Node,或者說是執行緒2正在等待的是一個獨占鎖,創建的node如下圖所示:

接著addwaiter創建完成節點node后,繼續執行代碼pred指標指向tail,但是默認tail是null,所以直接呼叫enq(node)方法,看樣子是要進行入隊,enqueue的意思, 代碼如下:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
執行到這里就會得到如下結果:

AQS的本質:為啥叫做異步佇列同步器?
AQS的本質:為啥叫做異步佇列同步器?
接著我們需要分析下enq(node)這個入隊方法了:
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
從脈絡上看,是一個經典for回圈+CAS 自旋操作,你可以跟著看下代碼執行的思路:
1)第一次for回圈
首先t指向tail,tail由于是null,t剛開始肯定是null,進入第一個if,
接著通過CAS操作compareAndSetHead,將head指向了新建的一個Node,成功后將tail指向了head,
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
所以會得到如下圖所示結果:

2)第二次for回圈
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
此時ReentrantLock的tail和head已經指向了空的new Node(),
接著還是t=tail, t此時不為空了,走到了else邏輯,使用入參node節點的prev指向了t所指向的空Node,
之后通過CAS操作compareAndSetTail,將tail指向到入參node節點,
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
最后通過t的next也指向了入參node節點,
也就是如下圖所示:

從上圖,我們就可以看出來,執行緒2的node和空node連接起來,形成了一個雙向鏈表,之前學習LinkedList你應該已經知道,雙向鏈表也可以當做佇列使用,所以這里你可以當做將node進行了入隊操作,
這個其實就是AQS的本質,等待佇列組件的作用,
當執行緒2進行了入隊等待,這里你可以簡化一下流程圖,你可以得到如下的圖:

加鎖失敗的時候如何借助AQS異步入隊阻塞等待?
加鎖失敗的時候如何借助AQS異步入隊阻塞等待?
入隊后,接著就結束了么?不是,還需要修改下執行緒2的狀態,將他進行掛起,既然已經排上隊了,就不要占用CPU資源了,是不是?
我們看下是如何做的:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
之前我們執行完了addWaiter,回傳的節點是node,也就是執行緒2對應的等待節點,arg是1,接著進入了acquireQueued這個方法:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
目前等待佇列情況如下:

這個方法核心脈絡,是一個無限for回圈,當中有兩個if,
接著我們看下細節:
1)第一次For回圈:
首先上來使用一個輔助指標p,指向了node節點的前一個節點,node.predecessor其實就是p=node.prev,代碼如下:
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
由于head等于p,就還是嘗試獲取一次鎖,tryAcquire(arg),這里假設執行緒1還沒有釋放鎖,tryAcquire(arg)肯定還是會失敗回傳false,所以第一個if不成立,(如果獲取成功,這個if其實會將執行緒2移出佇列的)
接著執行第二個if判斷,先進行了shouldParkAfterFailedAcquire方法呼叫,第一個引數傳入p,就是空Node,第二引數傳入node,就是執行緒2對應的node,
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
第一個引數傳入p,就是空Node,第二引數傳入node,就是執行緒2對應的node,
它們兩個節點的waitStatus都是0,所以經過上面代碼,會執行到最后一個else,
會通過CAS操作,將空Node的waitStatus狀態(ws)從0改為Node.SIGNAL(-1),如下圖所示:

接著shouldParkAfterFailedAcquire就直接回傳false,第一個條件false,就會直接進行下一次for回圈了,
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
2)第二次For回圈
假設執行緒1還是沒有釋放鎖,上面的for回圈還是會進入如下方法,但是其實的pred也就是空Node的watiStatus已經被改成SIGNAL(-1),所以之里會回傳true,
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
接著下面這個if第一個條件是ture會判斷第二條件parkAndCheckInterrupt
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
parkAndCheckInterrupt這個方法從名字看叫做掛起并且檢查執行緒是否被打斷,代碼如下
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
可以看到他核心呼叫了一個工具類LockSupport.park(this);
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
setBlocker(t, null);
}
這個底層是通過UNSAFE的C++代碼實作的,我們就不去看了,你只要知道,這個park操作會將執行緒掛起,進入等待狀態就可以了,還記得之前將執行緒的狀態圖么?

park操作會將執行緒掛起,進入Waiting等待狀態,也就是說執行緒2加鎖失敗最終就是入隊并且等待,
今天這一節,到這里就把AQS中入隊的邏輯給大家講清楚了,執行緒獲取鎖失敗如何入隊?如何掛起的?相信你都很清楚了,你可以自己用第三個執行緒嘗試加鎖失敗徹底圖解AQS佇列等待機制試試,最后學完,如果你可以畫出這個圖,就說嘛你真正明白了AQS的基本原理了,

小結&思考
小結&思考
雖然這個入隊邏輯看著比較復雜,但其實大家可以抽象出這個佇列的設計是基于:CAS操作+Node狀態+執行緒標記控制就可以了,
可以多思考下關鍵思想和關鍵點,不用太糾結細節,比如多思考下為啥設計了狀態,是為了單獨使用Condition嗎?還是,,,,
這些思考才是最重要的!
下一節,我們看下如果執行緒1釋放了鎖,如何喚醒佇列中元素的,喚醒的時候如果有本地執行緒來加鎖,還能插隊!?所以下一節也會給大家介紹下什么是公平和非公平鎖,
本文由博客群發一文多發等運營工具平臺 OpenWrite 發布
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/341688.html
標籤:Java
上一篇:設計模式系列-單例模式
