尊重原創著作權: https://www.gewuweb.com/hot/10338.html
「超詳細」Java執行緒池原始碼決議
尊重原創著作權: https://www.gewuweb.com/sitemap.html
繞不開的執行緒池
只看ThreadPoolExecutor的英文語意就能知道這是一個與執行緒池有關的類,
執行緒池是一種池化技術,Java中類似的池化技術有很多,
常見的有:
- 資料庫連接池
- redis連接池
- http連接池
- 記憶體池
- 執行緒池
池化技術的作用:把一些能夠復用的東西(比如說連接、執行緒)放到初始化好的池中,便于資源統一管理,
這樣做的好處:
避免重復創建、銷毀、調度的開銷,提高性能 保證內核的充分利用,防止過分調度 自定義引數配置達到最佳的使用效果
ThreadPoolExecutor 知識點
Java中創建執行緒池的方法
不推薦
通過Executors類的靜態方法創建如下執行緒池
- FixedThreadPool (固定個數)
- ScheduledThreadPool (執行周期性任務)
- WorkStealingPool (根據當前電腦CPU處理器數量生成相應執行緒數)
- CachedThreadPool (帶快取功能)
- SingleThreadPool (單個執行緒)
推薦
通過ThreadPoolExecutor創建執行緒池
// 給執行緒定義有業務含義的名稱
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("thread-pool-%s").build();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
5, // 執行緒池核心執行緒數
10, // 執行緒池最大執行緒數,達到最大值后執行緒池不會再增加執行緒
1000, // 執行緒池中超過corePoolSize數目的空閑執行緒最大存活時間
TimeUnit.MILLISECONDS, // 時間單位,毫秒
new LinkedBlockingQueue<>(50), // 作業執行緒等待佇列
threadFactory, // 自定義執行緒工廠
new ThreadPoolExecutor.AbortPolicy()); // 執行緒池滿時的拒絕策略
復制代碼
為什么
先來看看阿里巴巴出品的《Java開發手冊》中怎么說?
再來看看原始碼怎么寫?
以SingleThreadPool為例,其實作也是通過ThreadPoolExecutor的構造方法創建的執行緒池,
之所以不推薦的原因是其使用了LinkedBlockingQueue作為作業執行緒的等待佇列,其是一種無界緩沖等待佇列,該佇列的默認構造器定義的長度為Integer.MAX_VALUE
FixedThreadPool同理
CachedThreadPool采用了SynchronousQueue佇列,也是一種無界無緩沖等待佇列,而且其最大執行緒數是Integer.MAX_VALUE
ScheduledThreadPool采用了DelayedWorkQueue佇列,是一種無界阻塞佇列,其最大執行緒數是Integer.MAX_VALUE
以上四種執行緒池都有OOM的風險
相反,在使用ThreadPoolExecutor時,我們可以指定有界/無界阻塞佇列,并指定初始長度,
ThreadPoolExecutor原始碼分析
執行緒池生命周期
執行緒池狀態
|
狀態釋義
---|---
RUNNING
|
執行緒池被創建后的初始狀態,能接受新提交的任務,并且也能處理阻塞佇列中的任務
SHUTDOWN
|
關閉狀態,不再接受新提交的任務,但仍可以繼續處理已進入阻塞佇列中的任務
STOP
|
會中斷正在處理任務的執行緒,不能再接受新任務,也不繼續處理佇列中的任務
TIDYING
|
所有的任務都已終止,workerCount(有效作業執行緒數)為0
TERMINATED
|
執行緒池徹底終止運行
Tips:千萬不要把執行緒池的狀態和執行緒的狀態弄混了,補一張網上的執行緒狀態圖
Tips:當執行緒呼叫start(),執行緒在JVM中不一定立即執行,有可能要等待作業系統分配資源,此時為READY狀態,當執行緒獲得資源時進入RUNNING狀態,才會真正開始執行,
拒絕策略
- CallerRunsPolicy(在當前執行緒中執行)
- AbortPolicy(直接拋出RejectedExecutionException)
- DiscardPolicy(直接丟棄執行緒)
- DiscardOldestPolicy(丟棄一個未被處理的最久的執行緒,然后重試)
當沒有顯示指明拒絕策略時,默認使用AbortPolicy
ThreadPoolExecutor類圖
通過IDEA的Diagrams工具查看UML類圖,繼承關系一目了然
ThreadPoolExecutor類中的方法很多,最核心就是構造執行緒池的方法和執行執行緒任務的方法,先前已經給出了標準的構造方法,接下來就講一講如何執行執行緒任務...
任務執行機制
- 通過執行execute方法
該方法無回傳值,為ThreadPoolExecutor自帶方法,傳入Runnable型別物件
- 通過執行submit方法
該方法回傳值為Future物件,為抽象類AbstractExecutorService的方法,被ThreadPoolExecutor繼承,其內部實作也是呼叫了介面類Executor的execute方法,通過上面的類圖可以看到,該方法的實作依然是ThreadPoolExecutor的execute方法
execute()執行流程圖
execute()原始碼解讀
// 使用原子操作類AtomicInteger的ctl變數,前3位記錄執行緒池的狀態,后29位記錄執行緒數
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// Integer的范圍為[-2^31,2^31 -1], Integer.SIZE-3 =32-3= 29,用來輔助左移位運算
private static final int COUNT_BITS = Integer.SIZE - 3;
// 高三位用來存盤執行緒池運行狀態,其余位數表示執行緒池的容量
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 執行緒池狀態以常量值被存盤在高三位中
private static final int RUNNING = -1 << COUNT_BITS; // 執行緒池接受新任務并會處理阻塞佇列中的任務
private static final int SHUTDOWN = 0 << COUNT_BITS; // 執行緒池不接受新任務,但會處理阻塞佇列中的任務
private static final int STOP = 1 << COUNT_BITS; // 執行緒池不接受新的任務且不會處理阻塞佇列中的任務,并且會中斷正在執行的任務
private static final int TIDYING = 2 << COUNT_BITS; // 所有任務都執行完成,且作業執行緒數為0,將呼叫terminated方法
private static final int TERMINATED = 3 << COUNT_BITS; // 最終狀態,為執行terminated()方法后的狀態
// ctl變數的封箱拆箱相關的方法
private static int runStateOf(int c) { return c & ~CAPACITY; } // 獲取執行緒池運行狀態
private static int workerCountOf(int c) { return c & CAPACITY; } // 獲取執行緒池運行執行緒數
private static int ctlOf(int rs, int wc) { return rs | wc; } // 獲取ctl物件
復制代碼
public void execute(Runnable command) {
if (command == null) // 任務為空,拋出NPE
throw new NullPointerException();
int c = ctl.get(); // 獲取當前作業執行緒數和執行緒池運行狀態(共32位,前3位為運行狀態,后29位為運行執行緒數)
if (workerCountOf(c) < corePoolSize) { // 如果當前作業執行緒數小于核心執行緒數
if (addWorker(command, true)) // 在addWorker中創建作業執行緒并執行任務
return;
c = ctl.get();
}
// 核心執行緒數已滿(作業執行緒數>核心執行緒數)才會走下面的邏輯
if (isRunning(c) && workQueue.offer(command)) { // 如果當前執行緒池狀態為RUNNING,并且任務成功添加到阻塞佇列
int recheck = ctl.get(); // 雙重檢查,因為從上次檢查到進入此方法,執行緒池可能已成為SHUTDOWN狀態
if (! isRunning(recheck) && remove(command)) // 如果當前執行緒池狀態不是RUNNING則從佇列洗掉任務
reject(command); // 執行拒絕策略
else if (workerCountOf(recheck) == 0) // 當執行緒池中的workerCount為0時,此時workQueue中還有待執行的任務,則新增一個addWorker,消費workqueue中的任務
addWorker(null, false);
}
// 阻塞佇列已滿才會走下面的邏輯
else if (!addWorker(command, false)) // 嘗試增加作業執行緒執行command
// 如果當前執行緒池為SHUTDOWN狀態或者執行緒池已飽和
reject(command); // 執行拒絕策略
}
復制代碼
private boolean addWorker(Runnable firstTask, boolean core) {
retry: // 回圈退出標志位
for (;;) { // 無限回圈
int c = ctl.get();
int rs = runStateOf(c); // 執行緒池狀態
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()) // 換成更直觀的條件陳述句
// (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty())
)
// 回傳false的條件就可以分解為:
//(1)執行緒池狀態為STOP,TIDYING,TERMINATED
//(2)執行緒池狀態為SHUTDOWN,且要執行的任務不為空
//(3)執行緒池狀態為SHUTDOWN,且任務佇列為空
return false;
// cas自旋增加執行緒個數
for (;;) {
int wc = workerCountOf(c); // 當前作業執行緒數
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize)) // 作業執行緒數>=執行緒池容量 || 作業執行緒數>=(核心執行緒數||最大執行緒數)
return false;
if (compareAndIncrementWorkerCount(c)) // 執行cas操作,添加執行緒個數
break retry; // 添加成功,退出外層回圈
// 通過cas添加失敗
c = ctl.get();
// 執行緒池狀態是否變化,變化則跳到外層回圈重試重新獲取執行緒池狀態,否者內層回圈重新cas
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 簡單總結上面的CAS程序:
//(1)內層回圈作用是使用cas增加執行緒個數,如果執行緒個數超限則回傳false,否者進行cas
//(2)cas成功則退出雙回圈,否者cas失敗了,要看當前執行緒池的狀態是否變化了
//(3)如果變了,則重新進入外層回圈重新獲取執行緒池狀態,否者重新進入內層回圈繼續進行cas
// 走到這里說明cas成功,執行緒數+1,但并未被執行
boolean workerStarted = false; // 作業執行緒呼叫start()方法標志
boolean workerAdded = false; // 作業執行緒被添加標志
Worker w = null;
try {
w = new Worker(firstTask); // 創建作業執行緒實體
final Thread t = w.thread; // 獲取作業執行緒持有的執行緒實體
if (t != null) {
final ReentrantLock mainLock = this.mainLock; // 使用全域可重入鎖
mainLock.lock(); // 加鎖,控制并發
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get()); // 獲取當前執行緒池狀態
// 執行緒池狀態為RUNNING或者(執行緒池狀態為SHUTDOWN并且沒有新任務時)
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // 檢查執行緒是否處于活躍狀態
throw new IllegalThreadStateException();
workers.add(w); // 執行緒加入到存放作業執行緒的HashSet容器,workers全域唯一并被mainLock持有
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock(); // finally塊中釋放鎖
}
if (workerAdded) { // 執行緒添加成功
t.start(); // 呼叫執行緒的start()方法
workerStarted = true;
}
}
} finally {
if (! workerStarted) // 如果執行緒啟動失敗,則執行addWorkerFailed方法
addWorkerFailed(w);
}
return workerStarted;
}
復制代碼
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w); // 執行緒啟動失敗時,需將前面添加的執行緒洗掉
decrementWorkerCount(); // ctl變數中的作業執行緒數-1
tryTerminate(); // 嘗試將執行緒池轉變成TERMINATE狀態
} finally {
mainLock.unlock();
}
}
復制代碼
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 以下情況不會進入TERMINATED狀態:
//(1)當前執行緒池為RUNNING狀態
//(2)在TIDYING及以上狀態
//(3)SHUTDOWN狀態并且作業佇列不為空
//(4)當前活躍執行緒數不等于0
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) { // 作業執行緒數!=0
interruptIdleWorkers(ONLY_ONE); // 中斷一個正在等待任務的執行緒
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 通過CAS自旋判斷直到當前執行緒池運行狀態為TIDYING并且活躍執行緒數為0
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated(); // 呼叫執行緒terminated()
} finally {
ctl.set(ctlOf(TERMINATED, 0)); // 設定執行緒池狀態為TERMINATED,作業執行緒數為0
termination.signalAll(); // 通過呼叫Condition介面的signalAll()喚醒所有等待的執行緒
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
復制代碼
Worker原始碼解讀
Worker是ThreadPoolExecutor類的內部類,此處只講最重要的建構式和run方法
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
{
// 該worker正在運行的執行緒
final Thread thread;
// 將要運行的初始任務
Runnable firstTask;
// 每個執行緒的任務計數器
volatile long completedTasks;
// 構造方法
Worker(Runnable firstTask) {
setState(-1); // 呼叫runWorker()前禁止中斷
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this); // 通過ThreadFactory創建一個執行緒
}
// 實作了Runnable介面的run方法
public void run() {
runWorker(this);
}
... // 此處省略了其他方法
}
復制代碼
Worker實作了Runable介面,在呼叫start()方法后,實際執行的是run方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask; // 獲取作業執行緒中用來執行任務的執行緒實體
w.firstTask = null;
w.unlock(); // status設定為0,允許中斷
boolean completedAbruptly = true; // 執行緒意外終止標志
try {
// 如果當前任務不為空,則直接執行;否則呼叫getTask()從任務佇列中取出一個任務執行
while (task != null || (task = getTask()) != null) {
w.lock(); // 加鎖,保證下方臨界區代碼的執行緒安全
// 如果狀態值大于等于STOP且當前執行緒還沒有被中斷,則主動中斷執行緒
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt(); // 中斷當前執行緒
try {
beforeExecute(wt, task); // 任務執行前的回呼,空實作,可以在子類中自定義
Throwable thrown = null;
try {
task.run(); // 執行執行緒的run方法
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown); // 任務執行后的回呼,空實作,可以在子類中自定義
}
} finally {
task = null; // 將回圈變數task設定為null,表示已處理完成
w.completedTasks++; // 當前已完成的任務數+1
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
復制代碼
從任務佇列中取出一個任務
private Runnable getTask() {
boolean timedOut = false; // 通過timeOut變數表示執行緒是否空閑時間超時了
// 無限回圈
for (;;) {
int c = ctl.get(); // 執行緒池資訊
int rs = runStateOf(c); // 執行緒池當前狀態
// 如果執行緒池狀態>=SHUTDOWN并且作業佇列為空 或 執行緒池狀態>=STOP,則回傳null,讓當前worker被銷毀
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount(); // 作業執行緒數-1
return null;
}
int wc = workerCountOf(c); // 獲取當前執行緒池的作業執行緒數
// 當前執行緒是否允許超時銷毀的標志
// 允許超時銷毀:當執行緒池允許核心執行緒超時 或 作業執行緒數>核心執行緒數
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 如果(當前執行緒數大于最大執行緒數 或 (允許超時銷毀 且 當前發生了空閑時間超時))
// 且(當前執行緒數大于1 或 阻塞佇列為空)
// 則減少worker計數并回傳null
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 根據執行緒是否允許超時判斷用poll還是take(會阻塞)方法從任務佇列頭部取出一個任務
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r; // 回傳從佇列中取出的任務
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
復制代碼
總結一下哪些情況getTask()會回傳null:
執行緒池狀態為SHUTDOWN且任務佇列為空 執行緒池狀態為STOP、TIDYING、TERMINATED 執行緒池執行緒數大于最大執行緒數
執行緒可以被超時回收的情況下等待新任務超時
作業執行緒退出
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果completedAbruptly為true則表示任務執行程序中拋出了未處理的例外
// 所以還沒有正確地減少worker計數,這里需要減少一次worker計數
if (completedAbruptly)
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 把將被銷毀的執行緒已完成的任務數累加到執行緒池的完成任務總數上
completedTaskCount += w.completedTasks;
workers.remove(w); // 從作業執行緒集合中移除該作業執行緒
} finally {
mainLock.unlock();
}
// 嘗試結束執行緒池
tryTerminate();
int c = ctl.get();
// 如果是RUNNING 或 SHUTDOWN狀態
if (runStateLessThan(c, STOP)) {
// worker是正常執行完
if (!completedAbruptly) {
// 如果允許核心執行緒超時則最小執行緒數是0,否則最小執行緒數等于核心執行緒數
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 如果阻塞佇列非空,則至少要有一個執行緒繼續執行剩下的任務
if (min == 0 && ! workQueue.isEmpty())
min = 1;
// 如果當前執行緒數已經滿足最小執行緒數要求,則不需要再創建替代執行緒
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 重新創建一個worker來代替被銷毀的執行緒
addWorker(null, false);
}
}
Worker(null, false);
}
}
尊重原創著作權: https://www.gewuweb.com/sitemap.html
尊重原創著作權: https://www.gewuweb.com/hot/13313.html
「超詳細」Java執行緒池原始碼決議
繞不開的執行緒池
只看ThreadPoolExecutor的英文語意就能知道這是一個與執行緒池有關的類,
執行緒池是一種池化技術,Java中類似的池化技術有很多,
常見的有:
- 資料庫連接池
- redis連接池
- http連接池
- 記憶體池
- 執行緒池
池化技術的作用:把一些能夠復用的東西(比如說連接、執行緒)放到初始化好的池中,便于資源統一管理,
這樣做的好處:
避免重復創建、銷毀、調度的開銷,提高性能 保證內核的充分利用,防止過分調度 自定義引數配置達到最佳的使用效果
ThreadPoolExecutor 知識點
Java中創建執行緒池的方法
不推薦
通過Executors類的靜態方法創建如下執行緒池
- FixedThreadPool (固定個數)
- ScheduledThreadPool (執行周期性任務)
- WorkStealingPool (根據當前電腦CPU處理器數量生成相應執行緒數)
- CachedThreadPool (帶快取功能)
- SingleThreadPool (單個執行緒)
推薦
通過ThreadPoolExecutor創建執行緒池
// 給執行緒定義有業務含義的名稱
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("thread-pool-%s").build();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
5, // 執行緒池核心執行緒數
10, // 執行緒池最大執行緒數,達到最大值后執行緒池不會再增加執行緒
1000, // 執行緒池中超過corePoolSize數目的空閑執行緒最大存活時間
TimeUnit.MILLISECONDS, // 時間單位,毫秒
new LinkedBlockingQueue<>(50), // 作業執行緒等待佇列
threadFactory, // 自定義執行緒工廠
new ThreadPoolExecutor.AbortPolicy()); // 執行緒池滿時的拒絕策略
復制代碼
為什么
先來看看阿里巴巴出品的《Java開發手冊》中怎么說?
再來看看原始碼怎么寫?
以SingleThreadPool為例,其實作也是通過ThreadPoolExecutor的構造方法創建的執行緒池,
之所以不推薦的原因是其使用了LinkedBlockingQueue作為作業執行緒的等待佇列,其是一種無界緩沖等待佇列,該佇列的默認構造器定義的長度為Integer.MAX_VALUE
FixedThreadPool同理
CachedThreadPool采用了SynchronousQueue佇列,也是一種無界無緩沖等待佇列,而且其最大執行緒數是Integer.MAX_VALUE
ScheduledThreadPool采用了DelayedWorkQueue佇列,是一種無界阻塞佇列,其最大執行緒數是Integer.MAX_VALUE
以上四種執行緒池都有OOM的風險
相反,在使用ThreadPoolExecutor時,我們可以指定有界/無界阻塞佇列,并指定初始長度,
ThreadPoolExecutor原始碼分析
執行緒池生命周期
執行緒池狀態
|
狀態釋義
---|---
RUNNING
|
執行緒池被創建后的初始狀態,能接受新提交的任務,并且也能處理阻塞佇列中的任務
SHUTDOWN
|
關閉狀態,不再接受新提交的任務,但仍可以繼續處理已進入阻塞佇列中的任務
STOP
|
會中斷正在處理任務的執行緒,不能再接受新任務,也不繼續處理佇列中的任務
TIDYING
|
所有的任務都已終止,workerCount(有效作業執行緒數)為0
TERMINATED
|
執行緒池徹底終止運行
Tips:千萬不要把執行緒池的狀態和執行緒的狀態弄混了,補一張網上的執行緒狀態圖
Tips:當執行緒呼叫start(),執行緒在JVM中不一定立即執行,有可能要等待作業系統分配資源,此時為READY狀態,當執行緒獲得資源時進入RUNNING狀態,才會真正開始執行,
拒絕策略
- CallerRunsPolicy(在當前執行緒中執行)
- AbortPolicy(直接拋出RejectedExecutionException)
- DiscardPolicy(直接丟棄執行緒)
- DiscardOldestPolicy(丟棄一個未被處理的最久的執行緒,然后重試)
當沒有顯示指明拒絕策略時,默認使用AbortPolicy
ThreadPoolExecutor類圖
通過IDEA的Diagrams工具查看UML類圖,繼承關系一目了然
ThreadPoolExecutor類中的方法很多,最核心就是構造執行緒池的方法和執行執行緒任務的方法,先前已經給出了標準的構造方法,接下來就講一講如何執行執行緒任務...
任務執行機制
- 通過執行execute方法
該方法無回傳值,為ThreadPoolExecutor自帶方法,傳入Runnable型別物件
- 通過執行submit方法
該方法回傳值為Future物件,為抽象類AbstractExecutorService的方法,被ThreadPoolExecutor繼承,其內部實作也是呼叫了介面類Executor的execute方法,通過上面的類圖可以看到,該方法的實作依然是ThreadPoolExecutor的execute方法
execute()執行流程圖
execute()原始碼解讀
// 使用原子操作類AtomicInteger的ctl變數,前3位記錄執行緒池的狀態,后29位記錄執行緒數
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// Integer的范圍為[-2^31,2^31 -1], Integer.SIZE-3 =32-3= 29,用來輔助左移位運算
private static final int COUNT_BITS = Integer.SIZE - 3;
// 高三位用來存盤執行緒池運行狀態,其余位數表示執行緒池的容量
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 執行緒池狀態以常量值被存盤在高三位中
private static final int RUNNING = -1 << COUNT_BITS; // 執行緒池接受新任務并會處理阻塞佇列中的任務
private static final int SHUTDOWN = 0 << COUNT_BITS; // 執行緒池不接受新任務,但會處理阻塞佇列中的任務
private static final int STOP = 1 << COUNT_BITS; // 執行緒池不接受新的任務且不會處理阻塞佇列中的任務,并且會中斷正在執行的任務
private static final int TIDYING = 2 << COUNT_BITS; // 所有任務都執行完成,且作業執行緒數為0,將呼叫terminated方法
private static final int TERMINATED = 3 << COUNT_BITS; // 最終狀態,為執行terminated()方法后的狀態
// ctl變數的封箱拆箱相關的方法
private static int runStateOf(int c) { return c & ~CAPACITY; } // 獲取執行緒池運行狀態
private static int workerCountOf(int c) { return c & CAPACITY; } // 獲取執行緒池運行執行緒數
private static int ctlOf(int rs, int wc) { return rs | wc; } // 獲取ctl物件
復制代碼
public void execute(Runnable command) {
if (command == null) // 任務為空,拋出NPE
throw new NullPointerException();
int c = ctl.get(); // 獲取當前作業執行緒數和執行緒池運行狀態(共32位,前3位為運行狀態,后29位為運行執行緒數)
if (workerCountOf(c) < corePoolSize) { // 如果當前作業執行緒數小于核心執行緒數
if (addWorker(command, true)) // 在addWorker中創建作業執行緒并執行任務
return;
c = ctl.get();
}
// 核心執行緒數已滿(作業執行緒數>核心執行緒數)才會走下面的邏輯
if (isRunning(c) && workQueue.offer(command)) { // 如果當前執行緒池狀態為RUNNING,并且任務成功添加到阻塞佇列
int recheck = ctl.get(); // 雙重檢查,因為從上次檢查到進入此方法,執行緒池可能已成為SHUTDOWN狀態
if (! isRunning(recheck) && remove(command)) // 如果當前執行緒池狀態不是RUNNING則從佇列洗掉任務
reject(command); // 執行拒絕策略
else if (workerCountOf(recheck) == 0) // 當執行緒池中的workerCount為0時,此時workQueue中還有待執行的任務,則新增一個addWorker,消費workqueue中的任務
addWorker(null, false);
}
// 阻塞佇列已滿才會走下面的邏輯
else if (!addWorker(command, false)) // 嘗試增加作業執行緒執行command
// 如果當前執行緒池為SHUTDOWN狀態或者執行緒池已飽和
reject(command); // 執行拒絕策略
}
復制代碼
private boolean addWorker(Runnable firstTask, boolean core) {
retry: // 回圈退出標志位
for (;;) { // 無限回圈
int c = ctl.get();
int rs = runStateOf(c); // 執行緒池狀態
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()) // 換成更直觀的條件陳述句
// (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty())
)
// 回傳false的條件就可以分解為:
//(1)執行緒池狀態為STOP,TIDYING,TERMINATED
//(2)執行緒池狀態為SHUTDOWN,且要執行的任務不為空
//(3)執行緒池狀態為SHUTDOWN,且任務佇列為空
return false;
// cas自旋增加執行緒個數
for (;;) {
int wc = workerCountOf(c); // 當前作業執行緒數
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize)) // 作業執行緒數>=執行緒池容量 || 作業執行緒數>=(核心執行緒數||最大執行緒數)
return false;
if (compareAndIncrementWorkerCount(c)) // 執行cas操作,添加執行緒個數
break retry; // 添加成功,退出外層回圈
// 通過cas添加失敗
c = ctl.get();
// 執行緒池狀態是否變化,變化則跳到外層回圈重試重新獲取執行緒池狀態,否者內層回圈重新cas
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 簡單總結上面的CAS程序:
//(1)內層回圈作用是使用cas增加執行緒個數,如果執行緒個數超限則回傳false,否者進行cas
//(2)cas成功則退出雙回圈,否者cas失敗了,要看當前執行緒池的狀態是否變化了
//(3)如果變了,則重新進入外層回圈重新獲取執行緒池狀態,否者重新進入內層回圈繼續進行cas
// 走到這里說明cas成功,執行緒數+1,但并未被執行
boolean workerStarted = false; // 作業執行緒呼叫start()方法標志
boolean workerAdded = false; // 作業執行緒被添加標志
Worker w = null;
try {
w = new Worker(firstTask); // 創建作業執行緒實體
final Thread t = w.thread; // 獲取作業執行緒持有的執行緒實體
if (t != null) {
final ReentrantLock mainLock = this.mainLock; // 使用全域可重入鎖
mainLock.lock(); // 加鎖,控制并發
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get()); // 獲取當前執行緒池狀態
// 執行緒池狀態為RUNNING或者(執行緒池狀態為SHUTDOWN并且沒有新任務時)
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // 檢查執行緒是否處于活躍狀態
throw new IllegalThreadStateException();
workers.add(w); // 執行緒加入到存放作業執行緒的HashSet容器,workers全域唯一并被mainLock持有
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock(); // finally塊中釋放鎖
}
if (workerAdded) { // 執行緒添加成功
t.start(); // 呼叫執行緒的start()方法
workerStarted = true;
}
}
} finally {
if (! workerStarted) // 如果執行緒啟動失敗,則執行addWorkerFailed方法
addWorkerFailed(w);
}
return workerStarted;
}
復制代碼
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w); // 執行緒啟動失敗時,需將前面添加的執行緒洗掉
decrementWorkerCount(); // ctl變數中的作業執行緒數-1
tryTerminate(); // 嘗試將執行緒池轉變成TERMINATE狀態
} finally {
mainLock.unlock();
}
}
復制代碼
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 以下情況不會進入TERMINATED狀態:
//(1)當前執行緒池為RUNNING狀態
//(2)在TIDYING及以上狀態
//(3)SHUTDOWN狀態并且作業佇列不為空
//(4)當前活躍執行緒數不等于0
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) { // 作業執行緒數!=0
interruptIdleWorkers(ONLY_ONE); // 中斷一個正在等待任務的執行緒
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 通過CAS自旋判斷直到當前執行緒池運行狀態為TIDYING并且活躍執行緒數為0
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated(); // 呼叫執行緒terminated()
} finally {
ctl.set(ctlOf(TERMINATED, 0)); // 設定執行緒池狀態為TERMINATED,作業執行緒數為0
termination.signalAll(); // 通過呼叫Condition介面的signalAll()喚醒所有等待的執行緒
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
復制代碼
Worker原始碼解讀
Worker是ThreadPoolExecutor類的內部類,此處只講最重要的建構式和run方法
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
{
// 該worker正在運行的執行緒
final Thread thread;
// 將要運行的初始任務
Runnable firstTask;
// 每個執行緒的任務計數器
volatile long completedTasks;
// 構造方法
Worker(Runnable firstTask) {
setState(-1); // 呼叫runWorker()前禁止中斷
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this); // 通過ThreadFactory創建一個執行緒
}
// 實作了Runnable介面的run方法
public void run() {
runWorker(this);
}
... // 此處省略了其他方法
}
復制代碼
Worker實作了Runable介面,在呼叫start()方法后,實際執行的是run方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask; // 獲取作業執行緒中用來執行任務的執行緒實體
w.firstTask = null;
w.unlock(); // status設定為0,允許中斷
boolean completedAbruptly = true; // 執行緒意外終止標志
try {
// 如果當前任務不為空,則直接執行;否則呼叫getTask()從任務佇列中取出一個任務執行
while (task != null || (task = getTask()) != null) {
w.lock(); // 加鎖,保證下方臨界區代碼的執行緒安全
// 如果狀態值大于等于STOP且當前執行緒還沒有被中斷,則主動中斷執行緒
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt(); // 中斷當前執行緒
try {
beforeExecute(wt, task); // 任務執行前的回呼,空實作,可以在子類中自定義
Throwable thrown = null;
try {
task.run(); // 執行執行緒的run方法
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown); // 任務執行后的回呼,空實作,可以在子類中自定義
}
} finally {
task = null; // 將回圈變數task設定為null,表示已處理完成
w.completedTasks++; // 當前已完成的任務數+1
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
復制代碼
從任務佇列中取出一個任務
private Runnable getTask() {
boolean timedOut = false; // 通過timeOut變數表示執行緒是否空閑時間超時了
// 無限回圈
for (;;) {
int c = ctl.get(); // 執行緒池資訊
int rs = runStateOf(c); // 執行緒池當前狀態
// 如果執行緒池狀態>=SHUTDOWN并且作業佇列為空 或 執行緒池狀態>=STOP,則回傳null,讓當前worker被銷毀
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount(); // 作業執行緒數-1
return null;
}
int wc = workerCountOf(c); // 獲取當前執行緒池的作業執行緒數
// 當前執行緒是否允許超時銷毀的標志
// 允許超時銷毀:當執行緒池允許核心執行緒超時 或 作業執行緒數>核心執行緒數
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 如果(當前執行緒數大于最大執行緒數 或 (允許超時銷毀 且 當前發生了空閑時間超時))
// 且(當前執行緒數大于1 或 阻塞佇列為空)
// 則減少worker計數并回傳null
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 根據執行緒是否允許超時判斷用poll還是take(會阻塞)方法從任務佇列頭部取出一個任務
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r; // 回傳從佇列中取出的任務
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
復制代碼
總結一下哪些情況getTask()會回傳null:
執行緒池狀態為SHUTDOWN且任務佇列為空 執行緒池狀態為STOP、TIDYING、TERMINATED 執行緒池執行緒數大于最大執行緒數
執行緒可以被超時回收的情況下等待新任務超時
作業執行緒退出
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果completedAbruptly為true則表示任務執行程序中拋出了未處理的例外
// 所以還沒有正確地減少worker計數,這里需要減少一次worker計數
if (completedAbruptly)
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 把將被銷毀的執行緒已完成的任務數累加到執行緒池的完成任務總數上
completedTaskCount += w.completedTasks;
workers.remove(w); // 從作業執行緒集合中移除該作業執行緒
} finally {
mainLock.unlock();
}
// 嘗試結束執行緒池
tryTerminate();
int c = ctl.get();
// 如果是RUNNING 或 SHUTDOWN狀態
if (runStateLessThan(c, STOP)) {
// worker是正常執行完
if (!completedAbruptly) {
// 如果允許核心執行緒超時則最小執行緒數是0,否則最小執行緒數等于核心執行緒數
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 如果阻塞佇列非空,則至少要有一個執行緒繼續執行剩下的任務
if (min == 0 && ! workQueue.isEmpty())
min = 1;
// 如果當前執行緒數已經滿足最小執行緒數要求,則不需要再創建替代執行緒
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 重新創建一個worker來代替被銷毀的執行緒
addWorker(null, false);
}
}
Worker(null, false);
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/gongcheng/467085.html
標籤:其他
