本文章歡迎轉載,但是轉載請標明出處,程式鋒子https://blog.csdn.net/l13591302862/article/details/112757053
一 前言
平常看到很多的原始碼中都使用到了 FutureTask 物件,例如 ThreadPoolExecutor 和 Spring MVC 以及 Dubbo,但是對 FutureTask 的學習還只是停留在了表面,今天進行了深入學習,對原始碼進行了決議,希望對大家有所幫助,

二 簡介
2.1 什么是 FutureTask
a 官方描述
A cancellable asynchronous computation. This class provides a base implementation of Future, with methods to start and cancel a computation, query to see if the computation is complete, and retrieve the result of the computation.
The result can only be retrieved when the computation has completed; the get methods will block if the computation has not yet completed. Once the computation has completed, the computation cannot be restarted or cancelled (unless the computation is invoked using runAndReset).
A FutureTask can be used to wrap a Callable or Runnable object. Because FutureTask implements Runnable, a FutureTask can be submitted to an Executor for execution.
進行下簡單的翻譯:
-
可取消的異步計算,此類提供
Future的基本實作,其中包含開始和取消計算,查詢以查看計算是否完成以及檢索計算結果的方法, -
只有在計算完成后才能檢索結果,如果計算尚未完成,則
get方法將阻塞,一旦計算完成,就不能重新開始或取消計算(除非使用 runAndReset 呼叫計算), -
FutureTask可以用于包裝Callable或Runnable物件,由于FutureTask實作了Runnable, 因此可以將FutureTask提交給Executor以便執行, -
除了用作獨立類之外,此類還提供
protected方法,這些功能在創建自定義任務類時可能很有用,
b 個人理解
- 我覺得
FutureTask是一個將任務執行和結果回傳相分離的任務類,可以延遲計算,或者延遲獲取,并且保證并發安全,既可以單獨執行,也可以放入執行緒池中執行,方便進行并發異步執行,提高運行效率,
c FutureTask 類圖
同時看下 FutureTask 的類圖,我們可以發現,FutureTask 實作了 RunableFuture 介面,而 RunableFuture 又實作了 Runable 和 Future 介面,即 FutureTask 兼具 Future 和 Runnable 的能力,

