執行緒池關系
Executors創建執行緒池
Executors 實作了幾種常用的執行緒池,
-
newFixedThreadPool固定執行緒數的執行緒池,例子:
ExecutorService executorService = Executors.newFixedThreadPool(2); for (int i = 0; i < 5; i++) { executorService.execute(() -> System.out.println(UUID.randomUUID())); }運行結果:
f953f063-965d-4f3b-bc23-5d49c65a7d94 7de3f854-3415-4b05-8956-08f289db0817 ad595410-bed6-43f9-b3ad-70fa1a17698c cccc009e-b706-4c6c-8d87-1b45bf00c83f 563645ca-d29a-46fe-93c9-ec3164eddc59 -
newSingleThreadExecutor單執行緒的執行緒池,ExecutorService executorService = Executors.newSingleThreadExecutor(); -
newCachedThreadPool可以快取執行緒的執行緒池,ExecutorService executorService = Executors.newCachedThreadPool(); -
newScheduledThreadPool可以定時執行任務的執行緒池,例子:
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3); scheduledExecutorService.schedule(() -> System.out.println(UUID.randomUUID()), 1, TimeUnit.SECONDS);結果:
7ca82ded-2253-4de2-bc37-ac83d9aa4b9fschedule方法是在創建任務一段時間后再執行的一次性方法,ScheduledExecutorService介面中還提供了其他定時、周期執行的方法,
ThreadPoolExecutor
Executors 的各種實作方式本質是用 ThreadPoolExecutor 實作的,理解 ThreadPoolExecutor 才能準確的使用執行緒池,
引數
建構式:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
//忽略...
}
| 引數名 | 解釋 |
|---|---|
| corePoolSize | 核心執行緒數,不會銷毀,除非設定了 allowCoreThreadTimeOut |
| maximumPoolSize | 執行緒池允許的最大執行緒數 |
| keepAliveTime | 除核心執行緒以外的多余空閑執行緒等待新任務的最大時間,超過就會銷毀 |
| unit | keepAliveTime 的時間單位 |
| workQueue | 存放未執行任務的佇列,它只保存由 execute 方法提交的任務 |
| threadFactory | 生產執行緒的工廠 |
| handler | 被阻塞,也就是執行緒和佇列都已滿時的處理策略 |
核心執行緒數
-
corePoolSize可以為零,但不能為負,默認情況下,核心執行緒也是在任務到達時才創建, -
以核心執行緒數為界限,大于核心執行緒數執行器會優先把任務添加到佇列;小于核心執行緒數執行器會優先創建執行緒執行任務,
-
執行緒數實際最大值為
(2^29)-1,而不是Integer.MAX_VALUE的(2^31)-1,因為執行緒池中的實際執行緒數是和執行緒池運行狀態共同存盤在型別為AtomicInteger名為ctl的值中(它保存了執行緒池的狀態),其中執行緒池運行狀態有 5 種,因此二進制下需要最少 3 位才能保存下這 5 種狀態,結果就是下面這樣,
作業佇列
ThreadPoolExecutor 原始碼中推薦使用 SynchronousQueue、LinkedBlockingQueue 和 ArrayBlockingQueue ,Blocking 表示是阻塞的,Queue 表示是單向的,ps:Deque 表示是雙向的,
SynchronousQueue不持有佇列,直接將任務分配給執行緒,如果沒有空閑執行緒就創建執行緒,達到maximumPoolSize時就會拒絕任務,所以需要設定maximumPoolSize為很大的值,反過來,當處理任務的時間比新任務到達的時間慢就會導致執行緒數無限增加,它可以防止任務之前相互依賴的問題,LinkedBlockingQueue無界佇列,當核心執行緒已滿且都在作業時新任務會被添加到佇列中,因為是無界的,所以maximumPoolSize引數沒有意義,與SynchronousQueue相同,如果處理任務的時間比新任務到達的時間慢就會導致佇列無限增長,它適合相互之間獨立的任務且平滑短暫的請求,ArrayBlockingQueue有界佇列,選擇小佇列大執行緒池可能遇到系統資源使用增加和背景關系切換導致吞吐量減少的問題;選擇大佇列小執行緒池可能遇到任務被阻塞(I/O)導致執行緒調度時間增加的問題,它防止了使用有限的maximumPoolSize時系統資源被耗盡的問題,比較均衡,所以更難調優,
拒接策略
新任務既不能添加到佇列中,也不能創建執行緒去執行,就會使用設定好的拒絕策略處理它,ThreadPoolExecutor 提供了一下四種,
AbortPolicy中止策略,拒絕時拋出RejectedExecutionException例外,這是默認的拒絕策略,CallerRunsPolicy運行方呼叫策略,呼叫提交者本身的執行緒執行該任務,DiscardPolicy丟棄策略,無法執行的任務將直接丟棄,DiscardOldestPolicy丟棄老任務策略,拋棄最新進入佇列的任務,再添加新任務到佇列中,如果失敗則一直重試,
運行狀態
| 狀態名 | 說明 | 值 |
|---|---|---|
| RUNNING | 接收新任務并處理佇列中的任務, | -1 |
| SHUTDOWN | 不接受新任務,但處理排隊中的任務, | 0 |
| STOP | 不接受新任務,不處理佇列中的任務,終止正在運行的任務, | 1 |
| TIDYING | 所有任務都已終止,執行緒池中沒有執行緒 | 2 |
| TERMINATED | terminated()已運行完 | 3 |
提交任務
提交任務有兩種方式,一個是 execute,有回傳值,另一個是 submit,沒有回傳值,
execute原始碼如下:
public void execute(Runnable command) {
if (command == null)
//任務為慷訓傳空指標
throw new NullPointerException();
//獲取執行緒池狀態
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
//當前執行緒數小于核心執行緒數,創建新執行緒去執行這個任務
if (addWorker(command, true))
//成功了就結束方法
return;
//沒成功,更新狀態
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
//執行緒池運行狀態是 RUNNING,添加任務到佇列中,再次獲取執行緒池狀態
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
//執行緒池的運行狀態不是 RUNNING,移除該任務,使用拒絕策略拒絕任務,結束運行
reject(command);
else if (workerCountOf(recheck) == 0)
//沒執行緒了,創建一個執行緒去執行佇列中的任務
addWorker(null, false);
}
else if (!addWorker(command, false))
//執行緒池運行狀態不是 RUNNING 或者既不能添加新執行緒,也不能添加到佇列中,所以執行拒絕策略
reject(command);
}
其中 addWorker 方法通過檢查執行緒池的運行狀態和邊界(corePoolSize 或 maximumPoolSize)來添加新執行緒的,它有 firstTask 和 core 兩個引數,型別分別為 Runnanle 和 boolean,firstTask 是創建的新執行緒立即執行的任務,為空時不執行,core 為 true 時邊界為 corePoolSize,為 false 時邊界為 maximumPoolSize,
submit是由AbstractExecutorService提供的,它多載了 3 個方法,如下:
public Future<?> submit(Runnable task) {//...}
public <T> Future<T> submit(Runnable task, T result) {//...}
public <T> Future<T> submit(Callable<T> task) {//...}
大致就是把任務封裝下,呼叫前面的 execute 方法執行,最后回傳 Future,
提交任務的流程如下圖:
注意:進入第三個判斷,也就是執行緒數小于最大數,創建的新執行緒是不立即執行任務,
執行任務
執行緒池中的執行緒在 ThreadPoolExecutor 中就是 Worker,Worker 實作了 Runnable 介面的 run 方法,它在 run 方法中呼叫了 runWorker 方法執行任務,通過 getTask 方法獲取阻塞或定時任務,
參考、學習資料
深入理解Java執行緒池:ScheduledThreadPoolExecutor
ThreadPoolExecutor 引數詳解
Java并發編程:Callable、Future和FutureTask
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/141401.html
標籤:Java
上一篇:shiro原始碼分析-憑證匹配器(密碼匹配器)原始碼分析
下一篇:Java比較器
