1. JUC前言知識
JUC即 java.util.concurrent
涉及三個包:
- java.util.concurrent
- java.util.concurrent.atomic
- java.util.concurrent.locks
普通的執行緒代碼:
- Thread
- Runnable 沒有回傳值、效率相比入 Callable 相對較低!
- Callable 有回傳值!【作業常用】
1.1 行程和執行緒
行程:是指一個記憶體中運行的程式,每個行程都有一個獨立的記憶體空間,一個應用程式可以同時運行多個行程,行程是資源分配的單位,
記憶:行程的英文為Process,Process也為程序,所以行程可以大概理解為程式執行的程序,
(行程也是程式的一次執行程序,是系統運行程式的基本單位; 系統運行一個程式即是一個行程從創建、運行到消亡的程序)
執行緒:行程中的一個執行單元,負責當前行程中程式的執行,一個行程中是可以有多個執行緒的,執行緒是CPU調度和執行的單位,
【java默認有兩個執行緒:main、GC】
舉例:打開word使用是一個行程,word會檢查你的拼寫,兩個執行緒:容災備份,語法檢查
行程與執行緒的區別:
- 行程:有獨立的記憶體空間,行程中的資料存放空間(堆空間和堆疊空間)是獨立的,至少有一個執行緒,
- 執行緒:堆空間是共享的,堆疊空間是獨立的,執行緒消耗的資源比行程小的多
1.2 并發與并行
并行 :指兩個或多個事件在同一時刻發生(同時發生)【多個CPU同時執行多個執行緒】
并發 :指兩個或多個事件在同一個時間段內發生,(交替執行) 【一個CPU交替執行執行緒】
拓展:
-
并發編程的本質是充分利用cpu資源
java代碼查詢cpu核數:
//查詢cpu核數 //CPU 密集型,IO密集型 System.out.println(Runtime.getRuntime().availableProcessors()); -
java真的可以開啟執行緒嗎?不能,通過原始碼可知底層開啟執行緒的start()方法是native修飾的,意思是呼叫作業系統C++的代碼
//本地方法,底層的C++ java無法直接操作硬體 private native void start0();
1.3 執行緒六種狀態
- NEW(新建)
執行緒剛被創建,但是并未啟動,還沒呼叫start方法 - Runnable(可運行)
執行緒可以在java虛擬機中運行的狀態,可能正在運行自己代碼,也可能沒有,這取決于操 作系統處理器 - Blocked(鎖阻塞)
當一個執行緒試圖獲取一個物件鎖,而該物件鎖被其他的執行緒持有,則該執行緒進入Blocked狀 態;當該執行緒持有鎖時,該執行緒將變成Runnable狀態, - Waiting(無限等待)
一個執行緒在等待另一個執行緒執行一個(喚醒)動作時,該執行緒進入Waiting狀態,
進入這個 狀態后是不能自動喚醒的,必須等待另一個執行緒呼叫notify或者notifyAll方法才能夠喚醒, - Timed Waiting(計時等待)
同waiting狀態,有幾個方法有超時引數,呼叫他們將進入Timed Waiting狀態,
這一狀態 將一直保持到超時期滿或者接收到喚醒通知,帶有超時引數的常用方法有Thread.sleep 、 Object.wait - Teminated(被終止)
因為run方法正常退出而死亡,或者因為沒有捕獲的例外終止了run方法而死亡,
上原始碼:
public enum State {
/**
* 新建
*/
NEW,
/**
* 運行
*/
RUNNABLE,
/**
* 阻塞
*/
BLOCKED,
/**
* 等待,死死的等
*/
WAITING,
/**
* 超時等待
*/
TIMED_WAITING,
/**
* 停止
*/
TERMINATED;
}
1.4 sleep與wait區別
只要是等待都需要拋出例外,中斷例外
-
來自不同的類
-
wait -> Object
-
sleep -> Thread
-
-
關于鎖的釋放
- wait會釋放鎖
- sleep睡覺了,抱著鎖睡覺,不會釋放!
-
使用的范圍是不同的
- wait必須在同步代碼塊中
- sleep可以在任何地方睡
1.5 解耦寫執行緒
學生寫法(多耦):
public class SaleTickerDemo01 {
public static void main(String[] args) {
Ticket ticket = new Ticket();
new Thread(ticket, "A").start();
new Thread(ticket, "B").start();
new Thread(ticket, "C").start();
}
}
class Ticket implements Runnable{
private int number = 50;
public void run(){
if (number > 0) {
System.out.println(Thread.currentThread().getName() + "買了第" + (number--) + "張票");
}
}
}
作業寫法(解耦):
public class SaleTickerDemo01 {
public static void main(String[] args) {
Ticket ticket = new Ticket();
new Thread(() -> {
for (int i = 0; i < 50; i++) {
ticket.sale();
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 50; i++) {
ticket.sale();
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 50; i++) {
ticket.sale();
}
}, "C").start();
}
}
class Ticket {
private int number = 50;
public synchronized void sale() {
if (number > 0) {
System.out.println(Thread.currentThread().getName() + "買了第" + (number--) + "張票");
}
}
}
1.6 鎖基礎
較難,能理解就理解
1.6.1 鎖機制
通過使用synchronized關鍵字來實作鎖,這樣就能夠很好地解決執行緒之間爭搶資源的情況,那么,synchronized底層到是如何實作的呢?
我們知道,使用synchronized,一定是和某個物件相關聯的,比如我們要對某一段代碼加鎖,那么我們就需要提供一個物件來作為鎖本身:
public static void main(String[] args) {
synchronized (Main.class) {
//這里使用的是Main類的Class物件作為鎖
}
}
我們來看看,它變成位元組碼之后會用到哪些指令:

其中最關鍵的就是monitorenter指令了,可以看到之后也有monitorexit與之進行匹配(注意這里有2個),monitorenter和monitorexit分別對應加鎖和釋放鎖,在執行monitorenter之前需要嘗試獲取鎖,每個物件都有一個monitor監視器與之對應,而這里正是去獲取物件監視器的所有權,一旦monitor所有權被某個執行緒持有,那么其他執行緒將無法獲得(管程模型的一種實作),
在代碼執行完成之后,我們可以看到,一共有兩個monitorexit在等著我們,那么為什么這里會有兩個呢,按理說monitorenter和monitorexit不應該一一對應嗎,這里為什么要釋放鎖兩次呢?
首先我們來看第一個,這里在釋放鎖之后,會馬上進入到一個goto指令,跳轉到15行,而我們的15行對應的指令就是方法的回傳指令,其實正常情況下只會執行第一個monitorexit釋放鎖,在釋放鎖之后就接著同步代碼塊后面的內容繼續向下執行了,而第二個,其實是用來處理例外的,可以看到,它的位置是在12行,如果程式運行發生例外,那么就會執行第二個monitorexit,并且會繼續向下通過athrow指令拋出例外,而不是直接跳轉到15行正常運行下去,

實際上synchronized使用的鎖就是存盤在Java物件頭中的,我們知道,物件是存放在堆記憶體中的,而每個物件內部,都有一部分空間用于存盤物件頭資訊,而物件頭資訊中,則包含了Mark Word用于存放hashCode和物件的鎖資訊,在不同狀態下,它存盤的資料結構有一些不同,
1.6.2 重量級鎖
在JDK6之前,synchronized一直被稱為重量級鎖,monitor依賴于底層作業系統的Lock實作,Java的執行緒是映射到作業系統的原生執行緒上,切換成本較高,而在JDK6之后,鎖的實作得到了改進,我們先從最原始的重量級鎖開始:
我們說了,每個物件都有一個monitor與之關聯,在Java虛擬機(HotSpot)中,monitor是由ObjectMonitor實作的:
ObjectMonitor() {
_header = NULL;
_count = 0; //記錄個數
_waiters = 0,
_recursions = 0;
_object = NULL;
_owner = NULL;
_WaitSet = NULL; //處于wait狀態的執行緒,會被加入到_WaitSet
_WaitSetLock = 0 ;
_Responsible = NULL ;
_succ = NULL ;
_cxq = NULL ;
FreeNext = NULL ;
_EntryList = NULL ; //處于等待鎖block狀態的執行緒,會被加入到該串列
_SpinFreq = 0 ;
_SpinClock = 0 ;
OwnerIsThread = 0 ;
}
每個等待鎖的執行緒都會被封裝成ObjectWaiter物件,進入到如下機制:

ObjectWaiter首先會進入 Entry Set等著,當執行緒獲取到物件的monitor后進入 The Owner 區域并把monitor中的owner變數設定為當前執行緒,同時monitor中的計數器count加1,若執行緒呼叫wait()方法,將釋放當前持有的monitor,owner變數恢復為null,count自減1,同時該執行緒進入 WaitSet集合中等待被喚醒,若當前執行緒執行完畢也將釋放monitor并復位變數的值,以便其他執行緒進入獲取物件的monitor,
雖然這樣的設計思路非常合理,但是在大多數應用上,每一個執行緒占用同步代碼塊的時間并不是很長,我們完全沒有必要將競爭中的執行緒掛起然后又喚醒,并且現代CPU基本都是多核心運行的,我們可以采用一種新的思路來實作鎖,
在JDK1.4.2時,引入了自旋鎖(JDK6之后默認開啟),它不會將處于等待狀態的執行緒掛起,而是通過無限回圈的方式,不斷檢測是否能夠獲取鎖,由于單個執行緒占用鎖的時間非常短,所以說回圈次數不會太多,可能很快就能夠拿到鎖并運行,這就是自旋鎖,當然,僅僅是在等待時間非常短的情況下,自旋鎖的表現會很好,但是如果等待時間太長,由于回圈是需要處理器繼續運算的,所以這樣只會浪費處理器資源,因此自旋鎖的等待時間是有限制的,默認情況下為10次,如果失敗,那么會進而采用重量級鎖機制,

