在上一篇博文《圖解執行緒池原理》中,大體上介紹了執行緒池的作業原理,
這一篇從原始碼層面,細致剖析,文章會很長,
如果上篇文章內容沒吸收,先看上篇,先易后難嘛,
本文原始碼是 java 1.8 版本
一、示例代碼
public static void main(String[] args) throws Exception {
ExecutorService pool = Executors.newFixedThreadPool(2);
for(int i =1; i <= 10; i++){
int index = i;
pool.execute(new Runnable() {
@Override
public void run() {
String currentThreadName = Thread.currentThread().getName();
log.info("第{}次任務結束,執行者:{}", index, currentThreadName);
}
});
}
pool.shutdown();
System.out.println("All thread is over");
}
類圖關系, Executor 是最頂級介面,只有一個 execute() 方法,
最終大數的實作方法,在 ThreadPoolExecutor 這個類中(就是今天講的類),

創建執行緒池,最終呼叫的是 ThreadPoolExecutor 類中的方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
這里有7 個引數,功能如下
- corePoolSize 核心執行緒數
- maximumPoolSize 最大執行緒數
- keepAliveTime 執行緒空閑時,最大存活時間
- unit 存活時間的單位
- workQueue 阻塞佇列(存放任務)
- threadFactory 執行緒工廠(生產執行緒用)
- handler 拒絕策略
二、執行緒控制引數
先看這個,這個COUNT_BITS ,就是數字 29,下面會用到,
private static final int COUNT_BITS = Integer.SIZE - 3;
CAPACITY 最大執行緒數,(二進制數就是 3 個 0, 29個1)
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

執行緒池運行狀態
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
如圖,int 值的高三位代表執行緒運行狀態,后 29 位記錄作業執行緒數量

執行緒池的五種狀態,轉化關系如圖所示

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static int ctlOf(int rs, int wc) { return rs | wc; }
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
ctl 是原子遞增的一個數,這個數高3位,表示執行緒池的運行狀態,后29位是記錄運行的執行緒數,

每增加一個執行緒,ctl 增加 1, 每銷毀1個執行緒,ctl 減小1
runStateOf 就是獲取執行緒池的運行狀態,即 ctl 的高三位

圖片上,兩個數字,進行與運算,得到的 ctl 值的高三位,低29位一定都是0,
現在不明白,后文用到的時候就明白了,
同樣的,workerCountOf 是獲取正在運行的執行緒數量,即 ctl 的低29位,

