jdk執行緒池作業原理決議(二)
本篇博客是jdk執行緒池ThreadPoolExecutor作業原理決議系列博客的第二篇,在第一篇博客中從原始碼層面分析了ThreadPoolExecutor在RUNNING狀態下處理任務的核心邏輯,而在這篇博客中將會詳細講解jdk執行緒池ThreadPoolExecutor優雅停止的實作原理,
- jdk執行緒池ThreadPoolExecutor作業原理決議(自己動手實作執行緒池)(一)
ThreadPoolExecutor優雅停止原始碼分析(自己動手實作執行緒池v2版本)
ThreadPoolExecutor為了實作優雅停止功能,為執行緒池設定了一個狀態屬性,其共有5種情況,
在第一篇博客中曾介紹過,AtomicInteger型別的變數ctl同時維護了兩個業務屬性當前活躍作業執行緒個數與執行緒池狀態,其中ctl的高3位用于存放執行緒池狀態,
執行緒池作業狀態介紹
執行緒池作業狀態是單調推進的,即從運行時->停止中->完全停止,共有以下五種情況
1. RUNNING
RUNNING狀態,代表著執行緒池處于正常運行(運行時),RUNNING狀態的執行緒池能正常的接收并處理提交的任務
ThreadPoolExecutor初始化時對ctl賦予的默認屬性便是RUNNING(private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));)
RUNNING狀態下執行緒池正常作業的原理已經在第一篇博客中詳細的介紹過了,這里不再贅述,
2. SHUTDOWN
SHUTDOWN狀態,代表執行緒池處于停止對外服務的狀態(停止中),不再接收新提交的任務,但依然會將workQueue作業佇列中積壓的任務逐步處理完,
用戶可以通過呼叫shutdown方法令執行緒池由RUNNING狀態進入SHUTDOWN狀態,shutdown方法會在下文詳細展開分析,
3. STOP
STOP狀態,代表執行緒池處于停止狀態,不再接受新提交的任務(停止中),同時也不再處理workQueue作業佇列中積壓的任務,當前還在處理任務的作業執行緒將收到interrupt中斷通知
用戶可以通過呼叫shutdownNow方法令執行緒池由RUNNING或者SHUTDOWN狀態進入STOP狀態,shutdownNow方法會在下文詳細展開分析,
4. TIDYING
TIDYING狀態,代表著執行緒池即將完全終止,正在做最后的收尾作業(停止中),
在執行緒池中所有的作業執行緒都已經完全退出,且作業佇列中的任務已經被清空時會由SHUTDOWN或STOP狀態進入TIDYING狀態,
5. TERMINATED
TERMINATED狀態,代表著執行緒池完全的關閉(完全停止),

