目錄
- 前言
- 任務的描述
- FutureTask的設計與實作
- FutureTask狀態機
- FutureTask幾個關鍵方法
- ThreadPoolExecutor的設計與實作
- 簡介
- 類的描述與狀態
- ThreadPoolExecutor欄位描述
- ThreadPoolExecutor狀態描述
- Worker欄位描述
- Worker狀態描述
- 任務的提交與調度
- 作業執行緒的創建與執行
- 作業執行緒的創建
- 作業執行緒的執行
- 服務的關閉,任務的取消與執行緒的回收
- 服務的關閉
- 執行緒的回收
- 執行緒的中斷
- 執行緒池的使用
- 1 核心執行緒數與最大執行緒數
- 1.1 作業執行緒的大小設定
- 1.2 作業執行緒的回收
- 2 任務佇列
- 3 任務拒絕策略
- 4 作業執行緒工廠類
- JDK平臺提供的默認執行緒池
- 實際業務中的使用
- 1 核心執行緒數與最大執行緒數
- 總結
- 參考資料
前言
在我們實際作業程序中,往往會將大的任務劃分成幾個小的子任務,待所有子任務完成之后,再整合出大任務的結果.(例如: 新增直播課的場景),任務的性質通常是多種多樣的,這里列舉一些任務的常見性質.
從資源使用的角度:
- CPU密集型 (列舉素數)
- I/O密集型 (檔案上傳下載)
從執行程序的角度:
- 依賴其他有限資源(資料庫連接池,檔案描述符)/不依賴其他有限資源
- 沒有回傳值(寫日志,logService,MesageService)
- 有回傳值(計算結果)
- 處理程序中可能拋例外(例外要如何處理)
- 可取消的任務/不可取消的任務
從執行時間的角度:
- 執行時間短(列舉100以內的素數)
- 執行時間長(資料庫呼叫)
- 永遠無法結束(爬蟲任務)
- 限時任務,需要盡快回應(H5端介面,GUI點擊事件)
- 定時任務(Job)
任務是對現實問題的抽象,其對應程式中的某些方法,而方法的執行需要呼叫堆疊.
從Java記憶體模型圖中可以看出,Java執行緒為任務的執行提供了所需的方法呼叫堆疊,其中包括本地方法呼叫堆疊和Java方法呼叫堆疊,在32位系統中通常占0.5M,而在64位系統中通常占1M+10幾KB的內核資料結構.
而且有的作業系統也會對一個行程能創建的執行緒數量進行限制. 因此我們并不能無限制的創建執行緒,執行緒是一種共享資源,需要統一維護和調度,以便使資源利用率更加高效,這便有了執行緒池的概念.
Java記憶體模型圖:

注: 很多服務端的應用程式比如MySQL,Web服務器都使用了執行緒池技術.
然而也有例外,對于耗時較短的任務,比如僅有記憶體操作,導致執行緒的維護時間/任務的執行時間比值偏大,這類任務就不適合使用多執行緒技術,例如Redis服務.
或者對于需要確保執行緒安全性的,比如GUI開發包,也是使用單執行緒,例如Swing開發包,JavaScript等.
對于任務的執行,我們往往會有許多需求,例如觀察任務的執行狀態,獲取任務的執行結果或者執行程序中拋出的例外資訊,而對于長時間執行的任務,可能還會需要暫停任務,取消任務,限時等
jdk中有許多執行時間長的任務,例如,Thread.join, Object.wait, BlockingQueue.poll,Future.get,這些任務的介面設計也體現了這些需求例如lock介面:
public void lock(); //基于狀態的介面,當無法獲取鎖時,掛起當前執行緒
public void lockInterruptibly() throws InterruptedException; // 通過拋出中斷例外來回應中斷
public boolean tryLock();//用于快速判斷釋放可加鎖,用于輪詢
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException; //可限時的操作
下面列舉一些任務的執行需要考慮的問題.
任務的執行策略:
- 任務在什么執行緒中執行
- 任務的執行順序,FIFO還是按優先級
- 多少個任務可以并發執行
- 任務過多導致系統過載,選擇拒絕哪個任務,即任務的拒絕策略
- 如何通知應用程式有任務被拒絕
- 如何通知應用程式任務的執行結果,包括成功的結果和失敗的結果
為了應對這些繁雜的現實需求,jdk為我們提供了Executor框架.通過這個中間人,將任務的提交和實際執行策略解耦,并且提供了對生命周期的支持(ExecutorService),客戶端只需要關注任務的構建和任務的提交,由中間人來關注實際的執行策略,從而封裝了任務執行的復雜性,
本文主要介紹Java平臺提供的Executor執行框架,其中Runable表示任務,FutureTask表示任務的執行結果,ThreadPoolExecutor表示具體的任務執行策略.
任務的描述
Executor框架中,Runable介面表示任務,但是這個任務沒有回傳值且不能拋出例外,而Callable介面卻可以.所以Executors工具類提供了RunableAdapter配接器,通過callalbe(Runable)方法,將 runable轉為 callable.
| 任務 | 描述 |
|---|---|
| Runnable | 可執行的任務,無回傳值,不可拋出例外 |
| Callable | 可執行的任務,有回傳值,可以拋出例外 |
| FutureTask | 可執行的任務,可以管理任務的執行狀態和讀取任務的執行結果 |
為了對任務維護任務的運行狀態以及異步獲取任務的運行結果,Executor框架提供了Future類,該類表示一個異步任務的計算結果.提供了一些方法:
- 獲取任務執行結果 get(), get(long,TimeUnit)
- 取消任務 cancel(boolean)
- 判斷任務是否取消,判斷任務是否完成 isCancelled(),isDone()
同時也提供了記憶體一致性保證: Future.get()之前的操作, happen-before Future.get()之后的操作
FutureTask實作了Future介面和Runable介面,表示一個可取消的任務,AbstractExecutorService正是通過將Runable封裝成FutureTask,來管理和維護任務的狀態以及獲取任務的執行結果,下面介紹jdk1.8中FutureTask的實作.
AbstractExecutorService:
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
FutureTask的設計與實作
在jdk1.8之前的版本中為了簡介,依賴AQS來實作FutureTask,但在jdk1.8后,則放棄使用,通過WaitNode鏈表,來維護等待結果的執行緒.
FutureTask原始碼
public class FutureTask<V> implements RunnableFuture<V> {
/*
* FutureTask是一個并發安全類,有并發控制機制
* 先前版本為了簡潔,使用AQS來實作,jdk1.8后的并發控制使用 一個state 域,通過CAS操作state,同時通過一個簡單的stack來維護 waiting threads*/
private volatile int state;
// 實際的任務
private Callable<V> callable;
// 任務的執行結果或者拋出的例外,通過get()獲取這個欄位
// 不需要 volatile,不會有并發讀寫該欄位的情況
private Object outcome;
// 在run()的時候 會set runner
private volatile Thread runner;
// 一個簡單的等待佇列,為什么不用AQS了? AQS太重了
private volatile WaitNode waiters;
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() {
thread = Thread.currentThread();
}
}
}
FutureTask狀態機
了解了一個類的狀態機,也就大致了解了類的作業程序,FutureTask的狀態機如下所示

