多執行緒總結(四)萬字長文,一篇徹底看懂ReentrantLock,AQS的原始碼
一.前言
首先在聊ReentrantLock之前,我們需要知道整個JUC的并發同步的基石,currrent里面所有的共享變數都是由volatile修飾的,我們知道volatile的語意有2大特點,可見性以及防止重排序(記憶體屏障,volatie寫與volatile讀)
【1、當第二個操作為volatile寫操做時,不管第一個操作是什么(普通讀寫或者volatile讀寫),都不能進行重排序,這個規則確保volatile寫之前的所有操作都不會被重排序到volatile之后;
2、當第一個操作為volatile讀操作時,不管第二個操作是什么,都不能進行重排序,這個規則確保volatile讀之后的所有操作都不會被重排序到volatile之前;
3、當第一個操作是volatile寫操作時,第二個操作是volatile讀操作,不能進行重排序,】
而cas操作同時包含了volatile寫/讀語意,這二者的完美結合就組成了current的基石
二.ReentrantLock的基礎用法
1.
public class ReentrantLockText {
public static void main(String[] args) {
Lock lock = new ReentrantLock();
Thread t1 = new Thread(()->{
try {
lock.lock();
System.out.println("t1 start");
TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
System.out.println("t1 end");
} catch (InterruptedException e) {
System.out.println("interrupted!");
} finally {
lock.unlock();
}
});
t1.start();
Thread t2 = new Thread(()->{
try {
//lock.lock();
lock.lockInterruptibly(); //可以對interrupt()方法做出回應
System.out.println("t2 start");
TimeUnit.SECONDS.sleep(5);
System.out.println("t2 end");
} catch (InterruptedException e) {
System.out.println("interrupted!");
} finally {
lock.unlock();
}
});
t2.start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
t2.interrupt(); //打斷執行緒2的等待
}
}
運行結果