d 核心方法
public class FutureTask<V> implements RunnableFuture<V> {
// 獲取結果,計算未完成,當前執行緒會被阻塞
public V get() throws InterruptedException, ExecutionException {...}
// 獲取結果,計算未完成,當前執行緒會被阻塞,但是有時間限制
// 如果超時還未完成,拋出 TimeoutException
public V get(long timeout, TimeUnit unit){...}
// 進行計算,運行 Callable 的 call 方法,然后對結果進行賦值
public void run(){...}
// 進行計算,運行 Callable 的 call 方法,然后將 state 重置為 NEW
// 不對結果進行賦值,可以重復多次運行,只有子類可以呼叫,用來進行擴展,例如 ScheduledFutureTask
protected void runAndReset(){...}
// 取消任務,引數 mayInterruptIfRunning 表示取消程序中是否中斷當前執行緒
public boolean cancel(boolean mayInterruptIfRunning){...}
// 查看任務是否被取消
public boolean isCancelled(){...}
// 查看任務是否完成
public boolean isDone(){...}
}
2.2 FutureTask 的使用案例
a 單獨使用
/**
* @author CodderFengzi
* @date 2021/1/16
**/
public class FutureTaskDemo {
private Response response = new Response("無回應");
/**
* 獲得回應
*
* @param delay 延遲
* @param timeUnit 時間單位
* @return {@link Response}
*/
private Response getResponse(long delay, TimeUnit timeUnit) {
FutureTask<Response> ft = new FutureTask<>(new Task());
try {
response = ft.get(delay, timeUnit);
} catch (InterruptedException e) {
response = new Response("中斷");
} catch (ExecutionException e) {
response = new Response("錯誤");
} catch (CancellationException e) {
response = new Response("被取消");
} catch (TimeoutException e) {
response = new Response("請求超時");
ft.cancel(true);
}
return response;
}
private static class Response {
private final String content;
private Response(String content) {
this.content = content;
}
@Override
public String toString() {
return "Response{" + "content='" + content + '\'' +
'}';
}
}
private static class Task implements Callable<Response> {
@Override
public Response call() throws Exception {
// 模擬網路傳輸時間消耗
Thread.sleep(2000L);
return new Response("正確的回應");
}
}
public static void main(String[] args) {
FutureTaskDemo0 taskDemo = new FutureTaskDemo0();
Response timeoutResponse = taskDemo.getResponse(1000L, TimeUnit.MILLISECONDS);
Response normalResponse = taskDemo.getResponse(3000L, TimeUnit.MILLISECONDS);
// 列印結果:Response{content='請求超時'}
System.out.println(timeoutResponse);
// 列印結果:Response{content='正確的回應'}
System.out.println(normalResponse);
}
}
b 在執行緒池中使用
以下提供一個簡單的例子,創建 FutureTask 物件,并放入 ThreadPoolExecutor 執行緒池中執行,可以設定超時時間,如果超過超時時間任務未執行完,則拋出 TimeoutException,
/**
* @author CodderFengzi
* @date 2021/1/16
**/
public class FutureTaskDemo1 {
private Response response = new Response("無回應");
private final ThreadPoolExecutor executor = new ThreadPoolExecutor(
4,
4,
0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("CoderFengzi"));
/**
* 獲得回應
*
* @param delay 延遲
* @param timeUnit 時間單位
* @return {@link Response}
*/
private Response getResponse(long delay, TimeUnit timeUnit) {
FutureTask<Response> ft = new FutureTask<>(new Task());
executor.execute(ft);
try {
response = ft.get(delay, timeUnit);
} catch (InterruptedException e) {
response = new Response("中斷");
} catch (ExecutionException e) {
response = new Response("錯誤");
} catch (CancellationException e) {
response = new Response("被取消");
} catch (TimeoutException e) {
response = new Response("請求超時");
ft.cancel(true);
}
return response;
}
private static class Response {
private final String content;
private Response(String content) {
this.content = content;
}
@Override
public String toString() {
return "Response{" + "content='" + content + '\'' +
'}';
}
}
private static class Task implements Callable<Response> {
@Override
public Response call() throws Exception {
// 模擬網路傳輸時間消耗
Thread.sleep(2000L);
return new Response("正確的回應");
}
}
private static class NamedThreadFactory implements ThreadFactory {
private final static AtomicInteger COUNT = new AtomicInteger(1);
private final String name;
private NamedThreadFactory(String name) {
this.name = name;
}
@Override
public Thread newThread(Runnable r) {
return new Thread(r, name + "-" + COUNT.getAndIncrement());
}
}
public static void main(String[] args) {
FutureTaskDemo taskDemo = new FutureTaskDemo();
Response timeoutResponse = taskDemo.getResponse(1000L, TimeUnit.MILLISECONDS);
Response normalResponse = taskDemo.getResponse(3000L, TimeUnit.MILLISECONDS);
// 列印結果:Response{content='請求超時'}
System.out.println(timeoutResponse);
// 列印結果:Response{content='正確的回應'}
System.out.println(normalResponse);
}
}
c 用作快取的值
同時也有一個將 FutureTask 用作快取的例子,但是這個只是簡單的使用,沒有書寫快取淘汰的邏輯,
/**
* @author CodderFengzi
* @date 2021/1/16
**/
public class FutureTaskDemo2 {
private final ConcurrentHashMap<Integer, Future<String>> map = new ConcurrentHashMap<>();
private static final ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(
4,
4,
0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("CoderFengzi"));
/**
* 獲得結果
*
* @param key 鍵
* @return {@link String}
*/
private String getResult(int key) throws InterruptedException, ExecutionException {
Future<String> f;
if ((f = map.get(key)) == null) {
// 創建任務
FutureTask<String> fu = new FutureTask<>(new Task(key));
// 如果 put 成功,回傳 null,失敗回傳之前的插入的結果,并發條件下,僅有一個能成功
f = map.putIfAbsent(key, fu);
if (f == null) {
f = fu;
fu.run();
}
}
try {
return f.get();
} catch (InterruptedException e) {
e.printStackTrace();
map.remove(key);
throw e;
} catch (ExecutionException e) {
// 移除任務
map.remove(key);
throw e;
}
}
private static class Task implements Callable<String> {
private final int number;
public Task(int number) {
this.number = number;
}
@Override
public String call() throws Exception {
System.out.println(Thread.currentThread().getName() + " 進行計算");
return "CoderFengzi" + number;
}
}
private static class NamedThreadFactory implements ThreadFactory {
private final static AtomicInteger COUNT = new AtomicInteger(1);
private final String name;
private NamedThreadFactory(String name) {
this.name = name;
}
@Override
public Thread newThread(Runnable r) {
return new Thread(r, name + "-" + COUNT.getAndIncrement());
}
}
public static void main(String[] args) throws InterruptedException {
final FutureTaskDemo2 taskDemo = new FutureTaskDemo2();
final CountDownLatch cdl = new CountDownLatch(1);
final int count = 1000;
for (int i = 0; i < count; i++) {
EXECUTOR.execute(new Runnable() {
@Override
public void run() {
try {
// 模擬多個執行緒同一時刻并發執行
cdl.await();
String result = taskDemo.getResult(666);
System.out.println(Thread.currentThread().getName() + " result = " + result);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
System.out.println("執行錯誤");
}
}
});
}
Thread.sleep(5000L);
cdl.countDown();
// 運行結果:只進行了一次計算,其他的都直接從快取獲取
// CoderFengzi-1 進行計算
// CoderFengzi-3 result = CoderFengzi666
// CoderFengzi-4 result = CoderFengzi666
// CoderFengzi-3 result = CoderFengzi666
// CoderFengzi-2 result = CoderFengzi666
// CoderFengzi-1 result = CoderFengzi666
// CoderFengzi-2 result = CoderFengzi666
}
}
d 自定義子類進行擴展
如果我們想要對 FutureTask 進行擴展,那么可以實作 FutureTask 創建自定義的子類,以下實作了一個自定義的子類,可以進行重復多次運行
/**
* @author CoderFengzi
* @date 2021/1/17
**/
public class RepeatableFutureTask<V> extends FutureTask<V> {
private static final ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(
4,
4,
0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("CoderFengzi"));
/**
* 運行次數
*/
private final int count;
public RepeatableFutureTask(Callable<V> callable, int count) {
super(callable);
this.count = count;
}
@Override
protected void done() {
super.done();
System.out.println("完成了賦值,并且釋放了所有執行緒");
}
@Override
public void run() {
runAndReset(count);
}
/**
* 重復運行
*
* @param count 運行次數
* @return boolean
*/
public synchronized boolean runAndReset(int count) {
if (count < 0) {
throw new IllegalArgumentException("count must be positive");
}
for (int i = 0; i < count; i++) {
if (!super.runAndReset()) {
break;
}
}
return true;
}
private static class Task implements Callable<Void> {
public static final String CODER_FENGZI = " CoderFengzi 666";
@Override
public Void call() throws Exception {
System.out.println(Thread.currentThread().getName() + CODER_FENGZI);
return null;
}
}
private static class NamedThreadFactory implements ThreadFactory {
private final static AtomicInteger COUNT = new AtomicInteger(1);
private final String name;
private NamedThreadFactory(String name) {
this.name = name;
}
@Override
public Thread newThread(Runnable r) {
return new Thread(r, name + "-" + COUNT.getAndIncrement());
}
}
public static void main(String[] args) throws InterruptedException {
final RepeatableFutureTask<Void> rfu = new RepeatableFutureTask<>(new Task(), 3);
final int num = 4;
for (int i = 0; i < num; i++) {
EXECUTOR.execute(rfu);
}
// 運行結果:
// CoderFengzi-1 CoderFengzi 666
// CoderFengzi-1 CoderFengzi 666
// CoderFengzi-1 CoderFengzi 666
// CoderFengzi-3 CoderFengzi 666
// CoderFengzi-3 CoderFengzi 666
// CoderFengzi-3 CoderFengzi 666
// CoderFengzi-2 CoderFengzi 666
// CoderFengzi-2 CoderFengzi 666
// CoderFengzi-2 CoderFengzi 666
// CoderFengzi-4 CoderFengzi 666
// CoderFengzi-4 CoderFengzi 666
// CoderFengzi-4 CoderFengzi 666
}
}
三 原始碼決議
3.1 核心變數
在講解原始碼之前,我們先來了解下 FutureTask 的一些重要的變數,
a 狀態講解
state 是 FutureTask 用來表示狀態的變數,一共有 NEW(新建)、COMPLETING(計算中)、NORMAL(成功)、EXCEPTIONAL(例外)、CANCELLED(取消)、INTERRUPTING(中斷中)、INTERRUPTED(已中斷)這 7 種狀態,分別用 0 ~ 6 這 7 個數字表示,初始狀態是 NEW,而COMPLETING 和 INTERRUPTING 都是中間狀態,NORMAL、EXCEPTIONAL 和 INTERRUPTED 都是最終狀態,無法回退,
/**
* 此任務的運行狀態,最初為 NEW,運行狀態僅在 set,setException 和 cancel 方法中轉換為終端狀態,
* 在完成期間,狀態可能會采用 COMPLETING(正在設定結果時)或 INTERRUPTING(僅在中斷跑步者滿足cancel(true)時)的瞬間值,
* 從這些中間狀態到最終狀態的轉換使用更高效的有序寫入,值是遞增的,因為值是唯一的,無法進一步修改,
* 為了保記憶體證可見性,為 state 加 volatile 修飾符
*
* 這里應該是用了狀態模式,可能的狀態轉換:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
private volatile int state;
// 新建狀態,FutureTask 被創建時的初始狀態
private static final int NEW = 0;
// 執行完 Callable 的 call 方法之后的狀態,但是還未進行賦值
private static final int COMPLETING = 1;
// 正常回傳結果的狀態,即獲取結果,并且賦值成功
private static final int NORMAL = 2;
// 執行例外狀態,即呼叫 Callable 的 call 方法時出現例外,會將例外賦值給結果
private static final int EXCEPTIONAL = 3;
// 取消狀態,NEW 狀態下呼叫 cancel(false) 方法后的狀態
private static final int CANCELLED = 4;
// 正在中斷的狀態,NEW 狀態下呼叫 cancel(true) 方法后,但是還未終止當前執行緒時的狀態
private static final int INTERRUPTING = 5;
// 已中斷的狀態,INTERRUPTING 狀態下呼叫執行緒的 interrupt() 方法后的狀態
private static final int INTERRUPTED = 6;
FutureTask 其實用到了狀態模式,我們用一張圖來表示這幾種狀態的轉換:

b 其他變數
/**
* The underlying callable; nulled out after running
* 需要執行的 Callable 物件
*/
private Callable<V> callable;
/**
* The result to return or exception to throw from get()
* FurtureTask 的結果,可以通過 get() 方法獲得,因為 outcome 是在 state 變數被操作后才進行賦值操作,
* 而 state 有 volatile 修飾,因此 outcome 不加 volatile 修改也可保證可見性,
* 這里可以參考知乎的 Forest Wang 的解答來理解上面的話,https://www.zhihu.com/question/41016480/answer/551056899
*/
private Object outcome; // non-volatile, protected by state reads/writes
/**
* The thread running the callable; CASed during run()
* 用來執行 callable 物件的執行緒,該執行緒是成功進行 CAS 操作,將 runner 參考指向自己的執行緒,
* 同時為了保證記憶體可見性,加上了 volatile 修飾符
*/
private volatile Thread runner;
/**
* Treiber stack of waiting threads
* 用于存盤等待執行緒的 Treiber stack,一種基于 CAS 來保證并發安全的無鎖堆疊結構,可以 https://segmentfault.com/a/1190000012463330
*/
private volatile WaitNode waiters;
c 核心資料結構 - Treiber stack
FutureTask 的并發安全是基于 Treiber stack 的,主要是用來存盤呼叫 get 方法后被阻塞住的執行緒,后面的 get 方法將會進行決議,這里我們先簡單了解下什么是 Treiber stack,并且小伙伴們看完后也可以思考下,FutureTask 是如何使用 Treiber stack?下面開始講解 Treiber stack:
-
Treiber stack是一個支持并發操作的無鎖堆疊,基于CAS實作并發,即每次入堆疊和出堆疊都需要進行CAS操作, -
入堆疊時,獲取堆疊頂,然后將新創建的節點的
next指向堆疊頂,然后再次獲取堆疊頂和原堆疊頂進行比較,如果相等則將堆疊頂修改為新創建的節點,否則重新進行CAS操作,直到成功為止, -
出堆疊時,獲取堆疊頂,如果堆疊頂為
null,直接回傳,否則獲取堆疊頂的next,然后再次獲取堆疊頂和原堆疊頂進行比較,如果相等則將把堆疊頂修改為next,并回傳原堆疊頂,否則重新進行CAS操作,直到成功為止,
下面是 Treiber stack 的簡單實作,摘自《JAVA并發編程實踐(Doug Lea)》 一書,
public class ConcurrentStack<E> {
private AtomicReference<Node<E>> top = new AtomicReference<>();
public void push(E item) {
Node<E> newHead = new Node<>(item);
Node<E> oldHead;
do {
oldHead = top.get();
newHead.next = oldHead;
} while (!top.compareAndSet(oldHead, newHead));
}
public E pop() {
Node<E> oldHead;
Node<E> newHead;
do {
oldHead = top.get();
if (oldHead == null)
return null;
newHead = oldHead.next;
} while (!top.compareAndSet(oldHead, newHead));
return oldHead.item;
}
private static class Node<E> {
public final E item;
public Node<E> next;
public Node(E item) {
this.item = item;
}
}
}
以上的代碼如果覺得不太好理解,那么可以結合下面的圖去理解:

3.2 get 方法
java.util.concurrent.FutureTask#get
該方法被執行緒執行時,如果計算完成,outcome 引數不為 null,那么獲取結果,出現執行例外時,該結果為例外,有兩個版本,一個有時間限制,一個沒有時間限制,有時間限制的超時會拋出 TimeoutException,
// 獲取結果,計算未完成,當前執行緒會被阻塞
public V get() throws InterruptedException, ExecutionException {
int s = state;
// 如果是 COMPLETING(含)以下的狀態
// 即 NEW 或者 COMPLETING 狀態
if (s <= COMPLETING)
// 進行等待,我們下面會進行講解
s = awaitDone(false, 0L);
// 此時的狀態可能是 NORMAL、EXCEPTIONAL、CANCELLED、INTERRUPTING、INTERRUPTED
// 獲取結果
return report(s);
}
// 獲取結果,計算未完成,當前執行緒會被阻塞,但是有時間限制
// 如果超時還未完成,拋出 TimeoutException
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
// 如果是 COMPLETING(含)以下的狀態
// 即 NEW 或者 COMPLETING 狀態,那么安裝時間引數進行等待
// 如果時間到了,還是 COMPLETING 以下的狀態,表示超時了
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
// 超時拋出超時例外
throw new TimeoutException();
// 此時的狀態可能是 NORMAL、EXCEPTIONAL、CANCELLED、INTERRUPTING、INTERRUPTED
// 獲取結果
return report(s);
}
java.util.concurrent.FutureTask#awaitDone
該方法用來讓執行緒進行阻塞,以等待結果的回傳,并且會將執行緒包裝成 WaitNode 物件,同時嘗試通過 CAS 操作置換 waiters,
// 當前執行緒進行阻塞,等待 run() 方法運行完成,或者在中斷或超時時中止,
// timed 表示是否有時間限制,nanos 表示等待時間
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
// 回圈
for (;;) {
// Thread.interrupted 靜態方法會判定當前執行緒是否中斷,并且重置中斷狀態
// 如果當前執行緒中斷
if (Thread.interrupted()) {
// 進入這里表明執行緒被中斷了
// 移除中斷或超時的節點,下面進行講解,此方法正是使用了 Treiber stack
removeWaiter(q);
// 拋出中斷例外
throw new InterruptedException();
}
int s = state;
// 如果 state 為 COMPLETING(3) 以上的狀態,表示已經結束
if (s > COMPLETING) {
if (q != null)
// 將節點的執行緒設 null,原因如下:
// 如果不為空,后序的 finishCompletion 方法會對該執行緒呼叫 LockSupport.unpark 方法
// 此處為了減少不必要的 unpark
q.thread = null;
// 直接回傳 state
return s;
}
// 如果正在完成
else if (s == COMPLETING) // cannot time out yet
// 那么當前執行緒先稍等下其他執行緒
// 如果其他執行緒能完成的話,當前執行緒后面就沒必要進入等待堆疊和被阻塞
Thread.yield();
else if (q == null)
// 創建等待節點,節點的 thread 變數為當前執行緒
q = new WaitNode();
// 如果排隊失敗,繼續自旋
else if (!queued)
// 此處也使用了 Treiber stack,進行 CAS 自旋操作,嘗試入堆疊,即嘗試將節點放入堆疊頂
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
// 以上如果都不是,即當前還處于 NEW 狀態,那么下面要阻塞該執行緒
// 如果有時間限制
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
// 移除中斷或超時的節點,此處是表明執行超時
removeWaiter(q);
return state;
}
// 在 nanos 時間內,進行阻塞,后面自行恢復
LockSupport.parkNanos(this, nanos);
}
else
// 進行阻塞
// 之后如果其他執行緒呼叫 run 方法對 outcome 賦值成功
// 那么又會呼叫 finishCompletion 方法完成對所有執行緒的 unpark 操作
LockSupport.park(this);
}
}
// 簡單的鏈表,用來記錄等待的執行緒,
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
// 將 thread 設定為當前執行緒
WaitNode() { thread = Thread.currentThread(); }
}
java.util.concurrent.FutureTask#removeWaiter
該方法用來洗掉超時和執行緒中斷的等待節點,此處正是用到了 Treiber stack,利用 CAS 進行出堆疊,洗掉等待節點,
/**
* 嘗試洗掉超時或中斷的等待節點,以避免積累垃圾,
* 內部節點在沒有 CAS 的情況下,其資料不會發生,因此無論如何遍歷它們,都是無害的,
* 為了避免在洗掉等待節點的時候,其他執行緒修改了頭節點,在出現明顯競爭的情況下將重新遍歷該串列,
*/
private void removeWaiter(WaitNode node) {
// 只有在超時和中斷的情況下,才進入該方法,用來洗掉超時和中斷的等待節點
// 如果等待節點不為空
if (node != null) {
// 將等待節點執行緒設定為 null
node.thread = null;
retry:
for (;;) { // restart on removeWaiter race
// 進行遍歷
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
// s 用于記錄下個節點
s = q.next;
// 如果等待節點的 thread 變數不為空,表明該執行緒是處于阻塞狀態
if (q.thread != null)
// pred 用于記錄前個節點
pred = q;
// 如果 pred 不為空
else if (pred != null) {
// 如果 q.thread 為 null,此操作為洗掉 q 節點,否則沒有任何變化
// 即從 pred -> q -> s 變成 pred -> s
// 因為此處是內部節點很安全,因此可以進行非并發地洗掉
pred.next = s;
// 如果 pred 的 thread 為 null,即節點的資料被其他執行緒修改了
if (pred.thread == null) // check for race
// 防止出現錯誤,重新進行外層回圈,即重復獲取 waiters,重新遍歷
continue retry;
}
// 此處表明等待節點的執行緒為空,因此使用 CAS 操作進行節點洗掉
else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
q, s))
// 洗掉失敗,重新進行外層回圈,即重復獲取 waiters,重新遍歷
continue retry;
}
// 如果前面的操作在洗掉節點的程序中,節點沒有被其他執行緒修改
// 即正常洗掉,那么就會走到這里,然后回傳
break;
}
}
}
3.3 run 方法
java.util.concurrent.FutureTask#run
該方法用來呼叫 Callable 物件的 call 方法,并將回傳結果賦值給 outcome 變數,如果出現執行例外,就將例外賦值給 outcome,
// 除非已取消,否則為此 Future 的 outcome 設定為其運行結果,
public void run() {
// 如果 state 不是 NEW,或者 state 為 NEW 但是 CAS 失敗時,直接回傳
// 意味著只能有一個執行緒能,其他執行緒直接回傳
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
// 如果 callable 不為空,且 state 為 NEW
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 呼叫 call 方法獲取 result
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
// outcome 設定為 ex
setException(ex);
}
// 如果 ran 為 true,即執行成功,沒有例外
if (ran)
// outcome 設定為 result
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
// 如果處于 INTERRUPTING 或者 INTERRUPTED 狀態
if (s >= INTERRUPTING)
// 使用 Thread.yield() 方法,嘗試讓出執行緒執行權,直到結果不為 INTERRUPTING
handlePossibleCancellationInterrupt(s);
}
}
// 設定回傳結果
protected void set(V v) {
// CAS 將 NEW 狀態轉換成 COMPLETING 狀態
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// 將回傳結果賦值給 outcome
outcome = v;
// 結果賦值成功,將狀態變成 NORMAL 狀態
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
// 設定例外
protected void setException(Throwable t) {
// CAS 將 NEW 狀態轉換成 COMPLETING 狀態
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// 將例外賦值給 outcome
outcome = t;
// 結果賦值成功,將狀態變成 EXCEPTIONAL 狀態
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
// 釋放所有的阻塞執行緒,運行 done 方法
finishCompletion();
}
}
java.util.concurrent.FutureTask#finishCompletion
該方法用來釋放所有等待回傳結果的執行緒,并且清除所有 waiters 上的執行緒,
// 洗掉 waiters 的所有的節點上的等待執行緒,并且釋放所有等待的執行緒
// 同時呼叫 done(),并使 callable 無效(置空),
private void finishCompletion() {
// assert state > COMPLETING;
// 如果 waiters 不為空就一直回圈,防止阻塞鏈表上的執行緒沒有被釋放
// 因為是多執行緒運行,即在此期間,仍然可能有執行緒進行 waiters 佇列
for (WaitNode q; (q = waiters) != null;) {
// 將 waiters 通過 CAS 賦值為 null
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
// 從頭節點開始遍歷
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
// 釋放阻塞執行緒
LockSupport.unpark(t);
}
// 繼續向下遍歷鏈表
WaitNode next = q.next;
if (next == null)
// 遍歷結束,跳出回圈
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
// 空方法,擴展點,由子類實作
done();
// 任務置空,幫助 gc
callable = null; // to reduce footprint
}
四 注意事項
FutureTask的狀態是不斷向前的,不會回退,即只能運行一次計算,即一旦進入終端的狀態即NORMAL、EXCEPTIONAL、INTERRUPTED狀態,那么就無法發生改變,- 如果我們想要能夠重復運行,可以實作
FutureTask,呼叫runAndReset方法,該方法可以將狀態重新變為NEW,但是注意此方法不會對outcome賦值,詳見FutureTask的子類ScheduledFutureTask, - 呼叫完
get()方法,一定要記得呼叫 run() 方法,否則執行緒將會一直被阻塞住,因此也建議最好使用get(long, TimeUnit)方法, - 如果我們想要進行擴展,可以實作
FutureTask,其留有擴展點done方法,以及有許多的protected方法,
五 參考文章
- Treiber Stack簡單分析
- 玩轉Java并發工具,精通JUC,成為并發多面手
如果有興趣可以微信搜一搜
程式鋒子,關注本人的微信公眾號
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/250206.html
標籤:java
上一篇:最安全的單例模式-列舉

