大家好,我是小黑,一個在互聯網茍且偷生的農民工,
在上一期我們講了Thread.join()方法和CountDownLatch,這兩者都可以做到等待一個執行緒執行完畢之后當前執行緒繼續執行,并且CountDownLatch要更優秀,能滿足同時等待多個執行緒執行,我們通過查看原始碼知道CountDownLatch是通過AQS實作的,
那么在java.util.concurrent包中除了像CountDownLatch這樣的并發控制工具外,還有哪些呢?今天帶大家一起來看一看,
CountDownLatch
等待一個或多個執行緒直到執行緒中執行的一組操作完成的同步輔助工具,
CountDownLatch從字面理解為“計數器“,回顧昨天的內容,CountDownLatch可以實作等待其他執行緒執行,并且可以指定等待時間,
舉個例子,比如有一個考試,在開考之前老師要等學生到考場,如果所有學生都提前到達,老師可以提前發試卷,但是如果到考試時間有學生還沒有到,老師則可以不等,直接開始,我們通過CountDownLatch來模擬一下,
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
CountDownLatch count = new CountDownLatch(5);
for (int i = 1; i <= 5; i++) {
new Student("學生" + i, count).start();
}
// 只等待5秒,5秒之后開始發試卷
count.await(5, TimeUnit.SECONDS);
System.out.println("所有學生已到達,老師開始發卷子~");
}
}
class Student extends Thread {
private final CountDownLatch count;
public Student(String name, CountDownLatch count) {
super(name);
this.count = count;
}
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + "到達考場~");
count.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