在JDK6之后,自旋鎖得到了一次優化,自旋的次數限制不再是固定的,而是自適應變化的,比如在同一個鎖物件上,自旋等待剛剛成功獲得過鎖,并且持有鎖的執行緒正在運行,那么這次自旋也是有可能成功的,所以會允許自旋更多次,當然,如果某個鎖經常都自旋失敗,那么有可能會不再采用自旋策略,而是直接使用重量級鎖,
1.6.3 輕量級鎖
從JDK 1.6開始,為了減少獲得鎖和釋放鎖帶來的性能消耗,就引入了輕量級鎖,
輕量級鎖的目標是,在無競爭情況下,減少重量級鎖產生的性能消耗(并不是為了代替重量級鎖,實際上就是賭同一時間只有一個執行緒在占用資源),包括系統呼叫引起的內核態與用戶態切換、執行緒阻塞造成的執行緒切換等,它不像是重量級鎖那樣,需要向作業系統申請互斥量,它的運作機制如下:
在即將開始執行同步代碼塊中的內容時,會首先檢查物件的Mark Word,查看鎖物件是否被其他執行緒占用,如果沒有任何執行緒占用,那么會在當前執行緒中所處的堆疊幀中建立一個名為鎖記錄(Lock Record)的空間,用于復制并存盤物件目前的Mark Word資訊(官方稱為Displaced Mark Word),
接著,虛擬機將使用CAS操作將物件的Mark Word更新為輕量級鎖狀態(資料結構變為指向Lock Record的指標,指向的是當前的堆疊幀)
CAS(Compare And Swap)是一種無鎖演算法(我們之前在Springboot階段已經講解過了),它并不會為物件加鎖,而是在執行的時候,看看當前資料的值是不是我們預期的那樣,如果是,那就正常進行替換,如果不是,那么就替換失敗,比如有兩個執行緒都需要修改變數
i的值,默認為10,現在一個執行緒要將其修改為20,另一個要修改為30,如果他們都使用CAS演算法,那么并不會加鎖訪問i,而是直接嘗試修改i的值,但是在修改時,需要確認i是不是10,如果是,表示其他執行緒還沒對其進行修改,如果不是,那么說明其他執行緒已經將其修改,此時不能完成修改任務,修改失敗,在CPU中,CAS操作使用的是
cmpxchg指令,能夠從最底層硬體層面得到效率的提升,
如果CAS操作失敗了的話,那么說明可能這時有執行緒已經進入這個同步代碼塊了,這時虛擬機會再次檢查物件的Mark Word,是否指向當前執行緒的堆疊幀,如果是,說明不是其他執行緒,而是當前執行緒已經有了這個物件的鎖,直接放心大膽進同步代碼塊即可,如果不是,那確實是被其他執行緒占用了,
這時,輕量級鎖一開始的想法就是錯的(這時有物件在競爭資源,已經賭輸了),所以說只能將鎖膨脹為重量級鎖,按照重量級鎖的操作執行(注意鎖的膨脹是不可逆的)
所以,輕量級鎖 -> 失敗 -> 自適應自旋鎖 -> 失敗 -> 重量級鎖
解鎖程序同樣采用CAS演算法,如果物件的MarkWord仍然指向執行緒的鎖記錄,那么就用CAS操作把物件的MarkWord和復制到堆疊幀中的Displaced Mark Word進行交換,如果替換失敗,說明其他執行緒嘗試過獲取該鎖,在釋放鎖的同時,需要喚醒被掛起的執行緒,
輕量級鎖的加鎖程序:
(1)當執行緒執行代碼進入同步塊時,若Mark Word為無鎖狀態,虛擬機先在當前執行緒的堆疊幀中建立一個名為Lock Record的空間,用于存盤當前物件的Mark Word的拷貝,官方稱之為“Dispalced Mark Word”
(2)復制物件頭中的Mark Word到鎖記錄中,
(3)復制成功后,虛擬機將用CAS操作將物件的Mark Word更新為執行Lock Record的指標,并將Lock Record里的owner指標指向物件的Mark Word,如果更新成功,則執行4,否則執行5,;
(4)如果更新成功,則這個執行緒擁有了這個鎖,并將鎖標志設為00,表示處于輕量級鎖狀態
(5)如果更新失敗,虛擬機會檢查物件的Mark Word是否指向當前執行緒的堆疊幀,如果是則說明當前執行緒已經擁有這個鎖,可進入執行同步代碼,否則說明多個執行緒競爭,輕量級鎖就會膨脹為重量級鎖,Mark Word中存盤重量級鎖(互斥鎖)的指標,后面等待鎖的執行緒也要進入阻塞狀態,
1.6.4 偏向鎖
偏向鎖相比輕量級鎖更純粹,干脆就把整個同步都消除掉,不需要再進行CAS操作了,它的出現主要是得益于人們發現某些情況下某個鎖頻繁地被同一個執行緒獲取,這種情況下,我們可以對輕量級鎖進一步優化:偏向鎖實際上就是專門為單個執行緒而生的,當某個執行緒第一次獲得鎖時,如果接下來都沒有其他執行緒獲取此鎖,那么持有鎖的執行緒將不再需要進行同步操作,
通俗的講,偏向鎖就是在運行程序中,物件的鎖偏向某個執行緒,即在開啟偏向鎖機制的情況下,某個執行緒獲得鎖,當該執行緒下次再想要獲得鎖時,不需要再獲得鎖(即忽略synchronized關鍵詞),直接就可以執行同步代碼,比較適合競爭較少的情況,
可以從之前的MarkWord結構中看到,偏向鎖也會通過CAS操作記錄執行緒的ID,如果一直都是同一個執行緒獲取此鎖,那么完全沒有必要在進行額外的CAS操作,當然,如果有其他執行緒來搶了,那么偏向鎖會根據當前狀態,決定是否要恢復到未鎖定或是膨脹為輕量級鎖,
如果我們需要使用偏向鎖,可以添加-XX:+UseBiased引數來開啟,
所以,最終的鎖等級為:未鎖定 < 偏向鎖 < 輕量級鎖 < 重量級鎖
值得注意的是,如果物件通過呼叫hashCode()方法計算過物件的一致性哈希值,那么它是不支持偏向鎖的,會直接進入到輕量級鎖狀態,因為Hash是需要被保存的,而偏向鎖的Mark Word資料結構,無法保存Hash值;如果物件已經是偏向鎖狀態,再去呼叫hashCode()方法,那么會直接將鎖升級為重量級鎖,并將哈希值存放在monitor(有預留位置保存)中,

偏向鎖的獲取流程:
(1)查看Mark Word中偏向鎖的標識以及鎖標志位,若是否偏向鎖為1且鎖標志位為01,則該鎖為可偏向狀態,
(2)若為可偏向狀態,則測驗Mark Word中的執行緒ID是否與當前執行緒相同,若相同,則直接執行同步代碼,否則進入下一步,
(3)當前執行緒通過CAS操作競爭鎖,若競爭成功,則將Mark Word中執行緒ID設定為當前執行緒ID,然后執行同步代碼,若競爭失敗,進入下一步,
(4)當前執行緒通過CAS競爭鎖失敗的情況下,說明有競爭,當到達全域安全點時之前獲得偏向鎖的執行緒被掛起,偏向鎖升級為輕量級鎖,然后被阻塞在安全點的執行緒繼續往下執行同步代碼,
偏向鎖的釋放流程:
偏向鎖只有遇到其他執行緒嘗試競爭偏向鎖時,持有偏向鎖狀態的執行緒才會釋放鎖,執行緒不會主動去釋放偏向鎖,偏向鎖的撤銷需要等待全域安全點(即沒有位元組碼正在執行),它會暫停擁有偏向鎖的執行緒,撤銷后偏向鎖恢復到未鎖定狀態或輕量級鎖狀態,
1.6.5 鎖消除和鎖粗化
鎖消除和鎖粗化都是在運行時的一些優化方案,
- 鎖消除是比如我們某段代碼雖然加了鎖,但是在運行時根本不可能出現各個執行緒之間資源爭奪的情況,這種情況下,完全不需要任何加鎖機制,所以鎖會被消除,
- 鎖粗化則是我們代碼中頻繁地出現互斥同步操作,比如在一個回圈內部加鎖,這樣明顯是非常消耗性能的,所以虛擬機一旦檢測到這種操作,會將整個同步范圍進行擴展,
2. Lock鎖
2.0 Lock鎖和synchronized的區別
- Synchronized是內置Java關鍵字;Lock是一個Java類,
- Synchronized無法判斷獲取鎖的狀態;Lock可以判斷是否獲取到了鎖,(boolean b = lock.tryLock();)
- Synchronized會自動釋放鎖;Lock必須要手動釋放鎖,如果不釋放鎖,死鎖,
- Synchronized執行緒1獲得鎖阻塞時,執行緒2會一直等待下去;Lock鎖執行緒1獲得鎖阻塞時,執行緒2等待足夠長的時間后中斷等待,去做其他的事,
- Synchronized可重入鎖,不可以中斷的,非公平;Lock,可重入鎖,可以判斷鎖,非公平(可以自己設定),
lock.lockInterruptibly();方法:當兩個執行緒同時通過該方法想獲取某個鎖時,假若此時執行緒A獲取到了鎖,而執行緒B只有在等待,那么對執行緒B呼叫threadB.interrupt()方法能夠中斷執行緒B的等待程序, - Synchronized適合鎖少量的代碼同步問題;Lock適合鎖大量的同步代碼,
2.1 Lock介面的三個實作類

由jdk查詢可知,有三種類
2.2 ReentrantLock類
ReentrantLock鎖的物件是呼叫lock方法的實體物件
使用創建ReentrantLock物件代替傳統的Synchronized鎖
2.2.1 構造方法——公平鎖and非公平鎖
公平鎖:十分公平,不能插隊,
非公平鎖:十分不公平,可以插隊,(默認非公平鎖)

需要更改默認的非公平鎖為公平鎖,需要在創建物件的時候引數設定為true(默認是false)
2.2.2 ReentrantLock類的使用
class X {
private final ReentrantLock lock = new ReentrantLock();
// ...
public void m() {
lock.lock(); // block until condition holds
try {
//業務代碼 ... method body
} finally {
lock.unlock();
}
}
}
2.3 Condition介面
使用await和signal方法代替傳統的wait和notify方法


2.3.1 await() signal() 方法基本使用
就是在最原始的多執行緒synchronized寫法上修改了使用的方法
使用while的緣故還是:可能會出現虛假喚醒

