執行緒池 · 語雀 (yuque.com)
為什么要用執行緒池
在 HotSpot VM 的執行緒模型中,Java 執行緒被一對一映射為內核執行緒,
Java 在使用執行緒執行程式時,需要呼叫作業系統內核的 API,創建一個內核執行緒,作業系統要為執行緒分配一系列的資源;當該 Java 執行緒被終止時,這個內核執行緒也會被回收,因此 Java 執行緒的創建與銷毀的成本很高,從而增加系統的性能開銷,
除此之外,無限制地創建線同樣會給系統帶來性能問題,因為 CPU 核數是有限的,大量的執行緒背景關系切換會增加系統的性能開銷,同時無限制地創建執行緒還可能導致 OOM,
為了解決上述兩類問題,于是引入了執行緒池概念,
對于第一類問題,頻繁創建與銷毀執行緒:執行緒池復用執行緒,提高執行緒利用率,避免頻繁的創建與銷毀執行緒,
對于第二類問題,大量創建執行緒:執行緒池限制執行緒創建的最大數量,防止無限制地創建執行緒,
執行緒池提供了一種方式來管理執行緒和消費,維護基本資料統計等作業,比如統計已完成的任務數;
介紹執行緒池框架 Executor
Java 提供了一套執行緒池框架 Executor,
這個框架包括了 ThreadPoolExecutor 和 ScheduledThreadPoolExecutor 兩個核心執行緒池,
- ThreadPoolExecutor 是用來執行被提交的任務
- ScheduledThreadPoolExecutor 是用來執行定時任務,
還有一個 ForkJoinPool 則是為 ForkJoinTask 定制的執行緒池,與通常意義的執行緒池有所不同,
除此之外,Executors 類為我們提供了各種方便的靜態工廠方法來簡化執行緒池的創建,
public class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor
implements ScheduledExecutorService { }
從類的定義我們可以看到,ScheduledThreadPoolExecutor 類繼承自 ThreadPoolExecutor 類,因此下面我們就重點看看 ThreadPoolExecutor 類是如何實作執行緒池的,
ThreadPoolExecutor 的「構造引數」和「作業行為」
ThreadPoolExecutor 的建構式非常復雜,最完備的建構式有 7 個引數,如下面代碼所示,
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
下面我們一一介紹這些引數的意義,(參考了 ThreadPoolExecutor 的 javadoc )
執行緒數
corePoolSize:核心執行緒數
maximumPoolSize:最大執行緒數
執行緒池會根據 corePoolSize 和 maximumPoolSize 這兩個引數的值,自動調整執行緒池大小,
當我們向執行緒池中提交任務時:
-
如果當前有少于 corePoolSize 個執行緒正在運行,那么將創建一個新的執行緒來處理請求,即使其他作業執行緒處于空閑狀態(也就是說,前面說的正在運行的執行緒是指,所有已經創建的執行緒,包括處于空閑狀態的執行緒)
-
如果當前有大于等于 corePoolSize 個執行緒正在運行,則嘗試把任務加到任務佇列中
- 如果任務佇列未滿,則加入成功,排隊等待執行緒處理
- 如果任務佇列已滿,并且當前有不超過 maximumPoolSize 個執行緒,則創建一個新的執行緒來處理請求
-
如果當前有 maximumPoolSize 個執行緒正在運行,并且任務佇列已滿,那么執行緒池會拒絕接收任務,并按照指定的拒絕策略處理任務
通過將 corePoolSize 和 maximumPoolSize 設定為相同的值,我們可以創建固定大小的執行緒池,
通過將 maximumPoolSize 設定為一個本質上無界的值,例如 Integer.MAX VALUE,允許執行緒池容納任意數量的執行緒,
通常,corePoolSize 和 maximumPoolSize 這兩個引數的值只在構造 ThreadPoolExecutor 時設定,但這兩個引數的值也可以使用 setCorePoolSize(int) 和 setMaximumPoolSize(int) 動態修改,通過動態修改引數的值,我們可以做到動態配置自定義執行緒池,感興趣的可以了解一下,這個視頻里有講,
在創建完執行緒池之后,默認情況下,執行緒池中沒有任何執行緒,只有在新任務到達時執行緒才會被創建(new)和執行(start),但這可以通過使用 prestartCoreThread() 或 prestartAllCoreThreads() 方法來動態覆寫,如果使用非空佇列構造執行緒池,則可能需要預啟動執行緒,預啟動執行緒在搶購系統中也經常被用到,
非核心執行緒的存活時間
keepAliveTime:執行緒存活的實作
TimeUnit:存活時間的單位(小時、分鐘、秒、毫秒)
如果執行緒池當前有超過 corePoolSize 個執行緒,并且執行緒空閑的時間超過了 keepAliveTime,那么這些執行緒將被銷毀,這樣可以避免執行緒沒有被使用時的資源浪費,
這個引數也可以使用方法 setKeepAliveTime(long,TimeUnit) 動態修改,
默認情況下,只有存在多于 corePoolSize 個執行緒時,才會應用 keep-alive 策略;但通過 allowCoreThreadTimeOut(boolean) 方法,將引數 allowCoreThreadTimeOut 的值設定為 true,則 keep-alive 策略也可應用于不超過 corePoolSize 個執行緒時,
任務佇列
BlockingQueue
如果執行緒池當前有大于等于 corePoolSize 個執行緒正在運行,則嘗試把任務加到任務佇列中
- 如果任務佇列未滿,則加入成功,排隊等待執行緒處理
- 如果任務佇列已滿,并且當前有不超過 maximumPoolSize 個執行緒,則創建一個新的執行緒來處理請求
也就是說,當執行緒數量達到 corePoolSize 個之后,不會立即擴容執行緒池,而是先把任務堆積到任務佇列中,任務佇列滿了之后,才考慮擴容執行緒池,一直到執行緒個數達到 maximumPoolSize 為止,
這個任務佇列必須必須是 BlockingQueue 型別的,也就是必須是阻塞佇列,
阻塞佇列其實就是在佇列基礎上支持了阻塞操作,
簡單來說,阻塞操作就是:
- 如果佇列為空,那么從隊頭取資料的操作會被阻塞,直到佇列中有資料才能回傳;
- 如果佇列已滿,那么從隊尾插入資料的操作會被阻塞,直到佇列中有空閑位置并插入資料后,才能回傳,
Java 中 BlockingQueue 型別的佇列也有很多,比如:(共 8 個)
- ArrayBlockingQueue:基于陣列結構的有界阻塞佇列
- LinkedBlockingQueue:基于鏈表結構的阻塞佇列,可以在創建佇列時指定容量;如果沒有指定容量,那么其容量限制就自動被設定為 Integer.MAX_VALUE,成為了無界佇列,
- SynchronousQueue:不存盤元素的阻塞佇列,每個移除操作必須等待另一個執行緒的插入操作,反之每個插入操作也都要等待另一個執行緒的移除操作,這個佇列的容量是 0,
- PriorityBlockingQueue:具有優先級的無界阻塞佇列
- DelayQueue:支持延時獲取的無界阻塞佇列,內部用 PriorityQueue 實作
- LinkedTransferQueue:基于鏈表結構的無界阻塞佇列
- LinkedBlockingDeque:基于雙向鏈表的阻塞佇列,可以在創建佇列時指定容量
- DelayedWorkQueue:無界阻塞佇列
總結來說,BlockingQueue 型別的佇列可以從以下兩個維度劃分:
- 底層結構:陣列 or 單向鏈表 or 雙向鏈表 or 優先級佇列(DelayQueue 基于 PriorityQueue)
- 是否有界:有界 or 無界 or 既可以有界又可以無界
理論上兩個維度中兩兩組合,就可以構成一種型別的 BlockingQueue,

