摘要:AQS 的全稱為(AbstractQueuedSynchronizer),AQS 是一個用來構建鎖和同步器的框架,使用 AQS 能簡單且高效地構造出應用廣泛的大量的同步器,
本文分享自華為云社區《【高并發】AQS中的CountDownLatch、Semaphore與CyclicBarrier核心用法》,作者: 冰 河,
AQS 的全稱為(AbstractQueuedSynchronizer),AQS 是一個用來構建鎖和同步器的框架,使用 AQS 能簡單且高效地構造出應用廣泛的大量的同步器,本文主要講述AQS中的CountDownLatch、Semaphore與CyclicBarrier核心用法,
CountDownLatch
概述
同步輔助類,通過它可以阻塞當前執行緒,也就是說,能夠實作一個執行緒或者多個執行緒一直等待,直到其他執行緒執行的操作完成,使用一個給定的計數器進行初始化,該計數器的操作是原子操作,即同時只能有一個執行緒操作該計數器,
呼叫該類await()方法的執行緒會一直阻塞,直到其他執行緒呼叫該類的countDown()方法,使當前計數器的值變為0為止,每次呼叫該類的countDown()方法,當前計數器的值就會減1,當計數器的值減為0的時候,所有因呼叫await()方法而處于等待狀態的執行緒就會繼續往下執行,這種操作只能出現一次,因為該類中的計數器不能被重置,如果需要一個可以重置計數次數的版本,可以考慮使用CyclicBarrier類,
CountDownLatch支持給定時間的等待,超過一定的時間不再等待,使用時只需要在await()方法中傳入需要等待的時間即可,此時,await()方法的方法簽名如下:
public boolean await(long timeout, TimeUnit unit)
使用場景
在某些業務場景中,程式執行需要等待某個條件完成后才能繼續執行后續的操作,典型的應用為并行計算:當某個處理的運算量很大時,可以將該運算任務拆分成多個子任務,等待所有的子任務都完成之后,父任務再拿到所有子任務的運算結果進行匯總,
代碼示例
呼叫ExecutorService類的shutdown()方法,并不會第一時間內把所有執行緒全部都銷毀掉,而是讓當前已有的執行緒全部執行完,之后,再把執行緒池銷毀掉,
示例代碼如下:
package io.binghe.concurrency.example.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @Slf4j public class CountDownLatchExample { private static final int threadCount = 200; public static void main(String[] args) throws InterruptedException { ExecutorService exec = Executors.newCachedThreadPool(); final CountDownLatch countDownLatch = new CountDownLatch(threadCount); for (int i = 0; i < threadCount; i++){ final int threadNum = i; exec.execute(() -> { try { test(threadNum); } catch (InterruptedException e) { e.printStackTrace(); }finally { countDownLatch.countDown(); } }); } countDownLatch.await(); log.info("finish"); exec.shutdown(); } private static void test(int threadNum) throws InterruptedException { Thread.sleep(100); log.info("{}", threadNum); Thread.sleep(100); } }
支持給定時間等待的示例代碼如下:
package io.binghe.concurrency.example.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @Slf4j public class CountDownLatchExample { private static final int threadCount = 200; public static void main(String[] args) throws InterruptedException { ExecutorService exec = Executors.newCachedThreadPool(); final CountDownLatch countDownLatch = new CountDownLatch(threadCount); for (int i = 0; i < threadCount; i++){ final int threadNum = i; exec.execute(() -> { try { test(threadNum); } catch (InterruptedException e) { e.printStackTrace(); }finally { countDownLatch.countDown(); } }); } countDownLatch.await(10, TimeUnit.MICROSECONDS); log.info("finish"); exec.shutdown(); } private static void test(int threadNum) throws InterruptedException { Thread.sleep(100); log.info("{}", threadNum); } }
Semaphore
概述
控制同一時間并發執行緒的數目,能夠完成對于信號量的控制,可以控制某個資源可被同時訪問的個數,
提供了兩個核心方法——acquire()方法和release()方法,acquire()方法表示獲取一個許可,如果沒有則等待,release()方法則是在操作完成后釋放對應的許可,Semaphore維護了當前訪問的個數,通過提供同步機制來控制同時訪問的個數,Semaphore可以實作有限大小的鏈表,
使用場景
Semaphore常用于僅能提供有限訪問的資源,比如:資料庫連接數,
代碼示例
每次獲取并釋放一個許可,示例代碼如下:
package io.binghe.concurrency.example.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; @Slf4j public class SemaphoreExample { private static final int threadCount = 200; public static void main(String[] args) throws InterruptedException { ExecutorService exec = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(3); for (int i = 0; i < threadCount; i++){ final int threadNum = i; exec.execute(() -> { try { semaphore.acquire(); //獲取一個許可 test(threadNum); semaphore.release(); //釋放一個許可 } catch (InterruptedException e) { e.printStackTrace(); } }); } exec.shutdown(); } private static void test(int threadNum) throws InterruptedException { log.info("{}", threadNum); Thread.sleep(1000); } }
每次獲取并釋放多個許可,示例代碼如下:
package io.binghe.concurrency.example.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; @Slf4j public class SemaphoreExample { private static final int threadCount = 200; public static void main(String[] args) throws InterruptedException { ExecutorService exec = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(3); for (int i = 0; i < threadCount; i++){ final int threadNum = i; exec.execute(() -> { try { semaphore.acquire(3); //獲取多個許可 test(threadNum); semaphore.release(3); //釋放多個許可 } catch (InterruptedException e) { e.printStackTrace(); } }); } log.info("finish"); exec.shutdown(); } private static void test(int threadNum) throws InterruptedException { log.info("{}", threadNum); Thread.sleep(1000); } }
假設有這樣一個場景,并發太高了,即使使用Semaphore進行控制,處理起來也比較棘手,假設系統當前允許的最高并發數是3,超過3后就需要丟棄,使用Semaphore也能實作這樣的場景,示例代碼如下:
package io.binghe.concurrency.example.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; @Slf4j public class SemaphoreExample { private static final int threadCount = 200; public static void main(String[] args) throws InterruptedException { ExecutorService exec = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(3); for (int i = 0; i < threadCount; i++){ final int threadNum = i; exec.execute(() -> { try { //嘗試獲取一個許可,也可以嘗試獲取多個許可, //支持嘗試獲取許可超時設定,超時后不再等待后續執行緒的執行 //具體可以參見Semaphore的原始碼 if (semaphore.tryAcquire()) { test(threadNum); semaphore.release(); //釋放一個許可 } } catch (InterruptedException e) { e.printStackTrace(); } }); } log.info("finish"); exec.shutdown(); } private static void test(int threadNum) throws InterruptedException { log.info("{}", threadNum); Thread.sleep(1000); } }
CyclicBarrier
概述
是一個同步輔助類,允許一組執行緒相互等待,直到到達某個公共的屏障點,通過它可以完成多個執行緒之間相互等待,只有當每個執行緒都準備就緒后,才能各自繼續往下執行后面的操作,
與CountDownLatch有相似的地方,都是使用計數器實作,當某個執行緒呼叫了CyclicBarrier的await()方法后,該執行緒就進入了等待狀態,而且計數器執行加1操作,當計數器的值達到了設定的初始值,呼叫await()方法進入等待狀態的執行緒會被喚醒,繼續執行各自后續的操作,CyclicBarrier在釋放等待執行緒后可以重用,所以,CyclicBarrier又被稱為回圈屏障,
使用場景
可以用于多執行緒計算資料,最后合并計算結果的場景
CyclicBarrier與CountDownLatch的區別
- CountDownLatch的計數器只能使用一次,而CyclicBarrier的計數器可以使用reset()方法進行重置,并且可以回圈使用
- CountDownLatch主要實作1個或n個執行緒需要等待其他執行緒完成某項操作之后,才能繼續往下執行,描述的是1個或n個執行緒等待其他執行緒的關系,而CyclicBarrier主要實作了多個執行緒之間相互等待,直到所有的執行緒都滿足了條件之后,才能繼續執行后續的操作,描述的是各個執行緒內部相互等待的關系,
- CyclicBarrier能夠處理更復雜的場景,如果計算發生錯誤,可以重置計數器讓執行緒重新執行一次,
- CyclicBarrier中提供了很多有用的方法,比如:可以通過getNumberWaiting()方法獲取阻塞的執行緒數量,通過isBroken()方法判斷阻塞的執行緒是否被中斷,
代碼示例
示例代碼如下:
package io.binghe.concurrency.example.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @Slf4j public class CyclicBarrierExample { private static CyclicBarrier cyclicBarrier = new CyclicBarrier(5); public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++){ final int threadNum = i; Thread.sleep(1000); executorService.execute(() -> { try { race(threadNum); } catch (Exception e) { e.printStackTrace(); } }); } executorService.shutdown(); } private static void race(int threadNum) throws Exception{ Thread.sleep(1000); log.info("{} is ready", threadNum); cyclicBarrier.await(); log.info("{} continue", threadNum); } }
設定等待超時示例代碼如下:
package io.binghe.concurrency.example.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.*; @Slf4j public class CyclicBarrierExample { private static CyclicBarrier cyclicBarrier = new CyclicBarrier(5); public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++){ final int threadNum = i; Thread.sleep(1000); executorService.execute(() -> { try { race(threadNum); } catch (Exception e) { e.printStackTrace(); } }); } executorService.shutdown(); } private static void race(int threadNum) throws Exception{ Thread.sleep(1000); log.info("{} is ready", threadNum); try{ cyclicBarrier.await(2000, TimeUnit.MILLISECONDS); }catch (BrokenBarrierException | TimeoutException e){ log.warn("BarrierException", e); } log.info("{} continue", threadNum); } }
在宣告CyclicBarrier的時候,還可以指定一個Runnable,當執行緒達到屏障的時候,可以優先執行Runnable中的方法,
示例代碼如下:
package io.binghe.concurrency.example.aqs; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @Slf4j public class CyclicBarrierExample { private static CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> { log.info("callback is running"); }); public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++){ final int threadNum = i; Thread.sleep(1000); executorService.execute(() -> { try { race(threadNum); } catch (Exception e) { e.printStackTrace(); } }); } executorService.shutdown(); } private static void race(int threadNum) throws Exception{ Thread.sleep(1000); log.info("{} is ready", threadNum); cyclicBarrier.await(); log.info("{} continue", threadNum); } }
點擊關注,第一時間了解華為云新鮮技術~
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/542670.html
標籤:Java
上一篇:如何通過Java應用程式添加或洗掉 PDF 中的附件
下一篇:垃圾收集器必問系列—ZGC
