jdk執行緒池ThreadPoolExecutor作業原理決議(自己動手實作執行緒池)(一)
執行緒池介紹
在日常開發中經常會遇到需要使用其它執行緒將大量任務異步處理的場景(異步化以及提升系統的吞吐量),而在使用執行緒的程序中卻存在著兩個痛點,
- 在java等很多主流語言中每個邏輯上的執行緒底層都對應著一個系統執行緒(不考慮虛擬執行緒的情況),作業系統創建一個新執行緒是存在一定開銷的,
在需要執行大量的異步任務時,如果處理每個任務時都直接向系統申請創建一個執行緒來執行,并在任務執行完畢后再回收執行緒,則創建/銷毀大量執行緒的開銷將無法忍受, - 每個系統執行緒都會占用一定的記憶體空間,且系統在調度不同執行緒背景關系切換時存在一定的cpu開銷,因此在一定的硬體條件下,作業系統能同時維護的系統執行緒個數相對而言是比較有限的,
在使用執行緒的程序中如果沒有控制好流量,會很容易創建過多的執行緒而耗盡系統資源,令系統變得不可用,
而執行緒池正是為解決上述痛點而生的,其通過兩個手段來解決上述痛點,
池化執行緒資源
池化執行緒資源,顧名思義就是維護一個存活執行緒的集合(池子),提交任務的用戶程式不直接控制執行緒的創建和銷毀,不用每次執行任務時都申請創建一個新執行緒,而是通過執行緒池間接的獲得執行緒去處理異步任務,
執行緒池中的執行緒在執行完任務后通常也不會被系統回收掉,而是繼續待在池子中用于執行其它的任務(執行堆積的待執行任務或是等待新任務),
執行緒池通過池化執行緒資源,避免了系統反復創建/銷毀執行緒的開銷,大幅提高了處理大規模異步任務時的性能,
對執行緒資源的申請進行收口,限制系統資源的使用
如果程式都統一使用執行緒池來處理異步任務,則執行緒池內部便可以對系統資源的使用施加一定限制,
例如用戶可以指定一個執行緒池最大可維護的執行緒數量,以避免耗盡系統資源,
當用戶提交任務的速率過大,導致執行緒池中的執行緒數到達指定的最大值時依然無法滿足需求時,執行緒池可以通過丟棄部分任務或限制提交任務的流量的方式來處理這一問題,
執行緒池通過對執行緒資源的使用進行統一收口,用戶可以通過設定執行緒池的引數來控制系統資源的使用,從而避免系統資源耗盡,
jdk執行緒池ThreadPoolExecutor簡單介紹
前面介紹了執行緒池的概念,而要深入理解執行緒池的作業原理最好的辦法便是找到一個優秀的執行緒池實作來加以研究,
而自jdk1.5中引入的通用執行緒池框架ThreadPoolExecutor便是一個很好的學習物件,其內部實作不算復雜,卻在高效實作核心功能的同時還提供了較豐富的拓展能力,
下面從整體上介紹一下jdk通用執行緒池ThreadPoolExecutor的作業原理(基于jdk8),
ThreadPoolExecutor運行時作業流程
首先ThreadPoolExecutor允許用戶從兩個不同維度來控制執行緒資源的使用,即最大核心執行緒數(corePoolSize)和最大執行緒數(maximumPoolSize),
最大核心執行緒數:核心執行緒指的是通常常駐執行緒池的執行緒,常駐執行緒在執行緒池沒有任務空閑時也不會被銷毀,而是處于idle狀態,這樣在新任務到來時就能很快的進行回應,
最大執行緒數:和第一節中提到的一樣,即執行緒池中所能允許的活躍執行緒的最大數量,
在向ThreadPoolExecutor提交任務時(execute方法),會執行一系列的判斷來決定任務應該如何被執行(原始碼在下一節中具體分析),
- 首先判斷當前活躍的執行緒數是否小于指定的最大核心執行緒數corePoolSize,
如果為真,則說明當前執行緒池還未完成預熱,核心執行緒數不飽和,創建一個新執行緒來執行該任務,
如果為假,則說明當前執行緒池已完成預熱,進行下一步判斷, - 嘗試將當前任務放入作業佇列workQueue(阻塞佇列BlockingQueue),作業佇列中的任務會被執行緒池中的活躍執行緒按入隊順序逐個消費,
如果入隊成功,則說明當前作業佇列未滿,入隊的任務將會被執行緒池中的某個活躍執行緒所消費并執行,
如果入隊失敗,則說明當前作業佇列已飽和,執行緒池消費任務的速度可能太慢了,可能需要創建更多新執行緒來加速消費,進行下一步判斷, - 判斷當前活躍的執行緒數是否小于指定的最大執行緒數maximumPoolSize,
如果為真,則說明當前執行緒池所承載的執行緒數還未達到引數指定的上限,還有余量來創建新的執行緒加速消費,創建一個新執行緒來執行該任務,
如果為假,則說明當前執行緒池所承載的執行緒數達到了上限,但處理任務的速度依然不夠快,需要觸發拒絕策略,