| 狀態 | 描述 |
|---|---|
| NEW | 初始任務狀態 |
| COMPLETING | 任務已執行完,正在設定outcome |
| NORMAL | 任務正常執行完成 |
| EXCEPTIONAL | 任務執行程序中拋出例外,作業執行緒終止 |
| INTERRUPTED | 執行任務的作業執行緒收到中斷請求 |
思考一個問題,為什么要有一個COMLETING中間態?
為了維護復合操作的原子性:設定outcome的值和更新任務狀態需要原子操作
protected void set(V v) {
// 通過CAS先check下,確保狀態轉換是原子op,同時也確保outcome=v 和設定狀態的值這一對復合操作的原子性
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// 進來后就是單執行緒環境了
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
FutureTask幾個關鍵方法
run(),cancel(),awaiDone()
public void run() {
// 先驗條件
if (state != NEW ||
// set runner
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
// 任務執行出錯
result = null;
ran = false;
setException(ex);
}
if (ran)
// 執行成功 set outcome
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// 確保不會丟失中斷信號
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
public boolean cancel(boolean mayInterruptIfRunning) {
// cancel: new -> interrupting->interrupted,或者 new -> cancelled
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
// check then action, 復合操作, 如果失敗則說明此時任務的狀態不是 new了,回傳false,即取消失敗
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
// 通過給 執行該任務的執行緒 發送中斷信號來取消任務
if (t != null)
t.interrupt();
} finally { // final state
// 發送完后默認置為 interrupted, 表示 信號已發過去了,但任務不一定能停下來,需要任務自己判斷這個信號
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
// 任務未完成前,通過LockSupport.park等待任務完成
s = awaitDone(false, 0L);
return report(s);
}
@SuppressWarnings("unchecked")
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V) x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable) x);
}
/**
* 可中斷的方法
* Awaits completion or aborts on interrupt or timeout.
*
* @param timed true if use timed waits
* @param nanos time to wait, if timed
* @return state upon completion
*/
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
// 可中斷方法的大部分實作,都是通過拋InterruptedException()來回應中斷,注意Thread.interrupted()會清除中斷信號
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
// 如果任務 達到了終態,即isDone()了, 即 S>COMPLETIOG,回傳 isDone() => s>competing
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
// 正在寫結果, 馬上就結束了
} else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
// 任務還未開始, 即 s=NEW 時,此時創建 等待執行緒節點,再過一次前面的操作 到下一步
q = new WaitNode();
else if (!queued)
// 新增的節點未入隊,將節點入隊,入隊成功后再過一次前面的操作 到下一步
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
// 如果過了等待時間了,不等了, 把前面構建的節點從等待佇列中洗掉,回傳 state
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
// 否則 限時阻塞當前執行緒,等待任務完成時被喚醒
LockSupport.parkNanos(this, nanos);
} else
// timed=false時, 永遠阻塞當前執行緒,等待任務完成時被喚醒
LockSupport.park(this);
}
}
ThreadPoolExecutor的設計與實作
簡介
ThreadPoolExecutor = ThreadPool + Executor
Executor: 其中僅有一個execute(Runable) 方法,作業模式為生產者-消費者模式,提交(submit)任務的客戶端相當于生成者,執行任務的執行緒(worker)則相當于消費者,
ThreadPool: 從字面意思來看是一個執行緒的容器,用于管理和維護作業者執行緒,執行緒池的作業與任務佇列(work queue)密切相關的,其中任務佇列保存了所有等待執行的任務,
ThreadPoolExecutor即以生產者-消費者為作業模型,基于執行緒池實作的執行器,
由簡介我們可以了解到,一般執行緒池的實作涉及兩個關鍵組件:work queue,workers,而在ThreadPoolExecutor的設計和實作中,其分別對應BlockingQueue介面,Worker類,
| 執行緒池的實作所需關鍵組件 | ThreadPoolExecutor |
|---|---|
| work queue | BlockingQueue |
| worker | final class Worker extends AbstractQueuedSynchronizer implements Runnable{} |
接下來將從四個方面入手,介紹ThreadPoolExecutor的設計與實作,體會大師們(Doug Lea與JCP Expert Group)是如何思考和解決執行緒池問題的,
- 類的結構與狀態
- 任務的提交與調度
- 執行緒的創建與執行
- 服務的關閉,任務的取消,執行緒的回收
類的描述與狀態
在開始介紹ThreadPoolExecutor前,有必要先了解下Executor框架的其他組成部分,

