簡介
CyclicBarrier字面意思是回圈屏障,它可以實作執行緒間的計數等待,當執行緒到達屏障點時會依次進入等待狀態,直到最后一個執行緒進入屏障點時會喚醒等待的執行緒繼續運行,
CyclicBarrier和CountDownLatch類似,區別在于CountDownLatch只能使用一次,當計數器歸零后,CountDownLatch的await等方法都會直接回傳,而CyclicBarrier是可以重復使用的,當計數器歸零后,計數器和CyclicBarrier狀態都會被重置,
CyclicBarrier的使用
構造方法介紹
CyclicBarrier(int parties):創建CyclicBarrier,指定計數器值(等待執行緒數量),
CyclicBarrier(int parties, Runnable barrierAction):創建CyclicBarrier,指定計數器值(等待執行緒數量)和計數器歸零后(最后一個執行緒到達)要執行的任務,
核心方法介紹
await():阻塞當前執行緒,直到計數器歸零被喚醒或者執行緒被中斷,
await(long timeout, TimeUnit unit):阻塞當前執行緒,直到計數器歸零被喚醒、執行緒被中斷或者超時回傳,
CyclicBarrier例子
等待所有玩家準備就緒,游戲才開始,每一輪游戲的開始意味著CyclicBarrier已經重置,可以開始新一輪的計數,
public class Demo {
public static void main(String[] args) {
//創建CyclicBarrier并指定計數器值為5,以及計數器為0后要執行的任務
CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> {
System.out.println("---游戲開始---");
System.out.println("---五票贊成,游戲結束---");
});
Runnable runnable = () -> {
//重復使用CyclicBarrier5次
for(int i = 0; i < 5; i++){
System.out.println(Thread.currentThread().getName() + ":準備就緒");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
};
Thread thread1 = new Thread(runnable, "一號玩家");
Thread thread2 = new Thread(runnable, "二號玩家");
Thread thread3 = new Thread(runnable, "三號玩家");
Thread thread4 = new Thread(runnable, "四號玩家");
Thread thread5 = new Thread(runnable, "五號玩家");
thread1.start();
thread2.start();
thread3.start();
thread4.start();
thread5.start();
}
}
/*
* 回圈輸出5次
* 輸出結果:
* 一號玩家:準備就緒
* 三號玩家:準備就緒
* 二號玩家:準備就緒
* 五號玩家:準備就緒
* 四號玩家:準備就緒
* ---游戲開始---
* ---五票贊成,游戲結束---
* 三號玩家:準備就緒
* 一號玩家:準備就緒
* 五號玩家:準備就緒
* ......
*/
破損的CyclicBarrier
在使用CyclicBarrier中,假設總的等待執行緒數量為5,現在其中一個執行緒被中斷了,被中斷的執行緒將拋出InterruptedException例外,而其他4個執行緒將拋出BrokenBarrierException例外,
BrokenBarrierException例外表示當前的CyclicBarrier已經破損,可能不能等待所有執行緒到齊了,避免其他執行緒永久的等待,
CyclicBarrier的原始碼
CyclicBarrier是基于顯式鎖ReentrantLock來實作的,CyclicBarrier很多方法都使用顯式鎖做了同步處理,await方法的等待喚醒也是通過Condition實作的,
CyclicBarrier的成員變數:
//顯式鎖
private final ReentrantLock lock = new ReentrantLock();
//用于顯式鎖的Condition
private final Condition trip = lock.newCondition();
//執行緒數量
private final int parties;
//當所有執行緒到達屏障點后執行的任務
private final Runnable barrierCommand;
//Generation內部有一個broken變數,用于標識CyclicBarrier是否破損
private Generation generation = new Generation();
//用于遞減的執行緒數量,在每一輪結束后會被重置為parties
private int count;
await方法里是呼叫的dowait方法,dowait方法原始碼:
private int dowait(boolean timed, long nanos)throws InterruptedException, BrokenBarrierException,TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
//如果CyclicBarrier已破損,則拋出BrokenBarrierException例外
if (g.broken)
throw new BrokenBarrierException();
//如果當前執行緒已經中斷,則將CyclicBarrier標記為已破損并拋出InterruptedException例外
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
int index = --count;
//index == 0表示所有執行緒都到達了屏障點
if (index == 0) { // tripped
boolean ranAction = false;
try {
//執行執行緒到齊后需要執行的任務
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
//喚醒所有等待的執行緒并重置CyclicBarrier
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
//執行緒沒到齊,阻塞當前執行緒
for (;;) {
try {
//不帶超時時間的等待
if (!timed)
trip.await();
//帶超時時間的等待
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
nextGeneration方法:
private void nextGeneration() {
//喚醒所有等待的執行緒
trip.signalAll();
//重置CyclicBarrier
count = parties;
generation = new Generation();
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/273115.html
標籤:Java
上一篇:冒泡、快排、二分查找演算法