ThreadPoolExecutor優雅停止
執行緒池的優雅停止一般要能做到以下幾點:
- 執行緒池在中止后不能再受理新的任務
- 執行緒池中止的程序中,已經提交的現存任務不能丟失(等待剩余任務執行完再關倍訓者能夠把剩余的任務吐出來還給用戶)
- 執行緒池最終關閉前,確保創建的所有作業執行緒都已退出,不會出現資源的泄露
執行緒池自啟動后便會有大量的作業執行緒在內部持續不斷并發的執行提交的各種任務,而要想做到優雅停止并不是一件容易的事情,
因此ThreadPoolExecutor中最復雜、細節最多的部分并不在于上文中的正常作業流程,而在于分散在各個地方但又緊密協作的,控制優雅停止的邏輯,
ThreadPoolExecutor的其它功能
除了正常的作業流程以及優雅停止的功能外,ThreadPoolExecutor還提供了一些比較好用的功能
- 提供了很多protected修飾的鉤子函式,便于用戶繼承并實作自己的執行緒池時進行一定的拓展
- 在運行時統計了總共執行的任務數等關鍵指標,并提供了對應的api便于用戶在運行時觀察運行狀態
- 允許在執行緒池運行程序中動態修改關鍵的配置引數(比如corePoolSize等),并實時的生效,
jdk執行緒池ThreadPoolExecutor原始碼決議(自己動手實作執行緒池v1版本)
如費曼所說:What I can not create I do not understand(我不能理解我創造不了的東西),
通過模仿jdk的ThreadPoolExecutor實作,從零開始實作一個執行緒池,可以迫使自己去仔細的捋清楚jdk執行緒池中設計的各種細節,加深理解而達到更好的學習效果,
前面提到ThreadPoolExecutor的核心邏輯主要分為兩部分,一是正常運行時處理提交的任務的邏輯,二是實作優雅停止的邏輯,
因此我們實作的執行緒池MyThreadPoolExecutor(以My開頭用于區分)也會分為兩個版本,v1版本只實作前一部分即正常運行時執行任務的邏輯,將有關執行緒池優雅停止的邏輯全部去除,
相比直接啃jdk最終實作的原始碼,v1版本的實作會更簡單更易理解,讓正常執行任務時的邏輯更加清晰而不會耦合太多關于優雅停止的邏輯,
執行緒池關鍵成員變數介紹
ThreadPoolExecutor中有許多的成員變數,大致可以分為三類,
可由用戶自定義的、用于控制執行緒池運行的配置引數
- volatile int corePoolSize(最大核心執行緒數量)
- volatile int maximumPoolSize(最大執行緒數量)
- volatile long keepAliveTime(idle執行緒保活時間)
- final BlockingQueue workQueue(作業佇列(阻塞佇列))
- volatile ThreadFactory threadFactory(作業執行緒工廠)
- volatile RejectedExecutionHandler handler(拒絕例外處理器)
- volatile boolean allowCoreThreadTimeOut(是否允許核心執行緒在idle超時后退出)
其中前6個配置引數都可以在ThreadPoolExecutor的建構式中指定,而allowCoreThreadTimeOut則可以通過暴露的public方法allowCoreThreadTimeOut來動態的設定,
其中大部分屬性都是volatile修飾的,目的是讓運行程序中可以用過提供的public方法動態修改這些值后,執行緒池中的作業執行緒或提交任務的用戶執行緒能及時的感知到變化(執行緒間的可見性),并進行回應(比如令核心執行緒自動的idle退出)
這些配置屬性具體如何控制執行緒池行為的原理都會在下面的原始碼決議中展開介紹,理解這些引數的作業原理后才能在實際的業務中使用執行緒池時為其設定合適的值,
僅供執行緒池內部作業時使用的屬性
- ReentrantLock mainLock(用于控制各種臨界區邏輯的并發)
- HashSet
workers(當前活躍作業執行緒Worker的集合,作業執行緒的作業原理會在下文介紹) - AtomicInteger ctl(執行緒池控制狀態,control的簡寫)
這里重點介紹一下ctl屬性,ctl雖然是一個32位的整型欄位(AtomicInteger),但實際上卻用于標識兩個業務屬性,即當前執行緒池的運行狀態和worker執行緒的總數量,
在執行緒池初始化時狀態位RUNNING,worker執行緒數量位0(private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));),
ctl的32位中的高3位用于標識執行緒池當前的狀態,剩余的29位用于標識執行緒池中worker執行緒的數量(因此理論上ThreadPoolExecutor最大可容納的執行緒數并不是231-1(32位中符號要占一位),而是229-1)
由于聚合之后單獨的讀寫某一個屬性不是很方便,所以ThreadPoolExecutor中提供了很多基于位運算的輔助函式來簡化這些邏輯,
ctl這樣聚合的設計比起拆分成兩個獨立的欄位有什么好處?
在ThreadPoolExecutor中關于優雅停止的邏輯中有很多地方是需要同時判斷當前作業執行緒數量與執行緒池狀態后,再對執行緒池狀態作業執行緒數量進行更新的(具體邏輯在下一篇v2版本的博客中展開),
且為了執行效率,不使用互斥鎖而是通過cas重試的方法來解決并發更新的問題,而對一個AtomicInteger屬性做cas重試的更新,要比同時控制兩個屬性進行cas的更新要簡單很多,執行效率也高很多,
ThreadPoolExecutor共有五種狀態,但有四種都和優雅停止有關(除了RUNNING),
但由于v1版本的MyThreadPoolExecutorV1不支持優雅停止,所以不在本篇博客中講解這些狀態具體的含義以及其是如何變化的(下一篇v2版本的博客中展開)
記錄執行緒池運行程序中的一些關鍵指標
- completedTaskCount(執行緒池自啟動后已完成的總任務數)
- largestPoolSize(執行緒池自啟動后作業執行緒個數的最大值)
在運行程序中,ThreadPoolExecutor會在對應的地方進行埋點,統計一些指標并提供相應的api給用戶實時的查詢,以提高執行緒池作業時的可觀測性,
public class MyThreadPoolExecutorV1 implements MyThreadPoolExecutor{
/**
* 指定的最大核心執行緒數量
* */
private volatile int corePoolSize;
/**
* 指定的最大執行緒數量
* */
private volatile int maximumPoolSize;
/**
* 執行緒保活時間(單位:納秒 nanos)
* */
private volatile long keepAliveTime;
/**
* 存放任務的作業佇列(阻塞佇列)
* */
private final BlockingQueue<Runnable> workQueue;
/**
* 執行緒工廠
* */
private volatile ThreadFactory threadFactory;
/**
* 拒絕策略
* */
private volatile MyRejectedExecutionHandler handler;
/**
* 是否允許核心執行緒在idle一定時間后被銷毀(和非核心執行緒一樣)
* */
private volatile boolean allowCoreThreadTimeOut;
/**
* 主控鎖
* */
private final ReentrantLock mainLock = new ReentrantLock();
/**
* 當前執行緒池已完成的任務數量
* */
private long completedTaskCount;
/**
* 維護當前存活的worker執行緒集合
* */
private final HashSet<MyWorker> workers = new HashSet<>();
/**
* 當前執行緒池中存在的worker執行緒數量 + 狀態的一個聚合(通過一個原子int進行cas,來避免對兩個業務屬性欄位加鎖來保證一致性)
* v1版本只關心前者,即worker執行緒數量
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
/**
* 32位的有符號整數,有3位是用來存放執行緒池狀態的,所以用來維護當前作業執行緒個數的部分就只能用29位了
* 被占去的3位中,有1位原來的符號位,2位是原來的數值位,
* */
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
/**
* 執行緒池狀態poolStatus常量(狀態值只會由小到大,單調遞增)
* 執行緒池狀態遷移圖:
* ↗ SHUTDOWN ↘
* RUNNING ↓ TIDYING → TERMINATED
* ↘ STOP ↗
* 1 RUNNING狀態,代表著執行緒池處于正常運行的狀態,能正常的接收并處理提交的任務
* 執行緒池物件初始化時,狀態為RUNNING
* 對應邏輯:private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
*
* 2 SHUTDOWN狀態,代表執行緒池處于停止對外服務的狀態,不再接收新提交的任務,但依然會將workQueue作業佇列中積壓的任務處理完
* 呼叫了shutdown方法時,狀態由RUNNING -> SHUTDOWN
* 對應邏輯:shutdown方法中的advanceRunState(SHUTDOWN);
*
* 3 STOP狀態,代表執行緒池處于停止狀態,不再接受新提交的任務,同時也不再處理workQueue作業佇列中積壓的任務,當前還在處理任務的作業執行緒將收到interrupt中斷通知
* 之前未呼叫shutdown方法,直接呼叫了shutdownNow方法,狀態由RUNNING -> STOP
* 之前先呼叫了shutdown方法,后呼叫了shutdownNow方法,狀態由SHUTDOWN -> STOP
* 對應邏輯:shutdownNow方法中的advanceRunState(STOP);
*
* 4 TIDYING狀態,代表著執行緒池即將完全終止,正在做最后的收尾作業
* 當前執行緒池狀態為SHUTDOWN,任務被消費完作業佇列workQueue為空,且作業執行緒全部退出完成作業執行緒集合workers為空時,tryTerminate方法中將狀態由SHUTDOWN->TIDYING
* 當前執行緒池狀態為STOP,作業執行緒全部退出完成作業執行緒集合workers為空時,tryTerminate方法中將狀態由STOP->TIDYING
* 對應邏輯:tryTerminate方法中的ctl.compareAndSet(c, ctlOf(TIDYING, 0)
*
* 5 TERMINATED狀態,代表著執行緒池完全的關閉,之前執行緒池已經處于TIDYING狀態,且呼叫的鉤子函式terminated已回傳
* 當前執行緒池狀態為TIDYING,呼叫的鉤子函式terminated已回傳
* 對應邏輯:tryTerminate方法中的ctl.set(ctlOf(TERMINATED, 0));
* */
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
/**
* 跟蹤執行緒池曾經有過的最大執行緒數量(只能在mainLock的并發保護下更新)
*/
private int largestPoolSize;
private boolean compareAndIncrementWorkerCount(int expect) {
return this.ctl.compareAndSet(expect, expect + 1);
}
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
private void decrementWorkerCount() {
do {
// cas更新,workerCount自減1
} while (!compareAndDecrementWorkerCount(ctl.get()));
}
public MyThreadPoolExecutorV1(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
MyRejectedExecutionHandler handler) {
// 基本的引數校驗
if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) {
throw new IllegalArgumentException();
}
if (unit == null || workQueue == null || threadFactory == null || handler == null) {
throw new NullPointerException();
}
// 設定成員變數
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
public ThreadFactory getThreadFactory() {
return threadFactory;
}
}
Worker作業執行緒
ThreadPoolExecutor中的作業執行緒并不是裸的Thread,而是被封裝在了一個Worker的內部類中,
Worker實作了Runnable所以可以作為一個普通的執行緒來啟動,在run方法中只是簡單的呼叫了一下runWorker(runWorker后面再展開),
Worker類有三個成員屬性:
- Thread thread(被封裝的作業執行緒物件)
- Runnable firstTask(提交任務時,創建新Worker物件時指定的第一次要執行的任務(后續執行緒就會去拉取作業佇列里的任務執行了))
- volatile long completedTasks(統計用,計算當前作業執行緒總共完成了多少個任務)
Worker內封裝的實際的作業執行緒物件thread,其在建構式中由執行緒池的執行緒工廠threadFactory生成,傳入this,所以thread在start后,便會呼叫run方法進而執行runWorker,
執行緒工廠可以由用戶在創建執行緒池時通過引數指定,因此用戶在自由控制所生成的作業執行緒的同時,也需要保證newThread能正確的回傳一個可用的執行緒物件,
除此之外,Worker物件還繼承了AbstractQueuedSynchronizer(AQS)類,簡單的實作了一個不可重入的互斥鎖,
對AQS互斥模式不太了解的讀者可以參考一下我之前關于AQS互斥模式的博客:AQS互斥模式與ReentrantLock可重入鎖原理決議
AQS中維護了一個volatile修飾的int型別的成員變數state,其具體的含義可以由使用者自己定義,
在Worker中,state的值有三種狀態:
- state=-1,標識作業執行緒還未啟動(不會被interruptIfStarted打斷)
- state=0,標識作業執行緒已經啟動,但沒有開始處理任務(可能是在等待任務,idle狀態)
- state=1,標識worker執行緒正在執行任務(runWorker方法中,成功獲得任務后,通過lock方法將state設定為1)
具體這三種情況分別在什么時候出現會在下面決議提交任務原始碼的那部分里詳細介紹,
/**
* jdk的實作中令Worker繼承AbstractQueuedSynchronizer并實作了一個不可重入的鎖
* AQS中的state屬性含義
* -1:標識作業執行緒還未啟動
* 0:標識作業執行緒已經啟動,但沒有開始處理任務(可能是在等待任務,idle狀態)
* 1:標識worker執行緒正在執行任務(runWorker中,成功獲得任務后,通過lock方法將state設定為1)
* */
private final class MyWorker extends AbstractQueuedSynchronizer implements Runnable{
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
public MyWorker(Runnable firstTask) {
this.firstTask = firstTask;
// newThread可能是null
this.thread = getThreadFactory().newThread(this);
}
@Override
public void run() {
runWorker(this);
}
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock(){
acquire(1);
}
public boolean tryLock(){
return tryAcquire(1);
}
public void unlock(){
release(1);
}
public boolean isLocked(){
return isHeldExclusively();
}
void interruptIfStarted() {
Thread t;
// 三個條件同時滿足,才去中斷Worker對應的thread
// getState() >= 0,用于過濾還未執行runWorker的,剛入隊初始化的Worker
// thread != null,用于過濾掉構造方法中ThreadFactory.newThread回傳null的Worker
// !t.isInterrupted(),用于過濾掉那些已經被其它方式中斷的Worker執行緒(比如用戶自己去觸發中斷,提前終止執行緒池中的任務)
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
execute執行提交的任務
下面介紹本篇博客的重點,即執行緒池是如何執行用戶所提交的任務的,
用戶提交任務的入口是public的execute方法,Runnable型別的引數command就是提交的要執行的任務,
MyThreadPoolExecutorV1的execute方法(相比jdk的實作v1版本去掉了關于優雅停止的邏輯)
/**
* 提交任務,并執行
* */
public void execute(Runnable command) {
if (command == null){
throw new NullPointerException("command引數不能為空");
}
int currentCtl = this.ctl.get();
if (workerCountOf(currentCtl) < this.corePoolSize) {
// 如果當前存在的worker執行緒數量低于指定的核心執行緒數量,則創建新的核心執行緒
boolean addCoreWorkerSuccess = addWorker(command,true);
if(addCoreWorkerSuccess){
// addWorker添加成功,直接回傳即可
return;
}
}
// 走到這里有兩種情況
// 1 因為核心執行緒超過限制(workerCountOf(currentCtl) < corePoolSize == false),需要嘗試嘗試將任務放入阻塞佇列
// 2 addWorker回傳false,創建核心作業執行緒失敗
if(this.workQueue.offer(command)){
// workQueue.offer入隊成功
if(workerCountOf(currentCtl) == 0){
// 在corePoolSize為0的情況下,當前不存在存活的核心執行緒
// 一個任務在入隊之后,如果當前執行緒池中一個執行緒都沒有,則需要兜底的創建一個非核心執行緒來處理入隊的任務
// 因此firstTask為null,目的是先讓任務先入隊后創建執行緒去拉取任務并執行
addWorker(null,false);
}else{
// 加入佇列成功,且當前存在worker執行緒,成功回傳
return;
}
}else{
// 阻塞佇列已滿,嘗試創建一個新的非核心執行緒處理
boolean addNonCoreWorkerSuccess = addWorker(command,false);
if(!addNonCoreWorkerSuccess){
// 創建非核心執行緒失敗,執行拒絕策略(失敗的原因和前面創建核心執行緒addWorker的原因類似)
reject(command);
}else{
// 創建非核心執行緒成功,成功回傳
return;
}
}
}
/**
* 根據指定的拒絕處理器,執行拒絕策略
* */
private void reject(Runnable command) {
this.handler.rejectedExecution(command, this);
}
可以看到,execute方法原始碼中對于任務處理的邏輯很清晰,也能與ThreadPoolExecutor運行時作業流程中所介紹的流程所匹配,
addWorker方法(創建新的作業執行緒)
在execute方法中當需要創建核心執行緒或普通執行緒時,便需要通過addWorker方法嘗試創建一個新的作業執行緒,
/**
* 向執行緒池中加入worker
* */
private boolean addWorker(Runnable firstTask, boolean core) {
// retry標識外層回圈
retry:
for (;;) {
int currentCtl = ctl.get();
// 用于cas更新workerCount的內層回圈(注意這里面與jdk的寫法不同,改寫成了邏輯一致但更可讀的形式)
for (;;) {
// 判斷當前worker數量是否超過了限制
int workerCount = workerCountOf(currentCtl);
if (workerCount >= CAPACITY) {
// 當前worker數量超過了設計上允許的最大限制
return false;
}
if (core) {
// 創建的是核心執行緒,判斷當前執行緒數是否已經超過了指定的核心執行緒數
if (workerCount >= this.corePoolSize) {
// 超過了核心執行緒數,創建核心worker執行緒失敗
return false;
}
} else {
// 創建的是非核心執行緒,判斷當前執行緒數是否已經超過了指定的最大執行緒數
if (workerCount >= this.maximumPoolSize) {
// 超過了最大執行緒數,創建非核心worker執行緒失敗
return false;
}
}
// cas更新workerCount的值
boolean casSuccess = compareAndIncrementWorkerCount(currentCtl);
if (casSuccess) {
// cas成功,跳出外層回圈
break retry;
}
// compareAndIncrementWorkerCount方法cas爭搶失敗,重新執行內層回圈
}
}
boolean workerStarted = false;
MyWorker newWorker = null;
try {
// 創建一個新的worker
newWorker = new MyWorker(firstTask);
final Thread myWorkerThread = newWorker.thread;
if (myWorkerThread != null) {
// MyWorker初始化時內部執行緒創建成功
// 加鎖,防止并發更新
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (myWorkerThread.isAlive()) {
// 預檢查執行緒的狀態,剛初始化的worker執行緒必須是未喚醒的狀態
throw new IllegalThreadStateException();
}
// 加入worker集合
this.workers.add(newWorker);
int workerSize = workers.size();
if (workerSize > largestPoolSize) {
// 如果當前worker個數超過了之前記錄的最大存活執行緒數,將其更新
largestPoolSize = workerSize;
}
// 創建成功
} finally {
// 無論是否發生例外,都先將主控鎖解鎖
mainLock.unlock();
}
// 加入成功,啟動worker執行緒
myWorkerThread.start();
// 標識為worker執行緒啟動成功,并作為回傳值回傳
workerStarted = true;
}
}finally {
if (!workerStarted) {
addWorkerFailed(newWorker);
}
}
return workerStarted;
}
addWorker可以分為兩部分:判斷當前是否滿足創建新作業執行緒的條件、創建并啟動新的Worker作業執行緒,
判斷當前是否滿足創建新作業執行緒的條件
入口處開始的retry標識的for回圈部分,便是用于判斷是否滿足創建新作業執行緒的條件,
- 首先判斷當前作業執行緒數量是否超過了理論的最大值CAPACITY(即2^29-1),超過了則不能創建,回傳false,不創建新作業執行緒
- 根據boolean型別引數core判斷是否創建核心作業執行緒,core=true則判斷是否超過了corePoolSize的限制,core=false則判斷是否超過了maximumPoolSize的限制,不滿足則回傳false,不創建新作業執行緒
- 滿足上述限制條件后,則說明可以創建新執行緒了,compareAndIncrementWorkerCount方法進行cas的增加當前作業執行緒數,
如果cas失敗,則說明存在并發的更新了,則再一次的回圈重試,并再次的進行上述檢查,
需要注意的是:這里面有兩個for回圈的原因在于v1版本省略了優雅停止的邏輯(所以實際上v1版本能去掉內層回圈的),如果執行緒池處于停止狀態則不能再創建新作業執行緒了,因此也需要判斷執行緒池當前的狀態,
不滿足條件則也需要回傳false,不創建作業執行緒,
而且compareAndIncrementWorkerCount中cas更新ctl時,如果并發的執行緒池被停止而導致執行緒池狀態發生了變化,也會導致cas失敗重新檢查,
這也是jdk的實作中為什么把執行緒池狀態和作業執行緒數量系結在一起的原因之一,這樣在cas更新時可以原子性的同時檢查兩個欄位的并發爭搶,(更具體的細節會在下一篇博客的v2版本中介紹)
創建并啟動新的Worker作業執行緒
在通過retry那部分的層層條件檢查后,緊接著便是實際創建新作業執行緒的邏輯,
- 首先通過Worker的構造方法創建一個新的Worker物件,并將用戶提交的任務作為firstTask引數傳入,
- 判斷Worker在構造時執行緒工廠是否正確的生成了一個Thread(判空),如果thread == null的話直接回傳false,標識創建新作業執行緒失敗,
- 在mainLock的保護下,將新創建的worker執行緒加入workers集合中
- 啟動Worker中的執行緒(myWorkerThread.start()),啟動后會執行Worker類中的run方法,新的作業執行緒會執行runWorker方法(下文會展開分析runWorker)
- 如果Worker中的執行緒不是alive狀態等原因導致作業執行緒啟動失敗,則在finally中通過addWorkerFailed進行一系列的回滾操作
雖然在前面執行緒池作業流程的分析中提到了核心執行緒與非核心執行緒的概念,但Worker類中實際上并沒有核心/非核心的標識,
經過了作業執行緒啟動前的條件判斷后,新創建的作業執行緒實際上并沒有真正的核心與非核心的差別,
addWorkerFailed(addWorker的逆向回滾操作)
addWorker中作業執行緒可能會啟動失敗,所以要對addWorker中對workers集合以及workerCount等資料的操作進行回滾,
/**
* 當創建worker出現例外失敗時,對之前的操作進行回滾
* 1 如果新創建的worker加入了workers集合,將其移除
* 2 減少記錄存活的worker個數(cas更新)
* 3 檢查執行緒池是否滿足中止的狀態,防止這個存活的worker執行緒阻止執行緒池的中止(v1版本不考慮,省略了tryTerminate)
*/
private void addWorkerFailed(MyWorker myWorker) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (myWorker != null) {
// 如果新創建的worker加入了workers集合,將其移除
workers.remove(myWorker);
}
// 減少存活的worker個數
decrementWorkerCount();
// 嘗試著將當前worker執行緒終止(addWorkerFailed由作業執行緒自己呼叫)
// tryTerminate();
} finally {
mainLock.unlock();
}
}
runWorker(作業執行緒核心執行邏輯)
前面介紹了用戶如何向執行緒池提交任務,以及如何創建新作業執行緒Worker,下面介紹作業執行緒在執行緒池中是如何運行的,
- runWorker方法內部本質上是一個無限回圈,在進入主回圈之前通過unlock方法,將內部AQS父類中的state標識為0,允許被外部中斷(可以被interruptIfStarted選中而打斷)
- 之后便是主回圈,如果firstTask不為空(說明第一次啟動),則直接呼叫task.run方法,否則通過getTask方法嘗試從作業佇列中撈取一個任務來執行
- 在實際的任務執行前和執行后都呼叫對應的鉤子方法(beforeExecute、afterExecute)
- 在任務執行前通過lock方法將AQS的state方法設定為1代表當前Worker正在執行任務,并在執行完一個任務后在finally中進行unlock解鎖,令當前作業執行緒進入idle狀態,
同時清空firstTask的值(清空后下一次回圈就會通過getTask獲取任務了)并令Worker中的completedTasks統計指標也自增1 - 如果任務執行程序中出現了例外,則catch住并最終向上拋出跳出主回圈,finally中執行processWorkerExit(認為任務一旦執行出現了例外,則很可能作業執行緒內部的一些狀態已經損壞,需要重新創建一個新的作業執行緒來代替出例外的老作業執行緒)
- 有兩種情況會導致執行processWorkerExit,一種是上面說的任務執行時出現了例外,此時completedAbruptly=true;還有一種可能時getTask因為一些原因回傳了null,此時completedAbruptly=false,
completedAbruptly會作為processWorkerExit的引數傳遞,
/**
* worker作業執行緒主回圈執行邏輯
* */
private void runWorker(MyWorker myWorker) {
// 時worker執行緒的run方法呼叫的,此時的current執行緒的是worker執行緒
Thread workerThread = Thread.currentThread();
Runnable task = myWorker.firstTask;
// 已經暫存了firstTask,將其清空(有地方根據firstTask是否存在來判斷作業執行緒中負責的任務是否是新提交的)
myWorker.firstTask = null;
// 將state由初始化時的-1設定為0
// 標識著此時當前作業執行緒開始作業了,這樣可以被interruptIfStarted選中
myWorker.unlock();
// 默認執行緒是由于中斷退出的
boolean completedAbruptly = true;
try {
// worker執行緒處理主回圈,核心邏輯
while (task != null || (task = getTask()) != null) {
// 將state由0標識為1,代表著其由idle狀態變成了正在作業的狀態
// 這樣interruptIdleWorkers中的tryLock會失敗,這樣作業狀態的執行緒就不會被該方法中斷任務的正常執行
myWorker.lock();
// v1版本此處省略優雅停止相關的核心邏輯
try {
// 任務執行前的鉤子函式
beforeExecute(workerThread, task);
Throwable thrown = null;
try {
// 拿到的任務開始執行
task.run();
} catch (RuntimeException | Error x) {
// 使用thrown收集拋出的例外,傳遞給afterExecute
thrown = x;
// 同時拋出錯誤,從而中止主回圈
throw x;
} catch (Throwable x) {
// 使用thrown收集拋出的例外,傳遞給afterExecute
thrown = x;
// 同時拋出錯誤,從而中止主回圈
throw new Error(x);
} finally {
// 任務執行后的鉤子函式,如果任務執行時拋出了錯誤/例外,thrown不為null
afterExecute(task, thrown);
}
} finally {
// 將task設定為null,令下一次while回圈通過getTask獲得新任務
task = null;
// 無論執行時是否存在例外,已完成的任務數加1
myWorker.completedTasks++;
// 無論如何將myWorker解鎖,標識為idle狀態
myWorker.unlock();
}
}
// getTask回傳了null,說明沒有可執行的任務或者因為idle超時、執行緒數超過配置等原因需要回收當前執行緒,
// 執行緒正常的退出,completedAbruptly為false
completedAbruptly = false;
}finally {
// getTask回傳null,執行緒正常的退出,completedAbruptly值為false
// task.run()執行時拋出了例外/錯誤,直接跳出了主回圈,此時completedAbruptly為初始化時的默認值true
processWorkerExit(myWorker, completedAbruptly);
// processWorkerExit執行完成后,worker執行緒對應的run方法(run->runWorker)也會執行完畢
// 此時執行緒物件會進入終止態,等待作業系統回收
// 而且processWorkerExit方法內將傳入的Worker從workers集合中移除,jvm中的物件也會因為不再被參考而被GC回收
// 此時,當前作業執行緒所占用的所有資源都已釋放完畢
}
}
getTask嘗試獲取任務執行
runWorker中是通過getTask獲取任務的,getTask中包含著作業執行緒是如何從作業佇列中獲取任務的關鍵邏輯,
- 在獲取任務前,需要通過getTask檢查當前執行緒池的執行緒數量是否超過了引數配置(啟動后被動態調整了),因此需要先獲得當前執行緒池作業執行緒總數workCount,
如果當前作業執行緒數量超過了指定的最大執行緒個數maximumPoolSize限制,則說明當前執行緒需要退出了 - timed標識用于決定當前執行緒如何從作業佇列(阻塞佇列)中獲取新任務,如果timed為true則通過poll方法獲取同時指定相應的超時時間(配置引數keepAliveTime),如果timed為false則通過take方法無限期的等待,
如果作業佇列并不為空,則poll和take方法都會立即回傳一個任務物件,而當作業佇列為空時,作業執行緒則會阻塞在作業佇列上以讓出CPU(idle狀態)直到有新的任務到來而被喚醒(或者超時喚醒),
這也是存盤任務的workQueue不能是普通的佇列,而必須是阻塞佇列的原因,(對阻塞佇列作業原理不太清楚的讀者可以參考我以前的博客:自己動手實作一個阻塞佇列) - timed的值由兩方面共同決定,一是配置引數allowCoreThreadTimeOut是否為true,為true的話說明不管是核心執行緒還是非核心執行緒都需要在idle等待keepAliveTime后銷毀退出,所以allowCoreThreadTimeOut=true,則timed一定為true
二是如果allowCoreThreadTimeOut為false,說明核心執行緒不需要退出,而非核心執行緒在idle等待keepAliveTime后需要銷毀退出,則判斷當前workCount是否大于配置的corePoolSize,是的話則timed為true否則為false,
如果當前執行緒數超過了指定的最大核心執行緒數corePoolSize,則需要讓作業佇列為空時(說明執行緒池負載較低)部分idle執行緒退出,使得最侄訓躍的執行緒數減少到和corePoolSize一致,
從這里可以看到,核心與非核心執行緒的概念在ThreadPoolExecutor里是很弱的,不關心作業執行緒最初是以什么原因創建的都一視同仁,誰都可能被當作非核心執行緒而銷毀退出, - timedOut標識當前作業執行緒是否因為poll拉取任務時出現了超時,take永遠不會回傳null,因此只有poll在超時時會回傳null,當poll回傳值為null時,表明是等待了keepAliveTime時間后超時了,所以timedOut標識為true,
同時如果拉取任務時執行緒被中斷了,則捕獲InterruptedException例外,將timeOut標識為false(被中斷的就不認為是超時), - 當(workCount > maximumPoolSize)或者 (timed && timedOut)兩者滿足一個時,就說明當前執行緒應該要退出了,
此時將當前的workCount用cas的方式減去1,回傳null代表獲取任務失敗即可;如果cas失敗,則在for回圈中重試,
但有一種情況是例外的(workCount <= 1 && !workQueue.isEmpty()),即當前作業執行緒數量恰好為1,且作業佇列不為空(那么還需要當前執行緒繼續作業把作業佇列里的任務都消費掉,無論如何不能退出)
/**
* 嘗試著從阻塞佇列里獲得待執行的任務
* @return 回傳null代表作業佇列為空,沒有需要執行的任務; 或者當前worker執行緒滿足了需要退出的一些條件
* 回傳對應的任務
* */
private Runnable getTask() {
boolean timedOut = false;
for(;;) {
int currentCtl = ctl.get();
// 獲得當前作業執行緒個數
int workCount = workerCountOf(currentCtl);
// 有兩種情況需要指定超時時間的方式從阻塞佇列workQueue中獲取任務(即timed為true)
// 1.執行緒池配置引數allowCoreThreadTimeOut為true,即允許核心執行緒在idle一定時間后被銷毀
// 所以allowCoreThreadTimeOut為true時,需要令timed為true,這樣可以讓核心執行緒也在一定時間內獲取不到任務(idle狀態)而被銷毀
// 2.執行緒池配置引數allowCoreThreadTimeOut為false,但當前執行緒池中的執行緒數量workCount大于了指定的核心執行緒數量corePoolSize
// 說明當前有一些非核心的執行緒正在作業,而非核心的執行緒在idle狀態一段時間后需要被銷毀
// 所以此時也令timed為true,讓這些執行緒在keepAliveTime時間內由于佇列為空拉取不到任務而回傳null,將其銷毀
boolean timed = allowCoreThreadTimeOut || workCount > corePoolSize;
// 有共四種情況不需要往下執行,代表
// 1 (workCount > maximumPoolSize && workCount > 1)
// 當前作業執行緒個數大于了指定的maximumPoolSize(可能是由于啟動后通過setMaximumPoolSize調小了maximumPoolSize的值)
// 已經不符合執行緒池的配置引數約束了,要將多余的作業執行緒回收掉
// 且當前workCount > 1說明存在不止一個作業執行緒,意味著即使將當前作業執行緒回收后也還有其它作業執行緒能繼續處理作業佇列里的任務,直接回傳null表示自己需要被回收
// 2 (workCount > maximumPoolSize && workCount <= 1 && workQueue.isEmpty())
// 當前作業執行緒個數大于了指定的maximumPoolSize(maximumPoolSize被設定為0了)
// 已經不符合執行緒池的配置引數約束了,要將多余的作業執行緒回收掉
// 但此時workCount<=1,說明將自己這個作業執行緒回收掉后就沒有其它作業執行緒能處理作業佇列里剩余的任務了
// 所以即使maximumPoolSize設定為0,也需要等待任務被處理完,作業佇列為空之后才能回收當前執行緒,否則還會繼續拉取剩余任務
// 3 (workCount <= maximumPoolSize && (timed && timedOut) && workCount > 1)
// workCount <= maximumPoolSize符合要求
// 但是timed && timedOut,說明timed判定命中,需要以poll的方式指定超時時間,并且最近一次拉取任務超時了timedOut=true
// 進入新的一次回圈后timed && timedOut成立,說明當前worker執行緒處于idle狀態等待任務超過了規定的keepAliveTime時間,需要回收當前執行緒
// 且當前workCount > 1說明存在不止一個作業執行緒,意味著即使將當前作業執行緒回收后也還有其它作業執行緒能繼續處理作業佇列里的任務,直接回傳null表示自己需要被回收
// 4 (workCount <= maximumPoolSize && (timed && timedOut) && workQueue.isEmpty())
// workCount <= maximumPoolSize符合要求
// 但是timed && timedOut,說明timed判定命中,需要以poll的方式指定超時時間,并且最近一次拉取任務超時了timedOut=true
// 進入新的一次回圈后timed && timedOut成立,說明當前worker執行緒處于idle狀態等待任務超過了規定的keepAliveTime時間,需要回收當前執行緒
// 但此時workCount<=1,說明將自己這個作業執行緒回收掉后就沒有其它作業執行緒能處理作業佇列里剩余的任務了
// 所以即使timed && timedOut超時邏輯匹配,也需要等待任務被處理完,作業佇列為空之后才能回收當前執行緒,否則還會繼續拉取剩余任務
if ((workCount > maximumPoolSize || (timed && timedOut))
&& (workCount > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(currentCtl)) {
// 滿足上述條件,說明當前執行緒需要被銷毀了,回傳null
return null;
}
// compareAndDecrementWorkerCount方法由于并發的原因cas執行失敗,continue回圈重試
continue;
}
try {
// 根據上面的邏輯的timed標識,決定以什么方式從阻塞佇列中獲取任務
Runnable r = timed ?
// timed為true,通過poll方法指定獲取任務的超時時間(如果指定時間內沒有佇列依然為空,則回傳)
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
// timed為false,通過take方法無限期的等待阻塞佇列中加入新的任務
workQueue.take();
if (r != null) {
// 獲得了新的任務,getWork正常回傳對應的任務物件
return r;
}else{
// 否則說明timed=true,且poll拉取任務時超時了
timedOut = true;
}
} catch (InterruptedException retry) {
// poll or take任務等待時worker執行緒被中斷了,捕獲中斷例外
// timeout = false,標識拉取任務時沒有超時
timedOut = false;
}
}
}
processWorkerExit(處理作業執行緒退出)
在runWorker中,如果getTask方法沒有拿到任務回傳了null或者任務在執行時拋出了例外就會在最終的finally塊中呼叫processWorkerExit方法,令當前作業執行緒銷毀退出,
- processWorkerExit方法內會將當前執行緒占用的一些資源做清理,比如從workers中移除掉當前執行緒(利于Worker物件的GC),并令當前執行緒workerCount減一(completedAbruptly=true,說明是中斷導致的退出,getTask中沒來得及減workerCount,在這里補正)
- completedAbruptly=true,說明是runWorker中任務例外導致的執行緒退出,無條件的通過addWorker重新創建一個新的作業執行緒代替當前退出的作業執行緒,
- completedAbruptly=false,在退出當前作業執行緒后,需要判斷一下退出后當前所存活的作業執行緒數量是否滿足要求,
比如allowCoreThreadTimeOut=false時,當前作業執行緒個數是否不低于corePoolSize等,如果不滿足要求則通過addWorker重新創建一個新的執行緒,
作業執行緒退出時所占用資源的回收
- processWorkerExit方法執行完畢后,當前作業執行緒就完整的從當前執行緒池中退出了(workers中沒有了參考,workerCount減1了),GC便會將記憶體中的Worker物件所占用的記憶體給回收掉,
- 同時runWorker中最后執行完processWorkerExit后,作業執行緒的run方法也return了,標識著整個執行緒正常退出了,作業系統層面上也會將執行緒轉為終止態并最侄訓收,至此,執行緒占用的所有資源就被徹底的回收干凈了,
/**
* 處理worker執行緒退出
* @param myWorker 需要退出的作業執行緒物件
* @param completedAbruptly 是否是因為中斷例外的原因,而需要回收
* */
private void processWorkerExit(MyWorker myWorker, boolean completedAbruptly) {
if (completedAbruptly) {
// 如果completedAbruptly=true,說明是任務在run方法執行時出錯導致的執行緒退出
// 而正常退出時completedAbruptly=false,在getTask中已經將workerCount的值減少了
decrementWorkerCount();
}
ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 執行緒池全域總完成任務數累加上要退出的作業執行緒已完成的任務數
this.completedTaskCount += myWorker.completedTasks;
// workers集合中將當前作業執行緒剔除
workers.remove(myWorker);
// completedTaskCount是long型別的,workers是HashSet,
// 都是非執行緒安全的,所以在mainLock的保護進行修改
} finally {
mainLock.unlock();
}
int currentCtl = this.ctl.get();
if (!completedAbruptly) {
// completedAbruptly=false,說明不是因為中斷例外而退出的
// min標識當前執行緒池允許的最小執行緒數量
// 1 如果allowCoreThreadTimeOut為true,則核心執行緒也可以被銷毀,min=0
// 2 如果allowCoreThreadTimeOut為false,則min應該為所允許的核心執行緒個數,min=corePoolSize
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty()) {
// 如果min為0了,但作業佇列不為空,則修正min=1,因為至少需要一個作業執行緒來將作業佇列中的任務消費、處理掉
min = 1;
}
if (workerCountOf(currentCtl) >= min) {
// 如果當前作業執行緒數大于了min,當前執行緒數量是足夠的,直接回傳(否則要執行下面的addWorker恢復)
return;
}
}
// 兩種場景會走到這里進行addWorker操作
// 1 completedAbruptly=true,說明執行緒是因為中斷例外而退出的,需要重新創建一個新的作業執行緒
// 2 completedAbruptly=false,且上面的workerCount<min,則說明當前作業執行緒數不夠,需要創建一個
// 為什么引數core傳的是false呢?
// 因為completedAbruptly=true而中斷退出的執行緒,無論當前作業執行緒數是否大于核心執行緒,都需要創建一個新的執行緒來代替原有的被退出的執行緒
addWorker(null, false);
}
動態修改配置引數
ThreadPoolExecutor除了支持啟動前通過建構式設定配置引數外,也允許在執行緒池運行的程序中動態的更改配置,而要實作動態的修改配置,麻煩程度要比啟動前靜態的指定大得多,
舉個例子,在執行緒池的運行程序中如果當前corePoolSize=20,且已經創建了20個核心執行緒時(workerCount=20),現在將corePoolSize減少為10或者增大為30時應該如何實時的生效呢?
下面通過內嵌于代碼中的注釋,詳細的說明了allowCoreThreadTimeOut、corePoolSize、maximumPoolSize這三個關鍵配置引數實作動態修改的原理,
/**
* 設定是否允許核心執行緒idle超時后退出
* */
public void allowCoreThreadTimeOut(boolean value) {
if (value && keepAliveTime <= 0) {
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
}
// 判斷一下新舊值是否相等,避免無意義的volatile變數更新,導致不必要的cpu cache同步
if (value != allowCoreThreadTimeOut) {
allowCoreThreadTimeOut = value;
if (value) {
// 引數值value為true,說明之前不允許核心執行緒由于idle超時而退出
// 而此時更新為true說明現在允許了,則通過interruptIdleWorkers喚醒所有的idle執行緒
// 令其走一遍runWorker中的邏輯,嘗試著讓idle超時的核心執行緒及時銷毀
interruptIdleWorkers();
}
}
}
/**
* 動態更新核心執行緒最大值corePoolSize
* */
public void setCorePoolSize(int corePoolSize) {
if (corePoolSize < 0) {
throw new IllegalArgumentException();
}
// 計算差異
int delta = corePoolSize - this.corePoolSize;
// 賦值
this.corePoolSize = corePoolSize;
if (workerCountOf(this.ctl.get()) > corePoolSize) {
// 更新完畢后,發現當前作業執行緒數超過了指定的值
// 喚醒所有idle執行緒,讓目前空閑的idle超時的執行緒在workerCount大于maximumPoolSize時及時銷毀
interruptIdleWorkers();
} else if (delta > 0) {
// 差異大于0,代表著新值大于舊值
// We don't really know how many new threads are "needed".
// As a heuristic, prestart enough new workers (up to new
// core size) to handle the current number of tasks in
// queue, but stop if queue becomes empty while doing so.
// 我們無法確切的知道有多少新的執行緒是所需要的,
// 啟發式的預先啟動足夠的新作業執行緒用于處理作業佇列中的任務
// 但當執行此操作時作業佇列為空了,則立即停止此操作(佇列為空了說明當前負載較低,再創建更多的作業執行緒是浪費資源)
// 取差異和當前作業佇列中的最小值為k
int k = Math.min(delta, workQueue.size());
// 嘗試著一直增加新的作業執行緒,直到和k相同
// 這樣設計的目的在于控制增加的核心執行緒數量,不要一下子創建過多核心執行緒
// 舉個例子:原來的corePoolSize是10,且作業執行緒數也是10,現在新值設定為了30,新值比舊值大20,理論上應該直接創建20個核心作業執行緒
// 而作業佇列中的任務數只有10,那么這個時候直接創建20個新作業執行緒是沒必要的,只需要一個一個創建,在創建的程序中新的執行緒會盡量的消費作業佇列中的任務
// 這樣就可以以一種啟發性的方式創建合適的新作業執行緒,一定程度上節約資源,后面再有新的任務提交時,再從runWorker方法中去單獨創建核心執行緒(類似惰性創建)
while (k-- > 0 && addWorker(null, true)) {
if (workQueue.isEmpty()) {
// 其它作業執行緒在回圈的程序中也在消費作業執行緒,且用戶也可能不斷地提交任務
// 這是一個動態的程序,但一旦發現當前作業佇列為空則立即結束
break;
}
}
}
}
/**
* 動態更新最大執行緒數maximumPoolSize
* */
public void setMaximumPoolSize(int maximumPoolSize) {
if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) {
throw new IllegalArgumentException();
}
this.maximumPoolSize = maximumPoolSize;
if (workerCountOf(this.ctl.get()) > maximumPoolSize) {
// 更新完畢后,發現當前作業執行緒數超過了指定的值
// 喚醒所有idle執行緒,讓目前空閑的idle超時的執行緒在workerCount大于maximumPoolSize時及時銷毀
interruptIdleWorkers();
}
}
目前為止,通過v1版本的MyThreadPoolExecutor原始碼,已經將jdk執行緒池ThreadPoolExecutor在RUNNING狀態下提交任務,啟動作業執行緒執行任務相關的核心邏輯講解完畢了(不考慮優雅停止),
jdk執行緒池默認支持的四種拒絕策略
jdk執行緒池支持用戶傳入自定義的拒絕策略處理器,只需要傳入實作了RejectedExecutionHandler介面的物件就行,
而jdk在ThreadPoolExecutor中提供了默認的四種拒絕策略方便用戶使用,
- AbortPolicy
拒絕接受任務時會拋出RejectedExecutionException,能讓提交任務的一方感知到例外的策略,適用于大多數場景,也是jdk默認的拒絕策略, - DiscardPolicy
直接丟棄任務的拒絕策略,簡單的直接丟棄任務,適用于對任務執行成功率要求不高的場合 - DiscardOldestPolicy
丟棄當前作業佇列中最早入隊的任務,然后將當前任務重新提交,適用于后出現的任務能夠完全代替之前任務的場合(追求最終一致性) - CallerRunsPolicy
令呼叫者執行緒自己執行所提交任務的拒絕策略,在執行緒池壓力過大時,讓提交任務的執行緒自己執行該任務(異步變同步),能有效地降低執行緒池的壓力,也不會丟失任務,但可能導致整體業務吞吐量大幅降低,
上面介紹的四種jdk默認拒絕策略分別適應不同的業務場景,需要用戶仔細考慮最適合的拒絕策略,同時靈活的、基于介面的設計也開放的支持用戶去自己實作更貼合自己業務的拒絕策略處理器,
/**
* 默認的拒絕策略:AbortPolicy
* */
private static final MyRejectedExecutionHandler defaultHandler = new MyAbortPolicy();
/**
* 拋出RejectedExecutionException的拒絕策略
* 評價:能讓提交任務的一方感知到例外的策略,比較通用,也是jdk默認的拒絕策略
* */
public static class MyAbortPolicy implements MyRejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable command, MyThreadPoolExecutor executor) {
// 直接拋出例外
throw new RejectedExecutionException("Task " + command.toString() +
" rejected from " + executor.toString());
}
}
/**
* 令呼叫者執行緒自己執行command任務的拒絕策略
* 評價:在執行緒池壓力過大時,讓提交任務的執行緒自己執行該任務(異步變同步),
* 能夠有效地降低執行緒池的壓力,也不會丟失任務,但可能導致整體業務吞吐量大幅降低
* */
public static class MyCallerRunsPolicy implements MyRejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable command, MyThreadPoolExecutor executor) {
if (!executor.isShutdown()) {
// 如果當前執行緒池不是shutdown狀態,則令呼叫者執行緒自己執行command任務
command.run();
}else{
// 如果已經是shutdown狀態了,就什么也不做直接丟棄任務
}
}
}
/**
* 直接丟棄任務的拒絕策略
* 評價:簡單的直接丟棄任務,適用于對任務執行成功率要求不高的場合
* */
public static class MyDiscardPolicy implements MyRejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable command, MyThreadPoolExecutor executor) {
// 什么也不做的,直接回傳
// 效果就是command任務被無聲無息的丟棄了,沒有例外
}
}
/**
* 丟棄當前作業佇列中最早入隊的任務,然后將當前任務重新提交
* 評價:適用于后出現的任務能夠完全代替之前任務的場合(追求最終一致性)
* */
public static class MyDiscardOldestPolicy implements MyRejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable command, MyThreadPoolExecutor executor) {
if (!executor.isShutdown()) {
// 如果當前執行緒池不是shutdown狀態,則丟棄當前作業佇列中最早入隊的任務,然后將當前任務重新提交
executor.getQueue().poll();
executor.execute(command);
}else{
// 如果已經是shutdown狀態了,就什么也不做直接丟棄任務
}
}
}
jdk默認的四種執行緒池實作
jdk中除了提供了默認的拒絕策略,還在Executors類中提供了四種基于ThreadPoolExecutor的、比較常用的執行緒池,以簡化用戶對執行緒池的使用,
這四種執行緒池可以通過Executors提供的public方法來分別創建:
newFixedThreadPool
newFixedThreadPool方法創建一個作業執行緒數量固定的執行緒池,其創建ThreadPoolExecutor時傳入的核心執行緒數corePoolSize和最大執行緒數maximumPoolSize是相等的,
因此其作業佇列傳入是一個無界的LinkedBlockingQueue,無界的作業佇列意味著永遠都不會創建新的非核心執行緒,
在默認allowCoreThreadTimeOut為false的情況下,執行緒池中的所有執行緒都是不會因為idle超時而銷毀的核心執行緒,
適用場景:由于作業執行緒數量固定,“fixedThreadPool”適用于任務流量較為穩定的場景
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
newCachedThreadPool
newCachedThreadPool方法創建一個作業執行緒數量有巨大彈性的執行緒池,其核心執行緒數corePoolSize=0而最大執行緒數maximumPoolSize為Integer.MAX_VALUE,60s的保活時間,
同時其作業佇列是SynchronousQueue,是一種佇列容量為0、無法快取任何任務的阻塞佇列(任何時候插入資料(offer)時必須有消費者執行緒消費,否則生產者執行緒將會被阻塞),
這也意味著“cachedThreadPool”中沒有核心執行緒,所有作業執行緒在任務負載較低時都會在60s的idle后被銷毀;同時當負載較高,新任務到來時由于所有的作業執行緒都在執行其它任務,將會立即創建一個新的非核心執行緒來處理任務,
適用場景:由于可以無限制的創建新執行緒來做到及時回應任務,“cachedThreadPool”適用于任務流量較大且不穩定,對任務延遲容忍度較低的場景
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
newSingleThreadExecutor
newSingleThreadExecutor方法創建一個單執行緒的執行緒池,其核心執行緒數corePoolSize=1且最大執行緒數maximumPoolSize也為1,其作業佇列是無界佇列,
這意味著“singleThreadExecutor”中任何提交的任務都將嚴格按照先入先出的順序被執行,
適用場景:“singleThreadExecutor”適用于任務量較小、對任務延遲容忍度較高、并要求任務順序執行的場景,
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
newScheduledThreadPool
newScheduledThreadPool方法創建一個支持定時任務、延遲任務執行的執行緒池(關于jdk定時任務執行緒池ScheduledThreadPoolExecutor的作業原理會在未來的博客中展開)
適用場景:“scheduledThreadPool”適用于需要任務定時或者延遲執行的場景,
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
jdk默認提供的執行緒池的缺陷
- 無論是newCachedThreadPool還是newFixedThreadPool、newSingleThreadExecutor,其設定的最大執行緒數量(Integer.MAX_VALUE)和無界的作業佇列(new LinkedBlockingQueue
())都缺乏必要的限制,
在生產環境中很容易因為任務流量過大導致創建過多的作業執行緒或令無界的作業佇列堆積大量的任務物件而耗盡CPU和記憶體等系統資源,最終導致程式崩潰,
這也是為什么阿里巴巴的開發規范中推薦使用更基礎的ThreadPoolExecutor建構式來創建所需要的執行緒池, - 只有在了解ThreadPoolExecutor作業原理以及各項配置引數的具體作用后,才能根據具體的業務和硬體配置來設定最合適的引數值,
總結
- 這篇博客中我們首先介紹了執行緒池的基本概念,隨后在原始碼層面決議了jdk默認的執行緒池ThreadPoolExecutor在執行所提交任務的整體作業原理(RUNNING狀態),
并在最后簡單的分析了jdk默認提供的四種拒絕策略和四種執行緒池的適用場景, - 希望通過這篇博客能讓讀者更好的理解執行緒池的作業原理,并在作業中更好的使用執行緒池,關于ThreadPoolExecutor優雅停止的原理會在下一篇博客中進行詳細的分析,盡請期待,
- 本篇博客的完整代碼在我的github上:https://github.com/1399852153/Reinventing-the-wheel-for-learning(ThreadPool模塊) 內容如有錯誤,還請多多指教,
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/531414.html
標籤:Java
上一篇:RabbitMQ學習筆記
下一篇:淺談PHP設計模式的原型模式
