如果 把ReentrantLock比做一個人的話,那么 AQS 就是他的靈魂,離開 AQS 談論鎖都是耍流氓
ReentrantLock and AQS
- 一.AQS使用方式和其中的設計模式
- 二.重要引數
- 三.了解其中的方法
- 1.模板方法:
- ???獨占式獲取
- ???共享式獲取
- ???獨占式釋放鎖
- ???共享式釋放鎖
- 2.需要子類覆寫的流程方法
- 3.同步狀態state:
- 三、原始碼
- 1.lock
- 實作類原始碼:ReentrantLock 為例
- AbstractQueuedSynchronizer 原始碼
- 如下為 addWaiter 方法 這一支的操作,
- 如下為acquireQueued 的操作
- 總結
- 2.unlock
- 四、ReentrantLock原始碼分析
- 1.構造器
- 2. 非公平鎖獲取鎖
- 3.公平鎖獲取鎖
- 4.釋放鎖
一.AQS使用方式和其中的設計模式
繼承,模板方法設計模式
二.重要引數
- private volatile int state; 記錄當前鎖是否有執行緒拿到鎖、一個執行緒進入鎖的重入數,如果是0,代表沒有任何執行緒進入鎖,如果是 n,n>0 那么代表有個執行緒重入了 n 次
- AbstractOwnableSynchronizer: private transient Thread exclusiveOwnerThread:當前拿鎖的執行緒
- volatile int waitStatus:競爭失敗的執行緒會打包成Node放到同步佇列,Node可能的狀態里:
- CANCELLED(cancelled 1):該節點的執行緒可能由于超時或被中斷而處于被取消(作廢)狀態,一旦處于這個狀態,節點狀態將一直處于CANCELLED(作廢),因此應該從佇列中移除.
- SIGNAL(signal -1):拿到鎖的節點(head)和未拿到鎖正常等待的節點 waitState 都是 signal ,當前節點為SIGNAL時,后繼節點會被掛起,因此在當前節點釋放鎖或被取消之后必須被喚醒(unparking)其后繼結點. 如:如果head 釋放鎖以后,就會判斷 head.next 是否是 signal,是的話喚醒,
- CONDITION(condition -2) :當前節點處于等待佇列
- PROPAGATE(propagate -3):共享,表示狀態要往后面的節點傳播
0,表示初始狀態(拿完鎖的狀態)
- private transient volatile Node head:等待佇列的頭
- private transient volatile Node tail:等待佇列的尾
三.了解其中的方法
1.模板方法:
???獨占式獲取
???accquire
???acquireInterruptibly
???tryAcquireNanos
???共享式獲取
???acquireShared
???acquireSharedInterruptibly
???tryAcquireSharedNanos
???獨占式釋放鎖
???release
???共享式釋放鎖
???releaseShared
2.需要子類覆寫的流程方法
???獨占式獲取 tryAcquire
???獨占式釋放 tryRelease
???共享式獲取 tryAcquireShared
???共享式釋放 tryReleaseShared
???這個同步器是否處于獨占模式 isHeldExclusively
3.同步狀態state:
getState:獲取當前的同步狀態
setState:設定當前同步狀態
compareAndSetState 使用CAS設定狀態,保證狀態設定的原子性
三、原始碼
1.lock
實作類原始碼:ReentrantLock 為例
final void lock() {
acquire(1);
}
AbstractQueuedSynchronizer 原始碼
//p1197
public final void acquire(int arg) {
//tryAcquire 為需要覆寫,需要子類實作的方法
//EXCLUSIVE表示正在一個獨占模式下等待
//如果一個執行緒執行 tryAcquire 恰好拿到了鎖,那么就不再執行acquireQueued了,也不會放到待執行的佇列了
if (!tryAcquire(arg) &&
//addWaiter 下面 p605
//acquireQueued 下面857
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
//子類實作tryAcquire 方法示例
protected boolean tryAcquire(int arg) {
if(compareAndSetState(0,1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
如下為 addWaiter 方法 這一支的操作,
把呼叫 lock 的執行緒包裝成 node 節點放到佇列中,并把這個 node 節點回傳
判斷當前佇列有沒有尾節點(佇列是否為空):
- 如果不為空那么就把當前節點掛在佇列末尾(進行雙鏈表的一些操作),設定當前節點為尾節點
- 如果為空,呼叫 enq 的方法,構建一個『空節點node ? 當前執行緒 node』這樣的雙鏈表,enq 中使用自旋 CAS 來添加,防止有其他執行緒更改
//p605
//mode == null
private Node addWaiter(Node mode) {
// 使用當前執行緒 構造一個 node,構造器如下 p505
Node node = new Node(Thread.currentThread(), mode);
//如果當前有尾節點(等待佇列不為空,非初始化)
Node pred = tail;
if (pred != null) {
// 當前節點的前驅節點地址賦值
node.prev = pred;
// 使用 CAS 把當添加進來的節點設定成尾節點
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 如果當前沒有尾節點(等待佇列為空,初始化佇列的時候) p583
enq(node);
return node;
}
//p505
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
如果尾節點為空(那么head也是空的,整個佇列都是空的),才會執行enq,
所以此方法最后會形成一個『空節點node ? 當前執行緒 node』,這樣一個雙鏈表
//p583
private Node enq(final Node node) {
//自旋
for (;;) {
Node t = tail;
//佇列初始化
if (t == null) {
//cas 方式設定 空節點 到 head 中,且 tail 與 head 同步
if (compareAndSetHead(new Node()))
tail = head;
} else {
//當前節點 的 上一個節點指標指向 if 中 造出來的那個 空節點
node.prev = t;
//cas 設定當前節點為尾節點
if (compareAndSetTail(t, node)) {
//if 中 造出來的那個空節點的下一個節點指向當前節點
t.next = node;
return t;
}
}
}
}
如下為acquireQueued 的操作
head 節點為已經拿到鎖的執行緒
此方法基本思路
1.先拿到當前節點的上一個節點,如果上一個節點為head且當前節點嘗試拿了一下鎖拿到了,那么就把當前節點設定成head,上一個節點脫鉤,回傳 false
2.否則就會執行shouldParkAfterFailedAcquire、parkAndCheckInterrupt
3.感覺如果p == head && tryAcquire(arg)條件不滿足回圈將永遠無法結束,當然不會出現死回圈,奧秘在于parkAndCheckInterrupt會把當前執行緒掛起,從而阻塞住執行緒的呼叫堆疊(阻塞住的意思就是走到這個方法parkAndCheckInterrupt,就不動了,卡住了),
//p857
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//如果他的前驅節點是頭結點,且已經執行完畢(state 已經復位),執行tryAcquire后被我當前執行緒搶占了鎖,那么執行把當前節點設定為head的操作
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//下面的兩個方法主要是檢查執行緒的狀態,是否被中斷,且阻塞當前執行緒(parkAndCheck)
//shouldParkAfterFailedAcquire p795
//parkAndCheckInterrupt p835
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
//如果發生例外,會執行此段代碼,取消加入佇列
cancelAcquire(node);
}
}
#p795
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
//This node has already set status asking a release to signal it, so it can safely park.
//規則1:該方法首先檢查前趨結點的waitStatus位,如果為SIGNAL,表示前趨結點會通知它,那么它可以放心大膽地掛起了,回傳 true 后呼叫acquireQueued方法的 parkAndCheckInterrupt)將導致執行緒阻塞
return true;
if (ws > 0) {
//Predecessor was cancelled. Skip over predecessors and indicate retry.
//規則2:如果前趨結點是一個被取消的結點怎么辦呢?那么就向前遍歷跳過被取消的結點,直到找到一個沒有被取消的結點為止,將找到的這個結點作為它的前趨結點,將找到的這個結點的waitStatus位設定為SIGNAL,回傳false表示執行緒不應該被掛起,然后進去acquireQueued方法回圈
//接下來acquireQueued回圈會出現兩種結果
//1.因為在這一步干掉了前一個被取消了的節點,把前前一個節點變成了前一個節點,恰巧前一個節點還是頭結點,所以當前節點嘗試tryAcquire可能會獲取到鎖,
//2.前前一個節點不是頭結點,那么就會再次進入到該方法,此次前一個節點肯定是SIGNAL狀態,所以當前節點被毫無疑問的掛起(阻塞)
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
//如果前繼節點狀態為非SIGNAL、非CANCELLED,則設定前繼的狀態為SIGNAL,回傳false后進入acquireQueued的無限回圈
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
p835
//這個方法的主要任務就是阻斷執行緒
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
總結
- 子類實作 tryAcquire方法來具體操作拿鎖的操作,
- 用戶呼叫acquire 方法,acquire 首先呼叫tryAcquire看看是否能拿到鎖,如果拿到了那么回傳 true,程式也能順利往下執行,同時業務代碼也會被執行
- 如果拿不到,那么就把當前執行緒封裝成一個 Node,通過呼叫addWaiter放到一個雙鏈表當中,
- addWaiter還會把這個node 回傳,在acquireQueued方法會進行自旋,如果當前節點的前一個節點為頭結點,本節點會再一次呼叫tryAcquire嘗試進行拿鎖的操作,
- 如果拿不到接著執行,在shouldParkAfterFailedAcquire 中把前一個節點的 waitStatus 由默認的 0 改成 SIGNAL(-1),當前節點作為尾節點waitStatus狀態還是 0,
- 改完waitStatus狀態之后,執行parkAndCheckInterrupt 阻塞當前執行緒,注:如果shouldParkAfterFailedAcquire回傳false,&& 會短路,parkAndCheckInterrupt 阻塞當前執行緒,注:如果shouldParkAfterFailedAcquire就不執行了,但是acquireQueued是一個自旋,待執行shouldParkAfterFailedAcquire,waitStatus狀態都改成-1后,還是會執行 parkAndCheckInterrupt來阻塞當前執行緒
2.unlock
public final boolean release(int arg) {
//tryRelease aqs 的具體實作類重寫
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
//設定 waitstatus 為0
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
//如果下個節點是null或者下個節點被cancelled,就找到佇列最開始的非cancelled的節點
if (s == null || s.waitStatus > 0) {
s = null;
//為什么要從后往前找??原因在addWaiter方法:
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null){
//釋放 一個被阻塞的執行緒
LockSupport.unpark(s.thread);
}
}
//我們從這里可以看到,節點入隊并不是原子操作,也就是說,node.prev = pred; compareAndSetTail(pred, node) 這兩個地方可以看作Tail入隊的原子操作
//但是此時pred.next = node;還沒執行,如果這個時候執行了unparkSuccessor方法,就沒辦法從前往后找了,所以需要從后往前找,
//還有一點原因,在產生CANCELLED狀態節點的時候,先斷開的是Next指標,Prev指標并未斷開,因此也是必須要從后往前遍歷才能夠遍歷完全部的Node,
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
呼叫 LockSupport.unpark(s.thread); 釋放當前執行緒阻塞,程式順序往下執行業務代碼,
四、ReentrantLock原始碼分析
1.構造器
引數為慷訓 false 是非公平鎖,為 true 是公平鎖,
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
2. 非公平鎖獲取鎖
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
final void lock() {
//如果直接修改了 state ,那么就證明現在沒有其他資源在獲得鎖,本執行緒直接拿到鎖,(原來鎖就是操作了 一個 volitale 的 state)
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
//實作 aqs 的 tryAcquire
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//如果是0,證明還沒有執行緒拿到這個鎖,直接 cas 設定 state
if (c == 0) {
if (compareAndSetState(0, acquires)) {
//記錄當前拿到鎖的執行緒是哪個
setExclusiveOwnerThread(current);
return true;
}
}
//鎖重入的時候
//如果當前拿鎖的執行緒是 已經拿到鎖的執行緒,那么增加重入次數
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
//如果當前鎖已經被獲取,且拿鎖的執行緒不是當前執行緒,那么回傳 false,tryAcquire 失敗
return false;
}
3.公平鎖獲取鎖
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//如果目前沒有執行緒拿鎖
if (c == 0) {
//hasQueuedPredecessors 是 公平鎖的關鍵
//對下面的 if 換一種好理解的寫法
/**
if(!hasQueuedPredecessors()){
if(compareAndSetState(0, acquires)){
setExclusiveOwnerThread(current);
return true;
}
}
*/
//如果 hasQueuedPredecessors 回傳 false,那么證明等待佇列中沒有執行緒在排隊, 那么執行 compareAndSetState 拿鎖
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
//hasQueuedPredecessors是公平鎖加鎖時判斷等待佇列中是否存在有效節點的方法,
//如果回傳False,說明當前執行緒可以爭取共享資源;如果回傳True,說明佇列中存在有效節點,當前執行緒必須加入到等待佇列中,
//判斷佇列中是否有 有效節點,以真實執行緒節點 入隊為準,不以虛節點(new Node())為準
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
//head 的下一個元素
Node s;
//1.如果 h != t 為false (h == t 為 true) ,那么直接回傳 false,在tryAcquire 中執行 set state 操作
//2.(s = h.next) == null && h != t 是走到了 enq ①處,此時 tail == node,head == new Node(),
// head 的 next 還未指向 tail,但是 node 已入 tail,所以公平起見當前執行緒就不能再搶占鎖了, 回傳 true,使當前執行緒不會再設定 state
//3.如果第二次拿鎖的執行緒 和第一次拿執行緒的執行緒不是同一個執行緒,那么第二個執行緒應該讓渡第一個執行緒(如果是同一個執行緒,那么兩次拿鎖就不分先后)
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
//分析此段代碼需從 enq 方法看起
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
//①
t.next = node;
return t;
}
}
}
}
總結:
- NonfairSync 和 fairSync 的根本區別在于tryAcquire 中 當 state == 0 時(即當前鎖沒有被任何一個執行緒占有),當前執行緒是直接嘗試拿鎖(非公平)還是查看等待佇列是否有元素來決定去排隊還是嘗試拿鎖(當等待佇列有元素就排隊,沒有元素就拿鎖),
- 公平鎖直接的提現 方法是 hasQueuedPredecessors
- tryAcquire 有兩個地方呼叫,1)lock 的時候直接呼叫一次,2)加入佇列后再次嘗試呼叫,
final void lock() {
acquire(1);
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
4.釋放鎖
protected final boolean tryRelease(int releases) {
//可重入鎖 state -1
int c = getState() - releases;
//如果解鎖執行緒 和 持鎖執行緒不是同一個 拋例外
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//如果當前執行緒的鎖全部解完,那么就代表 此執行緒已經釋放了鎖
if (c == 0) {
//釋放鎖成功
free = true;
//持鎖執行緒 釋放
setExclusiveOwnerThread(null);
}
//設定 state 數量
setState(c);
return free;
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/yidong/128605.html
標籤:其他
上一篇:JUC并發系列(五):CopyOnWriteArraySet解決HashSet非執行緒安全(代碼示例)
下一篇:工具類篇【一】String字串
