本文基于JDK-8u261原始碼分析
本篇文章為AQS系列文的第二篇,前文請看:[傳送門]
第一篇:AQS原始碼深入分析之獨占模式-ReentrantLock鎖特性詳解
1 Semaphore概覽
共享模式就是有多個執行緒可以同時拿到鎖資源,共享模式用Semaphore來舉例,其與ReentrantLock的結構類似,也有公平和非公平兩種模式:
1 public class Semaphore implements Serializable {
2 //...
3
4 private final Sync sync;
5
6 abstract static class Sync extends AbstractQueuedSynchronizer {
7 //...
8
9 Sync(int permits) {
10 setState(permits);
11 }
12
13 //...
14 }
15
16 static final class NonfairSync extends Sync {
17 //...
18
19 NonfairSync(int permits) {
20 super(permits);
21 }
22
23 //...
24 }
25
26 static final class FairSync extends Sync {
27 //...
28
29 FairSync(int permits) {
30 super(permits);
31 }
32
33 //...
34 }
35
36 public Semaphore(int permits) {
37 sync = new NonfairSync(permits);
38 }
39
40 public Semaphore(int permits, boolean fair) {
41 sync = fair ? new FairSync(permits) : new NonfairSync(permits);
42 }
43
44 //...
45 }
呼叫構造方法時需要傳入一個控制同時并發次數的引數permits,該值會賦值給AQS的state(注意:這里是可以賦值成小于等于0的引數的,如果acquire的引數沒有設定好的話,所有執行緒可能都會一直處于阻塞狀態而無法被喚醒),
2 非公平鎖
2.1 acquire方法
Semaphore的非公平鎖方式下的acquire方法:
1 /**
2 * Semaphore:
3 */
4 public void acquire() throws InterruptedException {
5 sync.acquireSharedInterruptibly(1);
6 }
7
8 /**
9 * AbstractQueuedSynchronizer:
10 */
11 public final void acquireSharedInterruptibly(int arg)
12 throws InterruptedException {
13 //arg = 1
14 //如果當前執行緒已經中斷了,直接拋出例外,因為被中斷了就沒有意義再去獲取鎖資源了
15 if (Thread.interrupted())
16 throw new InterruptedException();
17 //嘗試去獲取共享資源
18 if (tryAcquireShared(arg) < 0)
19 //獲取資源失敗的話,進CLH佇列進行排隊等待
20 doAcquireSharedInterruptibly(arg);
21}
22
23 /**
24 * Semaphore:
25 * 第18行代碼處:
26 */
27 protected int tryAcquireShared(int acquires) {
28 return nonfairTryAcquireShared(acquires);
29}
30
31 final int nonfairTryAcquireShared(int acquires) {
32 //acquires = 1
33 for (; ; ) {
34 int available = getState();
35 int remaining = available - acquires;
36 /*
37 如果剩余資源小于0或者CAS設定state-1成功了的話,退出死回圈
38 注意,這里不需要判斷溢位了,因為這里是在做state-1
39 */
40 if (remaining < 0 ||
41 compareAndSetState(available, remaining))
42 return remaining;
43 }
44 }
2.2 doAcquireSharedInterruptibly方法
doAcquireSharedInterruptibly方法和獨占模式的acquireQueued方法類似,但區別是共享模式在一個節點獲取鎖后,會通知后續的節點也來一起嘗試獲取:
1 /**
2 * AbstractQueuedSynchronizer:
3 * 和獨占模式下的acquireQueued方法的代碼類似,只不過這里是共享模式下的回應中斷模式
4 */
5 private void doAcquireSharedInterruptibly(int arg)
6 throws InterruptedException {
7 //CLH佇列尾加入一個新的共享節點
8 final Node node = addWaiter(Node.SHARED);
9 boolean failed = true;
10 try {
11 for (; ; ) {
12 //獲取當前節點的前一個節點
13 final Node p = node.predecessor();
14 if (p == head) {
15 /*
16 和獨占模式一樣,只有前一個節點是頭節點,也就是當前節點
17 是實際上的第一個等待著的節點的時候才嘗試獲取資源(FIFO)
18 */
19 int r = tryAcquireShared(arg);
20 if (r >= 0) {
21 /*
22 r大于等于0說明此時還有鎖資源(等于0說明鎖資源被當前執行緒拿走后就沒了),
23 設定頭節點,并且通知后面的節點也獲取鎖資源,獨占鎖和共享鎖的差異點就在于此,
24 共享鎖在前一個節點獲取資源后,會通知后續的節點也一起來獲取
25 */
26 setHeadAndPropagate(node, r);
27 p.next = null;
28 failed = false;
29 return;
30 }
31 }
32 /*
33 和獨占模式一樣,將CLH佇列中當前節點之前的一些CANCELLED狀態的節點剔除;前一個節點狀態如果
34 為SIGNAL時,就會阻塞當前執行緒,不同的是,這里會拋出例外,而不是獨占模式的會設定中斷位為true
35 即回應中斷模式,如果執行緒被中斷了會拋出InterruptedException
36 */
37 if (shouldParkAfterFailedAcquire(p, node) &&
38 parkAndCheckInterrupt())
39 throw new InterruptedException();
40 }
41 } finally {
42 if (failed)
43 //如果執行緒被中斷后喚醒,就會取消當前執行緒獲取鎖資源的請求
44 cancelAcquire(node);
45 }
46}
47
48 /**
49 * 第26行代碼處:
50 */
51 private void setHeadAndPropagate(Node node, int propagate) {
52 //記錄舊head節點
53 Node h = head;
54 //執行完setHead方法后,node節點成為新的head節點
55 setHead(node);
56 /*
57 <1>propagate>0表示還有剩余鎖資源;
58 <2>舊head節點的狀態<0(舊head節點是null這個條件是為了呼叫waitStatus時防止空指標例外);
59 <3>新head節點的狀態<0(新head節點是null這個條件是為了呼叫waitStatus時防止空指標例外)
60 這些條件滿足其一就會嘗試呼叫doReleaseShared方法來喚醒后面的節點
61 */
62 if (propagate > 0 || h == null || h.waitStatus < 0 ||
63 (h = head) == null || h.waitStatus < 0) {
64 Node s = node.next;
65 /*
66 具體是否會呼叫doReleaseShared方法還需要判斷node是最后一個節點或者node的下一個節點是
67 共享節點的時候才去喚醒(判斷s是否為null一方面也是為了后面判斷s是否是共享節點時不會拋
68 出空指標例外;但更重要的原因是因為如果node是CLH佇列中的最后一個節點的話,這個時候雖然
69 拿到的s是null,但如果此時有其他的執行緒在CLH佇列中新添加了一個節點后,此處并不能及時感
70 知到這個變化,于是此時也會走進doReleaseShared方法中去處理這種情況(當然,如果沒有發生
71 多執行緒插入節點的時候,多呼叫一次doReleaseShared方法也是無妨的,在該方法里面會過濾掉這
72 種情況),同時這里會特殊判斷共享節點是因為CLH佇列中可能會存在獨占節點和共享節點共存的
73 場景出現,也就是ReentrantReadWriteLock讀寫鎖的場景,這里會一直傳播喚醒共享節點直到遇
74 到一個獨占節點為止,后面的節點不管是獨占或共享狀態都不會再被喚醒了)
75 */
76 if (s == null || s.isShared())
77 doReleaseShared();
78 }
79}
80
81 /**
82 * 喚醒后續節點(加鎖和釋放鎖都會呼叫本方法)
83 */
84 private void doReleaseShared() {
85 for (; ; ) {
86 Node h = head;
87 //h != null && h != tail說明此時CLH佇列中至少有兩個節點(包括空節點),即至少含有一個真正在等待著的節點
88 if (h != null && h != tail) {
89 int ws = h.waitStatus;
90 if (ws == Node.SIGNAL) {
91 /*
92 因為下面要喚醒下一個節點,所以將頭節點的狀態SIGNAL改為0(因為SIGNAL表示的是下一個節點是阻塞狀態)
93 如果CAS沒成功,就繼續嘗試
94 */
95 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
96 continue;
97 //喚醒下一個可以被喚醒的節點
98 unparkSuccessor(h);
99 } else if (ws == 0 &&
100 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
101 /*
102 需要注意的是,在共享鎖模式下,不論是acquire方法還是release方法,都會呼叫到doReleaseShared的,
103 而且每個方法也可能有多個執行緒在呼叫,也就是說doReleaseShared方法會有多個執行緒在呼叫,假如此時有
104 多個執行緒進入到第89行代碼處,而其中一個執行緒先執行了第90行代碼處的if條件,將頭節點狀態改為了0
105 而剩下的執行緒就不能跳進第90行代碼處的if條件中,而只能走到第99行代碼處,ws == 0條件滿足,
106 于是剩下的執行緒就去CAS競爭修改頭節點狀態為PROPAGATE(表示需要將喚醒動作向后繼續傳播),修改成功的
107 那個執行緒就跳到了第135行代碼處,進行下個判斷邏輯,而再剩下的那些執行緒就讓它們繼續回圈就行了
108 (剩下的那些執行緒會發現head節點此時已經變成了PROPAGATE狀態,于是會在下一次回圈的第86行代碼處
109 和第135行代碼處兩次判斷head指標是否指向了同一個節點(包括之前那個CAS修改成功的執行緒和執行喚醒
110 動作的執行緒最后也會走到這里),如果相同了,說明:
111 <1>可能是當前喚醒傳播停止了(每個被喚醒的執行緒都可能會走入到本方法中的unparkSuccessor處
112 喚醒下一個節點,相當于把喚醒動作“傳播”下去,同時每次喚醒后會變更head指標,如果head不發生變動了,
113 就說明喚醒傳播停止了(注意上面所說的讀寫鎖場景,也有可能是遇到了一個獨占節點才停止的));
114 <2>可能是將要喚醒下一個節點但還沒喚醒前的瞬間
115 不管是屬于哪種情況,這些執行緒都可以退出了(第二種情況下只要等下一個節點喚醒并搶到鎖后,還是會走到
116 本方法里面的,也就是會將喚醒動作繼續傳播下去,但那個時候就不需要這些執行緒來操心了,只需要保證喚醒
117 能一直傳播下去就OK))
118
119 總結一下:因為head節點的狀態為0就說明此時是一個中間過渡狀態,最簡單的情況下只有這個執行緒以及它所
120 喚醒的下個執行緒們在一直傳遞地喚醒著,是不會走入到99行代碼處的if條件中來的,而如果有執行緒能走到這里,
121 就說明此時在doReleaseShared方法也就是本方法中有多個執行緒在同時呼叫著,PROPAGATE狀態的出現,
122 我認為是為了創造出一種區別于SIGNAL狀態的另外一種狀態(因為SIGNAL狀態的含義定義死了就是代表后一個
123 節點是阻塞狀態,所以這里不能用SIGNAL狀態來代替),這個時候將head節點由原來的0置為PROPAGATE狀態,
124 以此來保證之前的那些執行緒也可以讀取到此時舊的head節點狀態是PROPAGATE,是<0的,從而可以呼叫到
125 doReleaseShared方法繼續去喚醒下一個節點,也就是將喚醒動作傳播下去(在之前某個版本的
126 setHeadAndPropagate方法中,if條件中是沒有最后那兩個判斷新head節點狀態的條件的,如果是這樣的話,
127 我上面的這些分析就是沒問題的,但是后來不知道為什么又添加了那兩個條件,這個時候的解釋就略顯蒼白了
128 (因為即使沒有PROPAGATE狀態,這些獲取鎖的執行緒雖然拿到舊的head節點狀態是0,但是此時獲取到的新的head
129 節點也就是它們自己,其狀態肯定是<0的,所以一樣會走doReleaseShared方法),但是之前確實是這樣的,
130 也就是PROPAGATE狀態添加的本意就是為了將喚醒傳播下去,可能是后來為了修復某個bug,就又做了些改動
131 吧,這里就不再深究了)
132 */
133 continue;
134 }
135 if (h == head)
136 break;
137 }
138 }
2.3 release方法
Semaphore的release方法:
1 /**
2 * Semaphore:
3 */
4 public void release() {
5 sync.releaseShared(1);
6}
7
8 /**
9 * AbstractQueuedSynchronizer:
10 */
11 public final boolean releaseShared(int arg) {
12 //arg = 1
13 //釋放鎖資源,也就是做state+1的操作
14 if (tryReleaseShared(arg)) {
15 /*
16 喚醒后續可以被喚醒的節點
17 從這里就可以看出,在共享鎖模式下,不僅釋放鎖的方法可以喚醒節點,加鎖的方法也會觸發喚醒后續節點的操作
18 */
19 doReleaseShared();
20 return true;
21 }
22 return false;
23}
24
25 /**
26 * Semaphore:
27 * 第14行代碼處:
28 */
29 protected final boolean tryReleaseShared(int releases) {
30 //releases = 1
31 for (; ; ) {
32 int current = getState();
33 int next = current + releases;
34 //如果超出int最大值,則拋出Error,同時如果傳進來的releases本身就小于0的話,也會拋出Error
35 if (next < current)
36 throw new Error("Maximum permit count exceeded");
37 //CAS修改state+1
38 if (compareAndSetState(current, next))
39 return true;
40 }
41 }
3 PROPAGATE狀態
值得一提的是:縱觀整個AQS的原始碼,只有在doReleaseShared方法中具體用到了PROPAGATE這個狀態,在其他地方都是沒有顯式用到的,那么可能就會對這個狀態存在的意義有些許質疑了,其實在早期版本的AQS原始碼中是沒有PROPAGATE這個狀態的,之所以要引入它是為了解決一個bug(JDK-6801020):

從上面可以看到,這個bug是在Java 7中修復的(在Java 6中的一些版本中也已經添加了PROPAGATE狀態),同時在bug清單的下面也貼出了可能出現bug的測驗代碼,那么下面就來看一下離現在非常久遠的Java 5u22中的該處代碼是如何實作的:
1 private void setHeadAndPropagate(Node node, int propagate) {
2 setHead(node);
3 if (propagate > 0 && node.waitStatus != 0) {
4 Node s = node.next;
5 if (s == null || s.isShared())
6 unparkSuccessor(node);
7 }
8 }
9
10 public final boolean releaseShared(int arg) {
11 if (tryReleaseShared(arg)) {
12 Node h = head;
13 if (h != null && h.waitStatus != 0)
14 unparkSuccessor(h);
15 return true;
16 }
17 return false;
18 }
可以看到,早期版本的實作相比于現在的實作來說簡單了很多,總結起來最主要的區別有以下幾個:
- 在setHeadAndPropagate方法中,早期版本對節點waitStatus狀態的判斷只是!=0,而現在改為了<0;
- 早期版本的releaseShared方法中的執行邏輯和獨占鎖下的release方法是一樣的,而現在將具體的喚醒邏輯寫在了doReleaseShared方法里面,和setHeadAndPropagate方法共同呼叫,
而可能出現bug的測驗代碼如下:
1 import java.util.concurrent.Semaphore;
2
3 public class TestSemaphore {
4
5 private static Semaphore sem = new Semaphore(0);
6
7 private static class Thread1 extends Thread {
8 @Override
9 public void run() {
10 sem.acquireUninterruptibly();
11 }
12 }
13
14 private static class Thread2 extends Thread {
15 @Override
16 public void run() {
17 sem.release();
18 }
19 }
20
21 public static void main(String[] args) throws InterruptedException {
22 for (int i = 0; i < 10000000; i++) {
23 Thread t1 = new Thread1();
24 Thread t2 = new Thread1();
25 Thread t3 = new Thread2();
26 Thread t4 = new Thread2();
27 t1.start();
28 t2.start();
29 t3.start();
30 t4.start();
31 t1.join();
32 t2.join();
33 t3.join();
34 t4.join();
35 System.out.println(i);
36 }
37 }
38 }
其實上面所做的操作無非就是創建了四個執行緒:t1和t2用于獲取信號量,而t3和t4用于釋放信號量,其中的10000000次for回圈是為了放大出現bug的幾率,join操作是為了阻塞主執行緒,現在就可以說出出現bug的現象了:也就是這里可能會出現執行緒被hang住的情況發生(遺憾的是,我并沒有模擬出來這個bug),
可以想象這樣一種場景:假如說當前CLH佇列中有一個空節點和兩個被阻塞的節點(t1和t2想要獲取信號量但獲取不到被阻塞在CLH佇列中(state初始為0)):head->t1->t2(tail),
- 時刻1:t3呼叫release->releaseShared->tryReleaseShared,將state+1變為1,同時發現此時的head節點不為null并且waitStatus為-1,于是繼續呼叫unparkSuccessor方法,在該方法中會將head的waitStatus改為0;
- 時刻2:t1被上面t3呼叫的unparkSuccessor方法所喚醒,呼叫了tryAcquireShared,將state-1又變為了0,注意,此時還沒有呼叫接下來的setHeadAndPropagate方法;
- 時刻3:t4呼叫release->releaseShared->tryReleaseShared,將state+1變為1,同時發現此時的head節點雖然不為null,但是waitStatus為0,所以就不會執行unparkSuccessor方法;
- 時刻4:t1執行setHeadAndPropagate->setHead,將頭節點置為自己,但在此時propagate也就是剩余的state已經為0了(propagate是在時刻2時通過傳參的方式傳進來的,那個時候-1后剩余的state是0),所以也不會執行unparkSuccessor方法,
至此可以發現一輪回圈走完后,CLH佇列中的t2執行緒永遠不會被喚醒,主執行緒也就永遠處在阻塞中,這里也就出現了bug,那么來看一下現在的AQS代碼在引入了PROPAGATE狀態后,在面對同樣的場景下是如何解決這個bug的:
- 時刻1:t3呼叫release->releaseShared->tryReleaseShared,將state+1變為1,繼續呼叫doReleaseShared方法,將head的waitStatus改為0,同時呼叫unparkSuccessor方法;
- 時刻2:t1被上面t3呼叫的unparkSuccessor方法所喚醒,呼叫了tryAcquireShared,將state-1又變為了0,注意,此時還沒有呼叫接下來的setHeadAndPropagate方法;
- 時刻3:t4呼叫release->releaseShared->tryReleaseShared,將state+1變為1,同時繼續呼叫doReleaseShared方法,此時會將head的waitStatus改為PROPAGATE;
- 時刻4:t1執行setHeadAndPropagate->setHead,將新的head節點置為自己,雖然此時propagate依舊是0,但是“h.waitStatus < 0”這個條件是滿足的(h現在是PROPAGATE狀態),同時下一個節點也就是t2也是共享節點,所以會執行doReleaseShared方法,將新的head節點(t1)的waitStatus改為0,同時呼叫unparkSuccessor方法,此時也就會喚醒t2了,
至此就可以看出,在引入了PROPAGATE狀態后,可以有效避免在高并發場景下可能出現的、執行緒沒有被成功喚醒的情況出現,
4 公平鎖
4.1 tryAcquireShared方法
同ReentrantLock一樣,Semaphore的公平鎖和非公平鎖實作上的區別也非常少,只有tryAcquireShared方法是不同的,所以下面就來看一下這個方法的實作:
1 /**
2 * Semaphore:
3 */
4 protected int tryAcquireShared(int acquires) {
5 for (; ; ) {
6 /*
7 可以看到公平鎖模式下的tryAcquireShared方法和非公平鎖模式下的nonfairTryAcquireShared方法的區別
8 一樣是多呼叫了一次hasQueuedPredecessors方法,以此來判斷CLH佇列中是否有執行緒的等待獲取鎖的時間
9 比當前執行緒的還要長,如果有的話就會直接回傳-1,也就是獲取資源失敗,然后會進CLH佇列進行排隊等待
10 (體現“公平”的含義);沒有的話就會去進行state-1,然后回傳剩余的鎖資源
11 */
12 if (hasQueuedPredecessors())
13 return -1;
14 int available = getState();
15 int remaining = available - acquires;
16 if (remaining < 0 ||
17 compareAndSetState(available, remaining))
18 return remaining;
19 }
20 }
在這行干的越久真是越覺得:萬丈高樓平地起,這絕B是句真理!在應用業務里待太久很多底層的東西往往容易忽略掉,今年的年初計劃是把常用的JDK原始碼工具做一次總結,眼看年底將近,乘著最近有空,趕緊的給補上,
- ArrayList你真懂?說說foreach與iterator時remove的區別(已完結)
- 你是否想過互聯網公司一面為什么總愛問集合?聊聊經典資料結構HashMap(已完結)
- AQS原始碼深入分析之獨占模式-ReentrantLock鎖特性詳解(當前文章)(已完結)
- AQS原始碼深入分析之共享模式-為什么AQS中要有PROPAGATE這個狀態?(當前文章)
- AQS原始碼深入分析之條件佇列-Java中的阻塞佇列是如何實作的?(創作中)
- AQS原始碼深入分析之應用工具CountDownLatch(創作中)
- AQS原始碼深入分析之應用工具CyclicBarrier(創作中)
- ConcurrentHashMap原始碼分析-ConcurrentHashMap在Java 8中的實作還有bug?而且還不止一處!這個坑還比較大,后面會重點總結一下!(已完結)
- ThreadPoolExecutor原始碼分析-問爛了的Java執行緒池執行流程,其實如果問的細,很多人還是一臉懵逼?(已完結)
- ScheduledThreadPoolExecutor原始碼分析-重點屢屢定時執行緒池是如何實作延遲執行和周期執行!
- ThreadLocal原始碼分析-重點總結,記憶體泄漏,軟參考弱參考虛參考,面試經常喜歡問,我也喜歡問別個
- 紅黑樹TreeMap、LinkedHashMap(不確定要不要寫,有時間寫,看專案情況)
- 有序并且執行緒的Map容器ConcurrentSkipListMap(跳表)深入理解
- LinkedList(不確定要不要寫,有時間寫,看專案情況)
- 1T資料快速排序!十種經典排序演算法總結(已完結)
每一次總結都是對知識點掌握程度的審視,技術不易,每日精進一點,與大家共勉,
另外筆者公眾號:奇客時間,有更多精彩的文章,有興趣的同學,可以關注
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/202751.html
標籤:Java