首先我們看一下代碼,主執行緒中老師等待的時間是5秒,所以5秒中之后開始考試;每個Student在run方法中會sleep() 2秒模擬每個學生到達花費的時間最少是2秒;
我們從結果來看,5個學生都在老師開發發卷子之前到達了考場,說明5個Student到達考場的時間并沒有超過5秒,所以肯定的是這5個執行緒不是串行執行的;
老師在等到之后確實開始考試了;如果把老師等待的時間往小調整,或者增大某個執行緒到達考場的時間,會發現會在到達考場之前開始發卷子,篇幅原因這里就不放代碼了,通過這個例子想必你已經掌握了CountDownLatch的用法,
Semaphore
一個計數信號量, 在概念上,信號量維持一組許可證,
Semaphore字面意思翻譯是信號量,信號量通常用于限制執行緒數量,而不是限制訪問某些共享資源,
我們還是通過生活中的例子來模擬,比如說,我們去座摩天輪,一個摩天輪上能容納的游客人數是固定的,所以在有人要上去之前需要先看是否還有剩余的空位,如果有則放行,如果沒有則讓游客等待,直到有人從摩天輪上離開,我們使用Semaphore來模擬這個場景,
public class SemaphoreDemo {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3, true);
for (int i = 1; i <= 5; i++) {
new Visitor("游客" + i, semaphore).start();
}
}
}
class Visitor extends Thread {
private Semaphore semaphore;
public Visitor(String name, Semaphore semaphore) {
super(name);
this.semaphore = semaphore;
}
@Override
public void run() {
try {
semaphore.acquire();
System.out.println(LocalDateTime.now() + Thread.currentThread().getName() + "坐上了摩天輪,真開心~");
TimeUnit.SECONDS.sleep(2);
System.out.println(LocalDateTime.now() + Thread.currentThread().getName() + "從摩天輪下來了-------");
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

從運行結果我們可以看出,游客1,2,3同時坐上去,但是游客4,5這時候是沒有坐上去的,只能等有人下來之后,游客4,5才能再上去,因為我們在創建Semaphore時只給了3個許可,當1,2,3占用之后,4,5獲取不到,只能等待許可再次可用時才能獲取,
我們來看看Semaphore都有哪些方法,
構造方法
// 指定許可數量
new Semaphore(3);
// 指定許可數量,同時設定等待執行緒用公平的方式獲取許可
new Semaphore(3, true);
// 指定許可數量,同時設定等待執行緒用非公平的方式獲取許可,默認為false
new Semaphore(3, false);
成員方法
// 獲取許可,默認只獲取1個,阻塞直到獲取成功,或執行緒中斷interrupted
semaphore.acquire();
// 獲取給定數量的許可,阻塞直到獲取成功,或執行緒中斷interrupted
semaphore.acquire(2);
// 和acquire()一樣,但是不可以被中斷
semaphore.acquireUninterruptibly();
// 和acquire(2)一樣,但是不可以被中斷
semaphore.acquireUninterruptibly(2);
// 嘗試獲取1個許可,如果成功則回傳true,失敗則立馬回傳false
semaphore.tryAcquire();
// 嘗試獲取給定數量的許可,如果成功則回傳true,失敗則立馬回傳false
semaphore.tryAcquire(2);
// 嘗試獲取1個許可知道超時,如果獲取成功回傳true,反之回傳false
semaphore.tryAcquire(5, TimeUnit.SECONDS);
// 嘗試獲取給定數量的許可知道超時,如果獲取成功回傳true,反之回傳false
semaphore.tryAcquire(2, 1, TimeUnit.SECONDS);
// 釋放1個許可
semaphore.release();
// 釋放給定數量的許可
semaphore.release(2);
如果看過我之前AQS原始碼決議的朋友應該能猜到,Semaphore的底層也是通過AQS來實作的,是使用AQS的共享鎖相關的實作,
感興趣的同學可以回顧這篇文章,
CyclicBarrier
允許一組執行緒全部等待彼此達到共同屏障點的同步輔助工具,
CyclicBarrier從字面理解為“回圈柵欄”,可以理解為一個可以回圈使用的屏障,它的作用就是等待一組執行緒都完成執行后再進行下一步,
老樣子,我們再來舉個例子(我咋有這么多例子可把我牛逼壞了),
我們進場會和朋友聚餐,那江湖規矩,要等大家都到了,才能開始吃,等大家都吃的差不多了,大家一起散場,有沒有發現可上面考試的例子有點像,
我們來使用CyclicBarrier來模擬一下這個場景,
public class CyclicBarrierDemo {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
for (int i = 1; i <= 3; i++) {
new BBQer("吃貨" + i, cyclicBarrier).start();
}
}
}
class BBQer extends Thread {
CyclicBarrier cyclicBarrier;
public BBQer(String name, CyclicBarrier cyclicBarrier) {
super(name);
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println(LocalDateTime.now() + Thread.currentThread().getName() + "已到達戰場~");
// 等待其他人到場
cyclicBarrier.await();
System.out.println(LocalDateTime.now() + Thread.currentThread().getName() + "已饑餓難耐了,開始戰斗~");
TimeUnit.SECONDS.sleep(1);
// 等待其他人吃完
cyclicBarrier.await();
System.out.println(LocalDateTime.now() + Thread.currentThread().getName() + "吃飽喝足,回家睡覺");
} catch (Exception e) {
e.printStackTrace();
}
}
}

通過這里例子大家應該明白CyclicBarrier的使用場景了吧,主要體現在可以回圈上,這一點和CountDownLatch有很大的區別,CountDownLatch是一個計數器,只能有一次計數完成,之后不能再繼續歸零計算了,而CyclicBarrier可以回圈設定這個屏障,
我們再來看一下CyclicBarrier都有哪些常用的方法,
構造方法
// 創建有指定個數執行緒的回圈屏障
new CyclicBarrier(3);
// 創建有指定個數執行緒的回圈屏障,在所有執行緒到達屏障后,運行Runnable的run()方法
new CyclicBarrier(3, Runnable);
成員方法
// 等其他所有執行緒到達
cyclicBarrier.await();
// 等所有執行緒到達,超時只有放棄等待
cyclicBarrier.await(1, TimeUnit.SECONDS);
// 獲取當前正在等待的執行緒數
cyclicBarrier.getNumberWaiting();
// 重置等待狀態到初始狀態
cyclicBarrier.reset();
Phaser
一個可重復使用的同步屏障,功能類似于CyclicBarrier和CountDownLatch但支持更靈活的使用,
Phaser從字面意思可以理解為”階段器“,通過上面這段話可以感覺到要比CyclicBarrier和CountDownLatch更牛逼一些,更加的靈活,
我們這次不重新舉新的例子,還用上面的吃飯的例子,加入說我們上面吃飯的例子,如果說在大家開始吃的時候,另一個朋友打電話說他也要來,這時候總不能不讓來吧,應該讓他來和我們一起吃,并且吃完一起走,
而這種場景通過上面說到的CountDownLatch,CyclicBarrier還是Semaphore都是不能做到的,我們來看看使用Phaser如何解決,
public class PhaserDemo {
public static void main(String[] args) throws InterruptedException {
// 剛開始飯局是3個人
Phaser phaser = new Phaser(3);
for (int i = 1; i <= 3; i++) {
new Foodie("吃貨" + i, phaser).start();
}
TimeUnit.SECONDS.sleep(2);
phaser.register();
new Foodie("新來的", phaser).start();
System.out.println("飯局人數:" + phaser.getRegisteredParties());
}
}
class Foodie extends Thread {
private Phaser phaser;
public Foodie(String name, Phaser phaser) {
super(name);
this.phaser = phaser;
}
@Override
public void run() {
try {
phaseOne();
phaseTwo();
phaseThree();
} catch (Exception e) {
e.printStackTrace();
}
}
public void phaseOne() throws InterruptedException {
// 新來的不用等其他人
if (Thread.currentThread().getName().equals("新來的")) {
return;
}
TimeUnit.SECONDS.sleep(1);
System.out.println(LocalDateTime.now() + Thread.currentThread().getName() + "已到達戰場~");
// 到達飯局并加入等待
phaser.arriveAndAwaitAdvance();
}
public void phaseTwo() throws InterruptedException {
System.out.println(LocalDateTime.now() + Thread.currentThread().getName() + "已饑餓難耐了,開始戰斗~");
TimeUnit.SECONDS.sleep(2);
System.out.println(LocalDateTime.now() + Thread.currentThread().getName() + "吃完了~");
phaser.arriveAndAwaitAdvance();
}
public void phaseThree() {
System.out.println(LocalDateTime.now() + Thread.currentThread().getName() + "回家睡覺");
phaser.arriveAndDeregister();
}
}

從運行結果我們可以看到,剛開始1,2,3已經開始戰斗了(完成了第一個階段,進入第二個階段),這是后來了個新朋友加入,那飯局的人數變成了4人,然后再逐步完成后面的階段,
同樣我們來看一下Phaser的方法,
構造方法
// 創建一個階段器,沒有注冊方,沒有父級和初始階段
Phaser()
// 創建一個階段器,指定注冊方數量
Phaser(int parties)
// 相當于 Phaser(Phaser parent, 0)
Phaser(Phaser parent)
// 創建一個階段器,通過給定的父級階段器和給定的注冊方數
Phaser(Phaser parent, int parties)
成員方法
// 抵達這個階段,并不等待別人到達
arrive()
// 到達并阻塞等待其他到達
arriveAndAwaitAdvance()
// 到達并注銷
arriveAndDeregister()
// 等待從指定階段進入,如果不在該階段或階段器已終止則立即回傳
awaitAdvance(int phase)
//同awaitAdvance(int phase),可被中斷
awaitAdvanceInterruptibly(int phase)
//同awaitAdvance(int phase),可被中斷,可超時
awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit)
// 添加一個新的未到達節點
register()
// 添加指定個數新的未到達節點
bulkRegister(int parties)
// 強制終止階段器
forceTermination()
// 獲取到達數量
getArrivedParties()
// 獲取父階段器
getParent()
// 獲取根階段器
getRoot()
// 回傳當前階段數
getPhase()
// 回傳已注冊了的節點
getRegisteredParties()
// 回傳未到達的節點
getUnarrivedParties()
// 判斷是否終止
isTerminated()
//在即將進行的節點提前執行動作的可覆寫方法,并控制終止,
onAdvance(int phase, int registeredParties)
總結
我們來簡單總結一下,今天主要介紹JUC包中的執行緒同步工具,
CountDownLatch:主要用于計數,可完成等待多個執行緒執行,計數器每次減1,減到0之后釋放等待執行緒;歸零后無法重置,不可重復利用;
Semaphore:通常用于限制資源訪問,如限流,通過acquire()獲取后加1,release()之后減1,沒有許可時獲取會阻塞;
CyclicBarrier:回圈屏障,相比CountDownLatch,await()方法每次加1,加到指定值釋放等待執行緒;加到指定值之后會重置,可回圈利用;
Phaser:支持CountDownLatch和CyclicBarrier的功能,可以做到替換,并且可以動態的添加或減少設定值,
好的,本期內容就到這里,我們下期見,關注我的公眾號【小黑說Java】,更多干貨內容,

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/298234.html
標籤:Java