2.3.2 Condition實作精準通知喚醒
使用ReentrantLock創建的物件lock來創建多個condition物件,每次等待和喚醒都可以指定 如:conditionA.await(); conditionB.signal();
舉例:
public class C {
public static void main(String[] args) {
Data3 data3 = new Data3();
//A執行完,呼叫B,B執行完,呼叫C,C執行完,呼叫A
new Thread(() -> {
for (int i = 0; i < 10; i++) {
data3.printA();
}
}, "A").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
data3.printB();
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
data3.printC();
}
}, "C").start();
}
}
class Data3 {
private Lock lock = new ReentrantLock();
Condition conditionA = lock.newCondition();
Condition conditionB = lock.newCondition();
Condition conditionC = lock.newCondition();
private char ch = 'A';
public void printA() {
lock.lock();
try {
while (ch != 'A') {
//等待
conditionA.await();
}
System.out.println(Thread.currentThread().getName() + "--->A");
//喚醒
ch = 'B';
conditionB.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void printB() {
lock.lock();
try {
while (ch != 'B') {
//等待
conditionB.await();
}
System.out.println(Thread.currentThread().getName() + "--->B");
//喚醒
ch = 'C';
conditionC.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void printC() {
lock.lock();
try {
while (ch != 'C') {
//等待
conditionC.await();
}
System.out.println(Thread.currentThread().getName() + "--->C");
//喚醒
ch = 'A';
conditionA.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
結果:執行順序變成以此執行
A--->A
B--->B
C--->C
A--->A
B--->B
C--->C
A--->A
B--->B
C--->C
A--->A
B--->B
C--->C
A--->A
B--->B
C--->C
A--->A
B--->B
C--->C
A--->A
B--->B
C--->C
A--->A
B--->B
C--->C
A--->A
B--->B
C--->C
A--->A
B--->B
C--->C
3. 生產者與消費者
3.1 傳統的synchronized寫法
synchronized+wait+notifyall
public class A {
public static void main(String[] args) {
Data data = https://www.cnblogs.com/buchizicai/archive/2023/05/04/new Data();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"A").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "C").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "D").start();
}
}
class Data {
private int number = 0;
public synchronized void increment() throws InterruptedException {
if (number != 0) {
//等待
this.wait();
}
number++;
System.out.println(Thread.currentThread().getName() + "==>" + number);
//通知其他執行緒,我+1完畢了
this.notifyAll();
}
public synchronized void decrement() throws InterruptedException {
if (number == 0) {
//等待
this.wait();
}
number--;
System.out.println(Thread.currentThread().getName() + "==>" + number);
//通知其他執行緒,我-1完畢了
this.notifyAll();
}
}
輸出:發現有問題,出現了負數和大于1的數,這就是虛假喚醒問題(看下面)
A==>1
C==>0
B==>1
A==>2
B==>3
C==>2
C==>1
C==>0
B==>1
A==>2
B==>3
C==>2
C==>1
C==>0
B==>1
A==>2
B==>3
D==>2
D==>1
D==>0
C==>-1
C==>-2
C==>-3
D==>-4
D==>-5
D==>-6
D==>-7
D==>-8
D==>-9
D==>-10
B==>-9
A==>-8
B==>-7
A==>-6
B==>-5
A==>-4
B==>-3
A==>-2
3.2 Lock寫法
ReentrantLock類 和 Condition介面:lock+await+signal
public class PC {
public static void main(String[] args) {
Data data = https://www.cnblogs.com/buchizicai/archive/2023/05/04/new Data();
new Thread(()->{
for (int i = 0; i < 10; i++) {
data.decrement();
}
},"A").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
data.increment();
}
},"B").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
data.decrement();
}
},"C").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
data.increment();
}
},"D").start();
}
}
class Data {
private int number = 0;
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
public void increment() {
lock.lock();
try {
if (number > 0) {
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
number++;
System.out.println(Thread.currentThread().getName() + "=>" + number);
condition.signalAll();
} finally {
lock.unlock();
}
}
public void decrement() {
lock.lock();
try {
if (number <= 0) {
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
number--;
System.out.println(Thread.currentThread().getName() + "=>" + number);
condition.signalAll();
} finally {
lock.unlock();
}
}
}
輸出:發現有問題,出現了負數,這就是虛假喚醒問題(看下面)
B=>1
A=>0
B=>1
A=>0
C=>-1
B=>0
B=>1
A=>0
C=>-1
B=>0
B=>1
A=>0
C=>-1
D=>0
D=>1
B=>2
A=>1
A=>0
C=>-1
D=>0
D=>1
B=>2
A=>1
A=>0
C=>-1
D=>0
D=>1
B=>2
A=>1
A=>0
C=>-1
D=>0
D=>1
B=>2
C=>1
C=>0
D=>1
C=>0
D=>1
C=>0
3.2 虛假喚醒問題
3.2.1 問題描述
白話:一個if條件里有等待陳述句,兩個消費者都進入這個等待;當生產者生產了1數量的產品并喚醒消費者,此時之前處于等待的兩個消費者會被喚醒,然后進行消費;導致最后消費的產品出現負數,因為產品只有一個,而消費者消費了兩次,
虛假喚醒是一種現象,它只會出現在多執行緒環境中,指的是在多執行緒環境下,多個執行緒等待在同一個條件上,等到條件滿足時,所有等待的執行緒都被喚醒,但由于多個執行緒執行的順序不同,后面競爭到鎖的執行緒在獲得時間片時條件已經不再滿足,執行緒應該繼續睡眠但是卻繼續往下運行的一種現象,
舉例:
public class A {
public static void main(String[] args) {
Data data = https://www.cnblogs.com/buchizicai/archive/2023/05/04/new Data();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"A").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "C").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
data.decrement();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "D").start();
}
}
class Data {
private int number = 0;
public synchronized void increment() throws InterruptedException {
if (number != 0) {
//等待
this.wait();
}
number++;
System.out.println(Thread.currentThread().getName() + "==>" + number);
//通知其他執行緒,我+1完畢了
this.notifyAll();
}
public synchronized void decrement() throws InterruptedException {
if (number == 0) {
//等待
this.wait();
}
number--;
System.out.println(Thread.currentThread().getName() + "==>" + number);
//通知其他執行緒,我-1完畢了
this.notifyAll();
}
}
結果:出現了大于1的情況,以及小于0的情況
A==>1
C==>0
B==>1
A==>2
B==>3
C==>2
C==>1
C==>0
B==>1
A==>2
B==>3
C==>2
C==>1
C==>0
B==>1
A==>2
B==>3
D==>2
D==>1
D==>0
C==>-1
C==>-2
C==>-3
D==>-4
D==>-5
D==>-6
D==>-7
D==>-8
D==>-9
D==>-10
B==>-9
A==>-8
B==>-7
A==>-6
B==>-5
A==>-4
B==>-3
A==>-2
3.2.2 解決方法

?? jdk8給出的解決方案為:將增加和減少的方法中的if修改為while,如:
用if判斷的話,喚醒后執行緒會從wait之后的代碼開始運行,但是不會重新判斷if條件,直接繼續運行if代碼塊之后的代碼,而如果使用while的話,也會從wait之后的代碼運行,但是喚醒后會重新判斷回圈條件,如果不成立再執行while代碼塊之后的代碼塊,成立的話繼續wait,
解決方案原理:
拿兩個加法執行緒A、B來說,比如A先執行,執行時呼叫了wait方法,那它會等待,此時會釋放鎖,那么執行緒B獲得鎖并且也會執行wait方法,兩個加執行緒一起等待被喚醒,此時減執行緒中的某一個執行緒執行完畢并且喚醒了這倆加執行緒,那么這倆加執行緒不會一起執行,其中A獲取了鎖并且加1,執行完畢之后B再執行,如果是if的話,那么A修改完num后,喚醒其他執行緒并釋放鎖,此時B搶到了鎖,B不會再去判斷num的值,直接會給num+1,如果是while的話,A執行完之后,B搶到了鎖的話B還會去判斷num的值,因此就不會執行,
synchronized的最終版本:
public class A {
public static void main(String[] args) {
Data data = https://www.cnblogs.com/buchizicai/archive/2023/05/04/new Data();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
data.increment();
}
},"A").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
data.decrement();
}
}, "B").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
data.increment();
}
}, "C").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
data.decrement();
}
}, "D").start();
}
}
class Data {
private int number = 0;
public synchronized void increment() throws InterruptedException {
while (number != 0) {
//等待
this.wait();
}
number++;
System.out.println(Thread.currentThread().getName() + "==>" + number);
//通知其他執行緒,我+1完畢了
this.notifyAll();
}
public synchronized void decrement() throws InterruptedException {
while (number == 0) {
//等待
this.wait();
}
number--;
System.out.println(Thread.currentThread().getName() + "==>" + number);
//通知其他執行緒,我-1完畢了
this.notifyAll();
}
}
Lock的最終版本:
public class PC {
public static void main(String[] args) {
Data data = https://www.cnblogs.com/buchizicai/archive/2023/05/04/new Data();
new Thread(()->{
for (int i = 0; i < 10; i++) {
data.decrement();
}
},"A").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
data.increment();
}
},"B").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
data.decrement();
}
},"C").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
data.increment();
}
},"D").start();
}
}
class Data {
private int number = 0;
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
public void increment() {
lock.lock();
try {
while (number > 0) {
condition.await();
}
number++;
System.out.println(Thread.currentThread().getName() + "=>" + number);
condition.signalAll();
} finally {
lock.unlock();
}
}
public void decrement() {
lock.lock();
try {
while (number <= 0) {
condition.await();
}
number--;
System.out.println(Thread.currentThread().getName() + "=>" + number);
condition.signalAll();
} finally {
lock.unlock();
}
}
}
3.3 Condition精準喚醒生產者消費者
使用了精準喚醒就不會出現虛假喚醒問題,也就不需要while了
(因為虛假喚醒就是不確定釋放的鎖給誰才出現的,現在有了精準喚醒就不會出現虛假喚醒問題)
使用Condition介面的特點,指定物件等待以及指定物件的喚醒
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ConditionTest {
public static void main(String[] args) {
ConditionTest01 conditionTest01 = new ConditionTest01();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
conditionTest01.method01();
}
},"AAAA").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
conditionTest01.method02();
}
},"BBBB").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
conditionTest01.method03();
}
},"CCCC").start();
}
}
class ConditionTest01 {
Lock lock = new ReentrantLock();
Condition condition1 = lock.newCondition();
Condition condition2 = lock.newCondition();
Condition condition3 = lock.newCondition();
private int number = 2; //作為一個識別符號,用于判斷是哪個執行緒休眠
public void method01() {
lock.lock();
try {
// 這里為了測驗是否真的精準喚醒不會虛假喚醒而使用if 實際使用上就應該按上文說的用while
if (number != 1){
condition1.await();
}
number = 2;
System.out.println(Thread.currentThread().getName()+"喚醒了condition2");
condition2.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void method02() {
lock.lock();
try {
if (number != 2){
condition2.await();
}
number = 3;
System.out.println(Thread.currentThread().getName()+"喚醒了condition3");
condition3.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void method03() {
lock.lock();
try {
if (number != 3){
condition3.await();
}
number = 1;
System.out.println(Thread.currentThread().getName()+"喚醒了condition1");
condition1.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
4. ReadWriteLock讀寫鎖
讀寫鎖維護了一個讀鎖和一個寫鎖,這兩個鎖的機制是不同的,
- 讀鎖:在沒有任何執行緒占用寫鎖的情況下,同一時間可以有多個執行緒加讀鎖,
- 寫鎖:在沒有任何執行緒占用讀鎖的情況下,同一時間只能有一個執行緒加寫鎖,
4.0 基本使用
在業務中有讀寫操作的時候可以使用,不再只會用Lock鎖
讀寫鎖也可以分為:獨占鎖/排他鎖(寫鎖)、共享鎖(讀鎖)

讀寫鎖在面對讀和寫操作時使用的鎖是不一樣的,
基本使用:
- 創建讀寫鎖物件:
ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); - 讀操作的鎖為:
readWriteLock.writeLock().lock();和readWriteLock.writeLock().unlock(); - 寫操作的鎖為:
readWriteLock.readLock().lock();和readWriteLock.readLock().unlock();
舉例:模擬讀和寫兩個操作
/**
* 獨占鎖(寫鎖)一次只能被一個執行緒占有
* 共享鎖(讀鎖)多個執行緒可以同時占有
* ReadWriteLock
* 讀-讀 可以共存!
* 讀-寫 不能共存!
* 寫-寫 不能共存!
*
*/
public class ReadWriteLockDemo {
public static void main(String[] args) {
MyCacheLock myCache = new MyCacheLock();
for (int i = 1; i <= 5; i++) {
final String temp = String.valueOf(i);
new Thread(() -> {
myCache.put(temp, temp);
}, temp).start();
}
for (int i = 1; i <= 5; i++) {
final String temp = String.valueOf(i);
new Thread(() -> {
myCache.get(temp);
}, temp).start();
}
}
}
//加了讀寫鎖的讀寫操作,寫的時候是A寫完才能到B寫,讀的話可以一起進來讀
class MyCacheLock {
private volatile Map<String, Object> map = new HashMap<>();
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
//存,寫
public void put(String key, Object value) {
//上鎖
readWriteLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "寫入" + key);
map.put(key, value);
System.out.println(Thread.currentThread().getName() + "寫入ok");
} catch (Exception e) {
e.printStackTrace();
} finally {
//解鎖(一般是放在try/catch中的finally部分保證會執行
readWriteLock.writeLock().unlock();
}
}
//取,讀
public void get(String key) {
//上鎖
readWriteLock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "讀取" + key);
map.get(key);
System.out.println(Thread.currentThread().getName() + "讀取ok");
} catch (Exception e) {
e.printStackTrace();
} finally {
//解鎖(一般是放在try/catch中的finally部分保證會執行
readWriteLock.readLock().unlock();
}
}
}
//不加鎖的讀寫操作:出問題,A寫還沒寫完就被讀了,并且中間B也進來寫了可能會造成資料覆寫
class MyCache {
private volatile Map<String, Object> map = new HashMap<>();
//存,寫
public void put(String key, Object value) {
System.out.println(Thread.currentThread().getName() + "寫入" + key);
map.put(key, value);
System.out.println(Thread.currentThread().getName() + "寫入ok");
}
//取,讀
public void get(String key) {
System.out.println(Thread.currentThread().getName() + "讀取" + key);
map.get(key);
System.out.println(Thread.currentThread().getName() + "讀取ok");
}
}
加了讀寫鎖的結果:寫的時候是A寫完才能到B寫,讀的話可以一起進來讀
1寫入1
1寫入ok
2寫入2
2寫入ok
3寫入3
3寫入ok
4寫入4
4寫入ok
5寫入5
5寫入ok
1讀取
1讀取ok
2讀取
4讀取
4讀取ok
3讀取
5讀取
3讀取ok
2讀取ok
5讀取ok
4.1 讀鎖作業方式
讀優先鎖期望的是,讀鎖能被更多的執行緒持有,以便提高讀執行緒的并發性,它的作業方式是:當讀執行緒 A 先持有了讀鎖,寫執行緒 B 在獲取寫鎖的時候,會被阻塞,并且在阻塞程序中,后續來的讀執行緒 C 仍然可以成功獲取讀鎖,最后直到讀執行緒 A 和 C 釋放讀鎖后,寫執行緒 B 才可以成功獲取讀鎖,如下圖:

4.2 寫鎖作業方式
寫優先鎖是優先服務寫執行緒,其作業方式是:當讀執行緒 A 先持有了讀鎖,寫執行緒 B 在獲取寫鎖的時候,會被阻塞,并且在阻塞程序中,后續來的讀執行緒 C 獲取讀鎖時會失敗,于是讀執行緒 C 將被阻塞在獲取讀鎖的操作,這樣只要讀執行緒 A 釋放讀鎖后,寫執行緒 B 就可以成功獲取讀鎖,如下圖:

4.3 產生的問題
如果一直有讀執行緒獲取讀鎖,那么寫執行緒將永遠獲取不到寫鎖,這就造成了寫執行緒「饑餓」的現象,
寫優先鎖可以保證寫執行緒不會餓死,但是如果一直有寫執行緒獲取寫鎖,讀執行緒也會被「餓死」,
解決方式:
既然不管優先讀鎖還是寫鎖,對方可能會出現餓死問題,那么我們就不偏袒任何一方,搞個「公平讀寫鎖」,
公平讀寫鎖比較簡單的一種方式是:用佇列把獲取鎖的執行緒排隊,不管是寫執行緒還是讀執行緒都按照先進先出的原則加鎖即可,這樣讀執行緒仍然可以并發,也不會出現「饑餓」的現象,
4.4 鎖升級和鎖降級
A B是執行緒,下面的前提都是拿同一個物件鎖
- A拿了寫鎖,則B讀/寫鎖都不能拿,
- A拿了讀鎖,則B只能拿讀鎖,不能拿寫鎖
- A先拿了寫鎖再拿了讀鎖,則B讀/寫鎖都不能拿,直到A釋放寫鎖才能拿
- A先拿了讀鎖是不能再拿寫鎖的!(ReentrantReadWriteLock是不支持鎖升級)
鎖降級指的是:寫鎖降級為讀鎖,當一個執行緒持有寫鎖的情況下,雖然其他執行緒不能加讀鎖,但是執行緒自己是可以加讀鎖的:
public static void main(String[] args) throws InterruptedException {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
lock.writeLock().lock();
lock.readLock().lock();
System.out.println("成功加讀鎖!");
}
那么,如果我們在同時加了寫鎖和讀鎖的情況下,釋放寫鎖,是否其他的執行緒就可以一起加讀鎖了呢?
public static void main(String[] args) throws InterruptedException {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
lock.writeLock().lock();
lock.readLock().lock();
new Thread(() -> {
System.out.println("開始加讀鎖!");
lock.readLock().lock();
System.out.println("讀鎖添加成功!");
}).start();
TimeUnit.SECONDS.sleep(1);
lock.writeLock().unlock(); //如果釋放寫鎖,會怎么樣?
}
可以看到,一旦寫鎖被釋放,那么主執行緒就只剩下讀鎖了,因為讀鎖可以被多個執行緒共享,所以這時第二個執行緒也添加了讀鎖,而這種操作,就被稱之為"鎖降級"(注意不是先釋放寫鎖再加讀鎖,而是持有寫鎖的情況下申請讀鎖再釋放寫鎖)
注意在僅持有讀鎖的情況下去申請寫鎖,屬于"鎖升級",ReentrantReadWriteLock是不支持的:
public static void main(String[] args) throws InterruptedException {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
lock.readLock().lock();
lock.writeLock().lock();
System.out.println("所升級成功!");
}
可以看到執行緒直接卡在加寫鎖的那一句了,
5. Callable
實作Callable介面是創建執行緒的方式之一,原始碼可知本Callable介面是一個函式式介面

5.1 介紹
callable的三個特點:
- 可以有回傳值
- 可以拋出例外
- 方法不同,call()
下面這些api就是在描述一個圖:Call也是Thread實作的,但它是通過偽裝成Runnable進Thread的構造方法里,
如何偽裝呢?Runable有一個實作類FutureTask的構造方法可以裝入Callable,從而偽裝成了Runnable,
jdk介紹:

Runnable的Api檔案

FutureTask的Api檔案


5.2 Callable基本使用
- 實作介面Callable,并實作call方法
- 創建Runnable實作類物件,并裝入FutureTask構造方法內并創建FutureTask物件
- 將前面的FutureTask物件裝入Thread構造方法內,然后啟動執行緒start();
public class CallableTest {
public static void main(String[] args) {
MyThread thread = new MyThread();
//Callable --- Runnable 中間轉換(適配類)
FutureTask<String> futureTask = new FutureTask<>(thread);
//結果會被快取,提高效率
new Thread(futureTask, "A").start();
new Thread(futureTask, "B").start();
//獲取回傳值
try {
//這個get方法可能會產生阻塞,把它放到最后
String s = futureTask.get();
//或者使用異步通信來處理
System.out.println(s);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
class MyThread implements Callable<String> {
@Override
public String call() {
System.out.println("call()");
//可能是耗時的操作
return "123";
}
}
輸出:
產生疑問——為什么只輸出了一次call?因為FutureTask有快取!
注意:futureTask.get(); 可能會產生阻塞,所以要把它放到最后
call()
123
6. BlockingQueue阻塞佇列
為執行緒池打基礎
阻塞佇列的原生:

阻塞佇列的特點:

阻塞佇列BlockingQueue是一個介面,它的實作類有:
① ArrayBlockingQueue(陣列寫的佇列) ② LinkedBlockingQueue(鏈表寫的佇列) ③ SynchronousQueue同步佇列
6.1 BlockingQueue四組API
ArrayBlockingQueue 和 LinkedBlockingQueue有以下四種API,但SynchronousQueue沒有第一列,只有后面三列
API之間可以混合使用,只要代碼邏輯沒錯即可
| 方式 | 有回傳值,拋出例外 | 有回傳值,不拋出例外 | 阻塞等待 | 超時等待 |
|---|---|---|---|---|
| 添加 | add() | offer() | put() | offer( , , ) |
| 移除 | remove() | poll() | take() | poll( , ) |
| 查看佇列首元素 | element() | peek() | - | - |
/**
* 有回傳值,拋出例外
*/
public static void test1() {
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue(3);
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
//拋出例外 java.lang.IllegalStateException: Queue full
//System.out.println(blockingQueue.add("d"));
//查看佇列首元素
System.out.println(blockingQueue.element());
System.out.println("===================");
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
//拋出例外 java.util.NoSuchElementException
//System.out.println(blockingQueue.remove());
}
/**
* 有回傳值,不拋出例外
*/
public static void test2() {
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue(3);
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
System.out.println(blockingQueue.offer("c"));
//不拋出例外,回傳false
System.out.println(blockingQueue.offer("d"));
//查看佇列首元素
System.out.println(blockingQueue.peek());
System.out.println("===================");
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
//不拋出例外,回傳null
System.out.println(blockingQueue.poll());
}
/**
* 阻塞等待,一直等
*/
public static void test3() {
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue(3);
try {
blockingQueue.put("a");
blockingQueue.put("b");
blockingQueue.put("c");
//佇列沒有位置了,一直阻塞
//blockingQueue.put("d");
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
//佇列沒有元素了,一直阻塞
//System.out.println(blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 超時等待
*/
public static void test4() {
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue(3);
try {
//添加元素
System.out.println(blockingQueue.offer("a", 2, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("b", 2, TimeUnit.SECONDS));
System.out.println(blockingQueue.offer("c", 2, TimeUnit.SECONDS));
//嘗試進去佇列等待超過2秒進不去,則回傳false
System.out.println(blockingQueue.offer("d", 2, TimeUnit.SECONDS));
System.out.println("================");
//移除隊尾元素
System.out.println(blockingQueue.poll(2, TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(2, TimeUnit.SECONDS));
System.out.println(blockingQueue.poll(2, TimeUnit.SECONDS));
//從佇列中取等待超過2秒沒取到,則回傳null
System.out.println(blockingQueue.poll(2, TimeUnit.SECONDS));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
6.2 SynchronousQueue同步佇列
同步佇列:是一個容量等于1的佇列,進去一個元素,必須等待取出來之后,才能再往里面放一個元素!
舉例:
public class SynchronousQueueDemo {
public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + " put a");
blockingQueue.put("a");
System.out.println(Thread.currentThread().getName() + " put b");
blockingQueue.put("b");
System.out.println(Thread.currentThread().getName() + " put c");
blockingQueue.put("c");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "T1").start();
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + " ==> " + blockingQueue.take());
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + " ==> " + blockingQueue.take());
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + " ==> " + blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "T2").start();
}
}
輸出
T1 put a
T2 ==> a
T1 put b
T2 ==> b
T1 put c
T2 ==> c
7. 執行緒池?
7.1 介紹
之前使用的new Thread().start;來創建執行緒有問題:如果并發的執行緒數量很多,并且每個執行緒都是執行一個時間很短的任務就結束了,這樣頻繁創建執行緒就會大大降低系統的效率,因為頻繁創建執行緒和銷毀執行緒需要時間,所以就有了執行緒池的產生,
執行緒池(jdk1.5產生的)是:執行緒可以復用,就是執行完一個任務,并不被銷毀,而是可以繼續執行其他的任務
執行緒池的優點:
- 降低資源消耗,減少了創建和銷毀執行緒的次數,每個作業執行緒都可以被重復利用,可執行多個任務,
- 提高回應速度,當任務到達時,任務可以不需要等到執行緒創建就能立即執行,
- 提高執行緒的可管理性,可以根據系統的承受能力,調整執行緒池中作業線執行緒的數目,防止因為消耗過多的記憶體,而把服務器累趴下(每個執行緒需要大約1MB記憶體,執行緒開的越多,消耗的記憶體也就越大,最后死機),
7.2 執行緒池的簡要作業模型

解釋
- 執行緒池的作業模型主要兩部分組成,一部分是運行Runnable的Thread物件,另一部分就是阻塞佇列,
- 由執行緒池創建的Thread物件其內部的run方法會通過阻塞佇列的take方法獲取一個Runnable物件,然后執行這個Runnable物件的run方法
- 在Thread的run方法中呼叫Runnable物件的run方法
- 當Runnable物件的run方法執行完畢以后,Thread中的run方法又回圈的從阻塞佇列中獲取下一個Runnable物件繼續執行
- 這樣就實作了Thread物件的重復利用,也就減少了創建執行緒和銷毀執行緒所消耗的資源,
7.3 Executors類
7.3.0 基本操作
作用
在JDK1.5的時候java提供了執行緒池
java.util.concurrent.Executors類:執行緒池的工廠類,用來生產執行緒池
方法
- execute只能提交Runnable型別的任務,沒有回傳值,而submit既能提交Runnable型別任務也能提交Callable型別任務,回傳Future型別,
- execute方法提交的任務例外是直接拋出的,而submit方法是是捕獲了例外的,當呼叫FutureTask的get方法時,才會拋出例外,
static ExecutorService newFixedThreadPool(int nThreads) //創建一個可重用固定執行緒數的執行緒池,以共享的無界佇列方式來運行這些執行緒 int nThreads:創建執行緒池中執行緒的個數
submit(Runnable task) //提交一個 Runnable 任務用于執行
execute(Runnable task) //提交一個 Runnable 任務用于執行
oid shutdown() //用于銷毀執行緒池,一般不建議使用 //注意:執行緒池銷毀之后,就在記憶體中消失了,就不能在執行執行緒任務了
使用步驟
阿里巴巴開發手冊中講:執行緒池不能用Executors類,因為會嚴重占用資源,一般是用 ThreadPoolExecutor,下面拒絕策略有 ThreadPoolExecutor版本的執行緒池,
- 使用執行緒池工廠類Executors提供的靜態方法newFixedThreadPool生產一個指定執行緒數量的執行緒池
- 呼叫執行緒池ExecutorService中的方法submit,傳遞執行緒任務,執行執行緒任務

public static void main(String[] args) {
//1.使用執行緒池工廠類Executors提供的靜態方法newFixedThreadPool生產一個指定執行緒數量的執行緒池
ExecutorService ex = Executors.newFixedThreadPool(2);
//2.呼叫執行緒池ExecutorService中的方法submit,傳遞執行緒任務,執行執行緒任務
// 相當于new Thread(new Runnable(){}).start();
ex.submit(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"執行緒任務1執行了!");
}
});
ex.submit(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"執行緒任務2執行了!");
}
});
ex.shutdown();//銷毀執行緒比
ex.submit(new Runnable() { //會報錯
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"執行緒任務3執行了!");
}
});
}
7.3.1 三大方法(創建物件方式
三大方法指的是三種創建執行緒物件的方法,以及他們創建出來的執行緒池的特點
-
第1大方法:單個執行緒
ExecutorService threadExecutor = Executors.newSingleThreadExecutor(); -
第2大方法:創建一個固定的執行緒池大小
ExecutorService threadExecutor = Executors.newFixedThreadPool(5); -
第3大方法:可伸縮的,遇強則強,遇弱則弱(最大可以達到Integer.MAXOfValue)
ExecutorService threadExecutor = Executors.newCachedThreadPool();本例中隨著for增大而增大,但不是線性增長
public class Demo01 {
public static void main(String[] args) {
//第1大方法:單個執行緒
// ExecutorService threadExecutor = Executors.newSingleThreadExecutor();
//第2大方法:創建一個固定的執行緒池大小
// ExecutorService threadExecutor = Executors.newFixedThreadPool(5);
//第3大方法:可伸縮的,遇強則強,遇弱則弱(最大可以達到Integer.MAXOfValue)
ExecutorService threadExecutor = Executors.newCachedThreadPool();
try {
for (int i = 0; i < 10; i++) {
//使用了執行緒池之后,使用執行緒池來創建執行緒
threadExecutor.execute(() -> {
System.out.println(Thread.currentThread().getName() + " ok");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//執行緒池用完,程式結束,關閉執行緒池
threadExecutor.shutdown();
}
}
}
輸出:
第1大方法
pool-1-thread-1 ok
pool-1-thread-1 ok
pool-1-thread-1 ok
pool-1-thread-1 ok
pool-1-thread-1 ok
pool-1-thread-1 ok
pool-1-thread-1 ok
pool-1-thread-1 ok
pool-1-thread-1 ok
pool-1-thread-1 ok
第2大方法
pool-1-thread-1 ok
pool-1-thread-5 ok
pool-1-thread-5 ok
pool-1-thread-5 ok
pool-1-thread-5 ok
pool-1-thread-3 ok
pool-1-thread-2 ok
pool-1-thread-5 ok
pool-1-thread-4 ok
pool-1-thread-1 ok
第3大方法
pool-1-thread-2 ok
pool-1-thread-1 ok
pool-1-thread-4 ok
pool-1-thread-3 ok
pool-1-thread-6 ok
pool-1-thread-5 ok
pool-1-thread-7 ok
pool-1-thread-9 ok
pool-1-thread-10 ok
pool-1-thread-8 ok
7.3.2 七大引數(建構式引數
七大引數指的是 ThreadPoolExecutor 類的構造方法需要七個引數
Executors類構造方法原始碼:對Executors類三種構造方法原始碼分析可得都是通過一個類 ThreadPoolExecutor 來實作的,所以我們直接再去研究 ThreadPoolExecutor類的構造方法
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
ThreadPoolExecutor類構造方法原始碼:有七個引數
public ThreadPoolExecutor(int corePoolSize, //核心執行緒池大小
int maximumPoolSize, //最大小執行緒池大小
long keepAliveTime, //存活時間,超時了沒有呼叫就會釋放
TimeUnit unit,//超時單位
BlockingQueue<Runnable> workQueue,//阻塞佇列
ThreadFactory threadFactory,//執行緒工廠,創建執行緒的,一般不用動(用默認)
RejectedExecutionHandler handler //拒絕策略(根據需求自行選擇四種策略之一)
) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
引數的講解:客人是執行緒,舊視窗是核心執行緒池大小,新視窗+舊視窗是最大執行緒池大小,候客區是阻塞佇列,拒絕客人的是拒絕策略,
客人來銀行辦理業務,當客人少于兩人的時候就直接去舊視窗,若客人多了就先坐候客區,然后慢慢的客人越來越多,舊視窗和候客區都滿了就開放右邊三個新視窗給客人辦理業務,客人又來了一大波,此時所有視窗和候客區滿了就拒絕客人進來,

7.3.3 四種拒絕策略
四種拒絕策略
- 佇列滿了,執行緒數達到最大執行緒數,還有執行緒過來,不處理這個執行緒,拋出例外
new ThreadPoolExecutor.AbortPolicy()【默認】 - 直接讓提交任務的執行緒運行這個任務,比如在主執行緒向執行緒池提交了任務,那么就直接由主執行緒執行,
new ThreadPoolExecutor.CallerRunsPolicy() - 佇列滿了,丟掉任務,不會拋出例外
new ThreadPoolExecutor.DiscardPolicy() - 佇列滿了,嘗試和最早的競爭,競爭失敗丟掉任務,也不會拋出例外
new ThreadPoolExecutor.DiscardOldestPolicy()

使用 ThreadPoolExecutor創建執行緒(推薦)
public class Demo01 {
public static void main(String[] args) {
//自定義執行緒池
ExecutorService threadExecutor = new ThreadPoolExecutor(
2,
5,
3,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
//四種拒絕策略:
//佇列滿了,執行緒數達到最大執行緒數,還有執行緒過來,不處理這個執行緒,拋出例外
// new ThreadPoolExecutor.AbortPolicy()
//哪里來的就去哪里
// new ThreadPoolExecutor.CallerRunsPolicy()
//佇列滿了,丟掉任務,不會拋出例外
// new ThreadPoolExecutor.DiscardPolicy()
//佇列滿了,嘗試和最早的競爭,競爭失敗丟掉任務,也不會拋出例外
new ThreadPoolExecutor.DiscardOldestPolicy()
);
try {
//最大承載:Deque + Max 超過,RejectedExecutionException
for (int i = 0; i < 9; i++) {
//使用了執行緒池之后,使用執行緒池來創建執行緒
threadExecutor.execute(() -> {
System.out.println(Thread.currentThread().getName() + " ok");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//執行緒池用完,程式結束,關閉執行緒池
threadExecutor.shutdown();
}
}
}
結果:
new ThreadPoolExecutor.AbortPolicy() 輸出
pool-1-thread-2 ok
pool-1-thread-3 ok
pool-1-thread-1 ok
pool-1-thread-3 ok
pool-1-thread-2 ok
pool-1-thread-4 ok
pool-1-thread-5 ok
pool-1-thread-1 ok
java.util.concurrent.RejectedExecutionException: Task com.zyy.pool.Demo01$$Lambda$1/1096979270@7ba4f24f rejected from java.util.concurrent.ThreadPoolExecutor@3b9a45b3[Running, pool size = 5, active threads = 0, queued tasks = 0, completed tasks = 8]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at com.zyy.pool.Demo01.main(Demo01.java:40)
new ThreadPoolExecutor.CallerRunsPolicy() 輸出
pool-1-thread-2 ok
main ok
pool-1-thread-4 ok
pool-1-thread-3 ok
pool-1-thread-1 ok
pool-1-thread-3 ok
pool-1-thread-4 ok
pool-1-thread-5 ok
pool-1-thread-2 ok
new ThreadPoolExecutor.DiscardPolicy() 輸出
pool-1-thread-2 ok
pool-1-thread-1 ok
pool-1-thread-2 ok
pool-1-thread-1 ok
pool-1-thread-2 ok
pool-1-thread-3 ok
pool-1-thread-4 ok
pool-1-thread-5 ok
new ThreadPoolExecutor.DiscardOldestPolicy() 輸出
pool-1-thread-2 ok
pool-1-thread-3 ok
pool-1-thread-4 ok
pool-1-thread-1 ok
pool-1-thread-4 ok
pool-1-thread-2 ok
pool-1-thread-3 ok
pool-1-thread-5 ok
7.3.4 拓展:最大執行緒數應該如何設定?
有兩種設定方式:CPU密集型和IO密集型
-
CPU密集型,幾核,就是幾,可以保證CPU效率最高
-
IO密集型 (判斷你程式中十分耗IO的執行緒)
如程式中有15個大型任務,IO十分消耗資源,一般設定為2倍,為30
以CPU密集型為例:
獲取CPU核數的方法
//獲取CPU核數
System.out.println(Runtime.getRuntime().availableProcessors());
代碼優化:用上面獲取CPU核數的方法來替代最大執行緒數(這樣可以避免不同性能的電腦都可以跑起來,并且保證性能最大化)
public class Demo01 {
public static void main(String[] args) {
//自定義執行緒池
ExecutorService threadExecutor = new ThreadPoolExecutor(
4,
Runtime.getRuntime().availableProcessors(),
3,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
//佇列滿了,丟掉任務,不會拋出例外
new ThreadPoolExecutor.DiscardPolicy()
);
try {
//最大承載:Deque + Max 超過,RejectedExecutionException
for (int i = 0; i < 10; i++) {
//使用了執行緒池之后,使用執行緒池來創建執行緒
threadExecutor.execute(() -> {
System.out.println(Thread.currentThread().getName() + " ok");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//執行緒池用完,程式結束,關閉執行緒池
threadExecutor.shutdown();
}
}
}
8.并發工具類
8.1 CountDownLatch 計數器鎖
也可以理解為減法計數器,因為countDown方法減一計數

基本使用:
- 創建CountDownLatch物件并指定計數數量
- 在每個執行緒的最后都要使用countDown方法減一計數
- 在main主執行緒內子執行緒后寫await等待計數器歸零(該代碼是等待六個執行緒執行完),再繼續執行await后的代碼
public class CountDownLatchDemo {
public static void main(String[] args) {
//總數是6 必須要執行的任務的時候再使用
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 0; i < 6; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName()+" go out");
// 數量-1
countDownLatch.countDown();
}).start();
}
try {
//等待計數器歸零,然后再往下執行
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("close door");
}
}
8.2 CyclicBarr 回圈屏障
也可以理解為加法計數器

基本使用:
- 創建CyclicBarrier物件并指定 目標執行緒數量,以及 達到目標執行緒數量后再執行的執行緒
- 在每個執行緒的最后都要使用await方法,讓當前執行緒等待直到到達目標執行緒數量后,才允許執行緒繼續執行await后的代碼并執行構造方法里的執行緒
public class CyclicBarrierDemo {
public static void main(String[] args) {
/**
* 集齊七顆龍珠,召喚神龍
*/
CyclicBarrier cyclicBarrier = new CyclicBarrier(7, ()->{
System.out.println("召喚神龍成功!");
});
for (int i = 1; i <= 7; i++) {
final int temp = i;
new Thread(() -> {
System.out.println("收集"+temp+"星龍珠");
try {
//阻塞
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
}
輸出
收集1星龍珠
收集2星龍珠
收集3星龍珠
收集5星龍珠
收集4星龍珠
收集6星龍珠
收集7星龍珠
召喚神龍成功!
加/減法計數器的區別:
- 減法是卡執行緒后await后的代碼;
- 而加法卡的是執行緒內await后的代碼;
8.3 Semaphore信號量
信號量(Semaphore),有時被稱為信號燈,是在多執行緒環境下使用的一種設施,是可以用來保證兩個或多個關鍵代碼段不被并發呼叫,在進入一個關鍵代碼段之前,執行緒必須獲取一個信號量;一旦該關鍵代碼段完成了,那么該執行緒必須釋放信號量,其它想進入該關鍵代碼段的執行緒必須等待直到第一個執行緒釋放信號量,
作用:多個共享資源互斥使用!并發限流,控制最大執行緒數!

基本使用:
semaphore.acquire(); //獲得,假設已經滿了,等待,等待被釋放為止!
semaphore.release();//釋放,會將當前的信號量釋放,然后喚醒等待執行緒!
舉例:搶車位 5輛車,3個停車位
public class SemaphoreDemo {
public static void main(String[] args) {
//3個停車位 限流
Semaphore semaphore = new Semaphore(3);//限流數
for (int i = 1; i <= 5; i++) {
new Thread(() -> {
try {
//獲得
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "獲得停車位");
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//釋放
semaphore.release();
System.out.println(Thread.currentThread().getName() + "離開停車位");
}
}, String.valueOf(i)).start();
}
}
}
輸出:
2獲得停車位
1獲得停車位
3獲得停車位
2離開停車位
5獲得停車位
3離開停車位
4獲得停車位
1離開停車位
4離開停車位
5離開停車位
8.4 Exchanger 資料交換
執行緒之間的資料傳遞也可以這么簡單,
使用Exchanger,它能夠實作執行緒之間的資料交換:
public static void main(String[] args) throws InterruptedException {
Exchanger<String> exchanger = new Exchanger<>();
new Thread(() -> {
try {
System.out.println("收到主執行緒傳遞的交換資料:"+exchanger.exchange("AAAA"));
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
System.out.println("收到子執行緒傳遞的交換資料:"+exchanger.exchange("BBBB"));
}
在呼叫exchange方法后,當前執行緒會等待其他執行緒呼叫同一個exchanger物件的exchange方法,當另一個執行緒也呼叫之后,方法會回傳對方執行緒傳入的引數,
8.5 ForkJoin分支合并
8.5.1 Fork/Join 框架介紹
在JDK7時,出現了一個新 的框架用于并行執行任務,它的目的是為了把大型任務拆分為多個小任務,最后匯總多個小任務的結果,得到整大任務的結果,并且這些小任務都是同時在進行,大大提高運算效率,Fork就是拆分,Join就是合并,
我們來演示一下實際的情況,比如一個算式:18x7+36x8+9x77+8x53,可以拆分為四個小任務:18x7、36x8、9x77、8x53,最后我們只需要將這四個任務的結果加起來,就是我們原本算式的結果了,有點歸并排序的味道,

它不僅僅只是拆分任務并使用多執行緒,而且還可以利用作業竊取演算法,提高執行緒的利用率,
作業竊取演算法:是指某個執行緒從其他佇列里竊取任務來執行,一個大任務分割為若干個互不依賴的子任務,為了減少執行緒間的競爭,把這些子任務分別放到不同的佇列里,并為每個佇列創建一個單獨的執行緒來執行佇列里的任務,執行緒和佇列一一對應,但是有的執行緒會先把自己佇列里的任務干完,而其他執行緒對應的佇列里還有任務待處理,干完活的執行緒與其等著,不如幫其他執行緒干活,于是它就去其他執行緒的佇列里竊取一個任務來執行,

8.5.2 Fork/Join的使用:
ForkJoin分支合并是通過ForkJoinPool介面實作的,
ForkJoinPool介面的實作類有兩種:RecursiveAction沒有回傳值、RecursiveTask有回傳值


使用步驟:(以RecursiveTask為例)
步驟:
-
計算類繼承RecursiveTask
-
計算類重寫compute方法
-
main函式中使用ForkJoinPool類物件呼叫方法
forkJoinPool.execute(ForkJoinTask<?> task)【括號內是繼承了RecursiveTask的計算類物件】,開啟ForkJoin
舉例:這里以計算1-1000的和為例,我們可以將其拆分為8個小段的數相加,比如1-125、126-250... ,最后再匯總即可,它也是依靠執行緒池來實作的:
public class Main {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ForkJoinPool pool = new ForkJoinPool();
SubTask subtask = new SubTask(1, 1000);
System.out.println(pool.submit(subtask).get());//get方法獲取結果
}
//繼承RecursiveTask,這樣才可以作為一個任務,泛型就是計算結果型別
private static class SubTask extends RecursiveTask<Integer> {
private final int start; //比如我們要計算一個范圍內所有數的和,那么就需要限定一下范圍,這里用了兩個int存放
private final int end;
public SubTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
if(end - start > 125) { //每個任務最多計算125個數的和,如果大于繼續拆分,小于就可以開始算了
SubTask subTask1 = new SubTask(start, (end + start) / 2);
subTask1.fork(); //會繼續劃分子任務執行
SubTask subTask2 = new SubTask((end + start) / 2 + 1, end);
subTask2.fork(); //會繼續劃分子任務執行
return subTask1.join() + subTask2.join(); //回傳計算結果給上一級(越玩越有遞回那味了)
} else {
System.out.println(Thread.currentThread().getName()+" 開始計算 "+start+"-"+end+" 的值!");
int res = 0;
for (int i = start; i <= end; i++) {
res += i;
}
return res; //回傳的結果會作為join的結果
}
}
}
}
ForkJoinPool-1-worker-2 開始計算 1-125 的值!
ForkJoinPool-1-worker-2 開始計算 126-250 的值!
ForkJoinPool-1-worker-0 開始計算 376-500 的值!
ForkJoinPool-1-worker-6 開始計算 751-875 的值!
ForkJoinPool-1-worker-3 開始計算 626-750 的值!
ForkJoinPool-1-worker-5 開始計算 501-625 的值!
ForkJoinPool-1-worker-4 開始計算 251-375 的值!
ForkJoinPool-1-worker-7 開始計算 876-1000 的值!
500500
可以看到,結果非常正確,但是整個計算任務實際上是拆分為了8個子任務同時完成的,結合多執行緒,原本的單執行緒任務,在多執行緒的加持下速度成倍提升,
拓展:
- 包括Arrays工具類提供的并行排序也是利用了ForkJoinPool來實作:并行排序的性能在多核心CPU環境下,肯定是優于普通排序的,并且排序規模越大優勢越顯著,
public static void parallelSort(byte[] a) {
int n = a.length, p, g;
if (n <= MIN_ARRAY_SORT_GRAN ||
(p = ForkJoinPool.getCommonPoolParallelism()) == 1)
DualPivotQuicksort.sort(a, 0, n - 1);
else
new ArraysParallelSortHelpers.FJByte.Sorter
(null, a, new byte[n], 0, n, 0,
((g = n / (p << 2)) <= MIN_ARRAY_SORT_GRAN) ?
MIN_ARRAY_SORT_GRAN : g).invoke();
}
- 三種計算方式的效率對比(Stream并行流最好)
public class Test {
public static void main(String[] args) {
//耗時:6295
// test1();
//耗時:4401
// test2();
//耗時:294
test3();
}
//普通方式
public static void test1() {
Long sum = 0L;
long start = System.currentTimeMillis();
for (Long i = 1L; i <= 10_0000_0000L; i++) {
sum += i;
}
long end = System.currentTimeMillis();
System.out.println("sum:" + sum + " 時間:" + (end - start));
}
public static void test2() {
long start = System.currentTimeMillis();
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Long> forkJoinTask = new ForkJoinDemo(0L, 10_0000_0000L);
ForkJoinTask<Long> submit = forkJoinPool.submit(forkJoinTask);
Long sum = null;
try {
sum = submit.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
long end = System.currentTimeMillis();
System.out.println("sum:" + sum + " 時間:" + (end - start));
}
public static void test3() {
long start = System.currentTimeMillis();
//Stream 并行流
long sum = LongStream.range(0L, 10_0000_0001L).parallel().reduce(0, Long::sum);
long end = System.currentTimeMillis();
System.out.println("sum:" + sum + " 時間:" + (end - start));
}
}
9. 異步回呼
同步:指等待資源(阻塞)
異步:指設立哨兵,資源空閑通知執行緒,否則該執行緒去做其他事情(非阻塞)
9.1 CompletableFuture
CompletableFuture 在 Java 里面被用于異步編程,異步通常意味著非阻塞,可以使得我們的任務單獨運行在與主執行緒分離的其他執行緒中,并且通過回呼可以在主執行緒中得到異步任務的執行狀態,是否完成,和是否例外等資訊

CompletableFuture 實作了 Future, CompletionStage 介面,實作了 Future介面就可以兼容現在有執行緒池框架,而 CompletionStage 介面才是異步編程的介面抽象,里面定義多種異步方法,通過這兩者集合,從而打造出了強大的CompletableFuture 類:
- 異步呼叫沒有回傳值方法runAsync
- 異步呼叫有回傳值方法supplyAsync
主執行緒呼叫 get 方法會阻塞
public class CompletableFutureTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 異步呼叫沒有回傳值
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(()->{
System.out.println(Thread.currentThread().getName()+" : CompletableFuture");
});
completableFuture.get();
// 異步呼叫
// mq訊息佇列
CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getName()+" : CompletableFuture1");
// 模擬例外
int i = 10/0;
return 1024;
});
// 完成之后呼叫得回傳值
completableFuture1.whenComplete((Integer t, Throwable u)->{
System.out.println("-----t:"+t); // 方法的回傳值
System.out.println("-----u:"+u); // 例外的回傳資訊
}).get();
}
}
具體whenComplete的源代碼為:
t為回傳結果,u為例外資訊
public CompletableFuture<T> whenComplete(
BiConsumer<? super T, ? super Throwable> action) {
return uniWhenCompleteStage(null, action);
}
9.2 Future 與 CompletableFutured對比
Future是Callable那章的內容
對比這兩種方法,一個為同步一個為異步
Futrue 在 Java 里面,通常用來表示一個異步任務的參考,比如我們將任務提交到執行緒池里面,然后我們會得到一個 Futrue,在 Future 里面有 isDone 方法來 判斷任務是否處理結束,還有 get 方法可以一直阻塞直到任務結束然后獲取結果,但整體來說這種方式,還是同步的,因為需要客戶端不斷阻塞等待或者不斷輪詢才能知道任務是否完成
- 不支持手動完成: 我提交了一個任務,但是執行太慢了,我通過其他路徑已經獲取到了任務結果,現在沒法把這個任務結果通知到正在執行的執行緒,所以必須主動取消或者一直等待它執行完成
- 不支持進一步的非阻塞呼叫: 通過 Future 的 get 方法會一直阻塞到任務完成,但是想在獲取任務之后執行額外的任務,因為 Future不支持回呼函式,所以無法實作這個功能
- 不支持鏈式呼叫: 對于 Future 的執行結果,我們想繼續傳到下一個 Future 處理使用,從而形成一個鏈式的 pipline 呼叫,這在 Future 中是沒法實作的,
- 不支持多個 Future 合并: 比如我們有 10 個 Future 并行執行,我們想在所有的 Future 運行完畢之后,執行某些函式,是沒法通過 Future 實作的,
- 不支持例外處理:Future 的 API 沒有任何的例外處理的 api,所以在異步運行時,如果出了問題是不好定位的
的, - 不支持例外處理:Future 的 API 沒有任何的例外處理的 api,所以在異步運行時,如果出了問題是不好定位的
10. Volatile
10.1 JMM
推薦好文:https://baijiahao.baidu.com/s?id=1709086005694976168&wfr=spider&for=pc
10.1.1 JMM是什么
JMM(Java Memory Model),Java的記憶體模型,不存在的東西,概念!約定!
10.1.2 JMM的作用
快取一致性的協議,用來定義資料讀寫的規則,
JMM定義了執行緒作業記憶體和主記憶體的抽象關系:執行緒的共享變數存盤在主記憶體中,每個執行緒都有一個私有的本地作業記憶體,
使用volatile關鍵字來解決共享變數的可見性的問題,
Java記憶體模型是圍繞著并發編程中原子性、可見性、有序性這三個特征來建立的,
10.1.3 JMM的操作

主記憶體:對應堆中存放物件的實體的部分,
作業記憶體:對應執行緒的虛擬機堆疊的部磁區域,虛擬機可能會對這部分記憶體進行優化,將其放在CPU的暫存器或是高速快取中,比如在訪問陣列時,由于陣列是一段連續的記憶體空間,所以可以將一部分連續空間放入到CPU高速快取中,那么之后如果我們順序讀取這個陣列,那么大概率會直接快取命中,
JMM定義了8種操作來完成(每一種操作都是原子的、不可再拆分的)
- lock(鎖定):作用于主記憶體的變數,它把一個變數標識為一條執行緒獨占的狀態,
- unlock(解鎖):作用于主記憶體的變數,它把一個處于鎖定狀態的變數釋放出來,釋放后的變數才可以被其他執行緒鎖定,
- read(讀取):作用于主記憶體的變數,它把一個變數的值從主記憶體傳輸到執行緒的作業記憶體中,以便隨后的load動作使用,
- load(載入):作用于作業記憶體的變數,它把read操作從主記憶體中得到的變數值放入作業記憶體的變數副本中,
- use(使用):作用于作業記憶體的變數,它把作業記憶體中一個變數的值傳遞給執行引擎(每當虛擬機遇到一個需要使用到該變數的值的位元組碼指令時將會執行這個操作),
- assign(賦值):作用于作業記憶體的變數,它把一個從執行引擎接收到的值賦給作業記憶體的變數(每當虛擬機遇到一個給該變數賦值的位元組碼指令時執行這個操作),
- store(存盤):作用于作業記憶體的變數,它把作業記憶體中一個變數的值傳送到主記憶體中,以便隨后的write操作使用,
- write(寫入):作用于主記憶體的變數,它把store操作從作業記憶體中得到的變數的值放入主記憶體的變數中,
10.1.4 JMM定義的規則
關于JMM的一些同步的約定
- 執行緒解鎖前,必須把共享變數即可刷回主存
- 執行緒加鎖前,必須 讀取主存中的最新值到作業記憶體中
- 加鎖和解鎖是同一把鎖
8種操作必須滿足的規則:
- 不允許read和load、store和write操作之一單獨出現,(不允許一個變數從主記憶體讀取了但作業記憶體不接受;或者從作業記憶體發起回寫了但主記憶體不接受的情況出現)
- 不允許一個執行緒丟棄它的最近的assign操作,(變數在作業記憶體中改變了值之后,必須把該變化同步回主記憶體)
- 不允許一個執行緒無原因地(沒有發生過任何assign操作)把資料從執行緒的作業記憶體同步回主記憶體,
- 一個新的變數只能在主記憶體中“誕生”,不允許在作業記憶體中直接使用一個未被初始化(load或assign)的變數,(就是對一個變數實施use、store操作之前,必須先執行過了load和assign操作)
- 一個變數在同一時刻只允許一條執行緒對其進行lock操作,但lock操作可以被同一條執行緒重復執行多次,多次執行lock后,只有執行相同次數的unlock操作,變數才會被解鎖,
- 如果對一個變數執行lock操作,那將會清空作業記憶體中此變數的值,在執行引擎使用這個變數前,需要重新執行load或assign操作初始化變數的值,
- 如果一個變數事先沒有被lock操作鎖定,那就不允許對它執行unlock操作,也不允許去unlock一個被其他執行緒鎖定住的變數,
- 對一個變數執行unlock操作之前,必須先把此變數同步回主記憶體中(執行store、write操作),
10.1.5 happens-before先行發生原則
經過我們前面的講解,相信各位已經了解了JMM記憶體模型以及重排序等機制帶來的優點和缺點,綜上,JMM提出了happens-before(先行發生)原則,定義一些禁止編譯優化的場景,來向各位程式員做一些保證,只要我們是按照原則進行編程,那么就能夠保持并發編程的正確性,具體如下:
- 程式次序規則:同一個執行緒中,按照程式的順序,前面的操作happens-before后續的任何操作,
- 同一個執行緒內,代碼的執行結果是有序的,其實就是,可能會發生指令重排,但是保證代碼的執行結果一定是和按照順序執行得到的一致,程式前面對某一個變數的修改一定對后續操作可見的,不可能會出現前面才把a修改為1,接著讀a居然是修改前的結果,這也是程式運行最基本的要求,
- 監視器鎖規則:對一個鎖的解鎖操作,happens-before后續對這個鎖的加鎖操作,
- 就是無論是在單執行緒環境還是多執行緒環境,對于同一個鎖來說,一個執行緒對這個鎖解鎖之后,另一個執行緒獲取了這個鎖都能看到前一個執行緒的操作結果,比如前一個執行緒將變數
x的值修改為了12并解鎖,之后另一個執行緒拿到了這把鎖,對之前執行緒的操作是可見的,可以得到x是前一個執行緒修改后的結果12(所以synchronized是有happens-before規則的)
- 就是無論是在單執行緒環境還是多執行緒環境,對于同一個鎖來說,一個執行緒對這個鎖解鎖之后,另一個執行緒獲取了這個鎖都能看到前一個執行緒的操作結果,比如前一個執行緒將變數
- volatile變數規則:對一個volatile變數的寫操作happens-before后續對這個變數的讀操作,
- 就是如果一個執行緒先去寫一個
volatile變數,緊接著另一個執行緒去讀這個變數,那么這個寫操作的結果一定對讀的這個變數的執行緒可見,
- 就是如果一個執行緒先去寫一個
- 執行緒啟動規則:主執行緒A啟動執行緒B,執行緒B中可以看到主執行緒啟動B之前的操作,
- 在主執行緒A執行程序中,啟動子執行緒B,那么執行緒A在啟動子執行緒B之前對共享變數的修改結果對執行緒B可見,
- 執行緒加入規則:如果執行緒A執行操作
join()執行緒B并成功回傳,那么執行緒B中的任意操作happens-before執行緒Ajoin()操作成功回傳, - 傳遞性規則:如果A happens-before B,B happens-before C,那么A happens-before C,
那么我們來從happens-before原則的角度,來解釋一下下面的程式結果:
public class Main {
private static int a = 0;
private static int b = 0;
public static void main(String[] args) {
a = 10;
b = a + 1;
new Thread(() -> {
if(b > 10) System.out.println(a);
}).start();
}
}
首先我們定義以上出現的操作:
- A:將變數
a的值修改為10 - B:將變數
b的值修改為a + 1 - C:主執行緒啟動了一個新的執行緒,并在新的執行緒中獲取
b,進行判斷,如果為true那么就列印a
首先我們來分析,由于是同一個執行緒,并且B是一個賦值操作且讀取了A,那么按照程式次序規則,A happens-before B,接著在B之后,馬上執行了C,按照執行緒啟動規則,在新的執行緒啟動之前,當前執行緒之前的所有操作對新的執行緒是可見的,所以 B happens-before C,最后根據傳遞性規則,由于A happens-before B,B happens-before C,所以A happens-before C,因此在新的執行緒中會輸出a修改后的結果10,
10.2 Volatile三大特性
前置知識:
原子性:一個或多個程式指令,要么全部正確執行完畢不能被打斷,或者全部不執行,
可見性:當一個執行緒修改了某個共享變數的值,其它執行緒應當能夠立即看到修改后的值,
有序性:程式執行代碼指令的順序應當保證按照程式指定的順序執行,即便是編譯優化,也應當保證程式源語一致,
Volatile是java虛擬機提供輕量級的同步機制,有如下特性:
- 保證可見性
- 不保證原子性
- 保證有序性:禁止指令重排
10.2.1 保證可見性
保證可見性驗證:
public class JMMDemo {
//加了volatile保證可見性
private volatile static int number = 0;
public static void main(String[] args) {
//main
new Thread(() -> {
//子執行緒
while (number == 0) {
}
}).start();
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
//我們想number = 1,之后子執行緒會停止回圈,然而結果是子程式的回圈并沒有停止
//這里有個問題就是主記憶體的值已經被修改了,但子執行緒沒檢測到值被修改
number = 1;
System.out.println("number "+number);
}
}
10.2.2 不保證原子性
1)問題
不保證原子性驗證:
執行緒A在執行任務的時候,不能被打擾的,也不能被分割,要么同時成功,要么同時失敗,
/**
* 不保證原子性
*/
public class VDemo02 {
//這里加了volatile是不能保證原子性的
private volatile static int number = 0;
public static void add() {
//不是一個原子性操作
number ++;
}
public static void main(String[] args) {
//理論上num結果應該為2萬
for (int i = 1; i <= 20; i++) {
new Thread(()->{
for (int j = 0; j < 1000; j++) {
add();
}
}).start();
}
//條件:當前執行緒大于2的時候主執行緒就禮讓,出該回圈時子執行緒一定是都完成了,只剩下主執行緒和gc執行緒在了
while (Thread.activeCount() > 2) {
//main gc
Thread.yield();
}
System.out.println(Thread.currentThread().getName()+"-->"+number);
}
}
輸出(每次結果不固定)
main-->18795
用javap得到編譯后的執行檔案可以看出,++操作被拆成了四部,在這四步中就可能出現++失敗(可能是++值被覆寫),不能保證原子性

2)解決方案
可以使用lock和synchronized鎖來保證原子性,但如果不加lock和synchronized(因為同步鎖性能差),怎么樣保證原子性?
使用原子類,解決原子性問題

public class VDemo02 {
//這里加了volatile是不能保證原子性的
private volatile static AtomicInteger number = new AtomicInteger(0);
public static void add() {
//不是一個原子性的操作
// number ++;
//這里進原始碼看就是一個自旋鎖CAS
number.getAndIncrement();
}
public static void main(String[] args) {
//理論上num結果應該為2萬
for (int i = 1; i <= 20; i++) {
new Thread(()->{
for (int j = 0; j < 1000; j++) {
add();
}
}).start();
}
while (Thread.activeCount() > 2) {
//main gc
Thread.yield();
}
System.out.println(Thread.currentThread().getName()+"-->"+number);
}
}
3)拓展
-
原子類為啥可以做到原子性?
進入到Unsafe類中會發現大都方法都被native修飾:這些類的底層都直接和作業系統掛鉤,在記憶體中修改值!Unsafe是一個很特殊的存在
看增加方法可知
number.getAndIncrement();原始碼可知:是一個CAS自旋鎖【具體的看下面CAS】

10.2.3 保證有序性:避免指令重排
保證有序性是通過避免指令重排來實作的
1)什么是指令重排?
-- 你寫的程式,計算機并不是按照你寫那樣去執行的,
代碼到執行經過:源代碼 --》 編譯器優化的重排 --》指令并行也可能會重排 --》記憶體系統也會重排 --》執行
處理器在進行指令重排的時候,是根據資料之間的依賴性來重排的,
如:
int x = 1;// 1
int y = 1;// 2
x = x + 5;// 3
y = x * x;// 4
//我們所期望的執行順序:1234 但是可能執行的時候會變成2134 1324
//不可能是 4123,因為第四行代碼依賴于第一行代碼,第三行代碼依賴于第一行代碼
2)指令重排可能造成影響的結果
a b x y 這四個值默認值都是0
| 執行緒A | 執行緒B |
|---|---|
| x=a | y=b |
| b=1 | a=2 |
正常的結果:x=0;y=0;
此時由于指令重排執行緒A執行順序更改,執行緒B執行順序更改
| 執行緒A | 執行緒B |
|---|---|
| b=1 | a=2 |
| x=a | y=b |
重排導致的例外結果:x=2;y=1;
3)volatile避免指令重排
volatile使用記憶體屏障(是cpu指令)作用:
- 保證特定的操作的執行順序
- 可以保證某些變數的記憶體可見性(利用這些特性volatile實作了可見性)

volatile可以保證可見性,不能保證原子性,由于記憶體屏障,可以保證避免指令重排的現象產生!
11. CAS
11.1 原理
CAS(Compare-and-Swap),即比較并替換,是一種實作并發演算法時常用到的技術,Java并發包中的很多類都使用了CAS技術,
問題引入:前面volatile不保證原子性的問題例子
解決方案:前面volatile保證原子性的解決方案和拓展中使用到了CAS
看完解決方案可以得出:
CAS的思想:比較并替換,CAS需要有3個運算元:記憶體地址V,舊的預期值A,即將要更新的目標值B,
CAS指令執行時,當且僅當記憶體地址V的值與預期值A相等時,將記憶體地址V的值修改為B,否則就什么都不做,整個比較并替換的操作是一個原子操作,
compareAndSwapInt 是Unsafe類的方法,Unsafe是CAS核心類,由于java方法無法訪問底層系統,需要通過本地(native)方法來訪問,Unsafe相當于一個后門,基于該類可以直接操作特定記憶體的資料,Unsafe類存在于sun.misc包中,其內部方法操作可以像C的指標一樣直接操作記憶體,因為java中CAS操作的執行依賴于Unsafe類的方法,
注意Unsafe類中所有方法都是native修飾的,也就是說Unsafe類中的方法都直接呼叫作業系統底層資源執行相應任務,
CAS并發原語體現在JAVA語言中就是sun.misc.Unsafe類中的各個方法,呼叫Unsafe中的CAS方法,JVM會幫我們實作出CAS匯編指令,
這是一種完全依賴于硬體的功能,通過它實作了原子操作,再次強調,由于CAS是一種系統原語,原語屬于作業系統用語范疇,是由若干條指令組成的,用于完成某個功能的一個程序,并且原語的執行必須是連續的,在執行程序中不允許被中斷,也就是說CAS是一條CPU的原子指令,不會造成所謂的資料不一致性問題
拓展:
-
CP如何實作原子操作?
CPU 處理器速度遠遠大于在主記憶體中的,為了解決速度差異,在他們之間架設了多級快取,如 L1、L2、L3 級別的快取,這些快取離CPU越近就越快,將頻繁操作的資料快取到這里,加快訪問速度
現在都是多核 CPU 處理器,每個 CPU 處理器內維護了一塊位元組的記憶體,每個內核內部維護著一塊位元組的快取,當多執行緒并發讀寫時,就會出現快取資料不一致的情況,
此時,處理器提供:
-
總線鎖定
當一個處理器要操作共享變數時,在 BUS 總線上發出一個 Lock 信號,其他處理就無法操作這個共享變數了,
缺點很明顯,總線鎖定在阻塞其它處理器獲取該共享變數的操作請求時,也可能會導致大量阻塞,從而增加系統的性能開銷,
-
快取鎖定
后來的處理器都提供了快取鎖定機制,也就說當某個處理器對快取中的共享變數進行了操作,其他處理器會有個嗅探機制,將其他處理器的該共享變數的快取失效,待其他執行緒讀取時會重新從主記憶體中讀取最新的資料,基于 MESI 快取一致性協議來實作的,現代的處理器基本都支持和使用的快取鎖定機制,
注意:
有如下兩種情況處理器不會使用快取鎖定:
(1)當操作的資料跨多個快取行,或沒被快取在處理器內部,則處理器會使用總線鎖定,
(2)有些處理器不支持快取鎖定,比如:Intel 486 和 Pentium 處理器也會呼叫總線鎖定,
-
11.2 CAS存在的三大問題
- 回圈時間長開銷很大,
- 只能保證一個變數的原子操作,
- ABA問題,
11.2.1 回圈時間長開銷很大
CAS 通常是配合無限回圈一起使用的,我們可以看到 getAndAddInt 方法執行時,如果 CAS 失敗,會一直進行嘗試,如果 CAS 長時間一直不成功,可能會給 CPU 帶來很大的開銷,
11.2.2 只能保證一個變數的原子操作
當對一個變數執行操作時,我們可以使用回圈 CAS 的方式來保證原子操作,但是對多個變數操作時,CAS 目前無法直接保證操作的原子性,但是我們可以通過以下兩種辦法來解決:1)使用互斥鎖來保證原子性;2)將多個變數封裝成物件,通過 AtomicReference 來保證原子性,
11.2.3 ABA問題
1)什么是ABA問題
ABA問題(貍貓換太子):執行緒A和B去操作同一變數a,在主存中a=0,執行緒A是要更改a為2,執行緒B是將a改成1又將a改回0,若執行緒B執行完了,執行A還在執行,此時執行緒A比對后就會發現a是0繼續的更改為2,這里執行緒A沒有發現此時的a=0不是最開始的0了,這就是ABA問題,