Executor介面:框架的核心介面,其中只包含一個execute方法
public interface Executor {
/**
* @throws RejectedExecutionException if this task cannot be accepted for execution
* @throws NullPointerException if command is null
*/
void execute(Runnable command);
}
ExecutorService介面:執行緒池作為一個服務需要有服務的狀態維護等操作,這些操作被放到了這個介面,例如shutdown(),shutdownNow(),awaitTermination(),這里也給出了服務關閉方法,
public static void shutdownAndAwaitTermination(ExecutorService pool) {
pool.shutdown(); // Disable new tasks from being submitted
try {
// Wait a while for existing tasks to terminate
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
pool.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!pool.awaitTermination(60, TimeUnit.SECONDS))
System.err.println("Pool did not terminate");
}
} catch (InterruptedException ie) {
pool.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}
AbstractExecutorService類: 提供ExecutorService的基本實作,例如:通過將任務封裝成FutureTask,實作submit方法,任務的批量執行方法:invokeAll,invokeAny的通用實作等,
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
/** 注意這里已經說明了該介面可能會拋出此例外,但我們常常會忘記處理此例外而導致報錯,但我們卻記得NPE的處理
* @throws RejectedExecutionException
* @throws NullPointerException
*/
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
ThreadPoolExecutor欄位描述
ThreadPoolExecutor欄位
``` public class ThreadPoolExecutor extends AbstractExecutorService { /** * ctl(the main pool control state):用于維護了以下兩個欄位的值 * workCount: 存活著的執行緒數 低29位,大概5億 * runState: 高3位 執行緒池服務的狀態: RUNNING(-1),SHUTDOWN(0),STOP(1),TIDYING(2),TERMINATED(3) */ private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; //如何快速獲取n個1? [(1 << n) - 1],CPACITY=29個1 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 高3位 private static int runStateOf(int c) { return c & ~CAPACITY; } // 低29位 private static int workerCountOf(int c) { return c & CAPACITY; } // rs + wc => rs | wc; 加法換成位運算,更快一點 :) private static int ctlOf(int rs, int wc) { return rs | wc; } private volatile int corePoolSize; private volatile int maximumPoolSize; private volatile boolean allowCoreThreadTimeOut; /** * idle thread的定義: waiting for work * Timeout in nanoseconds for idle threads waiting for work. * 如果執行緒池中運行的執行緒比corePoolSize大,多余的執行緒會在沒有任務的時候的等待keep-alive times,如果在這個時間段內還是沒有任務執行,會回收這個執行緒直到corePoolSize數量(如何回收的? go processWorkerExit), * 注:(這個欄位只有當maximumPoolSize大于corePoolSize時有效) */ private volatile long keepAliveTime; // 任務佇列 private final BlockingQueueThreadPoolExecutor狀態描述
該類主要包含下面幾個狀態:
| 狀態 | 描述 |
|---|---|
| RUNNING | Accept new tasks and process queued tasks |
| SHUTDOWN | Don't accept new tasks, but process queued tasks |
| STOP | Don't accept new tasks, don't process queued tasks |
| TIDYING | All tasks have terminated, workerCount is zero,the thread transitioning to state TIDYING will run the terminated() hook method |
| TERMINATED | terminated() has completed |

狀態間的操作:
| 操作 | 描述 |
|---|---|
| shutdown() | 僅會給idle Worker發送中斷信號,會緩慢的結束執行緒池服務:將queue中的任務都執行完 |
| shutdownNow() | 會給所有Worker發送中斷信號,會快速的結束執行緒池服務(不安全): 嘗試中斷正在執行任務的執行緒,同時回傳queue中的任務串列 |
| awaitTermination() | 是一個基于狀態的方法,將在狀態達到TERMINATED時回傳,可以用在需要同步判斷執行緒池關閉的場景 |
| 其余方法 | 從圖可以看出,任何可以使的queue和pool為空的操作比如:addWorkerFailed,processWorkerExit,shutdown,shutdownNow等都有可能使得狀態轉為TERMINATE,所以這些方法都會呼叫tryTerminate(),以確保服務在正確的狀態 |
Worker欄位描述
點擊查看代碼
**
* 實際的作業者執行緒,主要維護執行緒的中斷狀態
* 這個類為了簡化在運行任務的時候對鎖的獲取和釋放,設計成了 extends AQS
* 當shutdown的時候,會通過tryLock判斷執行緒是否正在執行任務,如果為false,表示執行緒不在執行任務,而是在等待新的任務,通過發送中斷信號,中斷這些執行緒的等待,這些執行緒被中斷后會判斷是由于什么原因喚醒的,如果這個時候執行緒池狀態為SHUTDOWN,那么這些執行緒就會被回收.
* 注:(執行緒被喚醒的原因可能是被中斷了,也有可能是有任務了,也有可能是時間到了,喚醒后需要二次判斷go getTask())
* 注:(執行緒由于沒有任務掛起(poll()), 掛起期間可能有新的任務過來了(offer())被喚醒,也有可能被中斷信號通知關閉而被喚醒.)
*/
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
/**
* Worker繼承了AQS,也就是說Worker還有一個state屬性欄位,這個欄位是有必要分析下的:
* -1: 剛初始化
* 0: 剛呼叫runWorker或者沒任務了
* 1: 正在執行任務,正是通過這個state欄位,來判斷執行緒是否正在執行任務(tryLock)
*/
final Thread thread;
// 在Worker初始化時,firstTask可能有值
Runnable firstTask;
// 每個作業者執行緒完成的任務數,任務性質可以不同,即執行緒是可以復用的
volatile long completedTasks;
Worker(Runnable firstTask) {
// 只有在worker執行緒已開始的時候中斷才有意義,所以在初始化worker的時候state=-1,這個時候不會被中斷go isLocked()
setState(-1);
this.firstTask = firstTask;
// 初始化Workder的時候 通過 threadFactory創建執行緒,最終通過系統呼叫,由OS創建內核執行緒
this.thread = getThreadFactory().newThread(this);
}
// runWorker實際實作主執行回圈,接下來就是重點了,任務執行緒初始化時,拿到了firstTask(有的話),以及一個新的執行緒,接下來就開始真正地執行任務了
public void run() {
runWorker(this);
}
// 設計worker類的主要目的,用來中斷執行緒
void interruptIfStarted() {
Thread t;
// 只有在worker執行緒已開始的時候中斷才有意義, 所以在初始化worker的時候state=-1,這個時候不會被中斷
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
Worker狀態描述
Worker主要有3個狀態
| 狀態 | 描述 |
|---|---|
| INIT(-1) | 初始Worker狀態 |
| WAINTING TASK(0) | 等待任務到達 |
| RUNING | 正在執行任務 |
Worker狀態機如圖

任務的提交與調度
在介紹完具體的ThreadPoolExecutor與Worker的描述以及狀態機后,我們先來大致看下ThreadPoolExecutor的作業流程,有助于理解后續的操作步驟.

從圖中我們可以看出,一個正常執行完成的任務其主要經過submit() -> addWorker()->worker.thread.start()->worker.run()->runWorker()→task.run()等步驟,下面我們具體介紹下這些步驟.
任務調度方法主要在execute,具體原始碼注釋如下:
點擊查看代碼
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 執行緒池小于core
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 新增任務失敗,可能在addWorker的時候執行緒數達到了corePoolSize的水平,此時放到workQueue
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 判斷如果執行緒池正在shutdown,拒絕任務
if (!isRunning(recheck) && remove(command))
reject(command);
// 確保任務佇列中的任務可以被執行緒執行
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
// 作業佇列滿了,再嘗試增加worker,執行緒個數判斷使用 maxvalue
} else if (!addWorker(command, false))
reject(command);
}
作業執行緒的創建與執行
作業執行緒的創建主要根據執行緒池狀態,core和maximum引數判斷是否可以新增作業執行緒,如果新增成功,則開始執行任務.
作業執行緒的創建
點擊查看代碼
private boolean addWorker(Runnable firstTask, boolean core) {
retry: for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
!(rs == SHUTDOWN &&
firstTask == null &&
!workQueue.isEmpty()))
// 當shutdown且 佇列是空的時候就沒必要加worker了
return false;
for (;;) {
int wc = workerCountOf(c);
// 達到限制數量了也回傳false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 非shutdown,或者 是shutdown但是firstTask==null的時候,可以新增執行緒
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
// 新增worker的時候更新largestPoolSize
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// start() -> runWorker()->task.run()
// 新增成功后 呼叫start(),如果start()失敗了,比如ntive stack申請失敗,也回傳false
t.start();
workerStarted = true;
}
}
} finally {
if (!workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
作業執行緒的執行
作業者執行緒的執行原理上比較簡單,既不斷從任務佇列中取出任務,執行任務,然后回傳執行緒池并等待下一個任務,
// 典型的執行緒池作業者執行緒結構
public void run() {
Throwable thrown = null;
try {
while(!isInterrupted())
runTask(getTaskFromWorkQueue());
}catch(Throwable e) {
throw = e;
}finaly {
threadExited(this,thrown);
}
}
}
下面是ThreadPoolExecutor實際的作業者執行緒的任務執行,其中會涉及到執行緒的回收,任務的取消等實作.
點擊查看代碼
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// state: -1 => 0 , unlock -> release -> tryRelease -> state=0
// 這個時候任務執行緒開始作業,可以被中斷
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// getTask從佇列中拿任務
while (task != null || (task = getTask()) != null) {
// 作業的時候將state置為1,表示正在作業,這個操作一定會成功(正常來說lock是一個基于狀態的方法,可能會阻塞呼叫執行緒),因為不會有其他地方呼叫w.lock
// 注:(state: 0 => 1 lock -> acquire -> tryAcquire -> state=1)
w.lock();
// 執行緒當且僅當池子stopping(shutdown,shutdownNow的時候)的時候才會interrupted,且一定要interrupted
// 注:(worker執行緒是由執行緒池服務來維護的,只有執行緒池服務有權對worker執行緒進行中斷操作)
if ((runStateAtLeast(ctl.get(), STOP) ||
// 注:(Thread.interrupted會清除interrupted標記)
// 這里表明worker執行緒只能在STOPING(STOP,TIDING,TERMINATED)時中斷信號有效,其他形式的中斷信號(例如在任務中中斷)會被清除
(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP)))
&&
!wt.isInterrupted())
wt.interrupt();
try {
// hooc 函式
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
// 保存例外 thrown 到1326處理(給客戶端提供的鉤子函式,afterExecute,使客戶端可以感知到任務失敗并進行特定的處理),同時拋出例外到
// 1330 處理(執行緒池自身對任務例外的處理)
thrown = x;
throw x;
} catch (Error x) {
thrown = x;
throw x;
} catch (Throwable x) {
thrown = x;
throw new Error(x);
} finally {
// 將任務執行程序中的例外傳入到hooc函式
afterExecute(task, thrown);
}
} finally {
// beforeExecutehooc函式出錯或者任務出錯了的話,task=null,從而跳到1336,completedAbruptly=true,從而回收執行緒,即使執行緒并沒有完成任何作業
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 處理task=null的場景或者任務拋處例外時的場景,釋放執行緒,什么時候task會為null ,go getTask
processWorkerExit(w, completedAbruptly);
}
}
服務的關閉,任務的取消與執行緒的回收
服務的關閉
通過呼叫shutdown或者shutdownNow給作業執行緒發送中斷信號嘗試取消任務,并回收執行緒,繼而關閉服務
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 狀態至為SHUTDOWN
advanceRunState(SHUTDOWN);
// 給每個idle作業執行緒(已啟動且沒任務的)執行緒發送中斷信號
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 狀態置為 STOP
advanceRunState(STOP);
// 給每個作業執行緒發送中斷信號
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
執行緒的回收
執行緒回收流程圖:

觸發執行緒的回收主要有下面幾種情況
- 由于setMaximumPoolSize,導致currentSize > maximumPoolSize時,getTask()回傳null
- 執行緒池狀態為stop時,即呼叫shutdownNow()時,getTask()回傳null
- 執行緒池狀態為shutdown,即呼叫shutdown(),執行緒池給idle執行緒發送中斷信號,如果此時任務佇列為空時,getTask()回傳null
- 執行緒等待任務超時,getTask()回傳null
- 任務執行失敗,拋出運行時例外,導致task=null
當getTask()回傳null或者task=null時,runWorker()跳到processWorkExit()處理執行緒回收,此時會新增執行緒來替換由于任務例外而被終止的執行緒
點擊查看代碼
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// stopped 或者 shutdown 且 workQueue.isEmpty 回傳null 2,3
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
// allowCoreThreadTimeOut等價于 wc> corePooSize
// allowCoreThreadTimeOut, wc>corePoolSize, 一起表示 當任務執行緒獲取任務超時時,被要求中斷(subject to termination)
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 1 wc > maxPoolSize 或者 4 獲取任務超時且 要求獲取任務超時的行程被中斷(timed && timedOut)
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 如果沒有任務,則阻塞在這里, workQueue.offer后繼續運行
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
if (r != null)
return r;
// r==null,poll回傳null,表示timedOut,下次 go 1210
timedOut = true;
} catch (InterruptedException retry) {
// 忽略中斷信號
timedOut = false;
}
}
}
執行緒池通過workers.remove()操作來釋放worker的參考,從而由垃圾回收器回收執行緒,如果執行緒是由于任務執行例外而導致的終止,則會新增作業執行緒來替換它.
點擊查看代碼
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
// getTask時會decrementWorkerCount
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 回收執行緒前先將執行緒執行的任務數加一下
completedTaskCount += w.completedTasks;
// 通過釋放worker參考來釋放執行緒
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && !workQueue.isEmpty())
min = 1;
// 如果不是由于任務忽然中斷且執行緒數符合最小值的要求,那么無需addWorker替換
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 如果任務執行緒是由于任務執行例外退出的 或者 執行緒池中的數量小于min,addWorker
addWorker(null, false);
}
}
由上文我們了解到,無論是任務的取消,還是執行緒池服務的關閉,其中都是通過執行緒的中斷來實作的,理解了執行緒中斷我們就能夠理解任務的取消以及服務關閉的具體含義,
執行緒的中斷
中斷機制是一種Java執行緒間的通信機制,每個Java執行緒都有一個boolean型別的中斷狀態,當呼叫Thread.interrupt(),并不意味著立即停止目標執行緒正在執行的任務,只是傳遞一個中斷請求,將中斷狀態置為true,至于什么時候讀這個狀態,以及基于這個狀態做什么操作,則完全由任務自身去控制,(早期的jdk庫提供了Thread.stop(),Thread.suspend(),Thread.resume()來允許用戶暴力終止,暫停,恢復一個執行緒,在jdk1.2后這些方法就被置為deprecated了,因為這樣操作是不安全的,stop()會強制釋放掉執行緒持有的鎖,拋出ThreadDeathException,導致資料處于不一致的狀態,從而造成未知的后果)例如:
public class Factorizer {
private BigInteger lastNumber;
private BigInteger[] lastFactors;
public synchronized BigInteger[] cal(BigInteger number) {
if(Objects.equal(number,lastNumber)) {
return lastFactors;
}else {
//這兩步是復合操作,需要原子性,我們不會在這兩步之間判斷Thread.currentThread().isInterrupted()
lastFactors = factor(i);
lastNmuber=i;
return lastFactors;
}
}
}
jdk中有許多長時任務都是通過中斷機制取消任務的,它們對中斷的回應通常是:清除中斷狀態(Thread.interrupted()),然后拋出一個例外(InterruptedException),表示長時任務操作由于中斷而提前結束,(wait,join,sleep,FutureTask.get(),CoundownLatch.await,lockInterrrupted(),BlockQueue.poll()等)
在撰寫任務的時候,基于這個狀態做什么請求或者不做請求,例如重試或者忽略,都是可以的,只要滿足自身任務的需要即可,但設計糟糕的任務可能會屏蔽中斷請求,從而導致其他方法呼叫該任務的時候無法對中斷進行回應,例如:
不安全的中斷示例
public static void main(String[] args) {
Thread calPrimeTask = new Thread(InterruptedTest::calPrime);
calPrimeTask.start();
ThreadUtil.sleep(1000);
// 嘗試終止calPrimeTask
calPrimeTask.interrupt();
}
public static void calPrime() {
while(!Thread.currentThread().isInterrupted()) {
ThreadUtil.sleep(50);
log();
System.out.println("一個耗時50ms的任務完成!");
}
}
public static void log() {
/**
* 假設有一段代碼呼叫了jdk中的某個可能拋出InterruptedException的介面,這段代碼捕獲到這個例外后本意是不會處理這個例外,但是如果它沒有再
* Thread.currentThread().interrupt(),就會影響其他使用到這個方法的函式,例如calPrime();
*/
ArrayBlockingQueue<Integer> que = new ArrayBlockingQueue<>(12);
try {
System.out.println("do other thing");
que.poll(30, TimeUnit.MILLISECONDS);
}catch (InterruptedException e) {
e.printStackTrace();
// poll拋出 InterruptedException后,會清空 interrupted標記,這里回傳false
System.out.println(Thread.currentThread().isInterrupted());
// 如果這里不重新設定interrupted標記的話,這回使得calPrimary任務無法取消,我們不知道呼叫堆疊的其他地方是否會用到中斷信號,所以必須把中斷信號設定回去
Thread.currentThread().interrupt();
}
}
// 不支持取消但仍可以呼叫可中斷阻塞方法,忽略中斷信號并重試
public Task getNextTask(BlockingQueue<Task> queue) {
boolean interrupted = false;
try {
while(true) {
try {
return queue.take();
}catch(InterruptedException e) {
interrupted = true;
// 忽略并重試
// 如果我們在這里呼叫Thread.currentThread().interrupt()的話會引起死回圈
}
}finally {
if(interrupted) {
// 避免屏蔽中斷信號,其他方法可能需要
Thread.currentThread().interrupt();
}
}
}
}
執行緒池的使用
在實際生產生活中,由于任務性質的多種多樣,我們往往會自定義符合各自應用場景的執行緒池來執行任務,不同的執行緒池引數設定意味著不同的任務執行策略(避免雞蛋放在一個籃子里),
// 不自定義執行緒池的危害:可能造成無法預知的死鎖情況,次要的任務的執行影響重要的任務
public void deadLock() throws Exception{
CountDownLatch countDownLatch = new CountDownLatch(1);
Callable<String> task1 = () -> {
ThreadUtil.sleep(2000);
countDownLatch.countDown();
return "task1 finished";
};
// task2 依賴 task1
Callable<String> task2 = () -> {
countDownLatch.await();
ThreadUtil.sleep(1000);
return "task2 finished";
};
ExecutorService executorService = Executors.newFixedThreadPool(1);
// 假如 task2先于task1調度,就會發生死鎖,因為只有一個執行緒,task1在任務佇列里依賴task2的完成
Future<String> result2 = executorService.submit(task2);
Future<String> result1 = executorService.submit(task1);
System.out.println(result2.get() + result1.get());
};
那么,執行緒池引數的選擇就顯得尤為重要,以下是一些ThreadPoolExecutor引數的介紹以及使用建議,
1 核心執行緒數與最大執行緒數
從上文的任務執行流程我們大致可以了解到,執行緒池主要通過這兩個引數控制作業執行緒的數量,在設定這兩個引數的時候需要注意以下兩個問題,
1.1 作業執行緒的大小設定
大小設定主要影響系統的吞吐量,如果設定過小造成資源利用率低,人為地降低了系統的吞吐量,如果設定過大會造成執行緒競爭加劇,使得消耗更多的的計算資源在執行緒背景關系切換上而不是執行任務上,最終也會導致系統的吞吐量降低,
設定執行緒池大小主要以下幾種策略:
策略一
coreSize = 2×Ncpu,maxSize = 25×Ncpu
實際使用中我們往往會定義許多執行緒池,如果每個執行緒池的大小會導致核心執行緒越來越多,會使得競爭加劇,甚至達到作業系統限制的執行緒池數量
策略二
W/C: wait time/ compute time
Ucpu: 目標CPU利用率
Ncpu: Runtime.getRuntime().availableProcessors()
Nthreads = Ncpu * Ucpu * (1 + W/C)
I/O任務大部分時間都在等待I/O完成,這個值會比較大,上線前不好估計此值,而且執行緒池中的任務型別不一定都是一致的
策略三
QPS:每秒任務數 999線:單個任務的花銷
1s內一個執行緒能夠執行的任務數: 1/999線
1s內n個執行緒能執行的任務數:n * 1/999線
即 QPS=n/999線 ==> n = QPS*999線
核心執行緒:corePoolSize = QPS * 999線(單位:s)
timeout: 能容忍介面的最大回應時間
佇列大小:queueCapacity = corePoolSize/999線 * timeout = QPS * timeout
最大執行緒:maxPoolSize = (QPS峰值- queueCapacity) * 999線
此策略考慮了實際生產環境的任務使用情況,也是假定執行緒池中的任務是同型別的,
如果執行緒池中的任務不是服務間呼叫而是單獨的函式或者sql呼叫,那么QPS和999線就不好估計了,
策略四
使用動態執行緒池,可以動態調整執行緒池引數,以應對不同的使用場景變化,且可以通過cat監控執行緒池的使用情況,
實際生產業務使用中建議使用動態執行緒池,動態調整任務執行策略,同時為避免執行緒資源浪費,搭配下文提到的allowCoreThreadTimeOut一起使用,
public static ThreadPoolExecutor getExecutor(String name,boolean allowCoreThreadTimeOut) {
ThreadPoolExecutor result = null;
try {
result = DynamicThreadPoolManager.getInstance().getThreadPoolExecutor(name);
}catch (PoseidonException e) {
log.error("ExecutorCase.getExecutor Error:",e);
}
if(Objects.isNull(result)) {
return ExecutorUtil.getExecutor(name,Runtime.getRuntime().availableProcessors());
}
// 任務完成后不需要留有核心執行緒可關閉
result.allowCoreThreadTimeOut(allowCoreThreadTimeOut);
return result;
}
1.2 作業執行緒的回收
執行緒本質上是兩個方法呼叫堆疊,是一個共享資源,當一個執行緒池處理QPS較低的任務時(eg:boss后臺的介面,或者執行周期長的定時任務),我們往往會想當無任務執行的時候執行緒池可以自動回收執行緒資源,于是將coreSize設定成0,
假如我們將coreSize=0,但卻使用的是有界佇列,比如new ArrayBlockingQueue<>(),按照上文的執行流程,那么只有當任務塞滿任務佇列的時候,執行緒池才會正式開始執行任務,
為了解決這個問題,jdk1.6版本后的ThreadPoolExecutor提供了allowCoreThreadTimeOut欄位,將這個欄位置為ture后,我們不用設定coreSize=0,就可以讓執行緒在無任務的時候等待keepAliveTime時間,將coreThread回收,具體實作可以查看:getTask,processWorkerExit方法,
2 任務佇列
ThreadPoolExecutor使用的是BlockingQueue作為任務佇列,即任何阻塞佇列都可以用于任務的存盤和轉發,下面介紹3種常見任務佇列的選擇策略
策略一
無界佇列(Unbounded queues)
例如LinkedBlockingQueue,使用無界佇列主要適用于任務執行時間很短且確定的任務,例如找出某個自然數的因數,這種任務一定能夠快速執行完成,但是實際業務場景中的任務執行時間通常是不確定的,需要遠程呼叫介面,有許多I/O操作,這樣就找出了任務消費很慢,如果此時有任務提交過來會找出OOM,從而影響整個服務的穩定,所以不建議使用,
策略二
有界佇列(Bounded queues)
例如ArrayBlockingQueue,用于限制資源的使用量,避免出現OOM,
調整任務佇列長度時往往也要調整最大執行緒數(maxmiumSize),
| case | 問題 |
|---|---|
| 很大的queue.size,很小的maxmiumSize | 一方面降低CPU了使用率和執行緒切換頻率,避免過度競爭,從而導致人為的降低了吞吐量(可以是優點也可以是缺點) |
| 很小的queue.size,很大的maxmiumSize | 會導致CPU使用率增高,這也會導致吞吐量降低 |
實際使用的時候大部分任務都是i/o密集型的,所以其實可以并發執行比我們想的更多的任務,適用于不緊急但希望盡可能快的任務,例如定時job任務或者匯出任務,這種任務的執行我們希望在不影響其他服務的情況下盡可能快的執行,
策略三
直接處理任務(Direct handoffs)
例如:synchoronousQueue,當客戶端提交任務時,在有合適的執行緒執行此任務才回傳,否則阻塞客戶端,
使用這個佇列少了入隊和出隊操作,效率更好,適用于需要盡快回應的任務,例如h5端的介面,
這種方式通常需要 unbounded maxmiumPoolSize, 即無限制的執行緒數,但是如果當客戶端不停地提交任務且消費不過來地時候,會有執行緒數瘋狂飆升,造成系統不穩定的風險,所以實際使用中還是會限制 maxmiumSize的值,可以通過使用下文中提到的CallerRunRejectPolicy來緩慢降低客戶端提交任務的速度,從而將異步降級為同步執行,
3 任務拒絕策略
在執行緒池關閉(shutdown()),或者任務佇列滿了,作業執行緒也滿了的時候會執行RejectedExecutionHandler.rejectedExecution(),來拒絕任務,jdk為我們提供了以下四種拒絕策略,我們也可以自己定義合適的拒絕策略.
| 拒絕策略 | 描述 |
|---|---|
| ThreadPoolExecutor.AbortPolify | 默認拒絕策略,拋出RejectedExecutionException例外來通知客戶端有任務被拒絕,客戶端經常會忽略這個例外的處理導致發生線上問題 |
| ThreadPoolExecutor.DiscardPolify | 丟棄最新提交的任務,一般沒有哪個任務是可以丟棄的,不建議使用, |
| ThreadPoolExecutor.DiscardOldestPolify | 丟棄最先提交的任務,這里注意如果使用的是優先級佇列的話,會拋棄最高優先級的任務,隨意得謹慎使用 |
| ThreadPoolExecutor.CallerRunPolify | 由客戶端執行緒執行任務,即 客戶端代碼 -> submit() -> task.run() ->客戶端代碼,可以降低任務的提交速度,使得由異步執行降級為同步執行 |
4 作業執行緒工廠類
執行緒池使用執行緒工廠類來生成作業執行緒,我們可以自定義或者使用guava提供的ThreadFactoryBuilder()來創建執行緒工廠類
public class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory(String threadPoolName) {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
namePrefix = threadPoolName + "-" + poolNumber.getAndIncrement() + "-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon()) {
t.setDaemon(false);
}
if (t.getPriority() != Thread.NORM_PRIORITY) {
t.setPriority(Thread.NORM_PRIORITY);
}
return t;
}
}
// guava
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat(name + "-pool-%d").build();
JDK平臺提供的默認執行緒池
Executors作為Executor介面的伴生類,提供了一些默認的執行緒池供我們使用.當實際很少使用他們,因為Executors創建的執行緒池,要么是無界執行緒池(CachedThreadPoolExecutor), 要么是無界任務佇列(FixedThreadPoolExecutor),兩者都有資源耗盡的風險,會影響到整個服務器.SingletonThreadPoolExecutor又不適合需要多個任務并發執行的場景,所以最好是自定義適合各自業務場景的執行緒池.
Executors.newSingleThreadExecutor:
new ThreadPoolExecutor(1, 1,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
僅有一個作業執行緒的執行緒池,使用無界佇列,主要用于任務按順序執行;
Executors.newCachedThreadPool:
new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
無界執行緒池,任務無入隊和出隊操作,效率更好,適用于任務執行時間短且確定的場景,但當消費不過來時,有資源耗盡的風險;
Executors.newFixedThreadPool():
new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
有界執行緒池,但是使用的是無界佇列,避免由于執行緒過多而造成的資源耗盡風險,但當消費不過來時,由于使用的無界佇列,也會有OOM的風險;
Executors.ScheduledThreadPoolExecutor():
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
};
使用延遲佇列實作,用于執行定時任務;
實際業務中的使用
ThreadPoolExecutor僅是Executor介面的實作,在實際業務中我們可能會有批量任務執行的需求,雖然AbstractExecutorSerivice提供了invokeAll方法用于批量執行任務,當我們往往會使用ExecutorCompletionService來實作這個目的,因為后者相比前者,客戶端可以更快地拿到批量任務的執行結果(無論是例外還是正常執行),而前者只是按照任務的提交順序回傳任務執行結果.(考慮假如第一個任務就執行時間很長,第二個任務執行時間很短,使用ExecutorCompletionService可以先拿到第二個任務地執行結果)
// 實際使用中通過poseidon平臺提供的動態執行緒池來維護執行緒池引數.
public static ThreadPoolExecutor getExecutor(String name,boolean allowCoreThreadTimeOut) {
ThreadPoolExecutor result = null;
try {
result = DynamicThreadPoolManager.getInstance().getThreadPoolExecutor(name);
}catch (PoseidonException e) {
log.error("ExecutorCase.getExecutor Error:",e);
}
if(Objects.isNull(result)) {
return ExecutorUtil.getExecutor(name,Runtime.getRuntime().availableProcessors());
}
// 任務完成后不需要留有核心執行緒可關閉
result.allowCoreThreadTimeOut(allowCoreThreadTimeOut);
return result;
}
// 基于ExecutorCompletionService實作的invokeAll
public static boolean invokeAll(CompletionService<Boolean> executorService, List<Callable<Boolean>> taskList) {
if (Objects.isNull(executorService) || CollectionUtil.isNullOrEmpty(taskList)) {
return true;
}
List<Boolean> results = batchExecutor(executorService, taskList, false);
return results.stream().allMatch(e -> Objects.equals(e, Boolean.TRUE));
}
public static <T> List<T> batchExecutor(CompletionService<T> completionService,
List<Callable<T>> taskList, boolean allowError) {
List<T> resultList = Lists.newArrayList();
taskList.forEach(completionService::submit);
for (int i = 0; i < taskList.size(); i++) {
Future<T> resultFuture = null;
try {
resultFuture = completionService.take();
T result = resultFuture.get();
resultList.add(result);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
// 原始例外
Throwable cause = e.getCause();
log.error("batchExecutor執行例外: ", cause);
if (!allowError) {
// 重新拋出可處理的例外
throw new BizException(BizErrorCodeEnum.OPERATION_FAILED, cause.getMessage());
}
} finally {
if (Objects.nonNull(resultFuture)) {
resultFuture.cancel(true);
}
}
}
return resultList;
}
// boss后臺介面,資料匯出,同步,定時等任務,此類任務一般執行頻率較低,所以可以在任務的時候回收核心執行緒,同時為避免客戶端忘記處理RejectedExecutionException例外,使用CallerRunsPolicy拒絕策略,當任務佇列滿了的時候,通過使用呼叫者執行緒執行,來減慢任務提交的速度,這樣并發執行就自然降級為同步執行
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, 8, 20,
// 默認callerRunsPolicy,佇列滿后,降為串行執行
TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(200), namedThreadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
// 任務完成后不需要留有核心執行緒可關閉
threadPoolExecutor.allowCoreThreadTimeOut(true);
// h5端介面,此類任務一般執行頻率較高,可以不用回收核心執行緒,同時希望任務盡可能快的處理,所以任務佇列可以選擇使用SynchronizeQueue(),避免任務的入隊和出隊消耗,最大執行緒數可以設定地相對大點,這樣有利于更多任務并發執行,拒絕策略同樣選擇使用CallerRunsPolicy
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, 15, 20,
// 默認callerRunsPolicy,佇列滿后,降為串行執行
TimeUnit.MILLISECONDS, new SynchronousQueue<>(), namedThreadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
// 使用信號量限制任務提交速度
public class BoundedExecutor {
private final Executor exec;
private final Semaphore semaphore;
public BoundedExecutor(Executor exec, int bound) {
this.exec = exec;
this.semaphore = new Semaphore(bound);
}
public void submitTask(final Runnable command) throw InterruptedException{
semaphore.acquire();
try {
exec.execute(new Runnable() {
public void run() {
try {
command.run();
}finally {
semaphore.release();
}
}
});
}catch(RejectedExecutionException e) {
semaphore.release();
}
}
}
總結
本篇文章介紹了Executor框架的大體實作思路和在實際業務中的主要使用場景,專案中引入了執行緒池使用的好的話可以提高服務的回應速率,增大資源的利用率,但同時也引入了不穩定的因素,例如使用不當造成OOM或者拋出RejectedExecuteException等,所以對于執行緒池的監控就顯地尤為重要,本篇文章并沒有涉及執行緒地監控方面地知識,
參考資料
- [1] 《Java并發編程實踐》
- [2] JDK 1.8原始碼
- [3] Java執行緒池實作原理及其在美團業務中的實踐
- [4] Thread.interrupt()相關原始碼分析
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/547276.html
標籤:其他
