本文部分摘自《Java 并發編程的藝術》
Excutor 框架
1. 兩級調度模型
在 HotSpot VM 的執行緒模型中,Java 執行緒被一對一映射為本地作業系統執行緒,在上層,Java 多執行緒程式通常應用分解成若干個任務,然后使用用戶級的調度器(Executor)將這些任務映射為固定數量的執行緒;在底層,作業系統內核將這些執行緒映射到硬體處理器,這種兩級調度模型的示意圖如圖所示:

從圖中可以看出,應用程式通過 Executor 框架控制上層調度,下層的調度則由作業系統內核控制
2. 框架結構
Executor 框架主要由三大部分組成:
-
任務
包括被執行任務需要實作的介面:Runnable 介面或 Callable 介面
-
任務的執行
包括任務執行機制的核心介面 Executor,以及繼承自 Executor 的 ExecutorService 介面,Executor 框架有兩個關鍵類實作了 ExecutorService 介面,分別是 ThreadPoolExecutor 和 ScheduleThreadPoolExecutor,它們都是執行緒池的實作類,可以執行被提交的任務
-
異步計算的結果
包括介面 Future 和實作 Future 介面的 FutureTask 類
3. 執行程序
主執行緒首先要創建實作 Runnable 或 Callable 介面的任務物件,可以使用工具類 Executors 把一個 Runnable 物件封裝為一個 Callable 物件
// 回傳結果為 null
Executors.callable(Runnable task);
// 回傳結果為 result
Executors.callable(Runnable task, T result);
然后把 Runnable 物件直接交給 ExecutorService 執行
ExecutorService.execute(Runnable command);
或者把 Runnable 物件或 Callbale 物件提交給 ExecutorService 執行
ExecutorService.submit(Runnable task);
ExecutorService.submit(Callable<T> task);
如果執行 ExecutorService.submit 方法,將會回傳一個實作 Future 介面的物件 FutureTask,最后,主執行緒可以執行 FutureTask.get() 方法來等待任務執行完成,也可以執行 FutureTask.cancel(boolean mayInterruptIfRunning) 來取消此任務的執行
ThreadPoolExecutor
Executor 框架最核心的類是 ThreadPoolExecutor,它是執行緒池的實作類,有關介紹可以參考之前寫過的一篇文章
下面分別介紹三種 ThreadPoolExecutor
1. FixedThreadPool
FixedThreadPool 被稱為可重用固定執行緒數的執行緒池,下面是 FixedThreadPool 的源代碼實作
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
FixedThreadPool 的 corePoolSize 和 maximumPoolSize 都被設定為創建 FixedThreadPool 時指定的引數 nThreads,當執行緒池中的執行緒數大于 corePoolSize 時,keepAliveTime 為多余的空閑執行緒等待新任務的最長時間,超過這個時間后多余的執行緒將被終止,這里把 keepAliveTime 設定為 0L,意味著多余的空閑執行緒會被立即終止
FixedThreadPool 的 execute() 運行示意圖如下所示

對上圖說明如下:
- 如果當前運行的執行緒少于 corePoolSize,則創建新執行緒來執行任務
- 執行緒池完成預熱之后(當前運行的執行緒數等于 corePoolSize),將任務加入 LinkedBlockingQueue
- 執行緒執行完 1 中的任務后,會在回圈中反復從 LinkedBlockingQueue 獲取任務來執行
FixedThreadPool 使用無界佇列 LinkedBlockingQueue 作為執行緒池的作業佇列(佇列的容量為 Integer.MAX_VALUE),使用無界佇列作為作業佇列會對執行緒池帶來如下影響:當執行緒池中的執行緒數達到 corePoolSize 后,新任務將在無界佇列中等待,而無界佇列幾乎可以容納無限多的新任務,因此執行緒池中的執行緒數永遠不會超過 corePoolSize,因此 maximumPoolSize 就成了無效引數,keepAliveTime 也是無效引數,運行中的 FixThreadPool 不會拒絕任務
2. SingleThreadExecutor
SingleThreadExecutor 是使用單個 worker 執行緒的 Executor,下面是 SingleThreadExecutor 的源代碼實作
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
SingleThreadExecutor 的 corePoolSize 和 maximumPoolSize 被設定為 1,其他引數與 FixedThreadPool 相同,SingleThreadExecutor 使用無界佇列 LinkedBlockingQueue 作為執行緒池的作業佇列,其帶來的影響與 FixedThreadPool 相同,這里就不再贅述了