public class SACDemo {
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(2020);
//======搗亂的執行緒=========
System.out.println(atomicInteger.compareAndSet(2020, 2021));
System.out.println(atomicInteger.get());
System.out.println(atomicInteger.compareAndSet(2021, 2020));
System.out.println(atomicInteger.get());
//======期望的執行緒=========
System.out.println(atomicInteger.compareAndSet(2020, 1993));
System.out.println(atomicInteger.get());
}
}
2)解決ABA問題
解決ABA問題,引入原子參考!對應的思想:樂觀鎖(帶版本號的原子操作!)
注意
Integer使用了物件快取機制,默認范圍是-128~127,推薦使用靜態工廠方法valueOf獲取物件實體,而不是new,因為valueOf使用快取,而new一定會創建新的物件分配新的記憶體空間,【阿里巴巴手冊??】
public class SACDemo {
public static void main(String[] args) {
Integer int_1993 = 1993;
Integer int_2020 = 2020;
Integer int_2021 = 2021;
//AtomicStampedReference<Integer> 注意,如果泛型是一個包裝類,注意物件的參考問題
AtomicStampedReference<Integer> reference = new AtomicStampedReference<>(int_2020, 1);
new Thread(() -> {
//reference.getStamp()是版本號
System.out.println("A1->" + reference.getStamp());
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("A 2020->2021 " + reference.compareAndSet(int_2020, int_2021, reference.getStamp(), reference.getStamp() + 1));
System.out.println("A2->" + reference.getStamp());
System.out.println("A 2021->2020 " + reference.compareAndSet(int_2021, int_2020, reference.getStamp(), reference.getStamp() + 1));
System.out.println("A3->" + reference.getStamp());
}, "A").start();
new Thread(() -> {
System.out.println("B1->" + reference.getStamp());
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("B 2020->1993 " + reference.compareAndSet(int_2020, int_1993, reference.getStamp(), reference.getStamp() + 1));
System.out.println("B2->" + reference.getStamp());
}, "B").start();
}
}
輸出
A1->1
B1->1
A 2020->2021 true
A2->2
B 2020->1993 false
B2->3
A 2021->2020 true
A3->3
12. 鎖的集合
12.0 各大鎖的概念
共享鎖(S鎖):又稱為讀鎖
排它鎖/獨占鎖(X鎖):又稱為寫鎖(互斥鎖也是一種獨占鎖)
悲觀鎖:總是假設最壞的情況,每次去拿資料的時候都認為別人會修改,所以每次在拿資料的時候都會上鎖,這樣別人想拿這個資料就會阻塞直到它拿到鎖,傳統的關系型資料庫里邊就用到了很多這種鎖機制,比如行鎖,表鎖等,讀鎖,寫鎖等,都是在做操作之前先上鎖,再比如Java里面的同步原語synchronized關鍵字的實作也是悲觀鎖,
樂觀鎖:顧名思義,就是很樂觀,每次去拿資料的時候都認為別人不會修改,所以不會上鎖,但是在更新的時候會判斷一下在此期間別人有沒有去更新這個資料,可以使用版本號等機制,樂觀鎖適用于多讀的應用型別,這樣可以提高吞吐量,像資料庫提供的類似于write_condition機制,其實都是提供的樂觀鎖,在Java中java.util.concurrent.atomic包下面的原子變數類就是使用了樂觀鎖的一種實作方式CAS實作的,
12.1 公平鎖與非公平鎖
12.1.1 介紹
-
公平鎖:多個執行緒按照申請鎖的順序去獲得鎖,執行緒會直接進入佇列去排隊,永遠都是佇列的第一位才能得到鎖,
注意:在高并發的情況下公平鎖不一樣是公平的!(原因是AQS原理,但本篇沒講AQS)
-
非公平鎖:多個執行緒去獲取鎖的時候,會直接去嘗試獲取,獲取不到,再去進入等待佇列,如果能獲取到,就直接獲取到鎖,
簡單來說,公平鎖不讓插隊,都老老實實排著;非公平鎖讓插隊,但是排隊的人讓不讓你插隊就是另一回事了,
ReentrantLock的無參構造方法中,是這樣寫的:【默認是非公平的】
public ReentrantLock() {
sync = new NonfairSync(); //看名字貌似是非公平的
}
有參構造方法:
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
12.1.2 驗證公平鎖
這里我們選擇使用第二個構造方法,可以選擇是否為公平鎖實作:
public static void main(String[] args) throws InterruptedException {
ReentrantLock lock = new ReentrantLock(false);
Runnable action = () -> {
System.out.println("執行緒 "+Thread.currentThread().getName()+" 開始獲取鎖...");
lock.lock();
System.out.println("執行緒 "+Thread.currentThread().getName()+" 成功獲取鎖!");
lock.unlock();
};
for (int i = 0; i < 10; i++) { //建立10個執行緒
new Thread(action, "T"+i).start();
}
}
這里我們只需要對比將在1秒后開始獲取鎖...和成功獲取鎖!的順序是否一致即可,如果是一致,那說明所有的執行緒都是按順序排隊獲取的鎖,如果不是,那說明肯定是有執行緒插隊了,
結果:
執行緒 T0 開始獲取鎖...
執行緒 T3 開始獲取鎖...
執行緒 T2 開始獲取鎖...
執行緒 T1 開始獲取鎖...
執行緒 T7 開始獲取鎖...
執行緒 T6 開始獲取鎖...
執行緒 T5 開始獲取鎖...
執行緒 T0 成功獲取鎖!
執行緒 T4 開始獲取鎖...
執行緒 T9 開始獲取鎖...
執行緒 T8 開始獲取鎖...
執行緒 T4 成功獲取鎖!
執行緒 T3 成功獲取鎖!
執行緒 T2 成功獲取鎖!
執行緒 T1 成功獲取鎖!
執行緒 T7 成功獲取鎖!
執行緒 T6 成功獲取鎖!
執行緒 T5 成功獲取鎖!
執行緒 T9 成功獲取鎖!
執行緒 T8 成功獲取鎖!
運行結果可以發現,在公平模式下,確實是按照順序進行的,而在非公平模式下,一般會出現這種情況:執行緒剛開始獲取鎖馬上就能搶到,并且此時之前早就開始的執行緒還在等待狀態,很明顯的插隊行為,
12.1 可重入鎖
可重入性(遞回鎖):就是一個執行緒不用釋放,可以重復的獲取一個鎖n次,只是在釋放的時候,也需要相應的釋放n次,(簡單來說:A執行緒在某背景關系中獲得了某鎖,當A執行緒想要在次獲取該鎖時,不會應為鎖已經被自己占用,而需要先等到鎖的釋放)假使A執行緒即獲得了鎖,又在等待鎖的釋放,就會造成死鎖,
-
同一個執行緒可以對同一個物件連續拿鎖,但記住最后一定要遵守鎖了幾次就一定要解鎖幾次!(若未unlock則其他需要鎖的執行緒永遠無法啟動)
-
但不同執行緒對同一個物件拿鎖就無法連續拿鎖,必須等別的執行緒釋放了該物件鎖才能拿到鎖