圖片上的兩個數字,進行與運算,得到的是 ctl 的低29位的值,(高 3 位一定是0啦)
圖中這種狀態,執行緒池是 RUNNING 狀態(看高3位),運行的執行緒數量是 13(低29位的值)
Doug Lea 真的是很牛,用一個數字,即控制了執行緒池的運行狀態,又記錄了執行緒的數量,
三、執行緒池的運行
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { // 1、作業執行緒數量小于核心執行緒數,創建執行緒
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) { // 2、將任務放入阻塞佇列
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false)) // 3、隊滿時,繼續創建執行緒,直至作業執行緒數量達到最大值,
reject(command); // 4、執行拒絕策略
}
先明確一點:
傳入進來的任務,要么被 addWorker() 方法接收了,要么被放到阻塞佇列里,要么被拒絕策略給收了,
拿上一篇的例子來說,病人進來,要么被醫生叫走了,要么呆在大廳里,等著被醫生叫走,要么被拒絕策略收走了,
第一步:
workerCountOf(c) < corePoolSize 這個判斷,前半部分是獲取運行的執行緒數,
這個不明白,往上翻看,看看那個圖,
如果該判斷成立,走 addWorker() 方法,
第二步:
作業執行緒數量達到了corePoolSize,那準備往阻塞佇列中放,當然在在執行緒池的處于 RUNNING 狀態,
// 方法很容易懂,ctl 是負數就可以了
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
如果將任務成功放入阻塞佇列了,緊接著做了一個判斷 ! isRunning(recheck) && remove(command),
這是執行緒池不是運行狀態,就從阻塞佇列中洗掉任務,
仔細想想,前后判斷了兩次運行狀態,這里是做了極端情況,并發處理,不明說了,
第二個判斷 workerCountOf(recheck) == 0,這個是干啥的?開始我也沒想清楚,
后來明白了,這個是所有醫生都休息了,然后佇列有任務,派個醫生穿上防護服,去等著病人,
這個具體后面再分析,大概知道下,不明白沒關系,后面再解釋,
第三步:
任務放到阻塞佇列失敗了,那就創建作業執行緒,
第四步:
前面幾步都失敗了,執行拒絕策略,
總體上這個方法就講完了,沒有鎖、沒有CAS,不怕有并發么?
鎖控制是在addWorker() 方法中,這是盡可能減小鎖的粒度,提高性能,
然后講重頭戲,addWorker() ,先打下預防針,不太好理解,
- addWorker() 方法決議
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
先說這一段
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
看到這一大串,正常都會暈,我也暈,即某些情況下,addWorker() 方法直接回傳 false
我看了很多次,這段代碼與下面的同一個意思,
if (rs > SHUTDOWN) return false;
if (rs = SHUTDOWN && firstTask != null) return false;
if (rs = SHUTDOWN && workQueue.isEmpty()) return false;
也就是說,執行緒池是 RUNNING 可以執行 addWorker() 方法,
執行緒池是 SHUTDOWN 且佇列不為空,且是空任務時,可以執行 addWorker() 方法,
其他情況一律不得執行,為什么第二個條件,那么復雜,后面講到再說,
看下 for 回圈
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c); // 執行緒池運行狀態
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize)) // 控制執行緒數量
return false;
if (compareAndIncrementWorkerCount(c)) // 執行緒數量+1
break retry; // 跳出外層 for 回圈,執行下面的操作
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry; // 如果執行緒運行狀態變化,從外層回圈開始,重新執行,否則從內層回圈開始,重新執行,
}
}
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
這個雙層 for 回圈,就是做了一件事,在符合的條件時,ctl 增加1,
這里 break retry 和 continue retry 的作用,都寫了注釋,語法不熟悉的,得問度娘,
然后再看下半段
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask); // 創建執行緒
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); // 獲取全域鎖
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) { // 前面說過addWorker 執行的條件,RUNNING, 或者 SHUTDOWN 且 空任務
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w); // 核心,將worker放入集合中
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start(); // 啟動執行緒
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w); // 啟動失敗的執行緒,做相應處理,
}
return workerStarted;
這段代碼中 new Worker(firstTask); 是一個重點要說的, 這是 Worker 這個類的繼承關系圖

