一、序言
Java多執行緒編程執行緒池被廣泛使用,甚至成為了標配,
執行緒池本質是池化技術的應用,和連接池類似,創建連接與關閉連接屬于耗時操作,創建執行緒與銷毀執行緒也屬于重操作,為了提高效率,先提前創建好一批執行緒,當有需要使用執行緒時從執行緒池取出,用完后放回執行緒池,這樣避免了頻繁創建與銷毀執行緒,
// 任務
Runnable runnable = () -> System.out.println(Thread.currentThread().getId());
在應用中優先選用執行緒池執行異步任務,根據不同的場景選用不同的執行緒池,提高異步任務執行效率,
1、普通執行
new Thread(runnable).start();
2、執行緒池執行
Executors.newSingleThreadExecutor().execute(runnable)
二、執行緒池基礎
(一)核心引數
1、核心引數
執行緒池的核心引數決定了池的型別,進而決定了池的特性,
| 引數 | 解釋 | 行為 |
|---|---|---|
| corePoolSize | 核心執行緒數 | 池中長期維護的執行緒數量,不主動回收 |
| maximumPoolSize | 最大執行緒數 | 最大執行緒數大于等于核心執行緒數 |
| keepAliveTime | 執行緒最大空閑時間 | 非核心執行緒最大空閑時間,超時回收執行緒 |
| workQueue | 作業佇列 | 作業佇列直接決定執行緒池的型別 |
2、引數與池的關系
Executors類默認創建執行緒池與引數對應關系,
| 執行緒池 | corePoolSize | maximumPoolSize | keepAliveTime | workQueue |
|---|---|---|---|---|
| newCachedThreadPool | 0 | Integer.MAX_VALUE | 60 | SynchronousQueue |
| newSingleThreadExecutor | 1 | 1 | 0 | LinkedBlockingQueue |
| newFixedThreadPool | N | N | 0 | LinkedBlockingQueue |
| newScheduledThreadPool | N | Integer.MAX_VALUE | 0 | DelayedWorkQueue |
(二)執行緒池對比
根據使用場景選擇對應的執行緒池,
1、通用對比
| 執行緒池 | 特點 | 適用場景 |
|---|---|---|
| newCachedThreadPool | 超時未使用的執行緒回自動銷毀,有新任務時自動創建 | 適用于低頻、輕量級的任務,回收執行緒的目的是節約執行緒長時間空閑而占有的資源, |
| newSingleThreadExecutor | 執行緒池中有且只有一個執行緒 | 順序執行任務 |
| newFixedThreadPool | 執行緒池中有固定數量的執行緒,且一直存在 | 適用于高頻的任務,即執行緒在大多數時間里都處于作業狀態, |
| newScheduledThreadPool | 定時執行緒池 | 與定時調度相關聯 |
2、拓展對比
維護僅有一個執行緒的執行緒池有如下兩種方式,正常使用的情況下,二者差異不大;復雜使用環境下,二者存在細微的差異,用newSingleThreadExecutor方式創建的執行緒池在任何時刻至多只有一個執行緒,因此可以理解為用異步的方式執行順序任務;后者初始化的時候也只有一個執行緒,使用程序中可能會出現最大執行緒數超過1的情況,這時要求線性執行的任務會并行執行,業務邏輯可能會出現問題,與實際場景有關,
private final static ExecutorService executor = Executors.newSingleThreadExecutor();
private final static ExecutorService executor = Executors.newFixedThreadPool(1);
(三)執行緒池原理