Synchronized和reentrantLock都是可重入鎖
12.1.1 Synchronized版
兩個Synchronized鎖的物件是phone實體物件
public class Demo01 {
public static void main(String[] args) {
Phone phone = new Phone();
new Thread(() -> {
phone.sms();
}, "A").start();
new Thread(() -> {
phone.sms();
}, "B").start();
}
}
class Phone {
public synchronized void sms() {
System.out.println(Thread.currentThread().getName() + " sms");
//這里也有鎖
call();
}
public synchronized void call() {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " call");
}
}
12.1.2 Lock版
public class Demo02 {
public static void main(String[] args) throws InterruptedException {
ReentrantLock lock = new ReentrantLock();
lock.lock();
lock.lock(); //連續加鎖2次
new Thread(() -> {
System.out.println("執行緒2想要獲取鎖");
lock.lock();
System.out.println("執行緒2成功獲取到鎖");
}).start();
lock.unlock();
System.out.println("執行緒1釋放了一次鎖");
TimeUnit.SECONDS.sleep(1);
lock.unlock();
System.out.println("執行緒1再次釋放了一次鎖"); //釋放兩次后其他執行緒才能加鎖
}
}
結果:主執行緒——執行緒1,子執行緒——執行緒2, 一個ReentrantLock物件所以執行緒1和2的鎖是同一把,如何執行緒1先拿了鎖,所以執行緒2只能等執行緒1釋放鎖紅才能拿到鎖
執行緒1釋放了一次鎖
執行緒2想要獲取鎖
執行緒1再次釋放了一次鎖
執行緒2成功獲取到鎖
12.2 自旋鎖
自旋鎖是專為防止多處理器并發而引入的一種鎖,自旋鎖是通過 CPU 提供的 CAS 函式(Compare And Swap),在「用戶態」完成加鎖和解鎖操作,不會主動產生執行緒背景關系切換,所以相比互斥鎖來說,會快一些,開銷也小一些,
前面Volatile的不能保證原子性的解決方法使用的就是CAS自旋鎖