public class MyThreadPoolExecutorV2 implements MyThreadPoolExecutor {
/**
* 當前執行緒池中存在的worker執行緒數量 + 狀態的一個聚合(通過一個原子int進行cas,來避免對兩個業務屬性欄位加鎖來保證一致性)
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
/**
* 32位的有符號整數,有3位是用來存放執行緒池狀態的,所以用來維護當前作業執行緒個數的部分就只能用29位了
* 被占去的3位中,有1位原來的符號位,2位是原來的數值位
* */
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
/**
* 執行緒池狀態poolStatus常量(狀態值只會由小到大,單調遞增)
* 執行緒池狀態遷移圖:
* ↗ SHUTDOWN ↘
* RUNNING ↓ TIDYING → TERMINATED
* ↘ STOP ↗
* 1 RUNNING狀態,代表著執行緒池處于正常運行的狀態,能正常的接收并處理提交的任務
* 執行緒池物件初始化時,狀態為RUNNING
* 對應邏輯:private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
*
* 2 SHUTDOWN狀態,代表執行緒池處于停止對外服務的狀態,不再接收新提交的任務,但依然會將workQueue作業佇列中積壓的任務處理完
* 呼叫了shutdown方法時,狀態由RUNNING -> SHUTDOWN
* 對應邏輯:shutdown方法中的advanceRunState(SHUTDOWN);
*
* 3 STOP狀態,代表執行緒池處于停止狀態,不再接受新提交的任務,同時也不再處理workQueue作業佇列中積壓的任務,當前還在處理任務的作業執行緒將收到interrupt中斷通知
* 之前未呼叫shutdown方法,直接呼叫了shutdownNow方法,狀態由RUNNING -> STOP
* 之前先呼叫了shutdown方法,后呼叫了shutdownNow方法,狀態由SHUTDOWN -> STOP
* 對應邏輯:shutdownNow方法中的advanceRunState(STOP);
*
* 4 TIDYING狀態,代表著執行緒池即將完全終止,正在做最后的收尾作業
* 當前執行緒池狀態為SHUTDOWN,任務被消費完作業佇列workQueue為空,且作業執行緒全部退出完成作業執行緒集合workers為空時,tryTerminate方法中將狀態由SHUTDOWN->TIDYING
* 當前執行緒池狀態為STOP,作業執行緒全部退出完成作業執行緒集合workers為空時,tryTerminate方法中將狀態由STOP->TIDYING
* 對應邏輯:tryTerminate方法中的ctl.compareAndSet(c, ctlOf(TIDYING, 0)
*
* 5 TERMINATED狀態,代表著執行緒池完全的關閉,之前執行緒池已經處于TIDYING狀態,且呼叫的鉤子函式terminated已回傳
* 當前執行緒池狀態為TIDYING,呼叫的鉤子函式terminated已回傳
* 對應邏輯:tryTerminate方法中的ctl.set(ctlOf(TERMINATED, 0));
* */
// 11100000 00000000 00000000 00000000
private static final int RUNNING = -1 << COUNT_BITS;
// 00000000 00000000 00000000 00000000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 00100000 00000000 00000000 00000000
private static final int STOP = 1 << COUNT_BITS;
// 01000000 00000000 00000000 00000000
private static final int TIDYING = 2 << COUNT_BITS;
// 01100000 00000000 00000000 00000000
private static final int TERMINATED = 3 << COUNT_BITS;
private static int runStateOf(int c) {
return c & ~CAPACITY;
}
private static int ctlOf(int rs, int wc) {
return rs | wc;
}
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
/**
* 推進執行緒池作業狀態
* */
private void advanceRunState(int targetState) {
for(;;){
// 獲得當前的執行緒池狀態
int currentCtl = this.ctl.get();
// 1 (runState >= targetState)如果當前執行緒池狀態不比傳入的targetState小
// 代表當前狀態已經比引數要制定的更加快(或者至少已經處于對應階段了),則無需更新poolStatus的狀態(或陳述句中第一個條件為false,直接break了)
// 2 (this.ctl.compareAndSet),cas的將runState更新為targetState
// 如果回傳true則說明cas更新成功直接break結束(或陳述句中第一個條件為false,第二個條件為true)
// 如果回傳false說明cas爭搶失敗,再次進入while回圈重試(或陳述句中第一個和第二個條件都是false,不break而是繼續執行回圈重試)
if (runStateAtLeast(currentCtl, targetState) ||
this.ctl.compareAndSet(
currentCtl,
ctlOf(targetState, workerCountOf(currentCtl)
))) {
break;
}
}
}
}
- 因為執行緒池狀態不是單獨存放,而是放在ctl這一32位資料的高3位的,讀寫都比較麻煩,因此提供了runStateOf和ctlOf等輔助方法(位運算)來簡化操作,
- 執行緒池的狀態是單調遞進的,由于巧妙的將狀態靠前的值設定的更小,因此通過直接比較狀態的值來判斷當前執行緒池狀態是否推進到了指定的狀態(runStateLessThan、runStateAtLeast、isRunning、advanceRunState),
jdk執行緒池ThreadPoolExecutor優雅停止具體實作原理
執行緒池的優雅停止一般要能做到以下幾點:
- 執行緒池在中止后不能再受理新的任務
- 執行緒池中止的程序中,已經提交的現存任務不能丟失(等待剩余任務執行完再關倍訓者能夠把剩余的任務吐出來還給用戶)
- 執行緒池最終關閉前,確保創建的所有作業執行緒都已退出,不會出現資源的泄露
下面我們從原始碼層面決議ThreadPoolExecutor,看看其是如何實作上述這三點的.
如何中止執行緒池
ThreadPoolExecutor執行緒池提供了shutdown和shutdownNow這兩個public方法給使用者用于發出執行緒池的停止指令,
shutdown方法
shutdown方法用于關閉執行緒池,并令執行緒池從RUNNING狀態轉變位SHUTDOWN狀態,位于SHUTDOWN狀態的執行緒池,不再接收新任務,但已提交的任務會全部被執行完,
/**
* 關閉執行緒池(不再接收新任務,但已提交的任務會全部被執行)
* 但不會等待任務徹底的執行完成(awaitTermination)
*/
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
// shutdown操作中涉及大量的資源訪問和更新,直接通過互斥鎖防并發
mainLock.lock();
try {
// 用于shutdown/shutdownNow時的安全訪問權限
checkShutdownAccess();
// 將執行緒池狀態從RUNNING推進到SHUTDOWN
advanceRunState(SHUTDOWN);
// shutdown不會立即停止所有執行緒,而僅僅先中斷idle狀態的多余執行緒進行回收,還在執行任務的執行緒就慢慢等其執行完
interruptIdleWorkers();
// 單獨為ScheduledThreadPoolExecutor開的一個鉤子函式(hook for ScheduledThreadPoolExecutor)
onShutdown();
} finally {
mainLock.unlock();
}
// 嘗試終止執行緒池
tryTerminate();
}
/**
* 用于shutdown/shutdownNow時的安全訪問權限
* 檢查當前呼叫者是否有權限去通過interrupt方法去中斷對應作業執行緒
* */
private void checkShutdownAccess() {
// 判斷jvm啟動時是否設定了安全管理器SecurityManager
SecurityManager security = System.getSecurityManager();
// 如果沒有設定,直接回傳無事發生
if (security != null) {
// 設定了權限管理器,驗證當前呼叫者是否有modifyThread的權限
// 如果沒有,checkPermission會拋出SecurityException例外
security.checkPermission(shutdownPerm);
// 通過上述校驗,檢查作業執行緒是否能夠被呼叫者訪問
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (MyWorker w : workers) {
// 檢查每一個作業執行緒中的thread物件是否有權限被呼叫者訪問
security.checkAccess(w.thread);
}
} finally {
mainLock.unlock();
}
}
}
/**
* 中斷所有處于idle狀態的執行緒
* */
private void interruptIdleWorkers() {
// 默認打斷所有idle狀態的作業執行緒
interruptIdleWorkers(false);
}
private static final boolean ONLY_ONE = true;
/**
* 中斷處于idle狀態的執行緒
* @param onlyOne 如果為ture,至多只中斷一個作業執行緒(可能一個都不中斷)
* 如果為false,中斷workers內注冊的所有作業執行緒
* */
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (MyWorker w : workers) {
Thread t = w.thread;
// 1. t.isInterrupted(),說明當前執行緒存在中斷信號,之前已經被中斷了,無需再次中斷
// 2. w.tryLock(), runWorker方法中如果作業執行緒獲取到任務開始作業,會先進行Lock加鎖
// 則這里的tryLock會加鎖失敗,回傳false, 而回傳true的話,就說明當前作業執行緒是一個idle執行緒,需要被中斷
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
// tryLock成功時,會將內部state的值設定為1,通過unlock恢復到未加鎖的狀態
w.unlock();
}
}
if (onlyOne) {
// 引數onlyOne為true,至多只中斷一個作業執行緒
// 即使上面的t.interrupt()沒有執行,也在這里跳出回圈
break;
}
}
} finally {
mainLock.unlock();
}
}
/**
* 單獨為jdk的ScheduledThreadPoolExecutor開的一個鉤子函式
* 由ScheduledThreadPoolExecutor繼承ThreadExecutor時重寫(包級別訪問權限)
* */
void onShutdown() {}
- shutdown方法在入口處使用mainLock加鎖后,通過checkShutdownAccess檢查當前是否有權限訪問作業執行緒(前提是設定了SecurityManager),如果無權限則會拋出SecurityException例外,
- 通過advanceRunState方法將執行緒池狀態推進到SHUTDOWN,
- 通過interruptIdleWorkers使用中斷指令(Thread.interrupt)喚醒所有處于idle狀態的作業執行緒(存在idle狀態的作業執行緒代表著當前作業佇列是空的),
idle的作業執行緒在被喚醒后從getTask方法中退出(getTask中對應的退出邏輯在下文中展開),進而退出runWorker方法,最終系統回收掉作業執行緒占用的各種資源(第一篇博客中runWorker的決議中提到過), - 呼叫包級別修飾的鉤子函式onShutdown,這一方法是作者專門為同為java.util.concurrent包下的ScheduledThreadPoolExecutor提供的拓展,不在本篇博客中展開,
- 前面提到SHUTDOWN狀態的執行緒池在作業執行緒都全部退出且作業佇列為空時會轉變為TIDYING狀態,因此通過呼叫tryTerminate方法嘗試終止執行緒池(當前不一定會滿足條件,比如呼叫了shutdown但作業佇列還有很多任務等待執行),
tryTerminate方法中細節比較多,下文中再展開分析,
shutdownNow方法
shutdownNow方法同樣用于關閉執行緒池,但比shutdown方法更加激進,shutdownNow方法令執行緒池從RUNNING狀態轉變為STOP狀態,不再接收新任務,而作業佇列中未完成的任務會以串列的形式回傳給shutdownNow的呼叫者,
- shutdown方法在呼叫后,雖然不再接受新任務,但會等待作業佇列中的佇列被慢慢消費掉;而shutdownNow并不會等待,而是將當前作業佇列中的所有未被撈取執行的剩余任務全部回傳給shutdownNow的呼叫者,并對所有的作業執行緒(包括非idle的執行緒)發出中斷通知,
- 這樣做的好處是執行緒池可以更快的進入終止態,而不必等剩余的任務都完成,都回傳給用戶后也不會丟任務,
/**
* 立即關閉執行緒池(不再接收新任務,作業佇列中未完成的任務會以串列的形式回傳)
* @return 當前作業佇列中未完成的任務
* */
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
// shutdown操作中涉及大量的資源訪問和更新,直接通過互斥鎖防并發
mainLock.lock();
try {
// 用于shutdown/shutdownNow時的安全訪問權限
checkShutdownAccess();
// 將執行緒池狀態從RUNNING推進到STOP
advanceRunState(STOP);
interruptWorkers();
// 將作業佇列中未完成的任務提取出來(會清空執行緒池的workQueue)
tasks = drainQueue();
} finally {
mainLock.unlock();
}
// 嘗試終止執行緒池
tryTerminate();
return tasks;
}
/**
* shutdownNow方法內,立即終止執行緒池時該方法被呼叫
* 中斷通知所有已經啟動的作業執行緒(比如等待在作業佇列上的idle作業執行緒,或者run方法內部await、sleep等,令其拋出中斷例外快速結束)
* */
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (MyWorker w : workers) {
// 遍歷所有的worker執行緒,已啟動的作業執行緒全部呼叫Thread.interrupt方法,發出中斷信號
w.interruptIfStarted();
}
} finally {
mainLock.unlock();
}
}
/**
* 將作業佇列中的任務全部轉移出來
* 用于shutdownNow緊急關閉執行緒池時將未完成的任務回傳給呼叫者,避免任務丟失
* */
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> queue = this.workQueue;
ArrayList<Runnable> taskList = new ArrayList<>();
queue.drainTo(taskList);
// 通常情況下,普通的阻塞佇列的drainTo方法可以一次性的把所有元素都轉移到taskList中
// 但jdk的DelayedQueue或者一些自定義的阻塞佇列,drainTo方法無法轉移所有的元素
// (比如DelayedQueue的drainTo方法只能轉移已經不需要延遲的元素,即getDelay()<=0)
if (!queue.isEmpty()) {
// 所以在這里打一個補丁邏輯:如果drainTo方法執行后作業佇列依然不為空,則通過更基礎的remove方法把佇列中剩余元素一個一個的回圈放到taskList中
for (Runnable r : queue.toArray(new Runnable[0])) {
if (queue.remove(r)) {
taskList.add(r);
}
}
}
return taskList;
}
- shutdownNow方法在入口處使用mainLock加鎖后,與shutdown方法一樣也通過checkShutdownAccess檢查當前是否有權限訪問作業執行緒(前提是設定了SecurityManager),如果無權限則會拋出SecurityException例外,
- 通過advanceRunState方法將執行緒池狀態推進到STOP,
- 通過interruptWorkers使用中斷指令(Thread.interrupt)喚醒所有作業執行緒(區別于shutdown中的interruptIdleWorkers),區別在于除了idle的作業執行緒,所有正在執行任務的作業執行緒也會收到中斷通知,期望其能盡快退出任務的執行,
- 通過drainQueue方法將當前作業執行緒中剩余的所有任務以List的形式統一回傳給呼叫者,
- 通過呼叫tryTerminate方法嘗試終止執行緒池,
如何保證執行緒池在中止后不能再受理新的任務?
在execute方法作為入口,提交任務的邏輯中,v2版本相比v1版本新增了一些基于執行緒池狀態的校驗(和jdk的實作保持一致了),
execute方法中的校驗
- 首先在execute方法中,向作業佇列加入新任務前(workQueue.offer)對當前執行緒池的狀態做了一個校驗(isRunning(currentCtl)),希望非RUNNING狀態的執行緒池不向作業佇列中添加新任務
但在做該檢查時可能與shutdown/shutdownNow內推進執行緒池狀態的邏輯并發執行,所以在作業佇列成功加入任務后還需要再檢查一次執行緒池狀態,如果此時已經不是RUNNING狀態則需要通過remove方法將剛入隊的任務從佇列中移除,并呼叫reject方法(拒絕策略)
addWorker方法中的校驗
- 在addWorker方法的入口處(retry:第一層回圈通過(runState >= SHUTDOWN && !(runState == SHUTDOWN && firstTask == null && !workQueue.isEmpty())))邏輯,
保證了不是RUNNING狀態的執行緒池(runState >= SHUTDOWN),無法創建新的作業執行緒(addWorker回傳false),
但有一種特殊情況:即SHUTDOWN狀態下(runState == SHUTDOWN),作業佇列不為空(!workQueue.isEmpty()),且不是第一次提交任務時創建新作業執行緒(firstTask == null),
依然允許創建新的作業執行緒,因為即使在SHUTDOWN狀態下,某一存活的作業執行緒發生中斷例外時,會呼叫processWorkerExit方法,在銷毀原有作業執行緒后依然需要呼叫addWorker重新創建一個新的(firstTask == null)
execute與shutdown/shutdownNow并發時的處理
execute提交任務時addWorker方法和shutdown/shutdownNow方法是可能并發執行的,但addWorker中有多處地方都對執行緒池的狀態進行了檢查,盡最大的可能避免執行緒池停止時繼續創建新的作業執行緒,
- retry回圈中,compareAndIncrementWorkerCount方法會cas的更新狀態(此前獲取到的ctl狀態必然是RUNNING,否則走不到這里),cas成功則會跳出retry:回圈( break retry;),
而cas失敗可能有兩種情況:
如果是workerCount發生了并發的變化,則在內層的for (;;)回圈中進行重試即可
如果執行緒池由于收到終止指令而推進了狀態,則隨后的if (runStateOf(currentCtl) != runState)將會為true,跳出到外層的回圈重試(continue retry) - 在new Worker(firstTask)后,使用mainLock獲取鎖后再一次檢查執行緒池狀態(if (runState < SHUTDOWN ||(runState == SHUTDOWN && firstTask == null))),
由于shutdown、shutdownNow也是通過mainLock加鎖后才推進的執行緒池狀態,因此這里獲取到的狀態是準確的,
如果校驗失敗(if結果為false),則workers中不會加入新創建的作業執行緒,臨時變數workerAdded=false,則作業執行緒不會啟動(t.start()),臨時變數workerStarted也為false,最后會呼叫addWorkerFailed將新創建的作業執行緒回收掉(回滾)
基于execute方法和addWorker方法中關于各項關于執行緒池停止狀態校驗,最大程度的避免了執行緒池在停止程序中新任務的提交和可能的新作業執行緒的創建,使得execute方法在執行緒池接收到停止指令后(>=SHUTDOWN),最終都會去執行reject拒絕策略邏輯,
/**
* 提交任務,并執行
* */
@Override
public void execute(Runnable command) {
if (command == null){
throw new NullPointerException("command引數不能為空");
}
int currentCtl = this.ctl.get();
if (workerCountOf(currentCtl) < this.corePoolSize) {
// 如果當前存在的worker執行緒數量低于指定的核心執行緒數量,則創建新的核心執行緒
boolean addCoreWorkerSuccess = addWorker(command,true);
if(addCoreWorkerSuccess){
// addWorker添加成功,直接回傳即可
return;
}
// addWorker失敗了
// 失敗的原因主要有以下幾個:
// 1 執行緒池的狀態出現了變化,比如呼叫了shutdown/shutdownNow方法,不再是RUNNING狀態,停止接受新的任務
// 2 多個執行緒并發的execute提交任務,導致cas失敗,重試后發現當前執行緒的個數已經超過了限制
// 3 小概率是ThreadFactory執行緒工廠沒有正確的回傳一個Thread
// 獲取最新的ctl狀態
currentCtl = this.ctl.get();
}
// 走到這里有兩種情況
// 1 因為核心執行緒超過限制(workerCountOf(currentCtl) < corePoolSize == false),需要嘗試嘗試將任務放入阻塞佇列
// 2 addWorker回傳false,創建核心作業執行緒失敗
// 判斷當前執行緒池狀態是否為running
// 如果是running狀態,則進一步執行任務入隊操作
if(isRunning(currentCtl) && this.workQueue.offer(command)){
// 執行緒池是running狀態,且workQueue.offer入隊成功
int recheck = this.ctl.get();
// 重新檢查狀態,避免在上面入隊的程序中執行緒池并發的關閉了
// 如果是isRunning=false,則進一步需要通過remove操作將剛才入隊的任務洗掉,進行回滾
if (!isRunning(recheck) && remove(command)) {
// 執行緒池關閉了,執行reject操作
reject(command);
} else if(workerCountOf(currentCtl) == 0){
// 在corePoolSize為0的情況下,當前不存在存活的核心執行緒
// 一個任務在入隊之后,如果當前執行緒池中一個執行緒都沒有,則需要兜底的創建一個非核心執行緒來處理入隊的任務
// 因此firstTask為null,目的是先讓任務先入隊后創建執行緒去拉取任務并執行
addWorker(null,false);
}else{
// 加入佇列成功,且當前存在worker執行緒,成功回傳
return;
}
}else{
// 阻塞佇列已滿,嘗試創建一個新的非核心執行緒處理
boolean addNonCoreWorkerSuccess = addWorker(command,false);
if(!addNonCoreWorkerSuccess){
// 創建非核心執行緒失敗,執行拒絕策略(失敗的原因和前面創建核心執行緒addWorker的原因類似)
reject(command);
}else{
// 創建非核心執行緒成功,成功回傳
return;
}
}
}
/**
* 向執行緒池中加入worker
* */
private boolean addWorker(Runnable firstTask, boolean core) {
// retry標識外層回圈
retry:
for (;;) {
int currentCtl = ctl.get();
int runState = runStateOf(currentCtl);
// Check if queue empty only if necessary.
// 執行緒池終止時需要回傳false,避免新的worker被創建
// 1 先判斷runState >= SHUTDOWN
// 2 runState >= SHUTDOWN時,意味著不再允許創建新的作業執行緒,但有一種情況例外
// 即SHUTDOWN狀態下(runState == SHUTDOWN),作業佇列不為空(!workQueue.isEmpty()),還需要繼續執行
// 比如在當前存活的執行緒發生中斷例外時,會呼叫processWorkerExit方法,在銷毀原有作業執行緒后呼叫addWorker重新創建一個新的(firstTask == null)
if (runState >= SHUTDOWN && !(runState == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) {
// 執行緒池已經是關閉狀態了,不再允許創建新的作業執行緒,回傳false
return false;
}
// 用于cas更新workerCount的內層回圈(注意這里面與jdk的寫法不同,改寫成了邏輯一致但更可讀的形式)
for (;;) {
// 判斷當前worker數量是否超過了限制
int workerCount = workerCountOf(currentCtl);
if (workerCount >= CAPACITY) {
// 當前worker數量超過了設計上允許的最大限制
return false;
}
if (core) {
// 創建的是核心執行緒,判斷當前執行緒數是否已經超過了指定的核心執行緒數
if (workerCount >= this.corePoolSize) {
// 超過了核心執行緒數,創建核心worker執行緒失敗
return false;
}
} else {
// 創建的是非核心執行緒,判斷當前執行緒數是否已經超過了指定的最大執行緒數
if (workerCount >= this.maximumPoolSize) {
// 超過了最大執行緒數,創建非核心worker執行緒失敗
return false;
}
}
// cas更新workerCount的值
boolean casSuccess = compareAndIncrementWorkerCount(currentCtl);
if (casSuccess) {
// cas成功,跳出外層回圈
break retry;
}
// 重新檢查一下當前執行緒池的狀態與之前是否一致
currentCtl = ctl.get(); // Re-read ctl
if (runStateOf(currentCtl) != runState) {
// 從外層回圈開始continue(因為說明在這期間 執行緒池的作業狀態出現了變化,需要重新判斷)
continue retry;
}
// compareAndIncrementWorkerCount方法cas爭搶失敗,重新執行內層回圈
}
}
boolean workerStarted = false;
boolean workerAdded = false;
MyWorker newWorker = null;
try {
// 創建一個新的worker
newWorker = new MyWorker(firstTask);
final Thread myWorkerThread = newWorker.thread;
if (myWorkerThread != null) {
// MyWorker初始化時內部執行緒創建成功
// 加鎖,防止并發更新
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int runState = runStateOf(ctl.get());
// 重新檢查執行緒池運行狀態,滿足以下兩個條件的任意一個才創建新Worker
// 1 runState < SHUTDOWN
// 說明執行緒池處于RUNNING狀態正常運行,可以創建新的作業執行緒
// 2 runState == SHUTDOWN && firstTask == null
// 說明執行緒池呼叫了shutdown,但作業佇列不為空,依然需要新的Worker,
// firstTask == null標識著其不是因為外部提交新任務而創建新Worker,而是在消費SHUTDOWN前已提交的任務
if (runState < SHUTDOWN ||
(runState == SHUTDOWN && firstTask == null)) {
if (myWorkerThread.isAlive()) {
// 預檢查執行緒的狀態,剛初始化的worker執行緒必須是未喚醒的狀態
throw new IllegalThreadStateException();
}
// 加入worker集合
this.workers.add(newWorker);
int workerSize = workers.size();
if (workerSize > largestPoolSize) {
// 如果當前worker個數超過了之前記錄的最大存活執行緒數,將其更新
largestPoolSize = workerSize;
}
// 創建成功
workerAdded = true;
}
} finally {
// 無論是否發生例外,都先將主控鎖解鎖
mainLock.unlock();
}
if (workerAdded) {
// 加入成功,啟動worker執行緒
myWorkerThread.start();
// 標識為worker執行緒啟動成功,并作為回傳值回傳
workerStarted = true;
}
}
}finally {
if (!workerStarted) {
addWorkerFailed(newWorker);
}
}
return workerStarted;
}
public boolean remove(Runnable task) {
boolean removed = workQueue.remove(task);
// 當一個任務從作業佇列中被成功移除,可能此時作業佇列為空,嘗試判斷是否滿足執行緒池中止條件
tryTerminate();
return removed;
}
如何保證中止程序中不丟失任務?
- 通過shutdown關閉執行緒池時,SHUTDOWN狀態的執行緒池會等待所有剩余的任務執行完畢后再進入TIDYING狀態,
- 通過shutdownNow關閉執行緒池時,以回傳值的形式將剩余的任務吐出來還給用戶
中止前已提交的任務不會丟失;而中止后執行緒池也不會再接收新的任務(走拒絕策略),這兩點共同保證了提交的任務不會丟失,
如何保證執行緒池最終關閉前,所有作業執行緒都已退出?
執行緒池在收到中止命令進入SHUTDOWN或者STOP狀態時,會一直等到作業佇列為空且所有作業執行緒都中止退出后才會推進到TIDYING階段,
上面描述的條件是一個復合的條件,其只有在“收到停止指令(進入SHUTDOWN或者STOP狀態)”、"作業佇列中任務被移除或消費(作業佇列為空)"或是“作業執行緒退出(所有作業執行緒都中止退出)”這三類事件發生時才有可能滿足,
而判斷是否滿足條件并推進到TIDYING狀態的關鍵就在tryTerminate方法中,tryTerminate顧名思義便是用于嘗試終止執行緒池的,當上述任意事件觸發時便判斷是否滿足終止條件,如果滿足則將執行緒池推進到TIDYING階段,
因此在ThreadPoolExecutor中tryTerminate一共在6個地方被呼叫,分別是shutdown、shutdownNow、remove、purge、addWorkerFailed和processWorkerExit方法,
- shutdown、shutdownNow方法觸發收到停止指令的事件
- remove、purge方法觸發作業佇列中任務被移除的事件
- addWorkerFailed、processWorkerExit方法觸發作業執行緒退出的事件
tryTerminate原始碼分析
/**
* 嘗試判斷是否滿足執行緒池中止條件,如果滿足條件,將其推進到最后的TERMINATED狀態
* 注意:必須在任何可能觸發執行緒池中止的場景下呼叫(例如作業執行緒退出,或者SHUTDOWN狀態下佇列作業佇列為空等)
* */
final void tryTerminate() {
for (;;) {
int currentCtl = this.ctl.get();
if (isRunning(currentCtl)
|| runStateAtLeast(currentCtl, TIDYING)
|| (runStateOf(currentCtl) == SHUTDOWN && !workQueue.isEmpty())) {
// 1 isRunning(currentCtl)為true,說明執行緒池還在運行中,不滿足中止條件
// 2 當前執行緒池狀態已經大于等于TIDYING了,說明之前別的執行緒可能已經執行過tryTerminate,且通過了這個if校驗,不用重復執行了
// 3 當前執行緒池是SHUTDOWN狀態,但作業佇列中還有任務沒處理完,也不滿足中止條件
// 以上三個條件任意一個滿足即直接提前return回傳
return;
}
// 有兩種場景會走到這里
// 1 執行了shutdown方法(runState狀態為SHUTDOWN),且當前作業執行緒已經空了
// 2 執行了shutdownNow方法(runState狀態為STOP)
// 這個時候需要令所有的作業執行緒都主動的退出來回收資源
if (workerCountOf(currentCtl) != 0) {
// 如果當前作業執行緒個數不為0,說明還有別的作業執行緒在作業中,
// 通過interruptIdleWorkers(true),打斷其中的一個idle執行緒,嘗試令其也執行runWorker中的processWorkerExit邏輯,并執行tryTerminate
// 被中斷的那個作業執行緒也會執行同樣的邏輯(getTask方法回傳->processWorkerExit->tryTerminate)
// 這樣可以一個接著一個的不斷打斷每一個作業執行緒,令其逐步的退出(比起一次性的通知所有的idle作業執行緒,這樣相對平滑很多)
interruptIdleWorkers(ONLY_ONE);
return;
}
// 執行緒池狀態runState為SHUTDOWN或者STOP,且存活的作業執行緒個數已經為0了
// 雖然前面的interruptIdleWorkers是一個一個中斷idle執行緒的,但實際上有的作業執行緒是因為別的原因退出的(恰好workerCountOf為0了)
// 所以這里是可能存在并發的,因此通過mainLock加鎖防止并發,避免重復的terminated方法呼叫和termination.signalAll方法呼叫
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// cas的設定ctl的值為TIDYING+作業執行緒個數0(防止與別的地方ctl并發更新)
if (ctl.compareAndSet(currentCtl, ctlOf(TIDYING, 0))) {
try {
// cas成功,呼叫terminated鉤子函式
terminated();
} finally {
// 無論terminated鉤子函式是否出現例外
// cas的設定ctl的值為TERMINATED最終態+作業執行緒個數0(防止與別的地方ctl并發更新)
ctl.set(ctlOf(TERMINATED, 0));
// 通知使用awaitTermination方法等待執行緒池關閉的其它執行緒(通過termination.await等待)
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// 如果上述對ctl變數的cas操作失敗了,則進行重試,再來一次回圈
// else retry on failed CAS
}
}
如何保證作業執行緒一定能成功退出?
從上面tryTerminate方法的實作中可以看到,執行緒池必須等到所有作業執行緒都全部退出(workerCount為0),作業執行緒占用的全部資源都回收后才會推進到終止態,
那么之前啟動的作業執行緒一定能通過processWorkerExit退出并銷毀嗎?答案是不一定,這主要取決于用戶是否正確的撰寫了令作業執行緒安全退出的任務邏輯,
因為只有能退出任務執行邏輯(runWorker方法中的task.run())的作業執行緒才有機會執行processWorkerExit,無法從任務中跳出(正常退出or拋例外)的作業執行緒將永遠無法退出,導致執行緒池也永遠無法推進到終態,
下面分情況討論:
- 任務中的邏輯是一定會執行完正常結束的(沒有無限回圈也沒有令執行緒陷入阻塞態的操作),那么這是沒問題的
()->{
// 會正常結束的
System.out.println("hello world!");
};
- 任務中存在需要無限回圈的邏輯,那么最好在回圈條件內監聽一個volatile的變數,當需要執行緒池停止時,修改這個變數,從而令任務從無限回圈中正常退出,
()->{
// 無限回圈
while(true){
System.out.println("hello world!");
}
};
()->{
// 無限回圈時監聽一個變數
while(!isStop) {
System.out.println("hello world!");
}
};
- 任務中存在Condition.await等會阻塞當前執行緒,令其無法自然退出的邏輯,
tryTerminate中停止作業執行緒時會呼叫Worker類的interruptIfStarted方法發出中斷指令(Thread.interrupt方法),如果被阻塞的方法是回應中斷的,那么業務代碼中不能無腦吞掉InterruptedException,而要能感知到中斷例外,在確實要關閉執行緒池時令任務退出(向上拋例外或正常退出),
而如果是不回應中斷的阻塞方法(如ReentrantLock.lock),則需要用戶自己保證這些方法最終能夠被喚醒,否則作業執行緒將無法正常退出而阻止執行緒池進入終止狀態,
()->{
try {
new ReentrantLock().newCondition().await();
} catch (InterruptedException e) {
// doSomething處理一些邏輯后,,,
// 向上拋出例外
throw new XXXException(e);
}
}
()->{
try {
new ReentrantLock().newCondition().await();
} catch (InterruptedException e) {
}
// doSomething處理一些邏輯后,,,正常退出
}
為什么不在執行緒池終止時使用Thread.stop方法強制令作業執行緒停止呢?
雖然Thread.stop能夠保證執行緒一定會被停止,但由于停止的程序中存在很嚴重的并發安全問題而被廢棄而不推薦使用了,
具體原因可以參考官方檔案(Why is Thread.stop deprecated?):https://docs.oracle.com/javase/8/docs/technotes/guides/concurrency/threadPrimitiveDeprecation.html
總結
- 本篇博客從原始碼的角度詳細分析了jdk執行緒池ThreadPoolExecutor關于優雅停止實作的原理,其中重點介紹了ThreadPoolExecutor是如何做到中止后不能再受理新的任務、中止時不丟失已提交任務以及關閉時不會發生執行緒資源的泄露等核心功能,
- 結合之前發布的第一篇關于ThreadPoolExecutor正常運行時接受并執行所提交任務的博客,雖然沒有100%的覆寫ThreadPoolExecutor的全部功能,但依然完整的講解了ThreadPoolExecutor最核心的功能,希望這兩篇博客能幫助到對jdk執行緒池實作原理感興趣的讀者,
- 本篇博客的完整代碼在我的github上:https://github.com/1399852153/Reinventing-the-wheel-for-learning(ThreadPool模塊 MyThreadPoolExecutorV2) 內容如有錯誤,還請多多指教,
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/535976.html
標籤:Java
上一篇:RabbitMQ 常見問題
下一篇:第2-3-5章 洗掉附件的介面開發-檔案存盤服務系統-nginx/fastDFS/minio/阿里云oss/七牛云oss