執行緒池主要處理流程,任務提交之后是怎么執行的,大致如下:
- 判斷核心執行緒池是否已滿,如果不是,則創建執行緒執行任務
- 如果核心執行緒池滿了,判斷佇列是否滿了,如果佇列沒滿,將任務放在佇列中
- 如果佇列滿了,則判斷執行緒池是否已滿,如果沒滿,創建執行緒執行任務
- 如果執行緒池也滿了,則按照拒絕策略對任務進行處理
(四)提交任務的方式
往執行緒池中提交任務,主要有兩種方法:提交無回傳值的任務和提交有回傳值的任務,
1、無回傳值任務
execute用于提交不需要回傳結果的任務,
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.execute(() -> System.out.println("hello"));
}
2、有回傳值任務
submit()用于提交一個需要回傳果的任務,
該方法回傳一個Future物件,通過呼叫這個物件的get()方法,我們就能獲得回傳結果,get()方法會一直阻塞,直到回傳結果回傳,
我們也可以使用它的多載方法get(long timeout, TimeUnit unit),這個方法也會阻塞,但是在超時時間內仍然沒有回傳結果時,將拋出例外TimeoutException,
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(2);
Future<Long> future = executor.submit(() -> {
System.out.println("task is executed");
return System.currentTimeMillis();
});
System.out.println("task execute time is: " + future.get());
}
在提交任務時,如果無回傳值任務,優先使用
execute,
(無)關閉執行緒池
在執行緒池使用完成之后,我們需要對執行緒池中的資源進行釋放操作,這就涉及到關閉功能,我們可以呼叫執行緒池物件的shutdown()和shutdownNow()方法來關閉執行緒池,
這兩個方法都是關閉操作,又有什么不同呢?
shutdown()會將執行緒池狀態置為SHUTDOWN,不再接受新的任務,同時會等待執行緒池中已有的任務執行完成再結束,shutdownNow()會將執行緒池狀態置為SHUTDOWN,對所有執行緒執行interrupt()操作,清空佇列,并將佇列中的任務回傳回來,
另外,關閉執行緒池涉及到兩個回傳boolean的方法,isShutdown()和isTerminated,分別表示是否關閉和是否終止,
三、Executors
Executors是一個執行緒池工廠,提供了很多的工廠方法,我們來看看它大概能創建哪些執行緒池,
// 創建單一執行緒的執行緒池
public static ExecutorService newSingleThreadExecutor();
// 創建固定數量的執行緒池
public static ExecutorService newFixedThreadPool(int nThreads);
// 創建帶快取的執行緒池
public static ExecutorService newCachedThreadPool();
// 創建定時調度的執行緒池
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize);
// 創建流式(fork-join)執行緒池
public static ExecutorService newWorkStealingPool();
1、創建單一執行緒的執行緒池
任何時候執行緒池中至多只有一個執行緒,當執行緒執行例外終止時會自動創建一個新執行緒替換,如果既有異步執行任務的需求又希望任務得以順序執行,那么此型別執行緒池是首選,
若多個任務被提交到此執行緒池,那么會被快取到佇列,當執行緒空閑的時候,按照FIFO的方式進行處理,
2、創建固定數量的執行緒池
創建核心執行緒與最大執行緒數相等的固定執行緒數的執行緒池,任何時刻至多有固定數目的執行緒,當執行緒因例外而終止時則會自動創建執行緒替換,
當有新任務加入時,如果池內執行緒均處于活躍狀態,則任務進入等待佇列中,直到有空閑執行緒,佇列中的任務才會被順序執行;如果池內有非活躍執行緒,則任務可以立刻得以執行,
- 如果執行緒的數量未達到指定數量,則創建執行緒來執行任務
- 如果執行緒池的數量達到了指定數量,并且有執行緒是空閑的,則取出空閑執行緒執行任務
- 如果沒有執行緒是空閑的,則將任務快取到佇列(佇列長度為
Integer.MAX_VALUE),當執行緒空閑的時候,按照FIFO的方式進行處理
3、創建可伸縮的執行緒池
這種方式創建的執行緒池,核心執行緒池的長度為0,執行緒池最大長度為Integer.MAX_VALUE,由于本身使用SynchronousQueue作為等待佇列的緣故,導致往佇列里面每插入一個元素,必須等待另一個執行緒從這個佇列洗掉一個元素,
- 執行緒池可維護0到Integer.MAX_VALUE個執行緒資源,空閑執行緒默認情況下超過60秒未使用則會被銷毀,長期閑置的池占用較少的資源,
- 當有新任務加入時,如果池中有空閑且尚未銷毀的執行緒,則將任務交給此執行緒執行;如果沒有可用的執行緒,則創建一個新執行緒執行任務并添加到池中,
4、創建定時調度的執行緒池
和上面3個工廠方法回傳的執行緒池型別有所不同,它回傳的是ScheduledThreadPoolExecutor型別的執行緒池,平時我們實作定時調度功能的時候,可能更多的是使用第三方類別庫,比如:quartz等,但是對于更底層的功能,我們仍然需要了解,
四、手動創建執行緒池
理論上,我們可以通過Executors來創建執行緒池,這種方式非常簡單,但正是因為簡單,所以限制了執行緒池的功能,比如:無長度限制的佇列,可能因為任務堆積導致OOM,這是非常嚴重的bug,應盡可能地避免,怎么避免?歸根結底,還是需要我們通過更底層的方式來創建執行緒池,
拋開定時調度的執行緒池不管,我們看看ThreadPoolExecutor,它提供了好幾個構造方法,但是最底層的構造方法卻只有一個,那么,我們就從這個構造方法著手分析,
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler);
這個構造方法有7個引數,我們逐一來進行分析,
corePoolSize,執行緒池中的核心執行緒數maximumPoolSize,執行緒池中的最大執行緒數keepAliveTime,空閑時間,當執行緒池數量超過核心執行緒數時,多余的空閑執行緒存活的時間,即:這些執行緒多久被銷毀,unit,空閑時間的單位,可以是毫秒、秒、分鐘、小時和天,等等workQueue,等待佇列,執行緒池中的執行緒數超過核心執行緒數時,任務將放在等待佇列,它是一個BlockingQueue型別的物件threadFactory,執行緒工廠,我們可以使用它來創建一個執行緒handler,拒絕策略,當執行緒池和等待佇列都滿了之后,需要通過該物件的回呼函式進行回呼處理
這些引數里面,基本型別的引數都比較簡單,我們不做進一步的分析,我們更關心的是workQueue、threadFactory和handler,接下來我們將進一步分析,
(一)等待佇列-workQueue
等待佇列是BlockingQueue型別的,理論上只要是它的子類,我們都可以用來作為等待佇列,
同時,jdk內部自帶一些阻塞佇列,我們來看看大概有哪些,
ArrayBlockingQueue,佇列是有界的,基于陣列實作的阻塞佇列LinkedBlockingQueue,佇列可以有界,也可以無界,基于鏈表實作的阻塞佇列SynchronousQueue,不存盤元素的阻塞佇列,每個插入操作必須等到另一個執行緒呼叫移除操作,否則插入操作將一直處于阻塞狀態,該佇列也是Executors.newCachedThreadPool()的默認佇列PriorityBlockingQueue,帶優先級的無界阻塞佇列
通常情況下,我們需要指定阻塞佇列的上界(比如1024),另外,如果執行的任務很多,我們可能需要將任務進行分類,然后將不同分類的任務放到不同的執行緒池中執行,
(二)執行緒工廠-threadFactory
ThreadFactory是一個介面,只有一個方法,既然是執行緒工廠,那么我們就可以用它生產一個執行緒物件,來看看這個介面的定義,
public interface ThreadFactory {
/**
* Constructs a new {@code Thread}. Implementations may also initialize
* priority, name, daemon status, {@code ThreadGroup}, etc.
*
* @param r a runnable to be executed by new thread instance
* @return constructed thread, or {@code null} if the request to
* create a thread is rejected
*/
Thread newThread(Runnable r);
}
Executors的實作使用了默認的執行緒工廠-DefaultThreadFactory,它的實作主要用于創建一個執行緒,執行緒的名字為pool-{poolNum}-thread-{threadNum},
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
很多時候,我們需要自定義執行緒名字,我們只需要自己實作ThreadFactory,用于創建特定場景的執行緒即可,
(三)拒絕策略-handler
所謂拒絕策略,就是當執行緒池滿了、佇列也滿了的時候,我們對任務采取的措施,或者丟棄、或者執行、或者其他...
jdk自帶4種拒絕策略,我們來看看,
CallerRunsPolicy// 在呼叫者執行緒執行AbortPolicy// 直接拋出RejectedExecutionException例外DiscardPolicy// 任務直接丟棄,不做任何處理DiscardOldestPolicy// 丟棄佇列里最舊的那個任務,再嘗試執行當前任務
這四種策略各有優劣,比較常用的是DiscardPolicy,但是這種策略有一個弊端就是任務執行的軌跡不會被記錄下來,所以,我們往往需要實作自定義的拒絕策略, 通過實作RejectedExecutionHandler介面的方式,
五、其它
配置執行緒池的引數
前面我們講到了手動創建執行緒池涉及到的幾個引數,那么我們要如何設定這些引數才算是正確的應用呢?實際上,需要根據任務的特性來分析,
- 任務的性質:CPU密集型、IO密集型和混雜型
- 任務的優先級:高中低
- 任務執行的時間:長中短
- 任務的依賴性:是否依賴資料庫或者其他系統資源
不同的性質的任務,我們采取的配置將有所不同,在《Java并發編程實踐》中有相應的計算公式,
通常來說,如果任務屬于CPU密集型,那么我們可以將執行緒池數量設定成CPU的個數,以減少執行緒切換帶來的開銷,如果任務屬于IO密集型,我們可以將執行緒池數量設定得更多一些,比如CPU個數*2,
PS:我們可以通過
Runtime.getRuntime().availableProcessors()來獲取CPU的個數,
執行緒池監控
如果系統中大量用到了執行緒池,那么我們有必要對執行緒池進行監控,利用監控,我們能在問題出現前提前感知到,也可以根據監控資訊來定位可能出現的問題,
那么我們可以監控哪些資訊?又有哪些方法可用于我們的擴展支持呢?
首先,ThreadPoolExecutor自帶了一些方法,
long getTaskCount(),獲取已經執行或正在執行的任務數long getCompletedTaskCount(),獲取已經執行的任務數int getLargestPoolSize(),獲取執行緒池曾經創建過的最大執行緒數,根據這個引數,我們可以知道執行緒池是否滿過int getPoolSize(),獲取執行緒池執行緒數int getActiveCount(),獲取活躍執行緒數(正在執行任務的執行緒數)
其次,ThreadPoolExecutor留給我們自行處理的方法有3個,它在ThreadPoolExecutor中為空實作(也就是什么都不做),
protected void beforeExecute(Thread t, Runnable r)// 任務執行前被呼叫protected void afterExecute(Runnable r, Throwable t)// 任務執行后被呼叫protected void terminated()// 執行緒池結束后被呼叫
六、總結
- 盡量使用手動的方式創建執行緒池,避免使用
Executors工廠類 - 根據場景,合理設定執行緒池的各個引數,包括執行緒池數量、佇列、執行緒工廠和拒絕策略
喜歡本文就【??推薦??】一下,激勵我持續創作,這個Github同樣精彩,收到您的star我會很激動,本文歸檔在專題博客,視頻講解在B站,
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/456976.html
標籤:Java