對上圖說明如下:
- 如果當前運行的執行緒數少于 corePoolSize(即執行緒池中無運行的執行緒),則創建一個新執行緒來執行任務
- 在執行緒池完成預熱之后(當前執行緒池中有一個運行的執行緒),將任務加入 LinkedBlockingQueue
- 執行緒執行完 1 中的任務后,會在一個無限回圈中反復從 LinkedBlockingQueue 獲取任務來執行
3. CachedThreadPool
CachedThreadPool 是一個會根據需要創建新執行緒的執行緒池,下面是創建 CachedThreadPool 的源代碼
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
CachedThreadPool 的 corePoolSize 被設定為 0,即 corePool 為空,maximumPoolSize 被設定為 Integer.MAX_VALUE,即 maximumPool 是無界的,這里把 keepAliveTime 設定為 60L,意味著 CachedThreadPool 中的空閑執行緒等待新任務的最長時間為 60 秒,空閑執行緒超過 60 秒后將會被終止
CachedThreadPool 使用沒有容量的 SynchronousQueue 作為執行緒池的作業佇列,但 CachedThreadPool 的 maximumPool 是無界的,這意味著,如果主執行緒提交任務的速度高于 maximumPool 中執行緒處理任務的速度,CachedThreadPool 會不斷創建新執行緒,極端情況下,CachedThreadPool 會因為創建過多執行緒而耗盡 CPU 和記憶體資源

ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor 會把待調度的任務(ScheduledFutureTask)放到一個 DelayQueue 中,ScheduledFutureTask 主要包含三個成員變數
- long 型成員變數 time,表示這個任務將要被執行的具體時間
- long 型成員變數 sequenceNumber,表示這個任務被添加到 ScheduledThreadPoolExecutor 中的序號
- long 型成員變數 period,表示任務執行的間隔周期
DelayQueue 封裝了一個 PriorityQueue,這個 PriorityQueue 會對佇列中的 ScheduledFutureTask 進行排序,排序時,time 小的排在前面(時間早的任務將被先執行),如果兩個 ScheduledFutureTask 的 time 相同,就比較 sequenceNumber,sequenceNumber 小的排在前面(如果兩個任務的執行時間相同,先提交的任務先執行)
下圖是 ScheduledThreadPoolExecutor 中的執行緒執行周期任務的程序