reentrantlock用于替代synchronized
- 需要注意的是,必須要必須要必須要手動釋放鎖(重要的事情說三遍)
- 使用syn鎖定的話如果遇到例外,jvm會自動釋放鎖,但是lock必須手動釋放鎖,因此經常在finally中進行鎖的釋放
- 使用reentrantlock可以進行“嘗試鎖定”tryLock,這樣無法鎖定,或者在指定時間內無法鎖定,執行緒可以決定是否繼續等待
- 使用ReentrantLock還可以呼叫lockInterruptibly方法,可以對執行緒interrupt方法做出回應,
- 在一個執行緒等待鎖的程序中,可以被打斷
2.ReentrantLock還有一個tryLock(time),可以指定時間,如果指定時間內沒有獲得鎖,則放棄,可以通過其回傳值來決定是否繼續等待
3.還有就是Condition了(我個人覺得這是最靈活的一個地方)
public class Lock_condition {
public static void main(String[] args) {
char[] aI = "1234567".toCharArray();
char[] aC = "ABCDEFG".toCharArray();
Lock lock = new ReentrantLock();
Condition conditionT1 = lock.newCondition();
Condition conditionT2 = lock.newCondition();
new Thread(()->{
try {
lock.lock();
for(char c : aI) {
System.out.print(c);
conditionT2.signal();
conditionT1.await();
}
conditionT2.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}, "t1").start();
new Thread(()->{
try {
lock.lock();
for(char c : aC) {
System.out.print(c);
conditionT1.signal();
conditionT2.await();
}
conditionT1.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}, "t2").start();
}
}

這是condition結合lock(獨占鎖)的用法
condition目前只實作了獨占鎖,關于condition的原始碼理解,后續也會繼續更新,暫時我們只需要知道類似object的wait與notifiy
三.原理+原始碼
我們現在知道了基本用法,那么我們就可以開始探究原始碼了

1.AQS
我們知道JUC里面的核心類就是AQS,那么AQS究竟是個啥東西呢?
1)先上內部類NODE原始碼
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node next;
volatile Node prev;
volatile Thread thread;
Node nextWaiter;
不知道大家在看到這個 Node next;Node prev;的時候是啥感覺,反正我當時是激動壞了,這不就是一個雙向鏈表嘛,
再看volatile Thread thread;這個屬性,這是一個管理執行緒的雙向鏈表,換句話說就是將執行緒打包成立節點放入AQS的鏈表中
基礎的結構清楚之后,
SHARED 與EXCLUSIVE 代表是獨占節點還是共享節點
2)再上AQS屬性原始碼
private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;
Node0 head與tail不用說,這是來管理節點的,
這里我們要核心介紹一個屬性是state,這也是AQS這個類的靈魂,
1.再獨占鎖中這個state是1或者0,(如果大于1則表示鎖重入,這個稍后會有原始碼分析)
2在共享鎖中代表還有多少共享鎖資源,
3.在讀寫鎖中,高16位代表寫鎖是否被占用,低16位代表有多少讀鎖,
4.在CountDownLatch中,通過構造引數代碼門閘剩余個數
5.在Semaphore中,同樣通過構造引數代表信號燈個數
二.ReentrantLock獲取鎖原始碼(獨占鎖)
首先公平鎖與非公平鎖,分別繼承與Sync
FairSync NoFairSync ,默認是非公平鎖,可以在構造方法上指定
ReentrantLock lock = new ReentrantLock(true);//ture則是公平鎖
公平鎖故名思意,在AQS中管理著一個執行緒佇列,如果這時候 有一個執行緒過來搶這把鎖,如果是公平鎖,那么會判斷佇列是不是存在不同與當前執行緒的等待佇列(FIFO),如果存在則去排隊,非公平鎖則是直接去排隊
(1.1.非公平鎖獲取鎖)
final void lock() {
if (compareAndSetState(0, 1))//cas原子操作嘗試去修改值,如果修改成功說明成功獲取到了鎖
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
首先cas原子操作嘗試去修改值,如果修改成功說明成功獲取到了鎖,進入setExclusiveOwnerThread()方法
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
將當前執行緒記錄,實作偏向鎖,一行代碼便完美實作了偏向鎖!!
如果失敗則,呼叫acquire();這個方法呼叫的實際上是子類的nonfairTryAcquire方法
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
首先,獲取state的值,判斷是否為0,如果為0,則說明鎖沒有被占有,(可能是剛剛被釋放)那么cas操作開始嘗試獲取鎖,
**(注意注意注意)**重要的事情說三遍,這里僅僅嘗試獲取一次,沒有自旋!!這是獨占鎖與共享鎖的區別之一,因為如果state>=0(對于共享鎖來說state代表剩余數量),那么共享鎖會不斷嘗試自旋獲取鎖,之道state<0,因為只要》0那么就可能共享到鎖
接下來的else if就是重入鎖的操作了,判斷當先執行緒是不是記錄的執行緒,如果是,每次重入state+1,如果不是就回傳flase,直接拜拜
(1.2公平鎖獲取鎖)
剛剛介紹了公平鎖的意義,所有直接上原始碼,公平鎖比非公平鎖多了一個公平判斷
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
我們可以看到hasQueuedPredecessors是用來判斷佇列是否有不同于當前執行緒的節點等待,這里重點討論一個情況,
h != t回傳true,(s = h.next) == null回傳true
首先可以知道佇列中有2個節點,但是頭節點沒有后繼結點,在這里列舉一種情況,有另一個執行緒已經執行到初始化佇列的操作了,介于compareAndSetHead(new Node())與tail = head之間,也就是之后說的enq自旋方法,請繼續往下看
2.1繼續如果非公平鎖沒有獲取到鎖,那么會呼叫acquireQueued和addwaiter方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
首先來看addwaiter方法
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
首先將,當前執行緒封裝成一個,Node,然后通過判斷尾結點是不是空的方式,判斷佇列是不是空的,如果存在尾結點,那么直接進先驅后繼的改造,放入雙向鏈表,完成鏈表結構,那么如果是空的呢?這么呼叫了一個enq的自旋方法
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
我們可以看到這個一個自旋方法,第一次回圈:t為null,那么cas就new出來一個新結點,頭尾都指向這個新結點,
第二次回圈,將傳進來的這個執行緒結點的前驅指向剛剛new的這個結點,然后cas操作進行,將這個執行緒結點替換為尾部結點,然后head后繼指向執行緒結點,回傳head
經過二次回圈,得到了一個由2個節點組成的佇列,head-》node,head是假節點(里面不包括執行緒為null),node才是真正的執行緒結點(addwrite封裝好的傳進來的執行緒結點)
問題1.為什么一定要用cas操作,因為防止別的執行緒修改了該佇列
好,現在我們繼續,看acquireQueued
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
這是一個最最核心的方法,從佇列中取執行緒
首先這是一個自旋,判斷該節點的前驅節點是不是head,因為(FIFO)先進先出佇列,
如果不是直接拜拜進入shouldParkAfterFailedAcquire,繼續上原始碼
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
- CANCELLED:因為超時或者中斷,結點會被設定為取消狀態,被取消狀態的結點不應該去競爭鎖,只能保持取消狀態不變,不能轉換為其他狀態,處于這種狀態的結點會被踢出佇列,被GC回收;
- SIGNAL:表示這個結點的繼任結點被阻塞了,到時需要通知它;
- CONDITION:表示這個結點在條件佇列中,因為等待某個條件而被阻塞;
- PROPAGATE:使用在共享模式頭結點有可能牌處于這種狀態,表示鎖的下一次獲取可以無條件傳播;
- 0:None of the above,新結點會處于這種狀態,
首先說明一下waitStatus這個屬性,為什么之前不提呢,因為之前沒有對waitStatus進行操作,我們在new節點與封裝結點的時候沒有考慮這個屬性,所以我們現在當成一個新屬性,值為0來看待,
shouldParkAfterFailedAcquire這個代碼的邏輯意義是說明呢?
如果是SIGNAL那么,直接ture,
如果大于0,則是CANCELLED,被取消了,直接剔除佇列
如果都不是,那么將其前驅結點設為SIGNAL,也就是可以安心睡覺了,定好鬧鐘了,可以被等著喚醒了,
我們現在很明顯是第三種,因為之前啥都沒干,就是0,
所以本來狀態

進行shouldParkAfterFailedAcquire之后,
那么現在鏈表中,對執行緒1的前驅設鬧鐘,0變成-1

假設這時候又來了個執行緒2,那么同理,對執行緒而的先驅設鬧鐘0變成-1

在呼叫了shouldParkAfterFailedAcquire()之后,呼叫parkAndCheckInterrupt方法用于阻塞,
這里提一下,關于parkAndCheckInterrupt,lock里面用于阻塞都是基于lockSupper.park()與lockSupper.unpark()完成了,而lockSupper呼叫的又是unsafe這個類,我們知道java是基于jvm實作的,并不能和c++一樣直接對os進行操作,所以jvm給我們提供了一個梯子,這個梯子就是unsafe這個類,直接將執行緒的的交個作業系統阻塞
四,釋放鎖
終于到了釋放鎖,獨占鎖的釋放鎖的邏輯相對與共享鎖來說比較簡單,后續我也會繼續更新共享鎖的原始碼
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
首先我們來看tryRelease方法,
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
首先獲取,int c = getState() - releases;這里可能c是>0的,因為獨占鎖的重入鎖(上面以及說明了獨占鎖的重入的原始碼操作),所以有可能是需要進行多次解鎖的,繼續
判斷當前執行緒是不是獨占執行緒,如果不是則報IllegalMonitorStateException例外
一直解鎖到c=0的時候,那么執行緒已經解鎖,則設setExclusiveOwnerThread=null
設定當前獨占執行緒為null,然后設定state為0
繼續回到release,如果頭節點不是null而且h.waitStatus != 0,說明是-1,說明設定鬧鐘了,需要喚醒aqs佇列中的阻塞結點,呼叫的是unparkSuccessor方法,繼續看原始碼
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
這里首先明確,這里傳進來的node是啥?是頭節點!!!!,一定要明確這個,博主就是因為剛開始沒明確這個,半天沒明白,因為喚醒一定是喚醒頭節點之后的waitStatus不為1的結點
首先判斷頭節點,如果是-1則設定為0,這個中間狀態,表示有結點被喚醒了,
然后拿到,head的后繼節點,進行判斷,如何是null或者waitStatus >0,(就是1,CANCELLED,代表被取消了),這個時候從尾部開始遍歷,剔除waitStatus >0的節點,找到第一個waitStatus <=0的節點,用LockSupport.unpark(s.thread);將其喚醒,喚醒之后的執行緒回到acquireQueued方法中,
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//被阻塞的執行緒,被喚醒后在進行回圈,
//然后通過return interrupted;
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
首先,將該結點設定為head,然后將老head指向null,幫助gc回收,然后return回傳,至此執行緒自由
那么此時該佇列為

五,總結
寫了很久,如果有啥不對的地方,歡迎大家指正
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/272462.html
標籤:java
下一篇:Java 基礎入門訓練
