尊重原創著作權: https://www.gewuweb.com/hot/6051.html
Java十年老程式員分享:ThreadPoolExcutor 原理探究
尊重原創著作權: https://www.gewuweb.com/sitemap.html
概論
執行緒池(英語:thread
pool):一種執行緒使用模式,執行緒過多會帶來調度開銷,進而影響快取區域性和整體性能,而執行緒池維護著多個執行緒,等待著監督管理者分配可并發執行的任務,這避免了在處理短時間任務時創建與銷毀執行緒的代價,執行緒池不僅能夠保證內核的充分利用,還能防止過分調度,可用執行緒數量應該取決于可用的并發處理器、處理器內核、記憶體、網路
sockets 等的數量, 例如,執行緒數一般取 cpu 數量 +2 比較合適,執行緒數過多會導致額外的執行緒切換開銷,
Java 中的執行緒池是用 ThreadPoolExecutor 類來實作的. 本文就對該類的原始碼來分析一下這個類內部對于執行緒的創建,
管理以及后臺任務的調度等方面的執行原理,
先看一下執行緒池的類圖:
上圖的目的主要是為了讓大家知道執行緒池相關類之間的關系,至少賺個眼熟,以后看到不會有害怕的感覺,
Executor 框架介面
Executor 框架是一個根據一組執行策略呼叫,調度,執行和控制的異步任務的框架,目的是提供一種將”任務提交”與”任務如何運行”分離開來的機制,
下面是 ThreadPoolExeCutor 類圖,Executors 其實是一個工具類,里面提供了好多靜態方法,這些方法根據用戶選擇回傳不同的執行緒實體,
從上圖也可以看出來,ThreadPoolExeCutor 是執行緒池的核心,
J.U.C 中有三個 Executor 介面:
- ** Executor ** :一個運行新任務的簡單介面;
- ** ExecutorService ** :擴展了 Executor 介面,添加了一些用來管理執行器生命周期和任務生命周期的方法;
- ** ScheduledExecutorService ** :擴展了 ExecutorService,支持 Future 和定期執行任務,
其實通過這些介面就可以看到一些設計思想,每個介面的名字和其任務是完全匹配的,不會因為 Executor
中只有一個方法,就將其放到其他介面中,這也是很重要的單一原則,
ThreadPoolExeCutor 分析
再去具體分析 ThreadPoolExeCutor 運行邏輯前,先看下面的流程圖:
該圖是 ThreadPoolExeCutor 整個運行程序的一個概括,整個原始碼的核心邏輯總結起來就是:
- ** 創建執行緒: ** 要知道如何去創建執行緒,控制執行緒數量,執行緒的存活與銷毀;
- ** 添加任務: ** 任務添加后如何處理,是立刻執行,還是先保存;
- ** 執行任務: ** 如何獲取任務,任務執行失敗后如何處理?
下面將進入原始碼分析,來深入理解 ThreadPoolExeCutor 的設計思想,
建構式
先來看建構式:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
// 注意 workQueue, threadFactory, handler 是不可以為null 的,為慷訓直接拋出錯誤
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
- ** corePoolSize 核心執行緒數 : ** 表示核心執行緒池的大小,當提交一個任務時,如果當前核心執行緒池的執行緒個數沒有達到 corePoolSize,則會創建新的執行緒來執行所提交的任務,即使當前核心執行緒池有空閑的執行緒,如果當前核心執行緒池的執行緒個數已經達到了corePoolSize,則不再重新創建執行緒,如果呼叫了 prestartCoreThread() 或者 prestartAllCoreThreads() ,執行緒池創建的時候所有的核心執行緒都會被創建并且啟動,若 corePoolSize == 0,則任務執行完之后,沒有任何請求進入時,銷毀執行緒池的執行緒,若 corePoolSize > 0,即使本地任務執行完畢,核心執行緒也不會被銷毀,corePoolSize 其實可以理解為可保留的空閑執行緒數,
- ** maximumPoolSize: ** 表示執行緒池能夠容納同時執行的最大執行緒數,如果當阻塞佇列已滿時,并且當前執行緒池執行緒個數沒有超過 maximumPoolSize 的話,就會創建新的執行緒來執行任務,注意 maximumPoolSize >= 1 必須大于等于 1,maximumPoolSize == corePoolSize ,即是固定大小執行緒池, 實際上最大容量是由 CAPACITY 控制 ,
- ** keepAliveTime: ** 執行緒空閑時間,當空閑時間達到 keepAliveTime值時,執行緒會被銷毀,直到只剩下 corePoolSize 個執行緒為止,避免浪費記憶體和句柄資源,默認情況,當執行緒池的執行緒數 > corePoolSize 時,keepAliveTime 才會起作用,但當 ThreadPoolExecutor 的 allowCoreThreadTimeOut 變數設定為 true 時,核心執行緒超時后會被回收,
- ** unit : ** 時間單位,為 keepAliveTime 指定時間單位,
- ** workQueue ** 快取佇列,當請求的執行緒數 > corePoolSize 時,執行緒進入 BlockingQueue 阻塞佇列,可以使用 ArrayBlockingQueue, LinkedBlockingQueue, SynchronousQueue, PriorityBlockingQueue,
- ** threadFactory ** 創建執行緒的工程類,可以通過指定執行緒工廠為每個創建出來的執行緒設定更有意義的名字,如果出現并發問題,也方便查找問題原因,
- ** handler ** 執行拒絕策略的物件,當執行緒池的阻塞佇列已滿和指定的執行緒都已經開啟,說明當前執行緒池已經處于飽和狀態了,那么就需要采用一種策略來處理這種情況,采用的策略有這幾種:
* AbortPolicy : 直接拒絕所提交的任務,并拋出 **RejectedExecutionException** 例外;
* CallerRunsPolicy :只用呼叫者所在的執行緒來執行任務;
* DiscardPolicy :不處理直接丟棄掉任務;
* DiscardOldestPolicy :丟棄掉阻塞佇列中存放時間最久的任務,執行當前任務
屬性定義
看完建構式之后,再來看下該類里面的變數,有助于進一步理解整個代碼運行邏輯,下面是一些比較重要的變數:
[-1]原 = 1000 0001
這里需要對一些操作做些解釋,
- ** Integer.SIZE: ** 對于不同平臺,其位數不一樣,目前常見的是 32 位;
- ** (1 << COUNT_BITS) - 1: ** 首先是將 1 左移 COUNT_BITS 位,也就是第 COUNT_BITS + 1 位是1,其余都是 0;-1 操作則是將后面前面的 COUNT_BITS 位都變成 1,
- ** -1 << COUNT_BITS: ** -1 的原碼是 10000000 00000000 00000000 00000001 ,反碼是 111111111 11111111 11111111 11111110 ,補碼 +1,然后左移 29 位是 11100000 00000000 00000000 00000000;這里轉為十進制是負數,
- ** ~CAPACITY : ** 取反,最高三位是1;
總結:這里巧妙利用 bit 操作來將執行緒數量和運行狀態聯系在一起,減少了變數的存在和記憶體的占用,其中五種狀態的十進制排序: ** RUNNING <
SHUTDOWN < STOP < TIDYING < TERMINATED **
執行緒池狀態
** 執行緒池狀態含義: **
- ** RUNNING: ** 接受新任務并且處理阻塞佇列里的任務;
- ** SHUTDOWN: ** 拒絕新任務但是處理阻塞佇列里的任務;
- ** STOP: ** 拒絕新任務并且拋棄阻塞佇列里的任務同時會中斷正在處理的任務;
- ** TIDYING: ** 所有任務都執行完(包含阻塞佇列里面任務)當前執行緒池活動執行緒為 0,將要呼叫 terminated 方法
- ** TERMINATED: ** 終止狀態,terminated 方法呼叫完成以后的狀態;
** 執行緒池狀態轉換: **
- ** RUNNING -> SHUTDOWN: ** 顯式呼叫 shutdown() 方法,或者隱式呼叫了 finalize(),它里面呼叫了shutdown()方法,
- ** RUNNING or SHUTDOWN)-> STOP: ** 顯式 shutdownNow() 方法;
- ** SHUTDOWN -> TIDYING: ** 當執行緒池和任務佇列都為空的時候;
- ** STOP -> TIDYING: ** 當執行緒池為空的時候;
- ** TIDYING -> TERMINATED: ** 當 terminated() hook 方法執行完成時候;
原碼,反碼,補碼知識小劇場:
** 1. 原碼: ** 原碼就是符號位加上真值的絕對值, 即用第一位表示符號,其余位表示值. 比如如果是 8 位二進制:
[+1]原 = 0000 0001
[-1]原 = 1000 0001
負數原碼第一位是符號位.
** 2. 反碼: ** 反碼的表示方法是,正數的反碼是其本身,負數的反碼是在其原碼的基礎上, 符號位不變,其余各個位取反.
[+1] = [0000 0001]原 = [0000 0001]反
[-1] = [1000 0001]原 = [1111 1110]反
** 3. 補碼: ** 補碼的表示方法是,正數的補碼就是其本身,負數的補碼是在其原碼的基礎上, 符號位不變, 其余各位取反, 最后 +1.
(即在反碼的基礎上 +1)
[+1] = [0000 0001]原 = [0000 0001]反 = [0000 0001]補
[-1] = [1000 0001]原 = [1111 1110]反 = [1111 1111]補
4. 總結
在知道一個數原碼的情況下:
正數: ** 反碼,補碼 就是本身自己 **
負數: ** 反碼是高位符號位不變,其余位取反,補碼:反碼+1 **
** 5. 左移: ** 當數值左、右移時,先將數值轉化為其補碼形式,移完后,再轉換成對應的原碼
左移:高位丟棄,低位補零
[+1] = [00000001]補
[0000 0001]補 << 1 = [0000 0010]補 = [0000 0010]原 = [+2]
[-1] = [1000 0001]原 = [1111 1111]補
[1111 1111]補 << 1 = [1111 1110]補 = [1000 0010]原 = [-2]
其中,再次提醒,負數的補碼是反碼+1;負數的反碼是補碼-1;
** 6. 右移: ** 高位保持不變,低位丟棄
[+127] = [0111 1111]原 = [0111 1111]補
[0111 1111]補 >> 1 = [0011 1111]補 = [0011 1111]原 = [+63]
[-127] = [1111 1111]原 = [1000 0001]補
[1000 0001]補 >> 1 = [1100 0000]補 = [1100 0000]元 = [-64]
execute 方法分析
通過 ThreadPoolExecutor 創建執行緒池后,提交任務后執行程序是怎樣的,下面來通過原始碼來看一看,execute 方法原始碼如下:
execute 方法執行邏輯有這樣幾種情況:
- 如果當前運行的執行緒少于 corePoolSize,則會創建新的執行緒來執行新的任務;
- 如果運行的執行緒個數等于或者大于 corePoolSize,則會將提交的任務存放到阻塞佇列 workQueue 中;
- 如果當前 workQueue 佇列已滿的話,則會創建新的執行緒來執行任務;
- 如果執行緒個數已經超過了 maximumPoolSize,則會使用飽和策略 RejectedExecutionHandler 來進行處理,
這里要注意一下 addWorker(null, false) 也就是創建一個執行緒,但并沒有傳入任務,因為任務已經被添加到 workQueue 中了,所以
worker 在執行的時候,會直接從 workQueue 中獲取任務,所以,在 workerCountOf(recheck) == 0 時執行
addWorker(null, false) 也是為了保證執行緒池在 RUNNING 狀態下必須要有一個執行緒來執行任務,
需要注意的是,執行緒池的設計思想就是使用了 核心執行緒池 corePoolSize,阻塞佇列 workQueue 和執行緒池
maximumPoolSize ,這樣的快取策略來處理任務,實際上這樣的設計思想在需要框架中都會使用,
需要注意執行緒和任務之間的區別,任務是保存在 workQueue 中的,執行緒是從執行緒池里面取的,由 CAPACITY 控制容量,
addWorker 方法分析
addWorker 方法的主要作業是在執行緒池中創建一個新的執行緒并執行,firstTask 引數用于指定新增的執行緒執行的第一個任務,core 引數為 true
表示在新增執行緒時會判斷當前活動執行緒數是否少于 corePoolSize,false 表示新增執行緒前需要判斷當前活動執行緒數是否少于
maximumPoolSize,代碼如下:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
// 獲取運行狀態
int rs = runStateOf(c);
/*
* 這個if判斷
* 如果rs >= SHUTDOWN,則表示此時不再接收新任務;
* 接著判斷以下3個條件,只要有1個不滿足,則回傳false:
* 1. rs == SHUTDOWN,這時表示關閉狀態,不再接受新提交的任務,但卻可以繼續處理阻塞佇列中已保存的任務
* 2. firsTask為空
* 3. 阻塞佇列不為空
*
* 首先考慮rs == SHUTDOWN的情況
* 這種情況下不會接受新提交的任務,所以在firstTask不為空的時候會回傳false;
* 然后,如果firstTask為空,并且workQueue也為空,則回傳false,
* 因為佇列中已經沒有任務了,不需要再添加執行緒了
*/
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 獲取執行緒數
int wc = workerCountOf(c);
// 如果wc超過CAPACITY,也就是ctl的低29位的最大值(二進制是29個1),回傳false;
// 這里的core是addWorker方法的第二個引數,如果為true表示根據corePoolSize來比較,
// 如果為false則根據maximumPoolSize來比較,
//
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 嘗試增加workerCount,如果成功,則跳出第一個for回圈
if (compareAndIncrementWorkerCount(c))
break retry;
// 如果增加workerCount失敗,則重新獲取ctl的值
c = ctl.get(); // Re-read ctl
// 如果當前的運行狀態不等于rs,說明狀態已被改變,回傳第一個for回圈繼續執行
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 根據firstTask來創建Worker物件
w = new Worker(firstTask);
// 每一個Worker物件都會創建一個執行緒
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// rs < SHUTDOWN表示是RUNNING狀態;
// 如果rs是RUNNING狀態或者rs是SHUTDOWN狀態并且firstTask為null,向執行緒池中添加執行緒,
// 因為在SHUTDOWN時不會在添加新的任務,但還是會執行workQueue中的任務
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// workers是一個HashSet
workers.add(w);
int s = workers.size();
// largestPoolSize記錄著執行緒池中出現過的最大執行緒數量
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 啟動執行緒
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
這里需要注意有以下幾點:
- 在獲取鎖后重新檢查執行緒池的狀態,這是因為其他執行緒可可能在本方法獲取鎖前改變了執行緒池的狀態,比如呼叫了shutdown方法,添加成功則啟動任務執行,
- t.start() 會呼叫 Worker 類中的 run 方法,Worker 本身實作了 Runnable 介面,原因在創建執行緒得時候,將 Worker 實體傳入了 t 當中,可參見 Worker 類的建構式,
- wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) 每次呼叫 addWorker 來添加執行緒會先判斷當前執行緒數是否超過了CAPACITY,然后再去判斷是否超 corePoolSize 或 maximumPoolSize,說明執行緒數實際上是由 CAPACITY 來控制的,
內部類 Worker 分析
上面分析程序中,提到了一個 Worker 類,對于某些對原始碼不是很熟悉得同學可能有點不清楚,下面就來看看 Worker 的原始碼:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 注意此處傳入的是this
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker. */
// 這里其實會呼叫外部的 runWorker 方法來執行自己,
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
// 如果已經設定過1了,這時候在設定1就會回傳false,也就是不可重入
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
// 提供安全中斷執行緒得方法
void interruptIfStarted() {
Thread t;
// 一開始 setstate(-1) 避免了還沒開始運行就被中斷可能
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
首先看到的是 Worker 繼承了(AbstractQueuedSynchronizer) AQS,并實作了 Runnable 介面,說明 Worker
本身也是執行緒,然后看其建構式可以發現,內部有兩個屬性變數分別是 Runnable 和 Thread
實體,該類其實就是對傳進來得屬性做了一個封裝,并加入了獲取鎖的邏輯(繼承了 AQS ),具體可參考文章:透過 ReentrantLock 分析 AQS
的實作原理
Worker 繼承了 AQS,使用 AQS 來實作獨占鎖的功能,為什么不使用 ReentrantLock 來實作呢?可以看到 tryAcquire
方法,它是不允許重入的,而 ReentrantLock 是允許重入的:
- lock 方法一旦獲取了獨占鎖,表示當前執行緒正在執行任務中;
- 如果正在執行任務,則不應該中斷執行緒;
- 如果該執行緒現在不是獨占鎖的狀態,也就是空閑的狀態,說明它沒有在處理任務,這時可以對該執行緒進行中斷;
- 執行緒池在執行 shutdown 方法或 tryTerminate 方法時會呼叫 interruptIdleWorkers 方法來中斷空閑的執行緒,interruptIdleWorkers 方法會使用 tryLock 方法來判斷執行緒池中的執行緒是否是空閑狀態;
- 之所以設定為不可重入,是因為我們不希望任務在呼叫像 setCorePoolSize 這樣的執行緒池控制方法時重新獲取鎖,如果使用 ReentrantLock,它是可重入的,這樣如果在任務中呼叫了如 setCorePoolSize 這類執行緒池控制的方法,會中斷正在運行的執行緒,因為 size 小了,需要中斷一些執行緒 ,
所以,Worker 繼承自 AQS,用于判斷執行緒是否空閑以及是否可以被中斷,
此外,在構造方法中執行了 setState(-1); ,把 state 變數設定為 -1,為什么這么做呢?是因為 AQS 中默認的 state 是
0,如果剛創建了一個 Worker 物件,還沒有執行任務時,這時就不應該被中斷,看一下 tryAquire 方法:
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
正因為如此,在 runWorker 方法中會先呼叫 Worker 物件的 unlock 方法將 state 設定為 0,tryAcquire 方法是根據
state 是否是 0 來判斷的,所以, setState(-1); 將 state 設定為 -1 是為了禁止在執行任務前對執行緒進行中斷,
runWorker 方法分析
前面提到了內部類 Worker 的 run 方法呼叫了外部類 runWorker,下面來看下 runWork 的具體邏輯,
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // status 設定為0,允許中斷,也可以避免再次加鎖失敗
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
// 要派發task的時候,需要上鎖
w.lock();
// 如果執行緒池當前狀態至少是stop,則設定中斷標志;
// 如果執行緒池當前狀態是RUNNININ,則重置中斷標志,重置后需要重新
//檢查下執行緒池狀態,因為當重置中斷標志時候,可能呼叫了執行緒池的shutdown方法
//改變了執行緒池狀態,
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//任務執行前干一些事情
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();//執行任務
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//任務執行完畢后干一些事情
afterExecute(task, thrown);
}
} finally {
task = null;
//統計當前worker完成了多少個任務
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//執行清了作業
processWorkerExit(w, completedAbruptly);
}
}
總結一下 runWorker 方法的執行程序:
- while 回圈不斷地通過 getTask() 方法從阻塞佇列中取任務;
- 如果執行緒池正在停止,那么要保證當前執行緒是中斷狀態,否則要保證當前執行緒不是中斷狀態;
- 呼叫 task.run() 執行任務;
- 如果 task 為 null 則跳出回圈,執行 processWorkerExit 方法;
- runWorker 方法執行完畢,也代表著 Worker 中的 run 方法執行完畢,銷毀執行緒,
這里的 beforeExecute 方法和 afterExecute 方法在 ThreadPoolExecutor 類中是空的,留給子類來實作,
completedAbruptly 變數來表示在執行任務程序中是否出現了例外,在 processWorkerExit 方法中會對該變數的值進行判斷,
getTask 方法分析
getTask 方法是從阻塞佇列里面獲取任務,具體代碼邏輯如下:
private Runnable getTask() {
// timeOut變數的值表示上次從阻塞佇列中取任務時是否超時
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
/*
* 如果執行緒池狀態rs >= SHUTDOWN,也就是非RUNNING狀態,再進行以下判斷:
* 1. rs >= STOP,執行緒池是否正在stop;
* 2. 阻塞佇列是否為空,
* 如果以上條件滿足,則將workerCount減1并回傳null,
* 因為如果當前執行緒池狀態的值是SHUTDOWN或以上時,不允許再向阻塞佇列中添加任務,
*/
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
// timed變數用于判斷是否需要進行超時控制,
// allowCoreThreadTimeOut默認是false,也就是核心執行緒不允許進行超時;
// wc > corePoolSize,表示當前執行緒池中的執行緒數量大于核心執行緒數量;
// 對于超過核心執行緒數量的這些執行緒,需要進行超時控制
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/*
* wc > maximumPoolSize的情況是因為可能在此方法執行階段同時執行了setMaximumPoolSize方法;
* timed && timedOut 如果為true,表示當前操作需要進行超時控制,并且上次從阻塞佇列中獲取任務發生了超時
* 接下來判斷,如果有效執行緒數量大于1,或者阻塞佇列是空的,那么嘗試將workerCount減1;
* 如果減1失敗,則回傳重試,
* 如果wc == 1時,也就說明當前執行緒是執行緒池中唯一的一個執行緒了,
*/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
/*
* 根據timed來判斷,如果為true,則通過阻塞佇列的poll方法進行超時控制,如果在keepAliveTime時間內沒有獲取到任務,則回傳null;
* 否則通過take方法,如果這時佇列為空,則take方法會阻塞直到佇列不為空,
*
*/
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
// 如果 r == null,說明已經超時,timedOut設定為true
timedOut = true;
} catch (InterruptedException retry) {
// 如果獲取任務時當前執行緒發生了中斷,則設定timedOut為false并回傳回圈重試
timedOut = false;
}
}
}
其實到這里后,你會發現在 ThreadPoolExcute 內部有幾個重要的檢驗:
- ** 判斷當前的運行狀態 ** ,根據運行狀態來做處理,如果當前都停止運行了,那很多操作也就沒必要了;
- ** 判斷當前執行緒池的數量 ** ,然后將該資料和 corePoolSize 以及 maximumPoolSize 進行比較,然后再去決定下一步該做啥;
首先是第一個 if 判斷,當運行狀態處于非 RUNNING 狀態,此外 rs >= STOP(執行緒池是否正在 stop)或阻塞佇列是否為空,則將
workerCount 見 1 并回傳 null,為什么要減 1 呢,因為此處其實是去獲取一個
task,但是發現處于停止狀態了,也就是沒必要再去獲取運行任務了,那這個執行緒就沒有存在的意義了,后續也會在 processWorkerExit
將該執行緒移除,
第二個 if 條件目的是控制執行緒池的有效執行緒數量,由上文中的分析可以知道,在執行 execute 方法時,如果當前執行緒池的執行緒數量超過了
corePoolSize 且小于 maximumPoolSize,并且 workQueue 已滿時,則可以增加作業執行緒,但這時如果超時沒有獲取到任務,也就是
timedOut 為 true 的情況,說明 workQueue 已經為空了,也就說明了當前執行緒池中不需要那么多執行緒來執行任務了,可以把多于
corePoolSize 數量的執行緒銷毀掉,保持執行緒數量在 corePoolSize 即可,
什么時候會銷毀?當然是 runWorker 方法執行完之后,也就是 Worker 中的 run 方法執行完,由 JVM 自動回收,
getTask 方法回傳 null 時,在 runWorker 方法中會跳出 while 回圈,然后會執行 processWorkerExit 方法,
processWorkerExit 方法
下面再看 processWorkerExit 方法的具體邏輯:
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果completedAbruptly值為true,則說明執行緒執行時出現了例外,需要將workerCount減1;
// 如果執行緒執行時沒有出現例外,說明在getTask()方法中已經已經對workerCount進行了減1操作,這里就不必再減了,
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//統計完成的任務數
completedTaskCount += w.completedTasks;
// 從workers中移除,也就表示著從執行緒池中移除了一個作業執行緒
workers.remove(w);
} finally {
mainLock.unlock();
}
// 根據執行緒池狀態進行判斷是否結束執行緒池
tryTerminate();
int c = ctl.get();
/*
* 當執行緒池是RUNNING或SHUTDOWN狀態時,如果worker是例外結束,那么會直接addWorker;
* 如果allowCoreThreadTimeOut=true,并且等待佇列有任務,至少保留一個worker;
* 如果allowCoreThreadTimeOut=false,workerCount不少于corePoolSize,
*/
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
至此,processWorkerExit 執行完之后,作業執行緒被銷毀,以上就是整個作業執行緒的生命周期,但是這有兩點需要注意:
- 大家想想什么時候才會呼叫這個方法,任務干完了才會呼叫,那么沒事做了,就需要看下是否有必要結束執行緒池,這時候就會呼叫 tryTerminate,
- 如果此時執行緒處于 STOP 狀態以下,那么就會判斷核心執行緒數是否達到了規定的數量,沒有的話,就會繼續創建一個執行緒,
tryTerminate方法
tryTerminate 方法根據執行緒池狀態進行判斷是否結束執行緒池,代碼如下:
final void tryTerminate() {
for (;;) {
int c = ctl.get();
/*
* 當前執行緒池的狀態為以下幾種情況時,直接回傳:
* 1. RUNNING,因為還在運行中,不能停止;
* 2. TIDYING或TERMINATED,因為執行緒池中已經沒有正在運行的執行緒了;
* 3. SHUTDOWN并且等待佇列非空,這時要執行完workQueue中的task;
*/
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 如果執行緒數量不為0,則中斷一個空閑的作業執行緒,并回傳
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 這里嘗試設定狀態為TIDYING,如果設定成功,則呼叫terminated方法
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// terminated方法默認什么都不做,留給子類實作
terminated();
} finally {
// 設定狀態為TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
interruptIdleWorkers( boolean onlyOne ) 如果 ONLY_ONE = true 那么就的最多
讓一個空閑執行緒發生中斷,ONLY_ONE = false 時是所有空閑執行緒都會發生中斷,那執行緒什么時候會處于空閑狀態呢?
一是執行緒數量很多,任務都完成了;二是執行緒在 getTask 方法中執行 workQueue.take() 時,如果不執行中斷會一直阻塞,
所以每次在作業執行緒結束時呼叫 tryTerminate 方法來嘗試中斷一個空閑作業執行緒,避免在佇列為空時取任務一直阻塞的情況,
shutdown方法
shutdown 方法要將執行緒池切換到 SHUTDOWN 狀態,并呼叫 interruptIdleWorkers 方法請求中斷所有空閑的
worker,最后呼叫 tryTerminate 嘗試結束執行緒池,
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 安全策略判斷
checkShutdownAccess();
// 切換狀態為SHUTDOWN
advanceRunState(SHUTDOWN);
// 中斷空閑執行緒
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 嘗試結束執行緒池
tryTerminate();
}
這里思考一個問題:在 runWorker 方法中,執行任務時對 Worker 物件 w 進行了 lock
操作,為什么要在執行任務的時候對每個作業執行緒都加鎖呢?
下面仔細分析一下:
- 在 getTask 方法中,如果這時執行緒池的狀態是 SHUTDOWN 并且 workQueue 為空,那么就應該回傳 null 來結束這個作業執行緒,而使執行緒池進入 SHUTDOWN 狀態需要呼叫shutdown 方法;
- shutdown 方法會呼叫 interruptIdleWorkers 來中斷空閑的執行緒,interruptIdleWorkers 持有 mainLock,會遍歷 workers 來逐個判斷作業執行緒是否空閑,但 getTask 方法中沒有mainLock;
- 在 getTask 中,如果判斷當前執行緒池狀態是 RUNNING,并且阻塞佇列為空,那么會呼叫 workQueue.take() 進行阻塞;
- 如果在判斷當前執行緒池狀態是 RUNNING 后,這時呼叫了 shutdown 方法把狀態改為了 SHUTDOWN,這時如果不進行中斷,那么當前的作業執行緒在呼叫了 workQueue.take() 后會一直阻塞而不會被銷毀,因為在 SHUTDOWN 狀態下不允許再有新的任務添加到 workQueue 中,這樣一來執行緒池永遠都關閉不了了;
- 由上可知,shutdown 方法與 getTask 方法(從佇列中獲取任務時)存在靜態條件;
- 解決這一問題就需要用到執行緒的中斷,也就是為什么要用 interruptIdleWorkers 方法,在呼叫 workQueue.take() 時,如果發現當前執行緒在執行之前或者執行期間是中斷狀態,則會拋出 InterruptedException,解除阻塞的狀態;
- 但是要中斷作業執行緒,還要判斷作業執行緒是否是空閑的,如果作業執行緒正在處理任務,就不應該發生中斷;
- 所以 Worker 繼承自 AQS,在作業執行緒處理任務時會進行 lock,interruptIdleWorkers 在進行中斷時會使用 tryLock 來判斷該作業執行緒是否正在處理任務,如果 tryLock 回傳 true,說明該作業執行緒當前未執行任務,這時才可以被中斷,
下面就來分析一下 interruptIdleWorkers 方法,
interruptIdleWorkers方法
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
interruptIdleWorkers 遍歷 workers 中所有的作業執行緒,若執行緒沒有被中斷 tryLock 成功,就中斷該執行緒,
** 為什么需要持有 mainLock ?因為 workers 是 HashSet 型別的,不能保證執行緒安全, **
shutdownNow方法
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
// 中斷所有作業執行緒,無論是否空閑
interruptWorkers();
// 取出佇列中沒有被執行的任務
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
shutdownNow 方法與 shutdown 方法類似,不同的地方在于:
- 設定狀態為 STOP;
- 中斷所有作業執行緒,無論是否是空閑的;
- 取出阻塞佇列中沒有被執行的任務并回傳,
shutdownNow 方法執行完之后呼叫 tryTerminate 方法,該方法在上文已經分析過了,目的就是使執行緒池的狀態設定為 TERMINATED,
執行緒池的監控
通過執行緒池提供的引數進行監控,執行緒池里有一些屬性在監控執行緒池的時候可以使用
- ** getTaskCount ** :執行緒池已經執行的和未執行的任務總數;
- ** getCompletedTaskCount ** :執行緒池已完成的任務數量,該值小于等于 taskCount;
- ** getLargestPoolSize ** :執行緒池曾經創建過的最大執行緒數量,通過這個資料可以知道執行緒池是否滿過,也就是達到了maximumPoolSize;
- ** getPoolSize ** :執行緒池當前的執行緒數量;
- ** getActiveCount ** :當前執行緒池中正在執行任務的執行緒數量,
通過這些方法,可以對執行緒池進行監控,在 ThreadPoolExecutor 類中提供了幾個空方法,如 beforeExecute
方法,afterExecute 方法和 terminated
方法,可以擴展這些方法在執行前或執行后增加一些新的操作,例如統計執行緒池的執行任務的時間等,可以繼承自 ThreadPoolExecutor 來進行擴展,
到此,關于 ThreadPoolExecutor 的內容就講完了,
downNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
// 中斷所有作業執行緒,無論是否空閑
interruptWorkers();
// 取出佇列中沒有被執行的任務
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
shutdownNow 方法與 shutdown 方法類似,不同的地方在于:
- 設定狀態為 STOP;
- 中斷所有作業執行緒,無論是否是空閑的;
- 取出阻塞佇列中沒有被執行的任務并回傳,
shutdownNow 方法執行完之后呼叫 tryTerminate 方法,該方法在上文已經分析過了,目的就是使執行緒池的狀態設定為 TERMINATED,
執行緒池的監控
通過執行緒池提供的引數進行監控,執行緒池里有一些屬性在監控執行緒池的時候可以使用
- ** getTaskCount ** :執行緒池已經執行的和未執行的任務總數;
- ** getCompletedTaskCount ** :執行緒池已完成的任務數量,該值小于等于 taskCount;
- ** getLargestPoolSize ** :執行緒池曾經創建過的最大執行緒數量,通過這個資料可以知道執行緒池是否滿過,也就是達到了maximumPoolSize;
- ** getPoolSize ** :執行緒池當前的執行緒數量;
- ** getActiveCount ** :當前執行緒池中正在執行任務的執行緒數量,
通過這些方法,可以對執行緒池進行監控,在 ThreadPoolExecutor 類中提供了幾個空方法,如 beforeExecute
方法,afterExecute 方法和 terminated
方法,可以擴展這些方法在執行前或執行后增加一些新的操作,例如統計執行緒池的執行任務的時間等,可以繼承自 ThreadPoolExecutor 來進行擴展,
到此,關于 ThreadPoolExecutor 的內容就講完了,
ainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
// 中斷所有作業執行緒,無論是否空閑
interruptWorkers();
// 取出佇列中沒有被執行的任務
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
shutdownNow 方法與 shutdown 方法類似,不同的地方在于:
- 設定狀態為 STOP;
- 中斷所有作業執行緒,無論是否是空閑的;
- 取出阻塞佇列中沒有被執行的任務并回傳,
shutdownNow 方法執行完之后呼叫 tryTerminate 方法,該方法在上文已經分析過了,目的就是使執行緒池的狀態設定為 TERMINATED,
執行緒池的監控
通過執行緒池提供的引數進行監控,執行緒池里有一些屬性在監控執行緒池的時候可以使用
- ** getTaskCount ** :執行緒池已經執行的和未執行的任務總數;
- ** getCompletedTaskCount ** :執行緒池已完成的任務數量,該值小于等于 taskCount;
- ** getLargestPoolSize ** :執行緒池曾經創建過的最大執行緒數量,通過這個資料可以知道執行緒池是否滿過,也就是達到了maximumPoolSize;
- ** getPoolSize ** :執行緒池當前的執行緒數量;
- ** getActiveCount ** :當前執行緒池中正在執行任務的執行緒數量,
通過這些方法,可以對執行緒池進行監控,在 ThreadPoolExecutor 類中提供了幾個空方法,如 beforeExecute
方法,afterExecute 方法和 terminated
方法,可以擴展這些方法在執行前或執行后增加一些新的操作,例如統計執行緒池的執行任務的時間等,可以繼承自 ThreadPoolExecutor 來進行擴展,
到此,關于 ThreadPoolExecutor 的內容就講完了,
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/467018.html
標籤:其他
