前言
??CountDownLatch維護了一個計數器(還是是state欄位),呼叫countDown方法會將計數器減1,呼叫await方法會阻塞執行緒直到計數器變為0,可以用于實作一個執行緒等待所有子執行緒任務完成之后再繼續執行的邏輯,也可以實作類似簡易CyclicBarrier的功能,達到讓多個執行緒等待同時開始執行某一段邏輯目的,
??有了前面Semaphore原始碼分析和ReentrantLock原始碼分析的基礎,再來看CountDownLatch的原始碼就簡單的多了,
使用
- 一個執行緒等待其它執行緒執行完再繼續執行
......
CountDownLatch cdl = new CountDownLatch(10);
ExecutorService es = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
es.execute(() -> {
//do something
cdl.countDown();
});
}
cdl.await();
......
- 實作類似CyclicBarrier的功能,先await,再countDown
......
CountDownLatch cdl = new CountDownLatch(1);
ExecutorService es = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
es.execute(() -> {
cdl.await();
//do something
});
}
Thread.sleep(10000L);
cdl.countDown();
......
原始碼分析
??CountDownLatch的結構和ReentrantLock、Semaphore的結構類似,也是使用的內部類Sync繼承AQS的方式,并且重寫了tryAcquireShared和tryReleaseShared方法,
??還是首先來看建構式:
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
??需要傳入一個大于0的count,代表CountDownLatch計數器的初始值,通過Sync的建構式最終賦值給父類AQS的state欄位,可一個看到這個state欄位用法多多,在ReentrantLock中使用0和1來標識鎖的狀態,Semaphore中用來標識信號量,此處又用來表示計數器,
??CountDownLatch要通過await方法完成阻塞,先來看看這個方法是如何實作的:
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
??呼叫的是sync的acquireSharedInterruptibly方法,該方法定義在AQS中,Semaphore也呼叫的這個方法:
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
??這個方法的邏輯前面在決議SemaPhore的時候細說過了,這里不再贅述,主要就是兩個方法的呼叫,先通過tryAcquireShared方法嘗試獲取"許可",回傳值代表此次獲取后的剩余量,如果大于等于0表示獲取成功,否則表示失敗,如果失敗,那么就會進入doAcquireSharedInterruptibly方法執行入隊阻塞的邏輯,這里我們主要到CountDownLatch中看看tryAcquireShared方法的實作:
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
??和Semaphore的實作中每次將state減去requires不同,這里直接判斷state是否為0,如果為0那么回傳1,表示獲取"許可"成功;如果不為0,表示失敗,則需要入隊阻塞,從這個tryAcquireShared方法就能看出CountDownLatch的邏輯了:等到state變為了0,那么所有執行緒都能獲取運行許可,
??關于doAcquireSharedInterruptibly方法,在Semaphore的文章中詳細分析了,可以看看Semaphore原始碼分析,
??那么我們接下來來到countDown方法:
public void countDown() {
sync.releaseShared(1);
}
??呼叫的是sync的releaseShared方法,該方法定義在父類AQS中,Semaphore使用的也是這個方法:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
//當state從非
doReleaseShared();
return true;
}
return false;
}
??前面提到了CountDownLatch也重寫了tryReleaseShared方法:
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
//如果state等于0了直接回傳false
//保證在并發情況下,最多只會有一個執行緒回傳true
//也包括呼叫countDown的次數超過state的初始值
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
//如果回傳true,表示state從非0變為了0
//那么后續需要喚醒阻塞執行緒
return nextc == 0;
}
}
??Semaphore在釋放信號量的時候,是將獲取的許可歸還到state中,但是CountDownLatch沒有獲取許可的邏輯(獲取許可的時候是判斷state是否等于0),所以在countDown的時候沒有釋放的邏輯,就是將state減1,然后根據state減1之后的值是否為0判斷release是否成功,如果state本來大于0,經過減1之后變為了0,那么回傳true,tryReleaseShared方法的回傳值決定了后續需不需要呼叫doReleaseShared方法喚醒阻塞執行緒,
??這里有個邏輯:如果state已經為0,那么回傳false,這個主要應對兩種情況:
- 呼叫countDown的次數超過了state的初始值
- 多執行緒并發呼叫的時候保證只有一個執行緒去完成阻塞執行緒的喚醒操作
??可以看到CountDownLatch沒有鎖的概念,countDown方法可以被一個執行緒重復呼叫,只需要對state做reduce操作,而不用關心是誰做的reduce,如果tryReleaseShared回傳true,那么表示需要在后面進入doReleaseShared方法,該方法和Semaphore中呼叫的方法是同一個,主要是喚醒阻塞執行緒或者設定PROPAGAGE狀態,這里也不再贅述~
??阻塞執行緒被喚醒之后,會在doAcquireSharedInterruptibly方法中繼續回圈,雖然和Semaphore呼叫的是同樣的方法,但是這里有不一樣的地方,所以還是提一句,我們首先回到doAcquireSharedInterruptibly方法:
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
//如果head.next被unpark喚醒,說明此時state==0
//那么tryAcquireShared會回傳1
int r = tryAcquireShared(arg);
//r==1
if (r >= 0) {
//node節點被喚醒后,還會繼續喚醒node.next
//這樣依次傳遞,因為在這里的r一定為1
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
??當head.next執行緒被unpark喚醒后,會進入tryAcquireShared方法判斷,由于此時state已經為0(只有當state變為0時,才會unpark喚醒執行緒),而前面提到了在CountDownLatch重寫的tryAcquireShared中,如果state==0,那么會回傳1,所以會進入setHeadAndPropagate方法:
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
??該方法在Semaphore中詳細介紹過,這里我們就站在CountDownLatch的角度來看看,其實很簡單了,注意此時該方法的propagate引數值是1,那么就會進入到下面的if邏輯里,繼續喚醒下一個node,當下一個node對應的執行緒被喚醒后,同樣會進入setHeadAndPropagate方法,propagage同樣為1,那么繼續喚醒下一個node,就這樣依次將整個CLH佇列的節點都喚醒,
總結
??如果單獨把CountDownLatch拿出來看其實是很復雜的,只是CountDownLatch(包括Semaphore和ReentrantLock)都高度共用了AQS提供的一些方法,而這些方法在前面介紹Semaphore和ReentrantLock的時候已經詳細分析過,所以到本文分析CountDownLatch的時候,只需要關注它內部類Sync重寫的兩個方法:tryAcquireShared和tryReleaseShared,也就是"獲取許可"和"釋放許可"的邏輯,
??CountDownLatch在await的邏輯里,如果當前state的值大于0,那么會進入CLH佇列進行阻塞等待unpark喚醒(或者中斷喚醒);在countDown的邏輯里,就是簡單的將state-1,如果一個執行緒把state從1減為0,那么該執行緒就會負責喚醒head.next節點,head.next節點被喚醒后,又會在setHeadAndPropagate方法中喚醒next.next節點,這樣依次喚醒所有CLH佇列中的阻塞節點,當然,如果執行緒被中斷喚醒,那么也會進入cancelAcquire中進行無效節點的移除邏輯,
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/277645.html
標籤:java
下一篇:Spring基礎總結(第一周課)