執行緒工廠
ThreadFactory:執行緒工廠,用來創建執行緒
新執行緒是使用 ThreadFactory 創建的,
如果沒有指定,則 ThreadPoolExecutor 的構造方法默認使用 Executors.defaultThreadFactory(),它將創建執行緒,使其全部位于同一個執行緒組中(ThreadGroup),并具有相同的優先級(默認都為 NORM_PRIORITY )和非守護執行緒狀態,
通過提供不同的 ThreadFactory,我們可以更改執行緒的名稱、執行緒組、優先級、是否設定為守護執行緒等,
如果 ThreadFactory#newThread() 方法創建執行緒失敗回傳 null ,程式將繼續執行,但可能無法執行任何任務,
執行緒應該擁有“modifyThread”運行時權限,如果作業執行緒或執行緒池的其他執行緒不具備此權限,則服務可能降級:配置更改可能無法及時生效,并且關閉執行緒池可能處于可以終止但尚未完成的狀態,
// ThreadPoolExecutor 類的成員變數
private static final RuntimePermission shutdownPerm =
new RuntimePermission("modifyThread");
// 檢查執行緒是否擁有 modifyThread 運行時權限
// 該方法在 shutdown()、shutdownNow() 中被呼叫
private void checkShutdownAccess() {
SecurityManager security = System.getSecurityManager();
if (security != null) {
security.checkPermission(shutdownPerm);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (ThreadPoolExecutor.Worker w : workers)
security.checkAccess(w.thread);
} finally {
mainLock.unlock();
}
}
}
執行緒組
執行緒組的 javadoc:https://docs.oracle.com/javase/9/docs/api/java/lang/ThreadGroup.html
執行緒組表示一組執行緒,除此之外,執行緒組還可以包括其他執行緒組,
執行緒組形成一個樹,其中除初始執行緒組之外的每個執行緒組都有一個父執行緒組,
允許執行緒訪問有關其自己的執行緒組的資訊,但不能訪問有關其執行緒組的父執行緒組或任何其他執行緒組的資訊,
執行緒組的作用:
- 用執行緒組來批量管理、控制一組執行緒,比如:銷毀執行緒組及其所有子組
- 有效地對執行緒或執行緒組物件進行組織,
每個執行緒(Thread)必然存在于?個執行緒組(ThreadGroup)中,執行緒不能獨立于執行緒組存在,
如果 new Thread 時沒有顯式指定所在的執行緒組,那么默認將父執行緒 (執行當前 new Thread 的執行緒)所在的執行緒組設定為自己所在的執行緒組,
JVM 創建的 system 執行緒組是執行緒組樹結構的跟執行緒組,
system 執行緒組是用來處理 JVM 的系統任務的執行緒組,例如物件的銷毀等,
system 執行緒組的直接子執行緒組是 main 執行緒組,這個執行緒組至少包含一個 main 執行緒,用于執行 main 方法,
main 執行緒組的子執行緒組就是應用程式創建的執行緒組,
拒絕策略
RejectedExecutionHandler:拒絕策略
如果我們把任務提交到執行緒池時,被執行緒池拒絕接收了,執行緒池會按照指定的拒絕策略處理任務,
執行緒池拒絕接收我們提交的任務的原因(時機)可能有以下兩個:
- 執行緒池中的等待被執行的任務過多,任務佇列已滿,并且執行緒數達到 maximumPoolSize
- 執行緒池已經處于非 RUNNING 狀態
Java 執行緒池框架提供了以下 4 種拒絕策略:
- AbortPolicy:直接拋出 RejectedExecutionException 例外
- CallerRunsPolicy:用呼叫者所在執行緒(提交任務的執行緒)來執行任務
- DiscardOldestPolicy:丟棄任務佇列里最早的任務,把提交的任務加入任務佇列
- DiscardPolicy:直接丟棄提交的任務,
除了使用以上 Java 執行緒池框架提供的拒絕策略之外,我們還可以自定義拒絕策略,
實作自定義拒絕策略的步驟:
- 定義一個拒絕策略類(xxxPolicy),實作 RejectedExecutionHandler 介面
- 實作介面里的唯一方法
void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
在實際作業中,自定義的拒絕策略往往和降級策略配合使用,例如將任務資訊插入資料庫或者訊息引擎系統(Kafka、RocketMQ、...)等存盤系統,啟用一個專門用作補償的執行緒池進行補償,
所謂降級就是在服務無法正常提供功能的情況下,采取的補救措施,
具體采用何種降級手段,這要看具體場景,
執行緒池的生命周期
對于有生命周期的事物,要學好它,只要能搞懂生命周期中各個節點的狀態轉換機制就可以了,
執行緒池的運行狀態
// Integer.SIZE = 32
private static final int COUNT_BITS = Integer.SIZE - 3;
// runState 存盤在數字的高階位中
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
進入 RUNNING 狀態后:可以接收新的任務,可以處理任務佇列中的任務,
進入 SHUTDOWN 狀態后:不接收新的任務,但是可以處理任務佇列中的任務,(平緩的關閉程序)
進入 STOP 狀態后:不接收新的任務,并且不處理任務佇列中的任務(佇列中的任務,以集合的形式回傳),同時中斷所有正在執行的任務,(立即關閉)
進入 TIDYING 狀態時:執行緒池中的作業執行緒數為 0,任務佇列為空,代表所有的任務都已經處理完成,
TERMINATED:進入 TIDYING 狀態后,會執行 terminated() hook方法,當執行完該方法后,執行緒池進入 TERMINATED 運行狀態;并喚醒等待執行緒池終止的執行緒,所有呼叫 awaitTermination() 的執行緒繼續向下執行,
terminated() 默認是空方法,需要我們繼承 ThreadPoolExecutor 類,自行重寫,自定義執行緒池終止前的處理邏輯,
TIDYING 和 TERMINATED 的行為體現在 ThreadPoolExecutor#tryTerminate() 方法中,感興趣自行查看
在 shutdown()、shutdownNow() 方法的最后都有呼叫 tryTerminate() 方法,
執行緒池的狀態轉換
RUNNING:創建執行緒池后,執行緒池進入 RUNNING 運行狀態,
SHUTDOWN:執行 shutdown() 方法后,執行緒池進入 SHUTDOWN 運行狀態,
STOP:執行 shutdownNow() 方法后,執行緒池進入 STOP 運行狀態,
TIDYING:當作業執行緒數為 0,任務佇列為空,執行緒池進入 TIDYING 運行狀態,
TERMINATED:當 執行完 terminated() 方法后,執行緒池進入 TERMINATED運行狀態,