- 執行緒 1 從 DelayQueue 獲取已到期的 ScheduledFutureTask,到期任務是指 ScheduledFutureTask 的 time 大于等于當前時間
- 執行緒 1 執行這個 ScheduledFutureTask
- 執行緒 1 修改 ScheduledFutureTask 的 time 變數為下次將要被執行的時間
- 執行緒 1 把修改 time 之后的 ScheduledFutureTask 放回 DelayQueue 中
接下來我們看一下上圖中執行緒獲取任務的程序,源代碼如下:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null) {
available.await();
} else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay > 0) {
long tl = available.awaitNanos(delay);
} else {
E x = q.poll();
assert x != null;
if (q.size() != 0)
available.signalAll();
return x;
}
}
}
} finally {
lock.unlock();
}
}
獲取任務分為三大步驟:
- 獲取 Lock
- 獲取周期任務
- 如果 PriorityQueue 為空,當前執行緒到等待佇列中等待,否則執行下面的步驟
- 如果 PriorityQueue 的頭元素的 time 時間比當前時間大,到等待佇列等待 time 時間,否則執行下面的步驟
- 獲取 PriorityQueue 的頭元素,如果 PriorityQueue 不為空,則喚醒在等待佇列中等待的所有執行緒
- 釋放 Lock
ScheduledThreadPoolExecutor 在一個回圈中執行步驟二,直到執行緒從 PriorityQueue 獲取到一個元素之后才會退出無限回圈
最后我們再看把任務放入 DelayQueue 的程序,下面是原始碼實作
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();
q.offer(e);
if (first == null || e.compareTo(first) < 0) {
available.signalAll();
}
return true;
} finally {
lock.unlock();
}
}
添加任務分為三大步驟:
- 獲取 Lock
- 添加任務
- 向 PriorityQueue 添加任務
- 如果添加的任務是 PriorityQueue 的頭元素,喚醒在等待佇列中等待的所有執行緒
- 釋放 Lock
FutureTask
1. 簡介
Future 介面和實作 Future 介面的 FutureTask 類,代表異步計算的結果,FutureTask 除了實作 Future 介面外,還實作了 Runnable 介面,因此,FutureTask 可以交給 Executor 執行,也可以由呼叫執行緒直接執行 FutureTask.run(),根據 FutureTask.run() 方法被執行的時機,FutureTask可以處于下面三種狀態:
-
未啟動
FutureTask.run() 方法還沒有被執行之前,FutureTask 處于未啟動狀態,當創建一個 FutureTask,且沒有執行 FutureTask.run() 方法之前,這個 FutureTask 處于未啟動狀態
-
已啟動
FutureTask.run() 方法被執行的程序中,FutureTask 處于已啟動狀態
-
已完成
FutureTask.run() 方法執行完后正常結束,或被取消 FutureTask.cancel(…),或執行 FutureTask.run() 方法時拋出例外而結束,FutureTask 處于已完成狀態
下圖是 FutureTask 的狀態遷移圖

下圖是 get 方法和 cancel 方法的執行示意圖

- 當 FutureTask 處于未啟動或已啟動狀態時,執行 FutureTask.get() 方法將導致呼叫執行緒阻塞
- 當 FutureTask 處于已完成狀態時,執行 FutureTask.get() 方法將導致呼叫執行緒立即回傳結果或拋出例外
- 當 FutureTask 處于未啟動狀態時,執行 FutureTask.cancel() 方法將導致此任務永遠不會被執行
- 當 FutureTask 處于已啟動狀態時,執行 FutureTask.cancel(true) 方法將以中斷執行此任務執行緒的方式來試圖停止任務
- 當 FutureTask 處于已啟動狀態時,執行 FutureTask.cancel(false) 方法將不會對正在執行此任務的執行緒產生影響(讓正在執行的任務運行完成)
- 當 FutureTask 處于已完成狀態時,執行 FutureTask.cancel(…) 方法將回傳 false
2. 使用
可以把 FutureTask 交給 Executor 執行,也可以通過 ExecutorService.submit(...) 方法回傳一個 FutureTask,然后執行 FutureTask.get() 方法或 FutureTask.cancel(...) 方法,還可以單獨使用 FutureTask
當一個執行緒需要等待另一個執行緒把某個任務執行完后它才能繼續執行,此時可以使用 FutureTask,假設有多個執行緒執行若干任務,每個任務最多只能被執行一次,當多個執行緒試圖同時執行同一個任務時,只允許一個執行緒執行任務,其他執行緒需要等待這個任務執行完后才能繼續執行
private final ConcurrentMap<Object, Future<String>> taskCache = new ConcurrentHashMap<>();
private String executionTask(final String taskName)
throws ExecutionException, InterruptedException {
while (true) {
Future<String> future = taskCache.get(taskName); // 1.1, 2.1
if (future == null) {
Callable<String> task = new Callable<String>() {
@Override
public String call() throws InterruptedException {
return taskName;
}
};
FutureTask<String> futureTask = new FutureTask<String>(task);
future = taskCache.putIfAbsent(taskName, futureTask); // 1.3
if (future == null) {
future = futureTask;
futureTask.run(); // 1.4 執行任務
}
}
try {
return future.get(); // 1.5, 2.2
} catch (CancellationException e) {
taskCache.remove(taskName, future);
}
}
}
上述代碼的執行示意圖如圖所示:

- 兩個執行緒試圖同時執行同一個任務,這里使用了執行緒安全的 ConcurrentHashMap 作為任務快取可能到了注釋
- 兩個執行緒都執行到
// 1.1, 2.1這行時,假設執行緒一首先得到 future,根據接下來的代碼可得知,執行緒一創建任務放入快取,并執行,而執行緒二獲取執行緒一創建的任務,不需創建 - 兩個執行緒都在
// 1.5, 2.2處等待結果,只有執行緒一執行完任務后,執行緒二才能從 future.get() 回傳
3. 實作
FutureTask 的實作基于 AbstractQueuedSynchronizer(AQS)
FutureTask 宣告了一個內部私有的繼承 AQS 的子類 Sync,對 FutureTask 所有公有方法的呼叫都會委托給這個內部子類,FutureTask 的設計示意圖如下所示:

FutureTask.get() 方法會呼叫 AQS.acquireSharedInterruptibly(int arg) 方法,這個方法的執行程序如下:
- 呼叫 AQS.acquireSharedInterruptibly(int arg) 方法,該方法會回呼在子類 Sync 中實作的 tryAcquireShared() 方法來判斷 acquire 操作是否可以成功,acquire 操作可以成功的條件為:state 為執行完成狀態 RAN 或已取消狀態 CANCELLED,且 runner 不為 null
- 如果成功,get() 方法立即回傳,否則執行緒等待佇列中去等待其他執行緒執行 release 操作
- 當其他執行緒執行 release 操作(FutureTask.run() 或 FutureTask.cancel(…))喚醒當前執行緒后,當前執行緒再次執行 tryAcquireShared() 將回傳正值 1,當前執行緒將離開執行緒等待佇列并喚醒它的后繼執行緒
- 最后回傳計算的結果或拋出例外
FutureTask.run() 的執行程序如下:
- 執行在建構式中指定的任務
- 以原子方式來更新同步狀態(呼叫 AQS.compareAndSetState(int expect,int update),設定 state 為執行完成狀態 RAN),如果這個原子操作成功,就設定代表計算結果的變數 result 的值為 Callable.call() 的回傳值,然后呼叫 AQS.releaseShared(int arg)
- AQS.releaseShared(int arg) 首先會回呼在子類 Sync 中實作的 tryReleaseShared(arg) 來執行 release 操作(設定運行任務的執行緒 runner 為 null,然會回傳 true),然后喚醒執行緒等待佇列中的第一個執行緒
- 呼叫 FutureTask.done()
當執行 FutureTask.get() 方法時,如果 FutureTask 不是處于執行完成狀態 RAN 或已取消狀態 CANCELLED,當前執行執行緒將到 AQS 的執行緒等待佇列中等待(見下圖的執行緒 A、B、C、D),當某個執行緒執行 FutureTask.run() 方法或 FutureTask.cancel(...) 方法時,會喚醒執行緒等待佇列的第一個執行緒

假設開始時 FutureTask 處于未啟動狀態或已啟動狀態,等待佇列中已經有3個執行緒(A、B、C)在等待,此時,執行緒 D 執行 get() 方法將導致執行緒 D 也到等待佇列中去等待
當執行緒 E 執行 run() 方法時,會喚醒佇列中的第一個執行緒 A,執行緒 A 被喚醒后,首先把自己從佇列中洗掉,然后喚醒它的后繼執行緒 B,最后執行緒 A 從 get() 方法回傳,執行緒 B、C、D 重復 A 執行緒的處理流程,最終,在佇列中等待的所有執行緒都被級聯喚醒并從 get() 方法回傳
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/272422.html
標籤:Java