看下這個類的簡化代碼
private final class Worker extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
final Thread thread; // 執行緒
Runnable firstTask; // 任務
volatile long completedTasks; // 已完成的任務數
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
}
Worker 繼承了 AbstractQueuedSynchronizer,就有了可重入鎖的功能,
實作了 Runnable,就重寫了 run() 方法,
new Worker(firstTask) 時,將任務賦值了,創建了執行緒,并且加了鎖,
這里還要說明下,ThreadPoolExecutor 類中的一個全域變數 workers
/**
* Set containing all worker threads in pool. Accessed only when
* holding mainLock.
*/
private final HashSet<Worker> workers = new HashSet<Worker>();
原始碼的注釋寫的很清楚,workers 是存放 worker 物件的集合,
且只有拿到鎖的worker物件,才會被放到該集合中,
后半段的核心邏輯是創立執行緒,即 new 一個 Worker物件,把物件放到集合中,然后啟動執行緒,
至此,execute() 方法講的差不多了,除了拒絕策略沒說,
四、回圈執行任務
- addWorker 方法決議
addWorker 方法中會呼叫 Thread 類中的 start() 方法,然后JVM 會在合適的時間呼叫 run() 方法,
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
runWorker 的本質是執行 task.run(),而 task 獲取 是將 阻塞佇列取空為止,
順便說一句,第一次執行時,task 是 worker 在初始化時的那個任務,之后是從阻塞佇列中取任務,
這種設計,實作了一個執行緒執行了多個任務,
說細節:
w.unlock(); // allow interrupts 這行代碼,是不是和我一樣,剛開始覺得很奇怪,
怎么就好端端的來一個 釋放鎖呢? 這是因為在 new Worker物件時,加了鎖
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker:執行runWorker 不會被中斷
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
關于 AQS 的加鎖與解鎖,這里不詳細解釋,不理解的可以看我之前的博文《ReentrtantLock 分析》
while (task != null || (task = getTask()) != null) 這是回圈取任務,等下詳細說,
下面這一段看著都會頭暈,誰看誰頭暈,
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
原始碼注釋說的很明確,
當執行緒池狀態,大于等于 STOP 時,保證作業執行緒都有中斷標志,
當執行緒池狀態,小于STOP時,保證作業執行緒都沒有中斷標志,
這里詳細解釋下: 先看前半部分
runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
runStateAtLeast(ctl.get(), STOP) = A
那 偽代碼就是 A || (Thread.interrupted() && A)
情況1:A 為 true,那后面不會執行了,整體為true
情況2:A 為false, 執行 Thread.interrupted() ,即清除執行緒的中斷標志,
(Thread.interrupted() && A) 這個條件是 false, 整體結果為false,
前半部分說完了,說后半部分&& !wt.isInterrupted(),
上文 情況2時,不會執行后半部分,即判斷結束了,A 為 false,清除執行緒中斷標志,
這也就實作了,執行緒池狀態小于 STOP,作業執行緒保證沒有中斷標志,
情況1時,執行 wt.isInterrupted(), 如果有中斷,回傳true,取反后,整體是false,
如果沒中斷,回傳false,取反后,整體是 true,那會執行 wt.interrupt(),即打個中斷標志,
不管怎樣,最終執行緒會有中斷標志,
也就實作了,執行緒池狀態大于 等于STOP,作業執行緒保證有中斷標志,
有時候,代碼不好理解,真不是讀代碼的人笨,也不是寫代碼的人,寫的不好
.
.
- processWorkerExit 方法決議
繼續說 runWorker 方法,當阻塞佇列中取出的任務是null時,會跳出 while 回圈,執行 processWorkerExit(),
這個方法是在 finally 中執行的,正常情況下,是從阻塞佇列中取任務,取到的是null,此時 completedAbruptly是 false,
當然非正常情況,某個環節拋出了例外,執行了這個方法,此時completedAbruptly 是 true.
具體是哪個環節會拋出例外,后面再說,
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w); // 洗掉執行緒
} finally {
mainLock.unlock();
}
tryTerminate(); // 優雅的嘗試關閉執行緒池
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
如果是出了例外,才執行這個方法的,那作業執行緒的數量減一,這個好理解,
不論是否出例外,都會在加鎖的情況下,洗掉 worker,也就是洗掉了執行緒,
當然,在洗掉執行緒之前,會統計個執行緒完成了多少個任務,
tryTerminate() 這個方法,等到 shutDown() 時再詳細說,這里只要知道,這個方法是有可能關閉執行緒池的,
繼續說這個方法:processWorkerExit ,在最后,當執行緒池狀態是 RUNNING、SHUTDOWN 時,有可能會再次呼叫 addWorker 方法,只是傳入的任務是null,
首先是 completedAbruptly 為 true 時,呼叫 addWorker,這個好理解,上個方法例外退出了,那再起一個執行緒,繼續消費任務,
其次是completedAbruptly 為 false 時,即真的是從阻塞佇列中取的任務是null,作業執行緒的數量小于 min, 這時會呼叫 addWorker,
順便說一句,當作業執行緒數量,小于或等于 corePoolSize,正常不會進到這個方法的,為啥,等會兒說這個,
int min = allowCoreThreadTimeOut ? 0 : corePoolSize 這里 allowCoreThreadTimeOut 默認是false,這里 min 是可以等于corePoolSize的,這種情況的出現,想不明白一定不是作者寫錯了,
corePoolSize 是可以重置的, setCorePoolSize(int corePoolSize) ,特定時間的重置 corePoolSize ,這里的代碼判斷就有必要了,
總之這個方法會銷毀執行緒,也會在必要的情況下創建執行緒,保證作業執行緒不小于corePoolSize ,
.
.
- getTask() 方法決議
現在退回來,說 runWorker 方法中的這一段 while (task != null || (task = getTask()) != null)
當 getTask() 方法回傳 null 時,runWorker 會執行結束,進入 processWorkerExit 方法中,銷毀執行緒,
我曾經錯誤的以為,當阻塞佇列中取不到任務了,進入processWorkerExit 這個方法中,
銷毀作業執行緒,但同時維護corePoolSize ,還會創建執行緒,
那不就有可能,前面創建,后面銷毀,回圈不停啦!
后來我發現我錯了,原因就在 getTadk() 的原始碼中,
/**
* Performs blocking or timed wait for a task, depending on
* current configuration settings, or returns null if this worker
* must exit because of any of:
* 1. There are more than maximumPoolSize workers (due to
* a call to setMaximumPoolSize).
* 2. The pool is stopped.
* 3. The pool is shutdown and the queue is empty.
* 4. This worker timed out waiting for a task, and timed-out
* workers are subject to termination (that is,
* {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
* both before and after the timed wait, and if the queue is
* non-empty, this worker is not the last thread in the pool.
*
* @return task, or null if the worker must exit, in which case
* workerCount is decremented
*/
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
英文好的,可以先看下原始碼的注釋
這又是一個無限回圈,先看這段執行緒池運行狀態的判斷,看著會暈
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
這個的意思就是
rs = SHUTDOWN && workQueue.isEmpty() 執行 大括號里的代碼
rs > SHUTDOWN 時,執行大括號里的代碼
也就是說,這兩種情況下,回傳null,進入 processWorkerExit 方法銷毀執行緒了,
其它情況(rs = RUNNING,或者 rs = SHUTDOWN 并且 阻塞佇列不為空),要嘗試回傳任務,
接著看這一行
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
timed 這個引數,你可以理解為,是否支持超時機制,
false 不支持,從后文的代碼可看出,取任務時,呼叫 workQueue.take() ,阻塞,代碼就停到這里了,一直等到,其它執行緒往佇列中放了任務,阻塞被喚醒,繼續執行,
true 支持, 取任務時,呼叫 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS),即在一定時間內,取不到任務,直接回傳 null,
allowCoreThreadTimeOut 引數的默認值是 false, 那咱們就只討論 wc > corePoolSize,
當 wc > corePoolSize 時,timed 為 true,即支持超時,一定時間取不到任務,回傳 null,
結合前面所講,可以總結這么兩句話:
當作業執行緒大于 corePoolSize,取任務時,沒有任務,操作會超時,回傳null,進入下一次回圈,
當作業執行緒 不大于 corePoolSize,取任務時,沒有任務,會被阻塞,即代碼停止執行,直到有新的任務進來,
再看下面這一段 ,可以理解為 是否具備銷毀執行緒的條件,
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
timedOut 這個引數,可以理解為:是否經歷過取任務超時,false 沒有經歷過, true 經歷過,
條件1: wc > maximumPoolSize || (timed && timedOut)
條件2: wc > 1 || workQueue.isEmpty()
當wc > maximumPoolSize 條件1就滿足了,什么時候出現呢? 當重置 maximumPoolSize時會出現這種情況,
那條件1,可以用文字描述為:要么作業執行緒數大于 最大執行緒數,要么是 經歷過取任務超時(支持超時機制,才有可能經歷過取任務超時),才可能去銷毀執行緒,
條件1滿足的情況下,才會判斷條件2
wc > 1 這個說明當前作業執行緒,至少是 2 個,可以銷毀 1 個,
workQueue.isEmpty() 這個說明阻塞佇列中沒有任務了,可以放心銷毀執行緒,
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
如果 compareAndDecrementWorkerCount 執行成功了,那就直接回傳null,
如果執行失敗了,那就從頭開始,重新判斷,這個不難理解,并發控制的真好,
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
前面說的理解好了,這里就容易理解了,
支持超時等待,就呼叫 非阻塞 的 poll 方法,不支持的呼叫 阻塞 方法 take(),
創建執行緒池時,指定的引數 最大執行緒存活時間 keepAliveTime, 就是在這里發揮功效的,
至此,getTask() 方法講完了,籠統的概括:
當作業執行緒 小于等于 corePoolSize,取任務會呼叫 阻塞方法 take(),
即佇列中沒有任務會被阻塞,也就是說,執行緒不會被銷毀,
當作業執行緒 大于 corePoolSize, 取任務會呼叫 非阻塞的 poll() 方法,
即佇列中沒有任務時,會超時,在下個回圈中,回傳 null 進入銷毀執行緒的流程,
.
.
五、執行緒池的關閉
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess(); // 檢查有沒有權限關閉執行緒池
advanceRunState(SHUTDOWN); // 更改執行緒池的狀態
interruptIdleWorkers(); // 中斷執行緒
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
這里的代碼很簡潔,來一個一個分析
- 1、
checkShutdownAccess()方法決議
private void checkShutdownAccess() {
SecurityManager security = System.getSecurityManager();
if (security != null) {
security.checkPermission(shutdownPerm);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
security.checkAccess(w.thread);
} finally {
mainLock.unlock();
}
}
}
這個方法邏輯很清楚,先檢查有沒有權限關閉執行緒池,
如果有,再檢查是否可以中斷每個作業執行緒,
沒有相關權限,會拋出例外,
.
- 2、
advanceRunState()方法決議
這個方法,最終是把執行緒池的運行狀態,設定為一個大于等于0 的狀態,即大于等于 SHUTDOWN
private void advanceRunState(int targetState) {
for (;;) {
int c = ctl.get();
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
入參是 SHUTDOWN,如果執行緒池的運行狀態 大于等于 SHUTDOWN,直接跳出,
否則,執行 ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))) 這一段
workerCountOf(c) 是作業執行緒的數量,執行這它的時候,執行緒池的狀是 RUNNING,
那 workerCountOf(c) 是大于等于0的,所以 ctlOf(targetState, workerCountOf(c))) 計算出來也是大于0的,
那 ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) 執行的結果就是,執行緒池的狀態設定為大于等于 0,
.
- 3、
interruptIdleWorkers()方法決議
這個方法,是中斷空閑執行緒,
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
首先 mainLock.lock() 表明,整個程序是加鎖進行的,且是拿到主鎖才執行,
!t.isInterrupted() && w.tryLock() 這個條件,
若 t.isInterrupted() 回傳 true,即執行緒本身有中斷標志,取反后是false,后面的就不再執行了,
順便說一句,在 決議 runWorker() 方法時,分析過,
當執行緒池狀態,大于等于 STOP 時,保證作業執行緒都有中斷標志,
當執行緒池狀態,小于STOP時,保證作業執行緒都沒有中斷標志,
作業執行緒沒有中斷標志的情況下,會執行 w.tryLock() 方法,
有沒有疑問,這是什么意思呀?
前面說過,Worker 物件 繼承了 AQS,也就是說,它可以實作加鎖,
回頭可以再看下 runWorker() 方法,在執行這個方法之前,不會被中斷,
在執行任務時,是加鎖的,也不會被中斷,
在執行 getTask() 方法時,是可以被中斷的,
public boolean tryLock() { return tryAcquire(1); }
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
這段加鎖的代碼很容易理解,接著往下說,
在執行任務的執行緒不會被中斷,這也就實作了,即使執行緒池關閉了,正在執行任務的執行緒,依然會把任務給執行完,
拿到鎖之后,會執行 t.interrupt(), 即給執行緒打上中斷標志,
本例中,傳入的引數是 false,那會對所有的空閑執行緒打上中斷標志,
.
那下一個問題,執行緒被打上中斷標志后,有啥影響呢?
結論:在執行 getTask() 方法時,會拋出例外,跳出這個方法,進入processWorkerExit() 銷毀執行緒,
而且此時,執行緒池的狀態若是等于SHUTDOWN,就會會呼叫 addWorker(null, false) ,
前面分析 addWorker() 方法時,我們知道只有兩種情況下,會創建一個新的 Worker,
不記得的話,翻回去看下, 結合現在的場景,只有佇列不為空的時候,分創建新的 Worker,
若佇列為空,不會創建新的Worker,
想想,佇列不為空,說明還有任務存在,銷毀了一個執行緒,再創建一個,也正常,
由此,把前面分析過的幾個方法都串了起來,
如果說,上面的分析你理解了,那下面這句話你會有新的認識:
當執行緒池是SHUTDOWN 狀態時,佇列不為空時,是可以提交空任務,
當執行緒池是STOP 狀態時,不可以提交任務,
.
咱們還有一個問題沒有說,中斷的執行緒怎么就拋出例外了?
在 getTask() 方法中, 取任務的那行代碼
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
在 BlockQueue這個介面中,定義了這兩個方法,是會拋出例外的,
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
E take() throws InterruptedException;
以 LinkedBlockingQueue的實作為例子,詳細資訊可以看了之前的博文《LinkedBlockingQueue原始碼圖解》
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly(); // 執行緒有中斷,直接拋出例外
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
下面是 ReentrantLock 中 lockInterruptibly() 方法的實作
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
至此為止,interruptIdleWorkers() 這個方法決議完了,總結下大概就是:
執行緒池狀態大于等于SHUTDOWN 時,會呼叫該方法,將所有空閑的執行緒打個中斷標志,
被打中斷標志的執行緒,會在 getTask() 方法中拋出例外,從而在后一個方法中銷毀執行緒,
若銷毀執行緒后,阻塞佇列中還有任務,還是可以新建 Worker,繼續消費佇列中的任務,
.
- 4、
tryTerminate()方法決議
onShutdown() 這個方法,在 ThreadPoolExecutor 類中是一個空方法,這里就不講了,
tryTerminate() 這個方法,在 processWorkerExit() 里也呼叫過,當時沒講,放在這里決議
final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
先看這一段,判斷執行緒池狀態的
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
執行緒池處于運行狀態,不處理,這個好理解,
執行緒池狀態 大于等于TIDYING,不處理,這個說明已經在成功執行過 tryTerminate 方法了,
執行緒池狀態 是SHUTDOWN 且 阻塞佇列不為空,不處理,因為有任務,得讓它干活,不能中斷執行緒,
除此之外,就兩種狀態需要處理了:
執行緒池狀態是STOP,可以關閉執行緒池,
執行緒池狀態是SHUTDOWN 且阻塞佇列為空,
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE); // 中斷一個空閑執行緒,
return;
}
這段代碼的意思我懂,當作業執行緒大于0時, 中斷一個空閑的執行緒,
interruptIdleWorkers 這個方法,前面講過了,只是引數變成了true,只中斷一個空閑執行緒,
在 shutdown() 這個方法中,所有空閑執行緒都被中斷了,簡化的考慮,這段沒用,
但是在 processWorkerExit() 方法中,呼叫 tryTerminate() 是有意義的,
它可以讓阻塞的執行緒,從 getTask() 方法中,跳出來,進入processWorkerExit() 被銷毀掉,
同時會觸發再次中斷一個空閑執行緒,使其從getTask() 方法中,跳出來,進入processWorkerExit() 被銷毀掉,
這就形成了一個回圈,把空閑的執行緒一個一個處理掉了,
那么回過頭來想想, shutdown() 呼叫 interruptIdleWorkers(true),也是有意義的,
因為正在執行任務的執行緒,執行完任務,是可能變成空閑執行緒的,(大概知道就行了,不深入解釋)
再往后,獲取主鎖,修改執行緒池的狀態,
先是設定為 TIDYING狀態,再執行 terminated(),最后執行緒池狀態設定為 TERMINATED,
其中 terminated() 在 ThreadPoolExecutor 這個類中,是一個空方法,不用講了,
最后 termination.signalAll(), 這個是喚醒阻塞的執行緒,這個方法本文的方法中未曾涉及,不講了,
至此執行緒池原始碼決議,完畢,
六、總結
本文從原始碼層面,詳細分析了,執行緒池的創建、運行、關閉,
對應的就是 execute(), addWorker(), runWorker(), shutDown() 這幾個方法,
同時與簡單梳理了,這幾個方法之間相互關聯的地方,
上篇博文《圖解執行緒池原理》,從大方向上介紹了執行緒池的原理 ,本篇深入原始碼,詳細剖析了這幾個方法,
關于執行緒池的四種拒絕策略,單獨寫了一篇《執行緒池四種拒絕策略》,
本文不再分析,拒絕策略本身沒有復雜的邏輯,代碼也不難理解,
還有一些方法,比如 shutdownNow()、awaitTermination() 等 不再詳細分析,文章已經夠長啦!
能把本文所講的東西說出來,面試官基本該滿意了吧,
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/259712.html
標籤:java
上一篇:為什么要配置path環境變數