執行緒池的 API
下面我們看看,和執行緒池的運行狀態相關的 API

shutdown():不接收新的任務,但是可以處理任務佇列中的任務,(平緩的關閉程序)
shutdownNow():不接收新的任務,并且不處理任務佇列中的任務(回傳等待被執行的任務的串列),同時中斷所有正在執行的任務,(立即關閉)
isShutdown():如果執行緒池的狀態為 RUNNING,則回傳 false,否則回傳 true(描述的是非 RUNNING 狀態)
isTerminated():如果執行緒池的狀態為 TERMINATED,則回傳 true,否則回傳 false
awaitTermination(long timeout, TimeUnit unit):阻塞,直到以下三個事件之一發生就回傳(無論哪個先發生)
- 所有任務在一個關閉請求后完成執行(也就是執行緒池的狀態為 TERMINATED),回傳 true
- 發生超時,阻塞的時間超過指定的引數,回傳 false
- 當前執行緒被中斷,拋出 InterruptedException 例外
上面的是 ExecutorService 介面中的方法,下面看看 ThreadPoolExecutor 定義的方法:
- isTerminating():如果執行緒池的狀態為非 RUNNING,非 TERMINATED,回傳 true,否則回傳 false(也就是說,如果正在終止但尚未終止,回傳 true)
檢測執行緒池是否正處于正常狀態(RUNNING),使用 isShutdown()
檢測執行緒池是否處于正在關閉,但是尚未關閉的狀態,使用 isTerminating()
檢測執行緒池是否已經關閉(TERMINATED),使用 isTerminated()
執行緒「有超時的等待」或者「永久等待」執行緒池關閉,使用 awaitTermination()
Executors 提供的六種執行緒池
Executors 目前提供了 6 種不同配置的執行緒池創建:FixedThreadPool 、SingleThreadExecutor、CachedThreadPool、ScheduledThreadPool、SingleThreadScheduledExecutor、WorkStealingPool
我們重點介紹前 5 種,
newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
FixedThreadPool 的建構式的引數如代碼所示:
-
核心執行緒數和最大執行緒數相同,意味著 FixedThreadPool 是一個固定大小的執行緒池,可控制并發的執行緒數
-
執行緒的存活時間為:0 秒,意味著:
- 如果默認 keep-alive 策略只應用于多于 corePoolSize 個執行緒時,那么即使執行緒空閑,也不會被銷毀
- 如果通過 allowCoreThreadTimeOut() 設定 keep-alive 策略也可應用于不超過 corePoolSize 個執行緒,那么當執行緒執行完一個任務后,就會被銷毀(我想,沒人會這么做)
-
任務佇列為:LinkedBlockingQueue,在這里是無界佇列
這里需要我們注意的是:
LinkedBlockingQueue 可以在創建佇列時指定容量;如果沒有指定容量,那么其容量限制就自動被設定為 Integer.MAX_VALUE,成為了無界佇列,
顯然這里使用的 LinkedBlockingQueue 沒有指定容量,是無界佇列,
使用無界佇列會對執行緒池帶來一些影響,比如當執行緒數達到 corePoolSize 后:
- 新任務將在無界佇列中等待,因此執行緒數根本不會超過 corePoolSize,由于執行緒數不會超過 corePoolSize,所以 maximumPoolSize 就會失效,keepAliveTime 也就跟著失去了意義,
- LinkedBlockingQueue 會堆積,從而導致 java.lang.OutOfMemoryError 錯誤 (OOM 錯誤),
FixedThreadPool 的 execute() 運行示意圖如下:

我們來看看 FixedThreadPool 的 execute() 運行程序:
- 如果當前運行的執行緒數少于 corePoolSize,則創建新執行緒來執行任務;
- 如果當前運行的執行緒數等于 corePoolSize,將提交的任務加入 LinkedBlockingQueue;
- 執行緒執行完執行緒池中的任務后,會反復從 LinkedBlockingQueue 獲取任務來執行,
newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new Executors.FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
SingleThreadExecutor 的建構式的引數如代碼所示:
- 核心執行緒數和最大執行緒數都為 1,意味著 SingleThreadPool 是一種創建單個執行緒數的執行緒池,那么它可以保證任務按照先進先出的順序執行
- 執行緒的存活時間為:0 秒
- 任務佇列為:LinkedBlockingQueue
我們可以看到,SingleThreadExecutor 的 corePoolSize 和 maximumPoolSize 這兩個引數被設定為 1,其他引數與 FixedThreadPool 相同,
SingleThreadExecutor 使用的作業佇列也是無界佇列 LinkedBlockingQueue ,產生的影響和 FixedThreadPool 相同,
SingleThreadExecutor 的 execute() 運行示意圖如下:

我們來看看 SingleThreadExecutor 的 execute() 運行程序:
- 如果當前運行的執行緒數少于 corePoolSize,也就是執行緒池中無運行的執行緒,那么就創建一個新執行緒來執行任務;
- 如果當前執行緒池中有一個運行的執行緒,那么就將提交的任務加入 LinkedBlockingQueue;
- 執行緒執行完任務后,會反復從 LinkedBlockingQueue 獲取任務來執行,
newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
CachedThreadPool 的建構式的引數如代碼所示:
- 核心執行緒數為:0
- 最大執行緒數為:Integer. MAX_ VALUE,意味著 CachedThreadPool 可能會創建過多執行緒
- 執行緒的存活時間為:60 秒,意味著 CachedThreadPool 中的空閑執行緒等待新任務的最長時間為 60 秒,空閑執行緒等待超過 60 秒后將會被銷毀
- 任務佇列為:SynchronousQueue,SynchronousQueue 是一個沒有容量為 0 的阻塞佇列,每個插入操作必須等待另一個執行緒的對應移除操作,反之亦然,
因為設定核心執行緒數為 0,非核心執行緒的存活時間為 60 s,poll() 的超時時間為 60 s,所以:
- 首次把一個任務提交到執行緒池時,直接嘗試把任務加入到 SynchronousQueue 阻塞佇列,因為此時沒有另一個執行緒執行 SynchronousQueue.poll() 等待取資料,所以任務加入阻塞佇列失敗,直接創建新的作業執行緒執行任務,
- 新創建的作業執行緒將任務執行完成后,會執行 poll() 等待取資料,這個 poll 操作會讓空閑執行緒最多在 SynchronousQueue 中等待 60 秒鐘,
- 如果 60 秒內把一個新的任務加入到 SynchronousQueue 阻塞佇列,這個空閑執行緒將執行提交的新任務;
- 如果 60 秒內沒有新任務,這個空閑執行緒將被銷毀,
CachedThreadPool 的特點:
- 如果主執行緒提交任務的速度高于執行緒池中執行緒處理任務的速度時,CachedThreadPool 會不斷創建新執行緒,極端情況下,CachedThreadPool 會因為創建過多執行緒而耗盡 CPU 和記憶體資源,導致記憶體溢位,
- 長時間沒有任務時,CachedThreadPool 不會消耗什么資源
- 因此,CachedThreadPool 是一種用來處理大量短時間作業任務的執行緒池
CachedThreadPool 的 execute() 運行示意圖如下:

