
碎碎念
關于JDK原始碼相關的文章這已經是第四篇了,原創不易,粉絲從幾十人到昨天的666人,真的很感謝之前幫我轉發文章的一些朋友們,

從16年開始寫技術文章,到現在博客園已經發表了222篇文章,大多數都是原創,共有800多粉絲,基本上每個月都會有文章的產出,


回顧這幾年以來寫作的心路歷程,一直都是偷偷的寫,偷偷的發,害怕被人知道,怕被人罵文章寫的太水(之前心理太脆弱了,哈哈),后面和cxuan聊過后,他建議我給他投稿試試,于是就有了那一篇的萬字的AQS文章,
最近也有好多讀者加到我的微信,問一些文章中的問題,我也都會認真解答,看到有人閱讀我的文章并有所識訓,我真的挺欣慰,這就是寫作的動力吧,
幫助別人的同時也是在幫助自己,自己學的技術和理解的內容都是有局限性的,通過寫文章結識到了很多朋友,聽聽別人的分析和見解,我也能學到很多,

每次看到博客中有人留言都很激動,也會第一時間去回復,感謝下面的公眾號大佬們之前無私的幫助,大家也可以關注一下他們,都是很nice的大佬:
Java建設者、Java團長、程式猿石頭、碼象全堆疊、Java3y、JAVA小咖秀、Bella的技術輪子、石杉的架構筆記、武培軒、程式通事
前言
Java中的執行緒池已經不是什么神秘的技術了,相信在看的讀者在專案中也都有使用過,關于執行緒池的文章也是數不勝數,我們站在巨人的肩膀上來再次梳理一下,
本文還是保持原有的風格,圖文決議,盡量做到多畫圖!全文共20000+字,建議收藏后細細品讀,閱讀期間搭配原始碼食用效果更佳!
讀完此文你將學到:
ThreadPoolExecutor中常用引數有哪些?ThreadPoolExecutor中執行緒池狀態和執行緒數量如何存盤的?ThreadPoolExecutor有哪些狀態,狀態之間流轉是什么樣子的?ThreadPoolExecutor任務處理策略?ThreadPoolExecutor常用的拒絕策略有哪些?Executors工具類提供的執行緒池有哪些?有哪些缺陷?ThreadPoolExecutor核心執行緒池中執行緒預熱功能?ThreadPoolExecutor中創建的執行緒如何被復用的?ThreadPoolExecutor中關閉執行緒池的方法shutdown與shutdownNow的區別?ThreadPoolExecutor中存在的一些擴展點?ThreadPoolExecutor支持動態調整核心執行緒數、最大執行緒數、佇列長度等一些列引數嗎?怎么操作?
本文原始碼基于JDK1.8
執行緒池基本概念
執行緒池是一種池化思想的產物,如同我們資料庫有連接池、Java中的常量池,執行緒池可以幫助我們管理執行緒、復用執行緒,減少執行緒頻繁新建、銷毀等帶來的開銷,
在Java中是通過ThreadPoolExecutor類來創建一個執行緒池的,一般我們建議專案中自己去定義執行緒池,不推薦使用JDK提供的工具類Executors去構建執行緒池,
查看阿里巴巴開發手冊中也有對執行緒池的一些建議:
【強制】創建執行緒或執行緒池時請指定有意義的執行緒名稱,方便出錯時回溯,
正例:自定義執行緒工廠,并且根據外部特征進行分組,比如,來自同一機房的呼叫,把機房編號賦值給whatFeaturOfGroup
public class UserThreadFactory implements ThreadFactory {
private final String namePrefix;
private final AtomicInteger nextId = new AtomicInteger(1);
UserThreadFactory(String whatFeaturOfGroup) {
namePrefix = "From UserThreadFactory's " + whatFeaturOfGroup + "-Worker-";
}
@Override
public Thread newThread(Runnable task) {
String name = namePrefix + nextId.getAndIncrement();
Thread thread = new Thread(null, task, name, 0, false);
System.out.println(thread.getName());
return thread;
}
}
【強制】執行緒資源必須通過執行緒池提供,不允許在應用中自行顯式創建執行緒,
說明:執行緒池的好處是減少在創建和銷毀執行緒上所消耗的時間以及系統資源的開銷,解決資源不足的問題,
如果不使用執行緒池,有可能造成系統創建大量同類執行緒而導致消耗完記憶體或者“過度切換”的問題,
【強制】執行緒池不允許使用 Executors 去創建,而是通過 ThreadPoolExecutor 的方式,這
樣的處理方式讓寫的同學更加明確執行緒池的運行規則,規避資源耗盡的風險,
說明:Executors 回傳的執行緒池物件的弊端如下:
1) FixedThreadPool 和 SingleThreadPool:
允許的請求佇列長度為 Integer.MAX_VALUE,可能會堆積大量的請求,從而導致 OOM,
2) CachedThreadPool:
允許的創建執行緒數量為 Integer.MAX_VALUE,可能會創建大量的執行緒,從而導致 OOM,
執行緒池使用示例
下面是一個自定義的執行緒池,這是之前公司在用的一個執行緒池,修改其中部分屬性和備注做脫敏處理:
public class MyThreadPool {
static final Logger LOGGER = LoggerFactory.getLogger(MyThreadPool.class);
private static final int DEFAULT_MAX_CONCURRENT = Runtime.getRuntime().availableProcessors() * 2;
private static final String THREAD_POOL_NAME = "MyThreadPool-%d";
private static final ThreadFactory FACTORY = new BasicThreadFactory.Builder().namingPattern(THREAD_POOL_NAME)
.daemon(true).build();
private static final int DEFAULT_SIZE = 500;
private static final long DEFAULT_KEEP_ALIVE = 60L;
private static ExecutorService executor;
private static BlockingQueue<Runnable> executeQueue = new ArrayBlockingQueue<>(DEFAULT_SIZE);
static {
try {
executor = new ThreadPoolExecutor(DEFAULT_MAX_CONCURRENT, DEFAULT_MAX_CONCURRENT + 2, DEFAULT_KEEP_ALIVE,
TimeUnit.SECONDS, executeQueue, FACTORY);
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
LOGGER.info("MyThreadPool shutting down.");
executor.shutdown();
try {
if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
LOGGER.error("MyThreadPool shutdown immediately due to wait timeout.");
executor.shutdownNow();
}
} catch (InterruptedException e) {
LOGGER.error("MyThreadPool shutdown interrupted.");
executor.shutdownNow();
}
LOGGER.info("MyThreadPool shutdown complete.");
}
}));
} catch (Exception e) {
LOGGER.error("MyThreadPool init error.", e);
throw new ExceptionInInitializerError(e);
}
}
private MyThreadPool() {
}
public static boolean execute(Runnable task) {
try {
executor.execute(task);
} catch (RejectedExecutionException e) {
LOGGER.error("Task executing was rejected.", e);
return false;
}
return true;
}
public static <T> Future<T> submitTask(Callable<T> task) {
try {
return executor.submit(task);
} catch (RejectedExecutionException e) {
LOGGER.error("Task executing was rejected.", e);
throw new UnsupportedOperationException("Unable to submit the task, rejected.", e);
}
}
}
這里主要就是使用呼叫ThreadPoolExecutor建構式來構造一個執行緒池,指定自定義的ThreadFactory,里面包含我們自己執行緒池的poolName等資訊,重寫里面的execute()和submitTask()方法, 添加了系統關閉時的鉤子函式shutDownHook(),在里面呼叫執行緒池的shutdown()方法,使得系統在退出(使用ctrl c或者kill -15 pid)時能夠優雅的關閉執行緒池,
如果有看不懂的小伙伴也沒有關系,后面會詳細分析ThreadPoolExecutor中的原始碼,相信看完后面的代碼再回頭來看這個用例 就完全是小菜一碟了,
執行緒池實作原理
通過上面的示例代碼,我們需要知道創建執行緒池時幾個重要的屬性:
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler);
corePoolSize: 執行緒池核心執行緒數量
maximumPoolSize: 執行緒池最大執行緒數量
workQueue: 執行緒池中阻塞佇列,一般指定佇列大小
執行緒池中資料模型可以簡化成下圖所示,其中Thread應該是添加的一個個Worker,這里標注的Thread是為了方便理解:

