Semaphore
Semaphore 字面意思是信號量的意思,它的作用是控制訪問特定資源的執行緒數目,應用場景:資源訪問,服務限流, Semaphore 實作AbstractQueuedSynchronizer的方法與ReentrantLock一樣
Semaphore構造方法
public Semaphore(int permits) {------permits 表示能同時有多少個執行緒訪問我們的資源
sync = new NonfairSync(permits); -------------默認創建的是非公平鎖,
}
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
Sync(int permits) {
setState(permits);-------傳入的permits做i為了state的值,作為資源總數
}
semaphore.acquire();獲取資源,原始碼實作
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);---------每次申請一次資源
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();-----------------執行緒無效直接拋例外
if (tryAcquireShared(arg) < 0) --------------------拿不到資源,需要進行入隊操作
doAcquireSharedInterruptibly(arg); ---------入隊操作
}
final int nonfairTryAcquireShared(int acquires) { --------獲取資源的操作
for (;;) {
int available = getState(); --------------拿到現有的資源
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining)) -----------原子操作,多執行緒情況下會可能失敗,所以無線回圈自旋下去,直到成功;
return remaining;---------------------如果大于等于0那么就是拿到了資源,如果小于0,那么執行緒就要進入等待佇列
}
}
為什么要用死回圈----compareAndSetState這個是cas原子操作,失敗之后要回圈重復繼續操作,直到成功,死回圈也就結束了,
private void doAcquireSharedInterruptibly(int arg)-------------執行緒入隊操作
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);---------------注意這里是以共享的方式入隊
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) { --------新入隊的節點會判斷他上一個節點是不是頭節點,如果是頭節點會再次嘗試獲取資源,
int r = tryAcquireShared(arg);
if (r >= 0) { -------------如果獲取到資源,那么這個阻塞佇列就要清空了,里面沒有在等待的執行緒了,
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) && ----------如果獲取不到資源,那么就要執行緒阻塞了
parkAndCheckInterrupt()) -----------parkAndCheckInterrupt這個方法會將執行緒阻塞(掛起),執行緒都阻塞了,這個死回圈就不會執行了,這也就是為什么juc原始碼寫了很多
死回圈都沒問題地原因,我們可以借鑒,當執行緒被喚醒之后又開始這個死回圈,嘗試拿資源(非公平鎖有可 能拿不到),
拿不到再次被阻塞掛起,
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { ---------------判斷執行緒能否被正常阻塞
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) -----------------如果上一個節點是有效的在等待的執行緒,那么該執行緒就可以插入到佇列后面
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) { -----------如果上一個節點是無效的,那就查找上上個節點是不是有效的,直到找到那個有效的節點,然后將該節點插入到那個有效節點后面,
中間的無效節點從鏈表中洗掉,后面的節點要找前面
的節點這也就說明了為什么我們地等待佇列要設計成雙鏈表,不光有next,next這種找后驅節點地操作還有pre .pre這樣前驅節點,所以需要雙鏈表,
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
semaphore.release();釋放資源,原始碼分析
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases; -----------獲取當前的資源然后給資源加回去
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next)) -----------------CAS演算法還資源,死回圈,直到成功還回去,死回圈結束,
return true;
}
}
資源還回去之后執行doReleaseShared方法喚醒其他執行緒搶資源
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) { --------發現阻塞佇列有阻塞執行緒
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h); ---------跳過頭節點,喚醒下一個節點
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t; ------回圈找到waitStatus<0能喚醒的節點呼叫unpark方法喚醒執行緒,
}
if (s != null)
LockSupport.unpark(s.thread);
}
CountDownLatch是什么?
CountDownLatch這個類能夠使一個執行緒等待其他執行緒完成各自的作業后再執行,例如,應用程式的主執行緒希望在負責啟動框架服務的執行緒已經啟動所有的框架服務之后再執行, CountDownLatch如何作業? CountDownLatch是通過一個計數器來實作的,計數器的初始值為執行緒的數量,每當一個執行緒完成了自己的任務后,計數器的值就會減1,當計數器值到達0時,它表示所有的執行緒已經完成了任務,然后在閉鎖上等待的執行緒就可以恢復執行任務, API CountDownLatch.countDown()分執行緒執行------ 執行緒執行完任務之后處于等待狀態 CountDownLatch.await()? 主執行緒執行 -------監控所有執行緒,所有執行緒結束之后主執行緒繼續走下去, CountDownLatch 不可重用CyclicBarrier
柵欄屏障,讓一組執行緒到達一個屏障(也可以叫同步點)時被阻塞,直到最后一個執行緒到達屏障時,屏障才會開門,所有被屏障攔截的執行緒才會繼續運行,CyclicBarrier默認的構造方法是CyclicBarrier(int parties),其引數表示屏障攔截的執行緒數量,每個執行緒呼叫await方法告CyclicBarrier我已經到達了屏障,然后當前執行緒被阻塞, API cyclicBarrier.await()? 應用場景 可以用于多執行緒計算資料,最后合并計算結果的場景,例如,用一個Excel保存了用戶所有銀行流水,每個Sheet保存一個賬戶近一年的每筆銀行流水,現在需要統計用戶的日均銀行流水,先用多執行緒處理每個sheet里的銀行流水,都執行完之后,得到每個sheet的日均銀行流水,最后,再用barrierAction用這些執行緒的計算結果,計算出整個Excel的日均銀行流水 CyclicBarrier 可以重用,轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/140443.html
標籤:Java
下一篇:集合類Set底層資料結構總結
