JUC是Java編發編程中使用的工具類,全稱為java.util.concurrent,近期在大廠面試中屢屢被問到關于JUC的相關知識點問題,其重要性不言而喻,學好用好JUC可以說是每一個Java程式開發作業者不能不研究和不能不解決的問題,因此,在自己屢屢碰壁后決定對JUC的相關知識點進行一次系統的梳理,并通過博客分享的方式進行一個大概的總結,所記如下,
一、與JUC相關概念回顧
JUC是Java并發編程中使用的核心工具類,主要包括:
(1)鎖機制類Locks:Lock, Condition, ReadWriteLock等,
(2)原子操作類:Atomic:AtomicInteger, AtomicLong等,
(2)并發集合類:CopOnWriteArrayList, ConcurrentHashMap等,
(3)信號量三組工具類:CountDownLatch, CyclicBarrier, Semaphore,
(4)執行緒池相關類:Feture, Callable, Executor等,
在說JUC工具類之前我們首先回顧一下在并發編程中經常會混淆的幾個概念,執行緒和行程的區別是什么?并發和并行的區別是什么?阻塞和非阻塞的區別是什么?同步和異步的區別是什么?我覺得這幾個概念可以用生活中的例子進行類比,記起來更加方便,
行程: 打開QQ,開了一個行程;打開微信,開了一個行程;打開王者榮耀,又開了一個行程,
執行緒: 使用微信時,一邊與A好友進行文字聊天是一個執行緒,同時與B好友進行語音通話是一個執行緒,同時正在給C好友傳輸檔案又是一個執行緒,
(單行程單執行緒:一個人在一張桌子吃飯;單行程多執行緒:多個人在同一張桌子上一起吃飯)
并發: 兩個佇列交替使用一臺咖啡機,
并行: 兩個佇列使用兩臺咖啡機,
Erlang 之父 Joe Armstrong 用一張5歲小孩都能看懂的圖解釋了并發與并行的區別,如下:

阻塞: 排隊打飯,什么都不做等著輪到自己打飯為止,
非阻塞: 奶茶店買奶茶,當奶茶還做好前,先找個位置打兩盤王者,等奶茶做好后再去拿,
同步: 去肯德基買全家桶,等全家桶做好后通知我,我自己去前臺拿,
異步: 去酒店吃飯,等菜做好后端到我的桌面上,
(資料就緒后,需要自己去讀就是同步,資料就緒直接讀好再回呼給程式就是異步,可以理解為菜做好后如果是自己去拿就是同步,一手包辦送到我桌面就是異步)
二、鎖機制類
1、synchronized同步鎖回顧
在講JUC中的鎖機制前先回顧一下synchronized同步鎖,
使用口訣:執行緒操作資源類
題目:三個售票員賣出30張票,使用synchronized的代碼如下:
//資源類
public class Ticket0 {
private int count = 30;
public synchronized void sale() {
if(count > 0) {
System.out.println(Thread.currentThread().getName() + "賣出第" + (count--) +"張票,還剩" + count + "張票");
}
}
}
//執行緒
public class TicketDemo02 {
public static void main(String[] args) {
Ticket0 ticket = new Ticket0();
//售票員1
new Thread(()-> {
for (int i = 0; i < 40; i++) {
ticket.sale();
}
}, "AAA").start();
//售票員2
new Thread(()-> {
for (int i = 0; i < 40; i++) {
ticket.sale();
}
}, "BBB").start();
//售票員3
new Thread(()-> {
for (int i = 0; i < 40; i++) {
ticket.sale();
}
}, "CCC").start();
}
}
2、使用JUC的Lock
同樣的解決三個售票員賣出30張票的問題,使用Lock的代碼如下:
//資源類
public class Ticket {
private int count = 30;
private ReentrantLock lock = new ReentrantLock();
public void sale() {
lock.lock();
try {
if(count > 0) {
System.out.println(Thread.currentThread().getName() + "賣出第" + (count--) +"張票,還剩" + count + "張票");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
//執行緒
public class TicketDemo02 {
public static void main(String[] args) {
Ticket ticket = new Ticket();
new Thread(()-> {
for (int i = 0; i < 40; i++) {
ticket.sale();
}
}, "AAA").start();
new Thread(()-> {
for (int i = 0; i < 40; i++) {
ticket.sale();
}
}, "BBB").start();
new Thread(()-> {
for (int i = 0; i < 40; i++) {
ticket.sale();
}
}, "CCC").start();
}
}
單從賣票這個問題上來看,synchronized和Lock的效果一樣,那么他們的區別在哪里呢?
(1)synchronized是一個關鍵字,在jvm層面;而Lock是一個Java類,在API層面,
(2)synchronized無法判斷是否已獲取鎖的狀態,而Lock可以通過tryLock方法判斷獲取鎖的狀態
(3)synchronized自動釋放鎖,Lock需要手動釋放鎖,一般會在finally中使用unlock方法釋放,否則容易導致死鎖,
(4)synchronized是同步阻塞,而Lock是同步非阻塞
(5)synchronized適合代碼少量同步的同步問題,Lock適合代碼大量同步的同步問題,
3、執行緒間的通信
執行緒間的通信間的通信比較典型的就是生產者消費者模式,我們可以把現場間的通信歸納為以下口訣:
(1)執行緒操作資源類
(2)判斷、干活、通知
(3)多執行緒互動中,必須要防止虛假喚醒(在資源類做判斷等待時,必須使用while,禁止使用if)
題目:兩個執行緒,可以操作初始值為0的一個變數實作一個執行緒對該變數加1,一個執行緒對該變數減1實作交替10輪,變數最后初始值為0,
先回顧以往使用synchronized+wait/notify的實作方式,代碼如下:
//資源類
public class AirConditionor {
private int num = 0;
public synchronized void increament() throws InterruptedException {
//這里必須使用while,否則會產生虛假喚醒現象
while (num != 0) {
this.wait();
}
num++;
System.out.println(Thread.currentThread().getName() + ":" + num);
this.notifyAll();
}
public synchronized void decreament() throws InterruptedException {
//這里必須使用while,否則會產生虛假喚醒現象
while (num == 0) {
this.wait();
}
num--;
System.out.println(Thread.currentThread().getName() + ":" + num);
this.notifyAll();
}
}
//執行緒
public class ThreadWaitNotifyDemo02 {
public static void main(String[] args) {
AirConditionor airConditionor = new AirConditionor();
new Thread(() -> {
for (int i = 0; i < 20; i++) {
try {
airConditionor.increament();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "AAA").start();
new Thread(() -> {
for (int i = 0; i < 20; i++) {
try {
airConditionor.decreament();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "BBB").start();
new Thread(() -> {
for (int i = 0; i < 20; i++) {
try {
airConditionor.increament();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "CCC").start();
new Thread(() -> {
for (int i = 0; i < 20; i++) {
try {
airConditionor.decreament();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "DDD").start();
}
}
使用Lock進行執行緒間的通信,通過Condition介面來實作,相同的問題,代碼如下:
//資源類
public class AirConditionor2 {
private int num = 0;
private ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
public void increament() {
lock.lock();
try {
while (num != 0) {
condition.await();
}
num++;
System.out.println(Thread.currentThread().getName() + ":" + num);
condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void decreament() {
lock.lock();
try {
while (num == 0) {
condition.await();
}
num--;
System.out.println(Thread.currentThread().getName() + ":" + num);
condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
//執行緒
public class ThreadWaitNotifyDemo03 {
public static void main(String[] args) {
AirConditionor airConditionor2 = new AirConditionor();
new Thread(() -> {
for (int i = 0; i < 20; i++) {
try {
airConditionor2.increament();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "AAA").start();
new Thread(() -> {
for (int i = 0; i < 20; i++) {
try {
airConditionor2.decreament();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "BBB").start();
new Thread(() -> {
for (int i = 0; i < 20; i++) {
try {
airConditionor2.increament();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "CCC").start();
new Thread(() -> {
for (int i = 0; i < 20; i++) {
try {
airConditionor2.decreament();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "DDD").start();
}
}
直接這樣看以上問題的例子好像沒什么區別,那么synchronized+wait/notify和lock+await/signal的主要區別在哪里呢?主要體現在執行緒間定制化呼叫通信中,可以理解為前者是使用炮彈轟炸,后者是使用導彈精準打擊,題目例子如下:三個執行緒啟動,AAA列印5次,BBB列印10次,CCC列印15次,如此回傳列印10輪,
可以通過Lock實作Condition介面創建多個,每一個功能對應一個Condition,通過相應的等待喚醒方法來實作精準控制,代碼如下:
//資源類
public class MyThread {
/**
* 0代表列印5次,1代表列印10次,2代表列印15次
*/
private int num = 0;
Lock lock = new ReentrantLock();
Condition condition0 = lock.newCondition();
Condition condition1 = lock.newCondition();
Condition condition2 = lock.newCondition();
public void print5() {
lock.lock();
try {
while (num != 0) {
condition0.await();
}
for (int i = 1; i <= 5; i++) {
System.out.println(Thread.currentThread().getName() + ":" + i);
}
num = 1;
condition1.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void print10() {
lock.lock();
try {
while (num != 1) {
condition1.await();
}
for (int i = 1; i <= 10; i++) {
System.out.println(Thread.currentThread().getName() + ":" + i);
}
num = 2;
condition2.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void print15() {
lock.lock();
try {
while (num != 2) {
condition2.await();
}
for (int i = 1; i <= 15; i++) {
System.out.println(Thread.currentThread().getName() + ":" + i);
}
num = 0;
condition0.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
//執行緒
public class ThreadOrderAccessDemo04 {
public static void main(String[] args) {
MyThread myThread = new MyThread();
new Thread(()->{
for (int i = 0; i < 10; i++) {
myThread.print5();
}
}, "AAA").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
myThread.print10();
}
}, "BBB").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
myThread.print15();
}
}, "CCC").start();
}
}
4、鎖的8個問題
對于資源類方法的靜態和靜態,加鎖和不加鎖,衍生出令人混淆的8個關于鎖的問題:
以一個含有sendEmail()和sendMs()及sayHello()的方法的Phone類為例子,問題如下:
(1)標準訪問,先列印郵件
public class Phone {
public synchronized void sendEmail() throws Exception {
System.out.println("*****sendEmail");
}
public synchronized void sendMs() throws Exception {
System.out.println("*****sendMs");
}
}
public class Lock8 {
public static void main(String[] args) {
Phone phone = new Phone();
new Thread(() -> {
try {
phone.sendEmail();
} catch (Exception e) {
e.printStackTrace();
}
}, "A").start();
Thread.sleep(100); //目的是先使執行緒A先訪問到資源
new Thread(() -> {
try {
phone.sendMs();
//phone.sayHello();
} catch (Exception e) {
e.printStackTrace();
}
}, "B").start();
}
}
(2)郵件設定暫停4秒方法,先列印郵件
public class Phone {
public synchronized void sendEmail() throws Exception {
TimeUnit.SECONDS.sleep(4);
System.out.println("*****sendEmail");
}
public synchronized void sendMs() throws Exception {
System.out.println("*****sendMs");
}
}
public class Lock8 {
public static void main(String[] args) {
Phone phone = new Phone();
new Thread(() -> {
try {
phone.sendEmail();
} catch (Exception e) {
e.printStackTrace();
}
}, "A").start();
Thread.sleep(100); //目的是先使執行緒A先訪問到資源
new Thread(() -> {
try {
phone.sendMs();
//phone.sayHello();
} catch (Exception e) {
e.printStackTrace();
}
}, "B").start();
}
}
(3)新增sayHello方法,先列印sayHello
public class Phone {
public synchronized void sendEmail() throws Exception {
TimeUnit.SECONDS.sleep(4);
System.out.println("*****sendEmail");
}
public synchronized void sendMs() throws Exception {
System.out.println("*****sendMs");
}
public void sayHello() throws InterruptedException {
System.out.println("*****sayHello");
}
}
public class Lock8 {
public static void main(String[] args) {
Phone phone = new Phone();
new Thread(() -> {
try {
phone.sendEmail();
} catch (Exception e) {
e.printStackTrace();
}
}, "A").start();
new Thread(() -> {
try {
// phone.sendMs();
phone.sayHello();
} catch (Exception e) {
e.printStackTrace();
}
}, "B").start();
}
}
(4)兩部手機,先列印短信
public class Phone {
public synchronized void sendEmail() throws Exception {
TimeUnit.SECONDS.sleep(4);
System.out.println("*****sendEmail");
}
public synchronized void sendMs() throws Exception {
System.out.println("*****sendMs");
}
public void sayHello() throws InterruptedException {
System.out.println("*****sayHello");
}
}
public class Lock8 {
public static void main(String[] args) {
Phone phone1 = new Phone();
Phone phone2 = new Phone();
new Thread(() -> {
try {
phone1.sendEmail();
} catch (Exception e) {
e.printStackTrace();
}
}, "A").start();
new Thread(() -> {
try {
phone2.sendMs();
//phone2.sayHello();
} catch (Exception e) {
e.printStackTrace();
}
}, "B").start();
}
}
(5)兩個靜態同步方法,同一部手機,先列印郵件
public class Phone {
public static synchronized void sendEmail() throws Exception {
TimeUnit.SECONDS.sleep(4);
System.out.println("*****sendEmail");
}
public static synchronized void sendMs() throws Exception {
System.out.println("*****sendMs");
}
public void sayHello() throws InterruptedException {
System.out.println("*****sayHello");
}
}
public class Lock8 {
public static void main(String[] args) {
//Phone phone1 = new Phone();
//Phone phone2 = new Phone();
new Thread(() -> {
try {
Phone.sendEmail();
} catch (Exception e) {
e.printStackTrace();
}
}, "A").start();
new Thread(() -> {
try {
Phone.sendMs();
//phone2.sayHello();
} catch (Exception e) {
e.printStackTrace();
}
}, "B").start();
}
}
(6)兩個靜態同步方法,同兩部手機,先列印郵件,鎖的同一個位元組碼物件
public class Phone {
public static synchronized void sendEmail() throws Exception {
TimeUnit.SECONDS.sleep(4);
System.out.println("*****sendEmail");
}
public static synchronized void sendMs() throws Exception {
System.out.println("*****sendMs");
}
public void sayHello() throws InterruptedException {
System.out.println("*****sayHello");
}
}
public class Lock8 {
public static void main(String[] args) {
Phone phone1 = new Phone();
Phone phone2 = new Phone();
new Thread(() -> {
try {
Phone.sendEmail();
} catch (Exception e) {
e.printStackTrace();
}
}, "A").start();
new Thread(() -> {
try {
Phone.sendMs();
//phone2.sayHello();
} catch (Exception e) {
e.printStackTrace();
}
}, "B").start();
}
}
(7)一個靜態同步方法,一個普通同步方法,同一部手機,先列印短信
public class Phone {
public static synchronized void sendEmail() throws Exception {
TimeUnit.SECONDS.sleep(4);
System.out.println("*****sendEmail");
}
public synchronized void sendMs() throws Exception {
System.out.println("*****sendMs");
}
public void sayHello() throws InterruptedException {
System.out.println("*****sayHello");
}
}
public class Lock8 {
public static void main(String[] args) {
Phone phone1 = new Phone();
//Phone phone2 = new Phone();
new Thread(() -> {
try {
phone1.sendEmail();
} catch (Exception e) {
e.printStackTrace();
}
}, "A").start();
new Thread(() -> {
try {
//phone2.sendMs();
phone1.sendMs();
} catch (Exception e) {
e.printStackTrace();
}
}, "B").start();
}
}
(8)一個靜態同步方法,一個普通同步方法,同二部手機,先列印短信
public class Phone {
public static synchronized void sendEmail() throws Exception {
TimeUnit.SECONDS.sleep(4);
System.out.println("*****sendEmail");
}
public synchronized void sendMs() throws Exception {
System.out.println("*****sendMs");
}
public void sayHello() throws InterruptedException {
System.out.println("*****sayHello");
}
}
public class Lock8 {
public static void main(String[] args) {
Phone phone1 = new Phone();
Phone phone2 = new Phone();
new Thread(() -> {
try {
phone1.sendEmail();
} catch (Exception e) {
e.printStackTrace();
}
}, "A").start();
new Thread(() -> {
try {
//phone2.sendMs();
phone2.sendMs();
} catch (Exception e) {
e.printStackTrace();
}
}, "B").start();
}
}
總結分析:
(1)普通同步方法,鎖的是當前實體物件;靜態同步方法鎖的是當前類的Class物件,
(2)普通同步方法、靜態同步方法及普通方法相互之前不會有競態條件,
5、ReadWriteLock讀寫鎖
ReadWriteLock的原理可以總結為:寫的時候獨占,讀的時候共享,讀-讀能共存、讀-寫不能共存、寫-寫不能共存,代碼例子如下:
public class MyCache {
private volatile Map<String, Object> map = new HashMap<>();
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
public void put(String key, Object value) {
readWriteLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "---寫入資料" + value);
TimeUnit.SECONDS.sleep(3);
map.put(key, value);
System.out.println(Thread.currentThread().getName() + "---寫入完成");
} catch (Exception e) {
e.printStackTrace();
}finally {
readWriteLock.writeLock().unlock();
}
}
public void get(String key) {
readWriteLock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "讀取資料");
//TimeUnit.SECONDS.sleep(3);
Object result = map.get(key);
System.out.println(Thread.currentThread().getName() + "讀取完成" + result);
} catch (Exception e) {
e.printStackTrace();
} finally {
readWriteLock.readLock().unlock();
}
}
}
========
輸出:
5---寫入資料5
5---寫入完成
4---寫入資料4
4---寫入完成
1---寫入資料1
1---寫入完成
2---寫入資料2
2---寫入完成
3---寫入資料3
3---寫入完成
1讀取資料
1讀取完成1
2讀取資料
3讀取資料
4讀取資料
2讀取完成2
4讀取完成4
3讀取完成3
5讀取資料
5讀取完成5
三、并發集合類
在多執行緒環境下,使用ArrayList、HasSet及HashMap集合類時,會產生執行緒安全問題,報java.util.ConcurrentModificationException例外,那么為什么會導致產生這樣的例外呢?
我們可以類比成一個上課簽到的場景,簽到表相當于集合容器,多個學生相當于多個執行緒,由于缺乏紀律規則提示,大家都爭搶著往這個簽到表中簽名,導致一個學生還沒簽到完,就被另個學生把簽到表搶去,導致產生了混亂,甚至產生踩踏事件,
1、ArrayList執行緒不安全問題解決方案
(1)使用uitil包下的Vector類
(2)Collections.synchronizedList(new ArrayList<>())
(3)java.util.concurrent 包下的new CopyOnWriteArrayList<>()
public class NoSafeDemo {
public static void main(String[] args) {
List<String> list1 = new Vector<>();
List<String> list2 = Collections.synchronizedList(new ArrayList<>());
List<String> list3 = new CopyOnWriteArrayList<>();
for (int i = 0; i < 50; i++) {
new Thread(()-> {
list1.add(UUID.randomUUID().toString().substring(0, 8));
System.out.println(list1);
list2.add(UUID.randomUUID().toString().substring(0, 8));
System.out.println(list2);
list3.add(UUID.randomUUID().toString().substring(0, 8));
System.out.println(list3);
}, String.valueOf(i)).start();
}
}
}
方法(1)和(2)底層都是使用了synchronized實作,而CopyOnWrite容器底層使用了寫時復制,讀寫分離的思想實作,

核心原始碼如下:
public boolean add(E e) {
synchronized (lock) {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
}
}
實作原理:當往一個容器中添加元素時,不直接往Object[]添加,而是現將當前元素進行Copy并把陣列的長度+1,復制出新的陣列Object[] newElements,然后向新的容器添加元素,最后把原容器的參考指向新的容器 setArray(newElements);這種設計思想的好處是并發讀的時候不需要鎖,因為當前容器不會添加任何元素,提高了效率,
2、HashSet執行緒不安全問題解決方案
HashSet的執行緒不安全問題解決方案和ArrayList相似,
(1)使用Collections.synchronizedSet(new HashSet<>())
(2)java.util.concurrent 包下的CopyOnWriteArraySet
3、HashMap執行緒不安全問題解決方案
(1)使用Collections.synchronizedMap(new HashMap<>())
(2)java.util.concurrent 包下的ConcurrentHashMap
public class NoSafeDemo03 {
public static void main(String[] args) {
ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
for (int i = 0; i < 30; i++) {
new Thread(()-> {
map.put(UUID.randomUUID().toString().substring(0, 7), "aaa");
System.out.println(map);
},String.valueOf(i)).start();
}
}
}
原理分析:ConcurrentHashMap使用分段鎖的思想,容器中有多把鎖,每一把鎖鎖一段資料,這樣在多執行緒訪問時,不同段的資料不存在鎖的競爭,這樣就可以有效的提高并發效率,
4、BlockingQueue阻塞佇列
在多執行緒的阻塞,是在某種情況下會掛起執行緒,一旦條件滿足,掛起的執行緒又會自動喚醒,使用BlockingQueue的好處是我們不需要關心什么時候需要阻塞執行緒,什么時候需要喚醒執行緒,BlockingQueue都給一手包辦,
(1)當佇列是空的,從佇列中獲取元素的操作將會被阻塞
(2)當佇列是滿的,從佇列中添加元素的操作將會被阻塞
public class BlockingQueueDemo {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
//由于添加元素超過佇列大小,會產生阻塞
blockingQueue.put("aaa");
blockingQueue.put("aaa");
blockingQueue.put("aaa");
blockingQueue.put("aaa");
}
}
四、執行緒池
1、執行緒的獲取方法及Callable介面
獲得執行緒的方法一般情況下有一下三種:
(1)繼承Thread類并重寫run方法,
(2)實作Runnable介面并重寫run方法,
(3)實作java.util.concurrent包下的Callable介面并重寫call方法,
使用Callable創建執行緒的方法代碼如下:
//資源類
public class MyThread implements Callable<Integer> {
@Override
public Integer call() throws Exception {
//更加細粒度化的精確控制
System.out.println(Thread.currentThread().getName() +"come in here");
TimeUnit.SECONDS.sleep(4);
return 1024;
}
}
//執行緒
public class CallableDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//MyThread myThread = new MyThread();
FutureTask futureTask = new FutureTask(new MyThread());
new Thread(futureTask, "AAA").start();
new Thread(futureTask, "BBB").start();
System.out.println(Thread.currentThread().getName() + "計算完成!");
System.out.println(futureTask.get());
}
}
因為Thread(Runnable target) 的輸入變數為Runnable , 實作Callable介面的MyThread 不能輸入到Thread中,那么FutureTask(Callable callable) 相當于一個中間人,目的是通過FutureTask使Runnable和Callable產生聯系,
優點: 使用FutureTask的好處是對于主執行緒來說是同步非阻塞的,例子:老師上著課,口渴了,去買水不合適,講課執行緒繼續,我可以單起個執行緒找班長幫忙買水,水買回來了放桌上,我需要的時候再去get,(注意:get方法而獲取結果只有在計算完成時獲取,否則會一直阻塞直到任務轉入完成狀態)
Callable介面和Runnable介面的區別?
(1)前者有回傳值,后者沒有回傳值
(2)前者可以拋出例外,后者不可拋出例外
(3)前者的實作方法為call(),后者的實作方法為run()
2、為什么時候執行緒池?
執行緒池做的作業主要是控制運行的執行緒數量,處理程序中將任務放進佇列,然后在執行緒創建后啟動這些任務,如果執行緒數量超過了核心執行緒的最大數量,超出數量的執行緒排隊等候,等其他執行緒執行完畢,再從佇列中取出任務來執行,
主要特點為: 執行緒的復用;控制最大的并發數;管理執行緒

3、使用Excutors創建執行緒池的3種方式
(1)newFixedThreadPool(int nThreads) 方法創建,創建的執行緒池corePoolSize和maxinumPoolSize值是相等的,它使用的是LinkedBlockingQueue
//創建固定執行緒池
ExecutorService threadPool = newFixedThreadPool(5);
(2)newSingleThreadExecutor()方法創建,創建的執行緒池corePoolSize和maximumPoolSize值都是1,它使用的是LinkedBlockingQueue
//一個池子一個作業執行緒
ExecutorService threadPool = newSingleThreadExecutor();
(3)newCachedThreadPool()方法創建,創建的執行緒池corePoolSize為0,maximumPoolSize值都是Integer.MAX_VALUE,使用的SynchronousQueue,也就是說來了任務就創建執行緒運行,當執行緒空閑超過60秒,就銷毀執行緒,
//一池N執行緒,有N個受理視窗,創建多少個由實際情況而定,擴拓展的
ExecutorService threadPool2 = newCachedThreadPool();
以上三種創建執行緒池的底層都是通過ThreadPoolExecutor創建,底層到嗎如下:
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
阿里巴巴開發手冊關于執行緒池不允許使用Executors創建,而是通過ThreadPoolExecutor的方式創建,這樣的處理方式能讓撰寫代碼的工程師更加明確執行緒池的運行規則,避免資源耗盡的風險,弊端如下:
(1)使用FixedThreadPool和SingleThreadPool允許請求的佇列長度為Integer.MAX_VALUE,可能會堆積大量請求,導致OOM,
(2)CachedThreadPool允許創建最大的執行緒數為Integer.MAX_VALUE,創建大量的執行緒,導致OOM
4、執行緒池ThreadPoolExecutor的7大引數
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {......}
(1)corePoolSize:執行緒池中的常駐核心執行緒數
(2)maximumPoolSize:執行緒池中能容納的最大執行緒數
(3)keepAliveTime:多余空閑執行緒的存貨時間,當前執行緒數量超過corePoolSize且空閑時間達到keepAlive時,多余執行緒會被自動銷毀,使執行緒池中的執行緒數量保存為corePoolSize
(4)unit:keepAliveTime的時間單位
(5)workQueue:任務佇列,被提交但是未被執行的任務
(6)threadFactory:生成執行緒池中作業執行緒的執行緒工廠,用于創建執行緒
(7)handler:拒接策略,當佇列滿了的時候,并且作業執行緒大于或等于執行緒池的最大執行緒數時,如何拒絕請求執行的runnable的策略
5、執行緒池的底層作業原理

(1)創建了執行緒后,開始等待請求
(2)當呼叫一個Excuror()方法添加一個請求任務時,執行緒池會作出以下判斷:
- 如果正在運行的執行緒數小于corePoolSize,那么馬上創建這個執行緒運行這個任務
- 如果正在運行的大于corePoolSize,那么將這個任務放入佇列
- 如果佇列滿了,但是運行執行緒數小于maximumPoolSize,那么創建非核心執行緒來運行這個任務
- 如果佇列滿了且運行的執行緒數大于maximumPoolSize,那么執行緒會啟動拒絕策略來執行
(3)當一個執行緒完成任務時,他會從一個佇列中取下一個任務執行,
(4)當一個執行緒無事可做,且超過keepAliveTime時,執行緒會判斷:
- 如果當前運行的執行緒數大于corePoolSize,那么這個執行緒將會被停掉
- 所以執行緒池的所有任務完成后,它最侄訓收縮到corePoolSize的大小
可以把整個程序看成是銀行網點辦理業務,假設一共有5個視窗(maximumPoolSize),候客區有3個座位(workQueue的大小),今天開放的只有2個視窗(corePoolSize),存在以下情況:
(1)當來的客戶不大于2個人時,直接辦理
(2)當來的客戶大于2個人不大于5個人時,剩余的顧客在候客區等待
(3)當來的客戶大于5個人不大于8個人時,擴展不大于3的視窗數給候客區的客戶辦理業務
(4)當來的客戶大于8個人時,銀行坐不下了,告訴坐不下的客戶去其他地方或者遲點再來
6、ThreadPoolExecutor的拒絕策略
(1)AbortPolicy(默認):直接拋出RejectedExecutionExeception
public class MyThreadPoolDemo02 {
public static void main(String[] args) throws InterruptedException {
//CPU最大核數->CPU密集型
System.out.println(Runtime.getRuntime().availableProcessors());
//IO密集型,
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2,3,2L,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
for (int i = 0; i < 5; i++) {
//TimeUnit.SECONDS.sleep(1);
threadPoolExecutor.execute(()-> {
System.out.println(Thread.currentThread().getName() + "辦理業務");
});
}
}
}
======================
輸出:
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.john.demo01.MyThreadPoolDemo02$$Lambda$1/901506536@7c75222b rejected from java.util.concurrent.ThreadPoolExecutor@4c203ea1[Running, pool size = 3, active threads = 1, queued tasks = 2, completed tasks = 1]
at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2104)
at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:848)
at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1397)
at com.john.demo01.MyThreadPoolDemo02.main(MyThreadPoolDemo02.java:26)
pool-1-thread-1辦理業務
pool-1-thread-2辦理業務
pool-1-thread-1辦理業務
pool-1-thread-3辦理業務
pool-1-thread-1辦理業務
pool-1-thread-2辦理業務
(2)CallerRunsPolicy:將某些任務退回給呼叫者,從而降低新任務的流量,
public class MyThreadPoolDemo02 {
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
......
new ThreadPoolExecutor.CallerRunsPolicy()
);
......
}
===============
輸出:
pool-1-thread-1辦理業務
pool-1-thread-1辦理業務
pool-1-thread-1辦理業務
main辦理業務
pool-1-thread-1辦理業務
pool-1-thread-1辦理業務
pool-1-thread-1辦理業務
pool-1-thread-2辦理業務
pool-1-thread-1辦理業務
pool-1-thread-3辦理業務
(3)DiscardOldestPolicy:拋棄佇列中最久的任務,把新任務添加到佇列中
public class MyThreadPoolDemo02 {
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
......
new ThreadPoolExecutor.DiscardOldestPolicy()
);
for (int i = 0; i < 10; i++) {
//TimeUnit.SECONDS.sleep(1);
threadPoolExecutor.execute(()-> {
System.out.println(Thread.currentThread().getName() + "辦理業務");
});
}
}
===========
輸出:
pool-1-thread-1辦理業務
pool-1-thread-3辦理業務
pool-1-thread-2辦理業務
pool-1-thread-3辦理業務
pool-1-thread-1辦理業務
pool-1-thread-2辦理業務
(4)DiscardPolicy:丟棄無法處理的任務,不予任何處理,也不拋出例外,如允許任務失敗,這是一種最好的策略,
public class MyThreadPoolDemo02 {
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
......
new ThreadPoolExecutor.DiscardPolicy()
);
......
}
===========
輸出:
pool-1-thread-1辦理業務
pool-1-thread-2辦理業務
pool-1-thread-1辦理業務
pool-1-thread-2辦理業務
pool-1-thread-1辦理業務
pool-1-thread-3辦理業務
(Runtime.getRuntime().availableProcessors()獲取CPU的最大核數,CPU密集型最大執行緒數一般最大核數+1)
五、信號量工具類
1、CountDownLatch 減少計數
CountDownLatch主要方法,當一個或多個執行緒呼叫await方法,這些執行緒會阻塞,其他執行緒呼叫countDown方法會將計數器減1,當計數器減為0時,因await方法阻塞的執行緒會被喚醒,繼續執行,
例子:6個同學陸續離開教室后值班同學才可以關門,代碼如下:
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 1; i <= 6; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " 離開教室");
countDownLatch.countDown();
}, String.valueOf(i)).start();
}
countDownLatch.await();
System.out.println(Thread.currentThread().getName() + "班長關門走人");
}
}
2、CycliBarrier回圈柵欄
CycliBarrier要做的事情是讓一組執行緒達到一個屏障時被阻塞,之后最后一個執行緒達到屏障,屏障才會開門,所有執行緒才會繼續干活,執行緒進入屏障通過CycliBarrier的await方法,
例子:集齊7顆龍珠就可以召喚神龍
public class CyclicBarrierDemo {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(7, ()->{
System.out.println("召喚神龍");
});
for (int i = 1; i <= 7; i++) {
final int tmp = i;
new Thread(() -> {
try {
System.out.println("拿到第" + tmp + "顆龍龍珠");
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}, String.valueOf(i)).start();
}
}
}
3、Semaphore信號燈
Semaphore一般用于多個共享資源的互斥和并發執行緒數的控制,通過acquire獲取信號量(信號量減1),要么一直等待下去,直到有執行緒release釋放信號量(信號量+1),喚醒等待的執行緒,
例子:搶車位,6輛車,只有3個車位,使用Semaphore進行流量控制的代碼如下:
public class SemaphoreDemo {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);
for (int i = 1; i <= 6; i++) {
new Thread(() -> {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "搶到了車位");
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + "離開了車位");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}, String.valueOf(i)).start();
}
}
}
六、異步回呼
異步回呼使用的CompletableFuture類,它的用法與JS前端開發的異步呼叫箭頭函式有點相似,其方法引數大多為函式式介面,因此我們需先線了解一下阿函式式介面的使用,Java內置有四大函式式介面:

使用CompletableFuture完成異步回呼,代碼如下:
public class CompletableFutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//異步回呼
CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
System.out.println("有回傳值,insert mysql ok");
//int a = 1 / 0;
return 1024;
});
Integer result = completableFuture2.whenComplete((t, u) -> {
System.out.println("******t " + t);
System.out.println("******u " + u);
}).exceptionally(f -> {
System.out.println(f.getMessage());
return 444;
}).get();
System.out.println(result);
}
}
============
輸出:
有回傳值,insert mysql ok
******t 1024
******u null
1024
如果在異步回呼程序中產出例外,執行exceptionally中的方法:
public class CompletableFutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException
//異步回呼
CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
System.out.println("有回傳值,insert mysql ok");
int a = 1 / 0; //人為加入例外操作
return 1024;
});
//
Integer result = completableFuture2.whenComplete((t, u) -> {
System.out.println("******t " + t);
System.out.println("******u " + u);
}).exceptionally(f -> {
System.out.println(f.getMessage());
return 444;
}).get();
System.out.println(result);
}
}
==========
輸出:
有回傳值,insert mysql ok
******t null
******u java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
java.lang.ArithmeticException: / by zero
444
綜上,對于JUC并發編程的總結完畢,學而時習之,不亦說乎?通過這種理論,實踐,小總結的方式加深了對JUC的理解,
參考資料:
(1)尚硅谷周陽老師的JUC課程
(2)阿里巴巴Java開發手冊
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/279361.html
標籤:其他
上一篇:前端?規模構建演進實踐