執行緒池中提交一個任務具體執行流程如下圖:

提交任務時,比較當前執行緒池中執行緒數量和核心執行緒數的大小,根據比較結果走不同的任務處理策略,這個下面會有詳細說明,
執行緒池中核心方法呼叫鏈路:

TheadPoolExecutor原始碼初探
TheadPoolExecutor中常用屬性和方法較多,我們可以先分析下這些,然后一步步往下深入,常用屬性和方法如下:

具體代碼如下:
public class ThreadPoolExecutor extends AbstractExecutorService {
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
}
- ctl
ctl代表當前執行緒池狀態和執行緒池執行緒數量的結合體,高3位標識當前執行緒池運行狀態,后29位標識執行緒數量,ctlOf方法就是rs(執行緒池運行狀態)和wc(執行緒數量)按位或操作
- COUNT_BITS
COUNT_BITS = Integer.SIZE - 3 = 29,在ctl中,低29位用于存放當前執行緒池中執行緒的數量
- CAPACITY
CAPACITY = (1 << COUNT_BITS) - 1
我們來計算一下:
1 << 29 = 0010 0000 0000 0000 0000 0000 0000 0000
(1 << 29) - 1 = 0001 1111 1111 1111 1111 1111 1111 1111
這個屬性是用來執行緒池能裝載執行緒的最大數量,也可以用來做一些位運算操作,
- 執行緒池幾種狀態
RUNNING:
(1) 狀態說明:執行緒池處在RUNNING狀態時,能夠接收新任務,以及對已添加的任務進行處理,
(2) 狀態切換:執行緒池的初始化狀態是RUNNING,換句話說,執行緒池被一旦被創建,就處于RUNNING狀態,并且執行緒池中的任務數為0
SHUTDOWN:
(1) 狀態說明:執行緒池處在SHUTDOWN狀態時,不接收新任務,但能處理已添加的任務,
(2) 狀態切換:呼叫執行緒池的shutdown()介面時,執行緒池由RUNNING -> SHUTDOWN
STOP:
(1) 狀態說明:執行緒池處在STOP狀態時,不接收新任務,不處理已添加的任務,并且會中斷正在處理的任務,
(2) 狀態切換:呼叫執行緒池的shutdownNow()介面時,執行緒池由(RUNNING or SHUTDOWN ) -> STOP
TIDYING:
(1) 狀態說明:當所有的任務已終止,ctl記錄的"任務數量"為0,執行緒池會變為TIDYING狀態,當執行緒池變為TIDYING狀態時,會執行鉤子函式terminated(),terminated()在ThreadPoolExecutor類中是空的,若用戶想在執行緒池變為TIDYING時,進行相應的處理;可以通過多載terminated()函式來實作,
(2) 狀態切換:當執行緒池在SHUTDOWN狀態下,阻塞佇列為空并且執行緒池中執行的任務也為空時,就會由 SHUTDOWN -> TIDYING,
當執行緒池在STOP狀態下,執行緒池中執行的任務為空時,就會由STOP -> TIDYING
TERMINATED:
(1) 狀態說明:執行緒池徹底終止,就變成TERMINATED狀態,
(2) 狀態切換:執行緒池處在TIDYING狀態時,執行完terminated()之后,就會由 TIDYING -> TERMINATED
狀態的變化流轉:

- runStateOf()
計算執行緒池運行狀態的,就是計算ctl前三位的數值,`unStateOf() = c & ~CAPACITY,CAPACITY = 0001 1111 1111 1111 1111 1111 1111 1111,那么~CAPACITY = 1110 0000 0000 0000 0000 0000 0000 0000,它與任何數按位與的話都是只看這個數前三位
- workerCountOf()
計算執行緒池的執行緒數量,就是看ctl的后29位,workerCountOf() = c & CAPACITY, CAPACITY = 0001 1111 1111 1111 1111 1111 1111 1111與任何數按位與,就是看這個數的后29位
- ctlOf(int rs, int wt)
在獲取當前執行緒池ctl的時候會用到,在后面原始碼中會有很多地方呼叫, 傳遞的引數rs代表執行緒池狀態,wt代表當前執行緒次執行緒(worker)的數量
- runStateLessThan(int c, int s)
return c < s,c一般傳遞的是當前執行緒池的ctl值,比較當前執行緒池ctl所表示的狀態是否小于某個狀態s
- runStateAtLeast(int c, int s)
return c >= s,c一般傳遞的是當前執行緒池的ctl值,比較當前執行緒池ctl所表示的狀態,是否大于等于某個狀態s
- isRunning(int c)
c < SHUTDOWN, 判斷當前執行緒池是否是RUNNING狀態,因為只有RUNNING的值小于SHUTDOWN
- compareAndIncrementWorkerCount()/compareAndDecrementWorkerCount()
使用CAS方式 讓ctl值分別加一減一 ,成功回傳true, 失敗回傳false
- decrementWorkerCount()
將ctl值減一,這個方法用了do...while回圈,直到成功為止
- completedTaskCount
記錄執行緒池所完成任務總數 ,當worker退出時會將 worker完成的任務累積到completedTaskCount
- Worker
執行緒池內部類,繼承自AQS且實作Runnable介面,Worker內部有一個Thread thread是worker內部封裝的作業執行緒,Runnable firstTask用來接收用戶提交的任務資料,在初始化Worker時候會設定state為-1(初始化狀態),通過threadFactory創建一個執行緒,
- ThreadPoolExecutor初始化引數
corePoolSize: 核心執行緒數限制
maximumPoolSize: 最大執行緒限制
keepAliveTime: 非核心的空閑執行緒等待新任務的時間 unit: 時間單位,配合allowCoreThreadTimeOut也會清理核心執行緒池中的執行緒,
workQueue: 任務佇列,最好選用有界佇列,指定佇列長度
threadFactory: 執行緒工廠,最好自定義執行緒工廠,可以自定義每個執行緒的名稱
handler: 拒絕策略,默認是AbortPolicy
execute()原始碼分析
當有任務提交到執行緒池時,就會直接呼叫ThreadPoolExecutor.execute()方法,執行流程如下:

從流程圖可看,添加任務會有三個分支判斷,原始碼如下:
java.util.concurrent.ThreadPoolExecutor.execute():
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
c在這里代表執行緒池ctl的值,包含作業任務數量以及執行緒池的狀態,上面有解釋過,
接著看下面幾個分支代碼:
分支一: if (workerCountOf(c) < corePoolSize) ,條件成立表示當前執行緒數量小于核心執行緒數,此次提交任務,直接創建一個新的worker,
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
如果執行緒數小于核心執行緒數,執行addWorker操作,這個后面會講這個方法的細節,如果添加成功則直接回傳,失敗后會重新計算ctl的值,然后執行分支二,
針對addWorker()執行失敗的情況,有以下幾種可能:
- 存在并發情況,
execute()方法是可能有多個執行緒同時呼叫的,當多個執行緒同時workerCountOf(c) < corePoolSize成立后,就會向執行緒池中創建worker,這個時候執行緒池的核心執行緒數可能已經達到,在addWorker中還會再次判斷,所以會有任務添加失敗,

- 當前執行緒池狀態發生改變,例如執行緒A執行
addWorker()方法時,執行緒B修改執行緒池狀態,導致執行緒池不是RUNNING狀態,此時執行緒A執行addWorker()就有可能失敗,

分支二: if (isRunning(c) && workQueue.offer(command)) {}
通過分支一流程的分析,我們可以知道執行到這個分支說明**當前執行緒數量已經達到corePoolSize或者addWorker()執行失敗,我們先看看分支二執行流程:

首先判斷當前執行緒池是否處于RUNNING狀態,如果是則嘗試將task放入到workQueue中,workQueue是我們在初始化ThreadPoolExecutor時傳入進來的阻塞佇列,
如果當前任務成功添加到阻塞佇列中,再次獲取ctl賦值給recheck變數,然后執行:
if (!isRunning(recheck) && remove(command))
reject(command);
再次判斷當前執行緒池是否為RUNNINT狀態,如果不是則說明提交任務到佇列之后,執行緒池狀態被其他執行緒給修改了,比如呼叫shutdown()/shutdownNow()等,這種情況就需要把剛剛提交到佇列中的的任務洗掉掉,
再看下remove()方法:
public boolean remove(Runnable task) {
boolean removed = workQueue.remove(task);
tryTerminate();
return removed;
}
如果任務提交到佇列之后,執行緒池中的執行緒還未將這個任務消費,那么就可以remove成功,呼叫reject()方法來執行拒絕策略,
如果在改變執行緒池狀態之前,佇列中的資料已經被消費了,此時remove()就會失敗,

接著走else if中的邏輯:
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
走這個else if邏輯有兩種可能,執行緒池是RUNNING狀態或者執行緒池狀態被改變且workQueue中添加的任務已經被消費導致remove()失敗,
如果是RUNNING狀態,執行緒池中的執行緒數量是0,此時workQueue中還有待執行的任務,就需要新增一個worker(addWorker里面會有創建執行緒的操作),繼續消費workqueue中的任務,

這里要注意一下addWorker(null, false),也就是創建一個執行緒,但并沒有傳入任務,因為任務已經被添加到workQueue中了,所以worker在執行的時候,會直接從workQueue中獲取任務,在workerCountOf(recheck) == 0時執行addWorker(null, false)也是為了保證執行緒池在RUNNING狀態下必須要有一個執行緒來執行任務,可以理解為一種擔保兜底機制,
至于執行緒池中執行緒為何可以為0?這個如果我們設定了allowCoreThreadTimeOut=true,那么核心執行緒也是允許被回收的,后面getTask()中代碼有提及,
分支三: else if (!addWorker(command, false)) {}
通過分支一和分之二的分析,進入這個分支的前置條件:執行緒數超過核心執行緒數且workQueue中資料已滿,
else if (!addWorker(command, false)),執行添加worker操作,如果執行失敗就直接走reject()拒絕策略,這里添加失敗可能是執行緒數已經超過了maximumPoolSize,

addWorker()原始碼分析
上面分析提交任務的方法execute()時多次用到addWorker方法,接收任務后將任務添加到Worker中,
Worker是ThreadPoolExecutor中的內部類,繼承自AQS且實作了Runnable介面, 類中包含Thread thread,它是worker內部封裝的作業執行緒,還有firstTask屬性,它是一個可執行的Runnable物件,在Worker的建構式中,使用執行緒工廠創建了一個執行緒,當thread啟動的時候,會以worker.run()為入口啟動執行緒,這里會直接呼叫到runWorker()中,
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
private static final long serialVersionUID = 6138294804551838833L;
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
}
流程如下圖:

這里再回頭看下addWorker(Runnable firstTask, boolean core) 方法,這個方法主要是添加一個Worker到執行緒池中并執行,firstTask引數用于指定新增的執行緒執行的第一個任務,core引數為true表示在新增執行緒時會判斷當前活動執行緒數是否少于corePoolSize,false表示在新增執行緒時會判斷當前活動執行緒數是否少于maximumPoolSize
addWorker方法整體執行流程圖如下:

接著看下原始碼:
java.util.concurrent.ThreadPoolExecutor.addWorker():
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (!workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
這里是有兩層for回圈,外層回圈主要是判斷執行緒池的狀態,如果狀態不合法就直接回傳false.
只有兩種情況屬于合法狀態:
RUNNING狀態SHUTDOWN狀態時,佇列中還有未處理的任務,且提交的任務為空,SHUTDOWN含義就是不再接收新任務,可以繼續處理阻塞佇列的任務,
第二層回圈是通過CAS操作更新workCount數量,如果更新成功則往執行緒池中中添加執行緒,這個所謂的執行緒池就是一個HashSet陣列,添加失敗時判斷失敗原因,CAS失敗有兩種原因:執行緒池狀態被改變或者并發情況修改執行緒池中workCount數量,這兩種情況都會導致ctl值被修改,如果是第二種原因導致的失敗,繼續自旋更新workCount數量,
接著繼續分析回圈內部的實作,先看看第一層回圈:c代表執行緒池ctl值,rs代表執行緒池運行狀態,
if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
return false;
條件一:rs >= SHUTDOWN 成立, 說明當前執行緒池狀態不是RUNNING狀態
條件二: !(rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())
我們之前提到過,創建任務有兩種情況:
1)RUNNING狀態可以提交任務,
2)SHUTDOWN狀態下如果傳遞的任務是空且阻塞佇列中還有任務未處理的情況才是允許創建任務繼續處理的,因為阻塞佇列中的任務仍然需要繼續處理,
上面的條件一和條件二就是處理SHUTDOWN狀態下任務創建操作的判斷,
接著分析第二層回圈,先是判斷執行緒池workCount數量是否大于可創建的最大值,或者是否超過了核心執行緒數/最大執行緒數,如果是則直接回傳,addWorker()操作失敗,
接著使用compareAndIncrementWorkerCount(c)將執行緒池中workCount+1,這里使用的是CAS操作,如果成功則直接跳出最外層回圈,
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
}
如果CAS失敗,說明此時有競爭,會重新獲取ctl的值,判斷競爭失敗的原因是添加workCount數量還是修改執行緒池狀態導致的,如果執行緒池狀態未發生改變,就繼續回圈嘗試CAS增加workCount數量,接著看回圈結束后邏輯:
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (!workerStarted)
addWorkerFailed(w);
}
這里workerStarted代表worker是否已經啟動,workerAdded代表創建的worker是否添加到池子中,這里所謂的池子就是全域定義的一個HashSet結構的workers變數,
接著根據傳遞的firstTask來構建一個Worker,在Worker的構造方法中也會通過ThreadFactory創建一個執行緒,這里判斷t != null是因為用戶可以自定義ThreadFactory,如果這里用戶不是創建執行緒而是直接回傳null則會出現一些問題,所以需要判斷一下,
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
}
在往池子中添加Worker的時候,是需要先加鎖的,因為針對全域的workers操作并不是執行緒安全的,
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
繼續看下面代碼,rs代表當前執行緒池的狀態,這里還是判斷執行緒池的狀態,如果rs < SHUTDOWN代表執行緒池狀態是RUNNING狀態,此時可以直接操作,
如果是SHUTDOWN狀態,需要滿足firstTask == null才可以繼續操作,因為在SHUTDOWN狀態時不會再添加新的任務,但還是可以繼續處理workQueue中的任務,
t.isAlive() 當執行緒start后,執行緒isAlive會回傳true,這里還是防止自定義的ThreadFactory創建執行緒回傳給外部之前,將執行緒start了,由此可見Doug lea考慮問題真的很全面,
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
workers.add(w);
}
接著將創建的Worker添加到workers集合中,設定largestPoolSize,這個屬性是執行緒池生命周期內執行緒數最大值,一般是做統計資料用的, 最后修改workerAdded = true,代表當前提交的任務所創建的Worker已經添加到池子中了,
添加worker成功后,呼叫執行緒的start()方法啟動執行緒,因為Worker中重寫了run()方法,最后會執行Worker.run(),最后設定workerStarted = true后釋放全域鎖,
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
orkerAdded = true;
}
這里再回頭看看workerAdded = false的情形,如果執行緒池在lock之前,狀態發生了變化,導致添加失敗,此時workerAdded也會為false,最后執行addWorkerFailed(work)操作,這個方法是將Work從workers中移除掉,然后將workCount數量減一,最后執行tryTerminate()來嘗試關閉執行緒池,這個方法后面會細說,
runWorker()原始碼分析
在Worker類中的run方法呼叫了runWorker來執行任務,上面addWorker()方法正常的執行邏輯會創建一個Worker,然后啟動Worker中的執行緒,這里其實就會執行到runWorker方法,

runWorker的執行邏輯很簡單,啟動一個執行緒,執行當前傳遞的task任務,執行完后又不斷的從workQueue中獲取任務繼續執行,如果當前workCount數量小于核心執行緒數且佇列中沒有了任務,當前執行緒會被阻塞,這個就是getTask()的邏輯,一會會講到,
如果當前執行緒數大于核心執行緒數且佇列中沒有任務,就會回傳null,在runWorker這邊退出回圈,回收多余的worker資料,

原始碼如下:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
這里w.unlock()是為了初始化當前Work中state==0,然后設定獨占執行緒為null,因為在shutDown()方法中會嘗試獲取Worker中的鎖,如果獲取成功代表當前執行緒沒有被加鎖處于空閑狀態,給當前執行緒一個中斷信號,所以這里在執行執行緒任務的時候需要加鎖,防止呼叫shutDown()的時候給當前worker執行緒一個中斷信號,
判斷task是否為空,如果是一個空任務,那么就去workQueue中獲取任務,如果兩者都為空就會退出回圈,
while (task != null || (task = getTask()) != null) {}
最核心的就是呼叫task.run()啟動當前任務,這里面還有兩個可擴展的方法,分別是beforeExecute()/afterExecute(),我們可以在任務執行前和執行后分別自定義一些操作,其中afterExecute()可以接收到任務拋出的例外資訊,方便我們做后續處理,
while (task != null || (task = getTask()) != null) {
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
如果退出回圈,說明getTask()方法回傳null,會執行到finally中的processWorkerExit(w, completedAbruptly)方法,此方法是用來清理執行緒池中添加的work資料,completedAbruptly=true代表是例外情況下退出,
try {
while (task != null || (task = getTask()) != null) {
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
runWorker()中只是啟動了當前執行緒作業,還需要源源不斷通過getTask()方法從workQueue來獲取任務執行,在workQueue沒有任務的時候,根據執行緒池workCount和核心執行緒數的對比結果來使用processWorkerExit()執行清理作業,
getTask()原始碼分析
getTask方法用于從阻塞佇列中獲取任務,如果當前執行緒小于核心執行緒,那么當阻塞佇列中沒有任務時就會阻塞,反之會等待keepAliveTime后回傳,
這個就是keepAliveTime的使用含義:非核心的空閑執行緒等待新任務的時間,當然如果這里設定了allowCoreThreadTimeOut=true也會回收核心執行緒,
具體代碼如下:
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
這里核心代碼就是從workQueue中取任務,采用poll還是take取決于allowCoreThreadTimeOut和執行緒數量,allowCoreThreadTimeOut在構造ThreadLocalExecutor后設定的,默認為false,如果設定為true則代表核心執行緒數下的執行緒也是可以被回收的,如果使用take則表明workQueue中沒有任務當前執行緒就會被阻塞掛起,直到有了新的任務才會被喚醒,

在這里擴展下阻塞佇列的部分方法的含義,這里主要是看poll()和take()的使用區別:
阻塞佇列插入方法:
boolean add(E e):佇列沒有滿,則插入資料并回傳true;佇列滿時,拋出例外 java.lang.IllegalStateException: Queue full,
boolean offer(E e):佇列沒有滿,則插入資料并回傳true;佇列滿時,回傳false,
void put(E e):佇列沒有滿,則插入資料;佇列滿時,阻塞呼叫此方法執行緒,直到佇列有空閑空間時此執行緒進入就緒狀態,
boolean offer(E e, long timeout, TimeUnit unit):佇列沒有滿,插入資料并回傳true;佇列滿時,阻塞呼叫此方法執行緒,若指定等待的時間內還不能往佇列中插入資料,回傳false,
阻塞佇列移除(獲取)方法:
E remove():佇列非空,則以FIFO原則移除資料,并回傳該資料的值;佇列為空,拋出例外 java.util.NoSuchElementException,
E poll(): 佇列非空,移除資料,并回傳該資料的值;佇列為空,回傳null,
E take(): 佇列非空,移除資料,并回傳該資料的值;佇列為空,阻塞呼叫此方法執行緒,直到佇列為非空時此執行緒進入就緒狀態,
E poll(long timeout, TimeUnit unit):佇列非空,移除資料,并回傳該資料的值;佇列為空,阻塞呼叫此方法執行緒,若指定等待的時間內佇列都沒有資料可取,回傳null,
阻塞佇列檢查方法:
E element(): 佇列非空,則回傳隊首元素;佇列為空,拋出例外 java.util.NoSuchElementException,
E peek(): 佇列非空,則回傳隊首元素;佇列為空,回傳null,
processWorkerExit()原始碼分析
此方法的含義是清理當前執行緒,從執行緒池中移除掉剛剛添加的worker物件,

執行processWorkerExit()代表在runWorker()執行緒跳出了當前回圈,一般有兩種情況:
task.run()內部拋出例外,直接結束回圈,然后執行processWorkerExit()getTask()回傳為空,代表執行緒數量大于核心數量且workQueue中沒有任務,此時需要執行processWorkerExit()來清理多余的Worker物件
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly)、
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return;
}
addWorker(null, false);
}
}
針對于執行緒池workers的操作都會進行加鎖處理,然后將當前Worker從池子中移除,累加當前執行緒池完成的任務總數completedTaskCount,
接著呼叫tryTerminate()嘗試關閉執行緒池,這個方法后面有詳細說明,
接著判斷if (runStateLessThan(c, STOP)) {},含義是當前執行緒池狀態小于STOP,即當前執行緒池狀態當前執行緒池狀態為 RUNNING 或 SHUTDOWN,判斷當前執行緒是否是正常退出,如果當前執行緒是正常退出,那么completedAbruptly=false,接著判斷執行緒池中是否還擁有足夠多的的執行緒,因為例外退出可能導致執行緒池中執行緒數量不足,此時就要執行addWorker()為執行緒池添加新的worker資料,看下面的詳細分析:
執行最后的addWorke()有三種可能:
1)當前執行緒在執行task時 發生例外,這里一定要創建一個新worker頂上去,
2)如果!workQueue.isEmpty()說明任務佇列中還有任務,這種情況下最起碼要留一個執行緒,因為當前狀態為 RUNNING || SHUTDOWN這是前提條件,
3)當前執行緒數量 < corePoolSize值,此時會創建執行緒,維護執行緒池數量在corePoolSize個水平,
tryTerminate()原始碼分析
上面移除Worker的方法中有一個tryTerminate()方法的呼叫,這個方法是根據執行緒池狀態嘗試關閉執行緒池,
執行流程如下:

實作原始碼如下:
final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) {
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
}
}
首先是判斷執行緒池狀態:
條件一:isRunning(c) 成立,直接回傳就行,執行緒池很正常!
條件二:runStateAtLeast(c, TIDYING) 說明 已經有其它執行緒 在執行 TIDYING -> TERMINATED狀態了,當前執行緒直接回去,
條件三:(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())
SHUTDOWN特殊情況,如果是這種情況,直接回去,得等佇列中的任務處理完畢后,再轉化狀態,
接著執行:
if (workerCountOf(c) != 0) {
interruptIdleWorkers(ONLY_ONE);
return;
}
走到這個邏輯,說明執行緒池狀態 >= STOP或者執行緒池狀態為SHUTDOWN且佇列已經空了
當前執行緒池中的執行緒數量 > 0,呼叫interruptIdleWorkers()中斷一個空閑執行緒,然后回傳,我們來分析下,在getTask()回傳為空時會執行退出邏輯processWorkerExit(),這里就會呼叫tryTerminate()方法嘗試關閉執行緒池,
如果此時執行緒池狀態滿足執行緒池狀態 >= STOP或者執行緒池狀態為SHUTDOWN且佇列已經空了,如果此時執行緒池中執行緒數不為0,就會中斷一個空閑執行緒,
為什么這里只中斷一個執行緒呢?這里的設計思想是,如果執行緒數量特別多的話,只有一個執行緒去做喚醒空閑worker的任務可能會比較吃力,所以,就給了每個 被喚醒的worker執行緒 ,在真正退出之前協助 喚醒一個空閑執行緒的任務,提供吞吐量的一種常用手段,
我們順便看下interruptIdleWorkers()原始碼:
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
遍歷workers,如果執行緒是空閑狀態(空閑狀態:queue.take()和queue.poll()回傳空),則給其一個中斷信號,如果是處于workQueue阻塞的執行緒,會被喚醒,喚醒后,進入下一次自旋時,可能會return null執行退出相關的邏輯,接著又會呼叫processWorkerExit()->tryTerminate(),回到上面場景,當前執行緒退出的時候還是會繼續喚醒下一個空現執行緒,
接著往下看tryTerminate的剩余邏輯:
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
執行到這里的執行緒是誰?
workerCountOf(c) == 0 時,會來到這里,
最后一個退出的執行緒, 在 (執行緒池狀態 >= STOP || 執行緒池狀態為 SHUTDOWN 且 佇列已經空了)
執行緒喚醒后,都會執行退出邏輯,退出程序中 會 先將 workerCount計數 -1 => ctl -1,
呼叫tryTerminate 方法之前,已經減過了,所以0時,表示這是最后一個退出的執行緒了,
獲取全域鎖,進行加鎖操作,通過CAS設定執行緒池狀態為TIDYING狀態,設定成功則執行terminated()方法,這也是一個自定義擴展的方法,當執行緒池中止的時候會呼叫此方法,
最后設定執行緒池狀態為TERMINATED狀態,喚醒呼叫awaitTermination()方法的執行緒,
awaitTermination()原始碼分析
該方法是判斷執行緒池狀態是否達到TERMINATED,如果達到了則直接回傳true,沒有達到則會await掛起當前執行緒指定的時間,
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
if (nanos <= 0)
return false;
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
在每次執行tryTerminate()后會喚醒所有被await的執行緒,繼續判斷執行緒池狀態,
shutDown()/shutDownNow()原始碼分析
shutDown和shutDown()方法都是直接改變執行緒池狀態的方法,一般我們在系統關閉之前會呼叫此方法優雅的關閉執行緒池,
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown();
} finally {
mainLock.unlock();
}
tryTerminate();
}
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
shutdown和shutdownNow方法呼叫差不多,只是shutdown是將執行緒池狀態設定為SHUTDOWN,shutdownNow是將執行緒池狀態設定為STOP,
shutdownNow會回傳所有未處理的task集合,
來看看它們共同呼叫的一些方法:
private void advanceRunState(int targetState) {
for (;;) {
int c = ctl.get();
if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
這個方法是設定執行緒池狀態為指定狀態,runStateAtLeast(c, targetState),判斷當前執行緒池ctl值,如果小于targetState則會往后執行,
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))),通過CAS指令,修改ctl中執行緒池狀態為傳入的targetState,
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
interruptIdleWorkers含義是為空閑的執行緒設定中斷標識,這里要清楚worker什么時候空閑?我們在上面講解runWorker()方法時,執行task.run()之前,要針對Worker物件加鎖,設定Worker中的state值為1,防止運行的worker被添加中斷標識,接著執行getTask()方法,獲取阻塞佇列中的任務,如果是queue.take()則會阻塞掛起當前執行緒,釋放鎖,此時執行緒處于空閑狀態,如果是queue.pool()回傳為空,runWorker()會釋放鎖,此時執行緒也是空閑狀態,
執行interrupt()后處于queue阻塞的執行緒,會被喚醒,喚醒后,進入下一次自旋判斷執行緒池狀態是否改變,如果改變可能直接回傳空,這里具體參看runWorker()和getTask()方法,
onShutdown()也是一個擴展方法,需要子類去重寫,這里代表當執行緒池關閉后需要做的事情,drainQueue()方法是獲取workQueue中現有的的任務串列,
問題回顧
-
ThreadPoolExecutor中常用引數有哪些?
上面介紹過了,參見的引數是指ThreadPoolExecutor的構造引數,一般面試的時候都會先問這個,要解釋每個引數的含義及作用, -
ThreadPoolExecutor中執行緒池狀態和執行緒數量如何存盤的?
通過AtomicInteger型別的變數ctl來存盤,前3位代表執行緒池狀態,后29位代表執行緒池中執行緒數量, -
ThreadPoolExecutor有哪些狀態,狀態之間流轉是什么樣子的?
RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED

-
ThreadPoolExecutor任務處理策略?
這個問題就是考察execute()的執行程序,只要看過原始碼就不會有問題,

-
ThreadPoolExecutor常用的拒絕策略有哪些?
策略處理該任務,執行緒池提供了4種策略:
1)AbortPolicy:直接拋出例外,默認策略
2)CallerRunsPolicy:用呼叫者所在的執行緒來執行任務
3)DiscardOldestPolicy:丟棄阻塞佇列中靠最前的任務,并執行當前任務
4)DiscardPolicy:直接丟棄任務
當然執行緒池是支持自定義拒絕策略的,需要實作RejectedExecutionHandler介面中rejectedExecution()方法即可, -
Executors工具類提供的執行緒池有哪些?有哪些缺陷?
1) FixedThreadPool 和 SingleThreadPool:允許的請求佇列長度為 Integer.MAX_VALUE,可能會堆積大量的請求,從而導致 OOM,
2) CachedThreadPool:允許的創建執行緒數量為 Integer.MAX_VALUE,可能會創建大量的執行緒,從而導致 OOM,
所以阿里巴巴也建議我們要自定義執行緒池核心執行緒數以及阻塞佇列的長度, -
ThreadPoolExecutor核心執行緒池中執行緒預熱功能?
在創建執行緒池后,可以使用prestartAllCoreThreads()來預熱核心執行緒池,public int prestartAllCoreThreads() { int n = 0; while (addWorker(null, true)) ++n; return n; } -
ThreadPoolExecutor中創建的執行緒如何被復用的?
這個主要是看runWorker()和getTask()兩個方法的執行流程,當執行任務時呼叫runWorker()方法,執行完成后會繼續從workQueue中獲取任務繼續執行,已達到執行緒復用的效果,當然這里還有一些細節,可以回頭看看上面的原始碼決議, -
ThreadPoolExecutor中關閉執行緒池的方法shutdown與shutdownNow的區別?
最大的區別就是shutdown()會將執行緒池狀態變為SHUTDOWN,此時新任務不能被提交,workQueue中還存有的任務可以繼續執行,同時會像執行緒池中空閑的狀態發出中斷信號,
shutdownNow()方法是將執行緒池的狀態設定為STOP,此時新任務不能被提交,執行緒池中所有執行緒都會收到中斷的信號,如果執行緒處于wait狀態,那么中斷狀態會被清除,同時拋出InterruptedException, -
ThreadPoolExecutor中存在的一些擴展點?
鉤子方法:
1)beforeExecute()/afterExecute():runWorker()中執行緒執行前和執行后會呼叫的鉤子方法
2)terminated:執行緒池的狀態從TIDYING狀態流轉為TERMINATED狀態時terminated方法會被呼叫的鉤子方法,
3)onShutdown:當我們執行shutdown()方法時預留的鉤子方法, -
ThreadPoolExecutor支持動態調整核心執行緒數、最大執行緒數、佇列長度等一些列引數嗎?怎么操作?
運行期間可動態調整引數的方法:
1)setCorePoolSize():動態調整執行緒池核心執行緒數
2)setMaximumPoolSize():動態調整執行緒池最大執行緒數
3)setKeepAliveTime(): 空閑執行緒存活時間,如果設定了allowsCoreThreadTimeOut=true,核心執行緒也會被回收,默認只回收非核心執行緒
4)allowsCoreThreadTimeOut():是否允許回收核心執行緒,如果是true,在getTask()方法中,獲取workQueue就采用workQueue.poll(keepAliveTime),如果超過等待時間就會被回收,
總結
這篇執行緒池原始碼覆寫到了ThreadPoolExecutor中大部分代碼,我相信認真閱讀完后肯定會對執行緒池有更深刻的理解,如有疑問或者建議可關注公眾號給我私信,我都會一一為大家解答,
另外推薦一個我的up主朋友,他自己錄制了好多學習視頻并分享在B站上了,大家有時間可以看一下(PS:非恰飯非利益相關,良心推薦):小劉講原始碼-B站UP主

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/184109.html
標籤:Java