自旋鎖加鎖的兩步驟及程序:
-
第一步,查看鎖的狀態,如果鎖是空閑的,則執行第二步;
-
第二步,將鎖設定為當前執行緒持有;
CAS 函式就把這兩個步驟合并成一條硬體級指令,形成原子指令,這樣就保證了這兩個步驟是不可分割的,要么一次性執行完兩個步驟,要么兩個步驟都不執行,
使用自旋鎖的時候,當發生多執行緒競爭鎖的情況,加鎖失敗的執行緒會「忙等待」,直到它拿到鎖,這里的「忙等待」可以用 while 回圈等待實作,不過最好是使用 CPU 提供的 PAUSE 指令來實作「忙等待」,因為可以減少回圈等待時的耗電量,
注意:
- 需要注意,在單核 CPU 上,需要搶占式的調度器(即不斷通過時鐘中斷一個執行緒,運行其他執行緒),否則,自旋鎖在單 CPU 上無法使用,因為一個自旋的執行緒永遠不會放棄 CPU,
1)原理:
一個執行單元要想訪問被自旋鎖保護的共享資源,必須先得到鎖,在訪問完共享資源后,必須釋放鎖,如果在獲取自旋鎖時,沒有任何執行單元保持該鎖,那么將立即得到鎖;如果在獲取自旋鎖時鎖已經有保持者,那么獲取鎖操作將自旋在那里,直到該自旋鎖的保持者釋放了鎖,
2)弊端:
1、死鎖,試圖遞回地獲得自旋鎖必然會引起死鎖:遞回程式的持有實體在第二個實體回圈,以試圖獲得相同自旋鎖時,不會釋放此自旋鎖,
在遞回程式中使用自旋鎖應遵守下列策略:
遞回程式決不能在持有自旋鎖時呼叫它自己,也決不能在遞回呼叫時試圖獲得相同的自旋鎖,此外如果一個行程已經將資源鎖定,那么,即使其它申請這個資源的行程不停地瘋狂"自旋",也無法獲得資源,從而進入死回圈,
2、過多占用cpu資源,如果不加限制,由于申請者一直在回圈等待,因此自旋鎖在鎖定的時候,如果不成功,不會睡眠,會持續的嘗試,單cpu的時候自旋鎖會讓其它process動不了. 因此,一般自旋鎖實作會有一個引數限定最多持續嘗試次數. 超出后, 自旋鎖放棄當前time slice. 等下一次機會,
12.3 互斥鎖
每個物件都對應于一個可稱為" 互斥鎖" 的標記,這個標記用來保證在任一時刻,只能有一個執行緒訪問該物件,
互斥鎖mutex:獨占鎖;開銷大
應用:synchronized關鍵字、ReentrantLock類
互斥鎖原理:
互斥鎖是一種「獨占鎖」,比如當執行緒 A 加鎖成功后,此時互斥鎖已經被執行緒 A 獨占了,只要執行緒 A 沒有釋放手中的鎖,執行緒 B 加鎖就會失敗,于是就會釋放 CPU 讓給其他執行緒既然執行緒 B 釋放掉了 CPU,自然執行緒 B 加鎖的代碼就會被阻塞, 對于互斥鎖加鎖失敗而阻塞的現象,是由作業系統內核實作的,當加鎖失敗時,內核會將執行緒置為「睡眠」狀態,等到鎖被釋放后,內核會在合適的時機喚醒執行緒,當這個執行緒成功獲取到鎖后,于是就可以繼續執行,如下圖:

