前幾天,技術群里有個群友問了一個關于執行緒池的問題,內容如圖所示:

關于執行緒池相關知識可以先看下這篇:為什么阿里巴巴Java開發手冊中強制要求執行緒池不允許使用Executors創建?
那么就來和大家探討下這個問題,在執行緒池中,執行緒會從 workQueue 中讀取任務來執行,最小的執行單位就是 Worker,Worker 實作了 Runnable 介面,重寫了 run 方法,這個 run 方法是讓每個執行緒去執行一個回圈,在這個回圈代碼中,去判斷是否有任務待執行,若有則直接去執行這個任務,因此執行緒數不會增加,
如下是執行緒池創建執行緒的整體流程圖:

首先會判斷執行緒池的狀態,也就是是否在運行,若執行緒為非運行狀態,則會拒絕,接下來會判斷執行緒數是否小于核心執行緒數,若小于核心執行緒數,會新建作業執行緒并執行任務,隨著任務的增多,執行緒數會慢慢增加至核心執行緒數,如果此時還有任務提交,就會判斷阻塞佇列 workQueue 是否已滿,若沒滿,則會將任務放入到阻塞佇列中,等待作業執行緒獲得并執行,如果任務提交非常多,使得阻塞佇列達到上限,會去判斷執行緒數是否小于最大執行緒數 maximumPoolSize,若小于最大執行緒數,執行緒池會添加作業執行緒并執行任務,如果仍然有大量任務提交,使得執行緒數等于最大執行緒數,如果此時還有任務提交,就會被拒絕,
現在我們對這個流程大致有所了解,那么讓我們去看看原始碼是如何實作的吧!
執行緒池的任務提交從 submit 方法來說,submit 方法是 AbstractExecutorService 抽象類定義的,主要做了兩件事情:
- 把 Runnable 和 Callable 都轉化成 FutureTask
- 使用 execute 方法執行 FutureTask
execute 方法是 ThreadPoolExecutor 中的方法,原始碼如下:
public void execute(Runnable command) {
// 若任務為空,則拋 NPE,不能執行空任務
if (command == null) {
throw new NullPointerException();
}
int c = ctl.get();
// 若作業執行緒數小于核心執行緒數,則創建新的執行緒,并把當前任務 command 作為這個執行緒的第一個任務
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) {
return;
}
c = ctl.get();
}
/**
* 至此,有以下兩種情況:
* 1.當前作業執行緒數大于等于核心執行緒數
* 2.新建執行緒失敗
* 此時會嘗試將任務添加到阻塞佇列 workQueue
*/
// 若執行緒池處于 RUNNING 狀態,將任務添加到阻塞佇列 workQueue 中
if (isRunning(c) && workQueue.offer(command)) {
// 再次檢查執行緒池標記
int recheck = ctl.get();
// 如果執行緒池已不處于 RUNNING 狀態,那么移除已入隊的任務,并且執行拒絕策略
if (!isRunning(recheck) && remove(command)) {
// 任務添加到阻塞佇列失敗,執行拒絕策略
reject(command);
}
// 如果執行緒池還是 RUNNING 的,并且執行緒數為 0,那么開啟新的執行緒
else if (workerCountOf(recheck) == 0) {
addWorker(null, false);
}
}
/**
* 至此,有以下兩種情況:
* 1.執行緒池處于非運行狀態,執行緒池不再接受新的執行緒
* 2.執行緒處于運行狀態,但是阻塞佇列已滿,無法加入到阻塞佇列
* 此時會嘗試以最大執行緒數為界創建新的作業執行緒
*/
else if (!addWorker(command, false)) {
// 任務進入執行緒池失敗,執行拒絕策略
reject(command);
}
}
可以看到 execute 方法中的的核心方法為 addWorker,再去看 addWorker 方法之前,先看下 Worker 的初始化方法:
Worker(Runnable firstTask) {
// 每個任務的鎖狀態初始化為-1,這樣作業執行緒在運行之前禁止中斷
setState(-1);
this.firstTask = firstTask;
// 把 Worker 作為 thread 運行的任務
this.thread = getThreadFactory().newThread(this);
}
在 Worker 初始化時把當前 Worker 作為執行緒的構造器入參,接下來從 addWorker 方法中可以找到如下代碼:
final Thread t = w.thread;
// 如果成功添加了 Worker,就可以啟動 Worker 了
if (workerAdded) {
t.start();
workerStarted = true;
}
這塊代碼是添加 worker 成功,呼叫 start 方法啟動執行緒,Thread t = w.thread; 此時的 w 是 Worker 的參考,那么t.start();實際上執行的就是 Worker 的 run 方法,
Worker 的 run 方法中呼叫了 runWorker 方法,簡化后的 runWorker 原始碼如下:
final void runWorker(Worker w) {
Runnable task = w.firstTask;
while (task != null || (task = getTask()) != null) {
try {
task.run();
} finally {
task = null;
}
}
}
這個 while 回圈有個 getTask 方法,getTask 的主要作用是阻塞從佇列中拿任務出來,如果佇列中有任務,那么就可以拿出來執行,如果佇列中沒有任務,這個執行緒會一直阻塞到有任務為止(或者超時阻塞),其中 getTask 方法的時序圖如下:

其中執行緒復用的關鍵是 1.6 和 1.7 部分,這部分原始碼如下:
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
使用佇列的 poll 或 take 方法從佇列中拿資料,根據佇列的特性,佇列中有任務可以回傳,佇列中無任務會阻塞,
執行緒池的執行緒復用就是通過取 Worker 的 firstTask 或者通過 getTask 方法從 workQueue 中不停地取任務,并直接呼叫 Runnable 的 run 方法來執行任務,這樣就保證了每個執行緒都始終在一個回圈中,反復獲取任務,然后執行任務,從而實作了執行緒的復用,
總結
本文主要從原始碼的角度決議了 Java 執行緒池中的執行緒復用是如何實作的,歡迎大家留言交流討論,
最好的關系就是互相成就,大家的在看、轉發、留言三連就是我創作的最大動力,
更詳細的原始碼決議可以點擊鏈接查看:https://github.com/wupeixuan/JDKSourceCode1.8
參考
https://github.com/wupeixuan/JDKSourceCode1.8
面試官系統精講Java原始碼及大廠真題
Java并發編程學習寶典
Java 并發面試 78 講
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/145703.html
標籤:Java
上一篇:maven學習整理
下一篇:并發編程之volatile