我們來看看 CachedThreadPool 的 execute() 運行程序:
-
首先執行 SynchronousQueue.offer(Runnable task),
- 如果當前 maximumPool 中有空閑執行緒正在執行 SynchronousQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS),那么主執行緒執行 offer 操作與空閑執行緒執行的 poll 操作配對成功,主執行緒把任務交給空閑執行緒執行,execute() 方法執行完成;
- 否則(當前 maximumPool 中沒有空閑執行緒在執行 poll())執行下面的步驟 2,
-
當初始 maximumPool 為空,或者 maximumPool 中當前沒有空閑執行緒時,將沒有執行緒執行 SynchronousQueue.poll(keepAliveTime, TimeUnit. NANOSECONDS),這種情況下,步驟 1 就會失敗,此時 CachedThreadPool 會創建一個新執行緒執行任務,execute() 方法執行完成,
-
在步驟 2 中新創建的執行緒將任務執行完后,會執 SynchronousQueue.poll(keepAliveTime, TimeUnit. NANOSECONDS),這個 poll 操作會讓空閑執行緒最多在 SynchronousQueue 中等待 60 秒鐘,
- 如果 60 秒內主執行緒提交了一個新任務,主執行緒執行步驟 1,這個空閑執行緒將執行主執行緒提交的新任務;
- 否則,這個空閑執行緒將終止,
由于空閑 60 秒的空閑執行緒會被終止,因此長時間保持空閑的 CachedThreadPool 不會使用任何資源,
newScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new ScheduledThreadPoolExecutor.DelayedWorkQueue());
}
ScheduledThreadPool 執行緒池,它創建一個可以執行延遲任務的執行緒池,
ScheduledThreadPool 的建構式的引數如代碼所示:
- 核心執行緒數被設定為指定的引數,由程式員構造執行緒池時傳入
- 最大執行緒數為:Integer. MAX_ VALUE
- 任務佇列為:DelayedWorkQueue,是一個無界佇列,所以設定 maximumPoolSize 的大小不會生效
ScheduledThreadPoolExecutor 的任務傳遞示意圖如下:

我們來看看 ScheduledThreadPoolExecutor 的任務傳遞程序:
- 當呼叫 ScheduledThreadPoolExecutor 的 scheduleAtFixedRate() 方法和 scheduleWithFixedDelay() 方法時,會向 ScheduledThreadPoolExecutor 的 DelayedWorkQueue 添加一個實作了 RunnableScheduledFutur 介面的 ScheduledFutureTask,
- 執行緒池中的執行緒從 DelayedWorkQueue 中獲取 ScheduledFutureTask,然后執行任務,
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit):創建并執行一個周期性任務,該任務在給定的初始延遲之后首先執行,執行完成后在給定的延遲之后再執行,
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit):創建并執行一個周期性操作,該任務在給定的初始延遲后首先執行,然后在給定的周期之后執行,也就是說,任務將在 initialDelay 之后開始,然后是在 initialDelay + period 之后開始,然后是在 initialDelay + 2 * period 之后開始,依此類推,
總結來說:
scheduleAtFixedRate 是以上一個任務開始的時間計時,period 時間過去后,檢測上一個任務是否執行完畢:
- 如果上一個任務執行完畢,則當前任務立即執行
- 如果上一個任務沒有執行完畢,則需要等上一個任務執行完畢后立即執行
scheduleWithFixedDelay 是以上一個任務結束時開始計時,period 時間過去后,立即執行,
注意:
通過 ScheduledExecutorService 執行的周期性任務,如果任務執行的程序中拋出了例外,那么 ScheduledExecutorService 就會停止執行任務,且不會再周期性地執行該任務了,
所以你如果想保證任務一直被周期執行,那么需要 catch 一切可能的例外,
newSingleThreadScheduledExecutor
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new Executors.DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
newSingleThreadScheduledExecutor() 和 newScheduledThreadPool(int corePoolSize),創建的都是以個 ScheduledExecutorService,可以執行定時或周期性的任務,區別在于單一作業執行緒還是多個作業執行緒,
WorkStealingPool
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
newWorkStealingPool(int parallelism),這是一個經常被人忽略的執行緒池,Java 8 才加入這個創建方法,其內部會構建 ForkJoinPool,利用 Work-Stealing 演算法,并行地處理任務,不保證處理順序,
總結
FixedThreadPool 和 SingleThreadExecutor 的任務佇列都是 LinkedBlockingQueue,沒有數量限制,默認是 Integer.MAX_VALUE;
CachedThreadPool 和 scheduledThreadPool 中最大執行緒數默認 Integer.MAX_VALUE,沒有限制,
當執行緒過多的時候,這兩類執行緒池就都容易造成 OutOfMemoryError,所以我們在使用執行緒池時,最好根據實際情況自定義這些核心引數,
上面提到的幾種執行緒池,只有 CachedThreadPool 的執行緒存活時間大于 0,為 60 秒,其余執行緒池的執行緒存活時間都為 0 秒,
參考資料
18 | 如何設定執行緒池大小? · 語雀 (yuque.com)
Java中的執行緒池——如何創建及使用Executors的四種執行緒池-極客時間 (geekbang.org)
深入淺出 Java Concurrency (30): 執行緒池 part 3 Executor 生命周期 - xylz,imxylz - BlogJava
本文來自博客園,作者:真正的飛魚,轉載請注明原文鏈接:https://www.cnblogs.com/feiyu2/p/ThreadPool.html
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/502306.html
標籤:其他
上一篇:串列型別