所以,互斥鎖加鎖失敗時,會從用戶態陷入到內核態,讓內核幫我們切換執行緒,雖然簡化了使用鎖的難度,但是存在一定的性能開銷成本,
-
那這個開銷成本是什么呢?
會有兩次執行緒背景關系切換的成本:
- 當執行緒加鎖失敗時,內核會把執行緒的狀態從「運行」狀態設定為「睡眠」狀態,然后把 CPU 切換給其他執行緒運行;
- 接著,當鎖被釋放時,之前「睡眠」狀態的執行緒會變為「就緒」狀態,然后內核會在合適的時間,把 CPU 切換給該執行緒運行,
-
執行緒的背景關系切換的是什么?
當兩個執行緒是屬于同一個行程,因為虛擬記憶體是共享的,所以在切換時,虛擬記憶體這些資源就保持不動,只需要切換執行緒的私有資料、暫存器等不共享的資料,
上下切換的耗時有大佬統計過,大概在幾十納秒到幾微秒之間,如果你鎖住的代碼執行時間比較短,那可能背景關系切換的時間都比你鎖住的代碼執行時間還要長,
所以,如果你能確定被鎖住的代碼執行時間很短,就不應該用互斥鎖,而應該選用自旋鎖,否則使用互斥鎖,
互斥鎖和自旋鎖區別
1)加鎖失敗后處理方式不同
當加鎖失敗時,互斥鎖用「執行緒切換」來應對,自旋鎖則用「忙等待」來應對,
-
互斥鎖加鎖失敗后,執行緒會釋放 CPU執行緒代碼阻塞 ,給其他執行緒;
-
自旋鎖加鎖失敗后,執行緒會忙等待,直到它拿到鎖;
2)適用范圍不同
-
互斥鎖mutex:獨占鎖;開銷大,
pthread_mutex_lock(pthread_mutex_t *mutex);pthread_mutex_unlock(pthread_mutex_t *mutex); -
自旋鎖spin lock:輕量級的鎖,開銷小;適用于短時間內對鎖的使用,
如果自旋鎖已經被其他的執行單元保持,呼叫者就一直回圈在那里判斷該自旋鎖是否被釋放
pthread_spin_lock(pthread_spinlock_t *lock);pthread_spin_unlock(pthread_spinlock_t *lock);
13. 死鎖排查
13.1 死鎖現象

死鎖測驗,怎么排除死鎖?
public class DeadLockDemo {
public static void main(String[] args) {
String lockA = "lockA";
String lockB = "lockB";
new Thread(new MyThread(lockA, lockB), "T1").start();
new Thread(new MyThread(lockB, lockA), "T2").start();
}
}
class MyThread implements Runnable {
private String lockA;
private String lockB;
public MyThread(String lockA, String lockB) {
this.lockA = lockA;
this.lockB = lockB;
}
@Override
public void run() {
synchronized (lockA) {
System.out.println(Thread.currentThread().getName() + " lock:" + lockA + " want to get " + lockB);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lockB) {
System.out.println(Thread.currentThread().getName() + " lock:" + lockB + " want to get " + lockA);
}
}
}
}
輸出(下面輸出卡住,因為死鎖了)
T1 lock:lockA want to get lockB
T2 lock:lockB want to get lockA
13.2 排查方式
打開idea的命令列輸入口
- 使用jps定位行程號
jps -l

- 使用
jstack 埠號找到死鎖問題


本文來自博客園,作者:不吃紫菜,遵循CC 4.0 BY-SA著作權協議,
轉載請附上原文出處鏈接:https://www.cnblogs.com/buchizicai/p/17278656.html及本宣告;
本文著作權歸作者所有,歡迎轉載,但未經作者同意必須保留此段宣告,且在文章頁面明顯位置給出原文連接,否則保留追究法律責任的權利,
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/551602.html
標籤:其他
上一篇:【pandas基礎】--資料讀取
下一篇:返回列表




