Future模式
【1】Future模式是多執行緒開發中常見的設計模式,它的核心思想是異步呼叫,對于Future模式來說,它無法立即回傳你需要的資料,但是它會回傳一個契約,將來你可以憑借這個契約去獲取你需要的資訊,
【2】通俗一點就是生產者-消費者模型的擴展,經典“生產者-消費者”模型中訊息的生產者不關心消費者何時處理完該條訊息,也不關心處理結果,Future模式則可以讓訊息的生產者等待直到訊息處理結束,如果需要的話還可以取得處理結果,
java中是如何實作Future模式
【1】直接繼承Thread或者實作Runnable介面都可以創建執行緒,但是這兩種方法都有一個問題 就是:沒有回傳值,也就是不能獲取執行完的結果,
【2】因此java1.5就提供了Callable介面來實作這一場景,而Future和FutureTask就可以和Callable介面配合起來使用,【從而達到Future模式的效果】
Callable和Runnable的區別
【1】原始碼展示
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
【2】分析說明
Runnable 的缺陷:1.不能回傳一個回傳值 2.不能拋出 checked Exception,
Callable的call方法可以有回傳值,可以宣告拋出例外,
【3】疑問決議
1)為什么需要 Callable?
Callable 配合 Future 類 可以了解任務執行情況,或者取消任務的執行,還可獲取任務執行的結果,這些功能都是 Runnable 做不到的,因為它沒有回傳值,不能拋出例外,
了解Future介面
【1】介紹 :Future就是對于具體的Runnable或者Callable任務的執行結果進行取消、查詢是否完成、獲取結果, 必要時可以通過get方法獲取執行結果,該方法會阻塞直到任務回傳結果,
【2】原始碼展示
public interface Future<V> { // 取消任務的執行,引數指定是否立即中斷任務執行,或者等等任務結束 boolean cancel(boolean mayInterruptIfRunning); //任務是否已經取消,任務正常完成前將其取消,則回傳true boolean isCancelled(); //需要注意的是如果任務正常終止、例外或取消,都將回傳true boolean isDone(); //取得回傳物件 V get() throws InterruptedException, ExecutionException; //取得回傳對像,允許等待設定的時間范圍 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
了解FutureTask類(Future介面的實作類)
【1】介紹說明
1)該物件相當于是消費者和生產者的橋梁,消費者通過FutureTask存盤任務的處理結果,更新任務的狀態:未開始、正在處理、已完成等,而生產者拿到的 FutureTask被轉型為Future介面,可以阻塞式獲取任務的處理結果,非阻塞式獲取任務處理狀態,
2)FutureTask既可以被當做Runnable來執行,也可以被當做Future來獲取Callable的回傳結果,
【2】代碼展示
0)繼承關系
public class FutureTask<V> implements RunnableFuture<V> public interface RunnableFuture<V> extends Runnable, Future<V>
1)屬性值
// 表示當前任務的狀態 private volatile int state; // 表示當前任務的狀態是新創建的,尚未執行 private static final int NEW = 0; // 表示當前任務即將結束,還未完全結束,值還未寫,一種臨界狀態 private static final int COMPLETING = 1; // 表示當前任務正常結束 private static final int NORMAL = 2; // 表示當前任務執行程序中出現了例外,內部封裝的callable.call()向上拋出例外了 private static final int EXCEPTIONAL = 3; // 表示當前任務被取消 private static final int CANCELLED = 4; // 表示當前任務中斷中 private static final int INTERRUPTING = 5; // 表示當前任務已中斷 private static final int INTERRUPTED = 6; // 我們在使用FutureTask物件的時候,會傳入一個Callable實作類或Runnable實作類,這個callable存盤的就是 // 傳入的Callable實作類或Runnable實作類(Runnable會被使用修飾者設計模式偽裝為) private Callable<V> callable; // 正常情況下,outcome保存的是任務的回傳結果 // 不正常情況下,outcome保存的是任務拋出的例外 private Object outcome; // 保存的是當前任務執行期間,執行任務的執行緒的參考 private volatile Thread runner; // 因為會有很多執行緒去get結果,這里把執行緒封裝成WaitNode,一種資料結構:堆疊,頭插頭取 private volatile WaitNode waiters; static final class WaitNode { // 執行緒物件 volatile Thread thread; // 下一個WaitNode結點 volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } }
2)構造方法
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable } public FutureTask(Runnable runnable, V result) { //封裝成callable,但回傳值為傳入的值 this.callable = Executors.callable(runnable, result); this.state = NEW; }
3)核心方法
1.run()方法
public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; // 只有當任務狀態為new并且runner舊值為null才會執行到這里 try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { // 呼叫callable.run()并回傳結果 result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) // 內部設定outcome為callable執行的結果,并且更新任務的狀態為NORMAL(任務正常執行)并且喚醒阻塞的執行緒 set(result); } } finally { runner = null; int s = state; if (s >= INTERRUPTING) // 如果當前任務處于中斷中,則執行這個方法執行緒會不斷讓出cpu直到任務處于已中斷狀態 handlePossibleCancellationInterrupt(s); } }
2.set(V v)方法
protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // 設定outcome(結果)為callable.run()回傳的結果 outcome = v; //修改狀態 UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state // 喚醒呼叫get()的所有等待的執行緒并清空堆疊 finishCompletion(); } } protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // 設定outcome(結果)為callable.run()拋出的例外 outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } }
3.get()方法
public V get() throws InterruptedException, ExecutionException { int s = state; // 條件成立會呼叫awaitDone方法自旋等待直到任務完成 if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); } public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); return report(s); } //這個方法是真正用來獲取任務的回傳結果的,這個方法在get()方法里面會被呼叫,如果該方法被呼叫,說明任務已經執行完了, private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }
4.awaitDone(boolean timed, long nanos)方法
// 這個方法的作用是等待任務被完成(正常完成或出現例外完成都算完成),被中斷,或是被超時 private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { // 如果當前執行緒出現中斷例外,則將該執行緒代表的WaitNode結點移出堆疊并拋出中斷例外 if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; // 如果當前任務狀態大于COMPLETING,說明當前任務已經有結果了(任務完成、中斷、取消),直接回傳任務狀態 if (s > COMPLETING) { if (q != null) q.thread = null; return s; } // 當前任務處于臨界狀態,即將完成,則當前執行緒釋放cpu else if (s == COMPLETING) // cannot time out yet Thread.yield(); // 第一次自旋,如果當前WitNode為null,new一個WaitNode結點 else if (q == null) q = new WaitNode(); // 第二次自旋,如果當前WaitNode節點沒有入隊,則嘗試入隊 else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q); // 第三次自旋,到這里表示是否定義了超時時間 else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } // 未超出時間,掛起當前執行緒一定時間 LockSupport.parkNanos(this, nanos); } else // 掛起當前執行緒,該線程會休眠(什么時候該執行緒會繼續執行呢?除非有其他執行緒呼叫unpark()或者中斷該執行緒) LockSupport.park(this); } }
5.finishCompletion()方法
//任務執行完成(正常結束和非正常結束都代表任務執行完成)會呼叫這個方法來喚醒所有因呼叫get()方法而陷入阻塞的執行緒, private void finishCompletion() { // 如果條件成立,說明當前有陷入阻塞的執行緒 for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; // 執行到這里說明還有因呼叫get()而陷入阻塞的執行緒,自旋接著喚醒 // 這里q.next設定為null幫助GC(垃圾回收) q.next = null; // unlink to help gc q = next; } break; } } //拓展方法 done(); // 將callable設定為null,方便GC callable = null; }
【3】注意事項
1)當 for 回圈批量獲取 Future 的結果時容易 block,get 方法呼叫時應使用 timeout限制
2)Future 的生命周期不能后退,一旦完成了任務,它就永久停在了“已完成”的狀態,不能從頭再來
3)FutureTask 一般是結合執行緒池使用,然后額外采用FutureTask獲取結果,
【4】Future的局限性
從本質上說,Future表示一個異步計算的結果,它提供了isDone()來檢測計算是否已經完成,并且在計算結束后,可以通過get()方法來獲取計算結果,在異步計算中,Future確實是個非常優秀的介面,但是,它的本身也確實存在著許多限制:
1)并發執行多任務:Future只提供了get()方法來獲取結果,并且是阻塞的,所以,除了等待你別無他法;
2)無法對多個任務進行鏈式呼叫:如果你希望在計算任務完成后執行特定動作,比如發郵件,但Future卻沒有提供這樣的能力;
3)無法組合多個任務:如果你運行了10個任務,并期望在它們全部執行結束后執行特定動作,那么在Future中這是無能為力的;
4)沒有例外處理:Future介面中沒有關于例外處理的方法;
了解CompletionService介面
【1】介紹
1)CompletionService 介面是一個獨立的介面,并沒有擴展 ExecutorService , 其默認實作類是ExecutorCompletionService;
2)介面CompletionService 的功能是:以異步的方式一邊執行未完成的任務,一邊記錄、處理已完成任務的結果,讓兩件事分開執行,任務之間不會互相阻塞,可以實作先執行完的先取結果,不再依賴任務順序了,
3)簡單來說,CompletionService 就是監視著 Executor執行緒池執行的任務,用 BlockingQueue 將完成的任務的結果存盤下來,(當然,這個也可以是程式員自己去實作,但是要不斷遍歷與每個任務關聯的 Future,然后不斷去輪詢,判斷任務是否已經完成,比較繁瑣);
【2】原始碼展示
public interface CompletionService<V> { //提交一個 Callable 任務;一旦完成,便可以由take()、poll()方法獲取 Future<V> submit(Callable<V> task); //提交一個 Runnable 任務,并指定計算結果; Future<V> submit(Runnable task, V result); //獲取并移除表示下一個已完成任務的 Future,如果目前不存在這樣的任務,則等待, Future<V> take() throws InterruptedException; //獲取并移除表示下一個已完成任務的 Future,如果不存在這樣的任務,則回傳 null, Future<V> poll(); //獲取并移除表示下一個已完成任務的 Future,如果目前不存在這樣的任務,則將等待指定的時間(如果有必要) Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException; }
了解ExecutorCompletionService類(CompletionService介面的實作類)
【1】介紹
1)內部通過阻塞佇列+FutureTask,實作了任務先完成可優先獲取到,即結果按照完成先后順序排序,內部有一個先進先出的阻塞佇列,用于保存已經執行完成的Future,通過呼叫它的take方法或poll方法可以獲取到一個已經執行完成的Future,進而通過呼叫Future介面實作類的get方法獲取最終的結果,
【2】原始碼分析
1)屬性分析
//執行緒池 private final Executor executor; //判斷執行緒池是否繼承抽象類 private final AbstractExecutorService aes; //阻塞佇列 private final BlockingQueue<Future<V>> completionQueue;
2)構造方法
//對于執行緒池必須定義,而阻塞佇列會有默認的 //而默認的LinkedBlockingQueue對于并發編程來說是存在隱患的(依據阿里手冊來說,因為佇列的無盡性會導致OOM) //所以一般考慮要你自己去定義阻塞佇列 public ExecutorCompletionService(Executor executor) { if (executor == null) throw new NullPointerException(); this.executor = executor; //如果是繼承了抽象類的實作 this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = new LinkedBlockingQueue<Future<V>>(); } public ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue) { if (executor == null || completionQueue == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = completionQueue; }
3)阻塞佇列元素的定義
private class QueueingFuture extends FutureTask<Void> { QueueingFuture(RunnableFuture<V> task) { super(task, null); this.task = task; } //FutureTask里面的拓展方法,在run的時候會被呼叫,所以是做完任務了會自動提交到佇列里面 protected void done() { completionQueue.add(task); } private final Future<V> task; }
4)實作介面的方法
//采用newTaskFor來封裝非標準的取消 //因為傳入的Callable或Runnable,這種不是FutureTask,故需要封裝 private RunnableFuture<V> newTaskFor(Callable<V> task) { if (aes == null) return new FutureTask<V>(task); else return aes.newTaskFor(task); } private RunnableFuture<V> newTaskFor(Runnable task, V result) { if (aes == null) return new FutureTask<V>(task, result); else return aes.newTaskFor(task, result); } //下面是對介面定義的方法的實作 public Future<V> submit(Callable<V> task) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task); executor.execute(new QueueingFuture(f)); return f; } public Future<V> submit(Runnable task, V result) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task, result); executor.execute(new QueueingFuture(f)); return f; } public Future<V> take() throws InterruptedException { return completionQueue.take(); } public Future<V> poll() { return completionQueue.poll(); } public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException { return completionQueue.poll(timeout, unit); }
【3】匯總說明
1)說白了就是基于FutureTask 是單執行緒的任務,考慮可以等待獲取回傳結果,那么應該可以采用執行緒池的方法形成多任務并發的結果,
2)故定義了CompletionService介面作為規范,ExecutorCompletionService類作為具體的實作類【作為管理者】,不然每次采用執行緒池來做的話都要自己定義去管理,
3)當需要批量提交異步任務的時候建議你使用CompletionService,CompletionService將執行緒池Executor和阻塞佇列BlockingQueue的功能融合在了一起,能夠讓批量異步任務的管理更簡單,
4)CompletionService能夠讓異步任務的執行結果有序化,先執行完的先進入阻塞佇列,利用這個特性,你可以輕松實作后續處理的有序性,避免無謂的等待,同時還可以快速實作諸如Forking Cluster這樣的需求,
5)執行緒池隔離,CompletionService支持自己創建執行緒池,這種隔離性能避免幾個特別耗時的任務拖垮整個應用的風險,
【4】示例展示
1)示例代碼
public class CompletionServiceDemo { public static void main(String[] args) throws InterruptedException, ExecutionException { //創建執行緒池 ExecutorService executor = Executors.newFixedThreadPool(10); //創建CompletionService CompletionService<Integer> cs = new ExecutorCompletionService<>(executor); //異步向電商S1詢價 cs.submit(() -> getPriceByS1()); //異步向電商S2詢價 cs.submit(() -> getPriceByS2()); //異步向電商S3詢價 cs.submit(() -> getPriceByS3()); //將詢價結果異步保存到資料庫 for (int i = 0; i < 3; i++) { //從阻塞佇列獲取futureTask Integer r = cs.take().get(); executor.execute(() -> save(r)); } executor.shutdown(); } private static void save(Integer r) { System.out.println("保存詢價結果:{}"+r); } private static Integer getPriceByS1() throws InterruptedException { TimeUnit.MILLISECONDS.sleep(5000); System.out.println("電商S1詢價資訊1200"); return 1200; } private static Integer getPriceByS2() throws InterruptedException { TimeUnit.MILLISECONDS.sleep(8000); System.out.println("電商S2詢價資訊1000"); return 1000; } private static Integer getPriceByS3() throws InterruptedException { TimeUnit.MILLISECONDS.sleep(3000); System.out.println("電商S3詢價資訊800"); return 800; } }
了解CompletableFuture
【1】介紹
1)簡單的任務,用Future獲取結果還好,但我們并行提交的多個異步任務,往往并不是獨立的,很多時候業務邏輯處理存在串行[依賴]、并行、聚合的關系,如果要我們手動用 Fueture 實作,是非常麻煩的,
2)CompletableFuture是Future介面的擴展和增強,CompletableFuture實作了Future介面,并在此基礎上進行了豐富地擴展,完美地彌補了Future上述的種種問題,更為重要的是,CompletableFuture實作了對任務的編排能力,借助這項能力,我們可以輕松地組織不同任務的運行順序、規則以及方式,從某種程度上說,這項能力是它的核心能力,而在以往,雖然通過CountDownLatch等工具類也可以實作任務的編排,但需要復雜的邏輯處理,不僅耗費精力且難以維護,
3)CompletableFuture除了實作Future介面還實作了CompletionStage介面,
4)CompletionStage介面: 執行某一個階段,可向下執行后續階段,異步執行,默認執行緒池是ForkJoinPool.commonPool(),
【2】常用方法
1)描述依賴關系:
1.thenApply() 把前面異步任務的結果,交給后面的Function
2.thenCompose()用來連接兩個有依賴關系的任務,結果由第二個任務回傳
2)描述and聚合關系:
1.thenCombine:任務合并,有回傳值
2.thenAccepetBoth:兩個任務執行完成后,將結果交給thenAccepetBoth消耗,無回傳值,
3.runAfterBoth:兩個任務都執行完成后,執行下一步操作(Runnable),
3)描述or聚合關系:
1.applyToEither:兩個任務誰執行的快,就使用那一個結果,有回傳值,
2.acceptEither: 兩個任務誰執行的快,就消耗那一個結果,無回傳值,
3.runAfterEither: 任意一個任務執行完成,進行下一步操作(Runnable),
4)并行執行:
1.CompletableFuture類自己也提供了anyOf()和allOf()用于支持多個CompletableFuture并行執行
【3】創建異步操作
1)CompletableFuture 提供了四個靜態方法來創建一個異步操作:
public static CompletableFuture<Void> runAsync(Runnable runnable) public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
2)這四個方法區別在于:
1.runAsync 方法以Runnable函式式介面型別為引數,沒有回傳結果,supplyAsync 方法Supplier函式式介面型別為引數,回傳結果型別為U;Supplier 介面的 get() 方法是有回傳值的(會阻塞),
2.沒有指定Executor的方法會使用ForkJoinPool.commonPool() 作為它的執行緒池執行異步代碼,如果指定執行緒池,則使用指定的執行緒池運行,
3.默認情況下 CompletableFuture 會使用公共的 ForkJoinPool 執行緒池,這個執行緒池默認創建的執行緒數是 CPU 的核數(也可以通過 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 來設定 ForkJoinPool 執行緒池的執行緒數),如果所有 CompletableFuture 共享一個執行緒池,那么一旦有任務執行一些很慢的 I/O 操作,就會導致執行緒池中所有執行緒都阻塞在 I/O 操作上,從而造成執行緒饑餓,進而影響整個系統的性能,所以,強烈建議你要根據不同的業務型別創建不同的執行緒池,以避免互相干擾,
3)supplyAsync的兩種獲取結果的方法join&get
1.join()和get()方法都是用來獲取CompletableFuture異步之后的回傳值,join()方法拋出的是uncheck例外(即未經檢查的例外),不會強制開發者拋出,get()方法拋出的是經過檢查的例外,ExecutionException, InterruptedException 需要用戶手動處理(拋出或者 try catch)
【3】常用方法的使用與介紹
1)結果處理
1.介紹:
//當CompletableFuture的計算結果完成,或者拋出例外的時候,我們可以執行特定的 Action,主要是下面的方法: public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action) public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action) public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor) //Action的型別是BiConsumer<? super T,? super Throwable>,它可以處理正常的計算結果,或者例外情況, //方法不以Async結尾,意味著Action使用相同的執行緒執行,而Async可能會使用其它的執行緒去執行(如果使用相同的執行緒池,也可能會被同一個執行緒選中執行), //這幾個方法都會回傳CompletableFuture,當Action執行完畢后它的結果回傳原始的CompletableFuture的計算結果或者回傳例外
2.示例:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { } if (new Random().nextInt(10) % 2 == 0) { int i = 12 / 0; } System.out.println("執行結束!"); return "test"; }); //whenComplete一般搭配exceptionally一起使用,一個處理結果,一個處理例外 future.whenComplete(new BiConsumer<String, Throwable>() { @Override public void accept(String t, Throwable action) { System.out.println(t+" 執行完成!"); } }); future.exceptionally(new Function<Throwable, String>() { @Override public String apply(Throwable t) { System.out.println("執行失敗:" + t.getMessage()); return "例外xxxx"; } });
2)結果轉換
1.介紹:所謂結果轉換,就是將上一段任務的執行結果作為下一階段任務的入參參與重新計算,產生新的結果,
2.方法列舉:
【1】thenApply
1.說明
//thenApply 接收一個函式作為引數,使用該函式處理上一個CompletableFuture 呼叫的結果,并回傳一個具有處理結果的Future物件, public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
2.示例
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { int result = 100; System.out.println("一階段:" + result); return result; }).thenApply(number -> { int result = number * 3; System.out.println("二階段:" + result); return result; });
【2】thenCompose
1.說明
//thenCompose 的引數為一個回傳 CompletableFuture 實體的函式,該函式的引數是先前計算步驟的結果, public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn); public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) ;
2.示例
CompletableFuture<Integer> future = CompletableFuture .supplyAsync(new Supplier<Integer>() { @Override public Integer get() { int number = new Random().nextInt(30); System.out.println("第一階段:" + number); return number; } }) .thenCompose(new Function<Integer, CompletionStage<Integer>>() { @Override public CompletionStage<Integer> apply(Integer param) { return CompletableFuture.supplyAsync(new Supplier<Integer>() { @Override public Integer get() { int number = param * 2; System.out.println("第二階段:" + number); return number; } }); } });
3.說明:
【1】thenApply 和 thenCompose的區別
1.thenApply 轉換的是泛型中的型別,回傳的是同一個CompletableFuture;
2.thenCompose 將內部的 CompletableFuture 呼叫展開來并使用上一個CompletableFutre 呼叫的結果在下一步的 CompletableFuture 呼叫中進行運算,是生成一個新的CompletableFuture,
3.示例
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture<String> result1 = future.thenApply(param -> param + " World"); CompletableFuture<String> result2 = future .thenCompose(param -> CompletableFuture.supplyAsync(() -> param + " World")); System.out.println(result1.get());
3)結果消費
1.介紹:
【1】與結果處理和結果轉換系列函式回傳一個新的 CompletableFuture 不同,結果消費系列函式只對結果執行Action,而不回傳新的計算值,
【2】根據對結果的處理方式,結果消費函式又分為:
thenAccept系列:對單個結果進行消費
thenAcceptBoth系列:對兩個結果進行消費
thenRun系列:不關心結果,只對結果執行Action
2.方法列舉:
【1】thenAccept
1.說明
//通過觀察該系列函式的引數型別可知,它們是函式式介面Consumer,這個介面只有輸入,沒有回傳值, public CompletionStage<Void> thenAccept(Consumer<? super T> action); public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
2.示例
CompletableFuture<Void> future = CompletableFuture .supplyAsync(() -> { int number = new Random().nextInt(10); System.out.println("第一階段:" + number); return number; }).thenAccept(number -> System.out.println("第二階段:" + number * 5));
【2】thenAcceptBoth
1.說明
//thenAcceptBoth 函式的作用是,當兩個 CompletionStage 都正常完成計算的時候,就會執行提供的action消費兩個異步的結果, public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action); public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
2.示例
CompletableFuture<Integer> futrue1 = CompletableFuture.supplyAsync(new Supplier<Integer>() { @Override public Integer get() { int number = new Random().nextInt(3) + 1; try { TimeUnit.SECONDS.sleep(number); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("第一階段:" + number); return number; } }); CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() { @Override public Integer get() { int number = new Random().nextInt(3) + 1; try { TimeUnit.SECONDS.sleep(number); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("第二階段:" + number); return number; } }); futrue1.thenAcceptBoth(future2, new BiConsumer<Integer, Integer>() { @Override public void accept(Integer x, Integer y) { System.out.println("最終結果:" + (x + y)); } });
【3】thenRun
1.說明
//thenRun 也是對執行緒任務結果的一種消費函式,與thenAccept不同的是,thenRun 會在上一階段 CompletableFuture 計算完成的時候執行一個Runnable,Runnable并不使用該 CompletableFuture 計算的結果, public CompletionStage<Void> thenRun(Runnable action); public CompletionStage<Void> thenRunAsync(Runnable action);
2.示例
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> { int number = new Random().nextInt(10); System.out.println("第一階段:" + number); return number; }).thenRun(() -> System.out.println("thenRun 執行"));
4)結果組合
1.方法列舉:
【1】thenCombine
1.說明
//thenCombine 方法,合并兩個執行緒任務的結果,并進一步處理, public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn); public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
2.示例
CompletableFuture<Integer> future1 = CompletableFuture .supplyAsync(new Supplier<Integer>() { @Override public Integer get() { int number = new Random().nextInt(10); System.out.println("第一階段:" + number); return number; } }); CompletableFuture<Integer> future2 = CompletableFuture .supplyAsync(new Supplier<Integer>() { @Override public Integer get() { int number = new Random().nextInt(10); System.out.println("第二階段:" + number); return number; } }); CompletableFuture<Integer> result = future1 .thenCombine(future2, new BiFunction<Integer, Integer, Integer>() { @Override public Integer apply(Integer x, Integer y) { return x + y; } });
5)任務互動
1.介紹:所謂執行緒互動,是指將兩個執行緒任務獲取結果的速度相比較,按一定的規則進行下一步處理,
2.方法列舉:
【1】applyToEither
1.說明
//兩個執行緒任務相比較,先獲得執行結果的,就對該結果進行下一步的轉化操作, public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn); public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);
2.示例
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() { @Override public Integer get() { int number = new Random().nextInt(10); System.out.println("第一階段start:" + number); try { TimeUnit.SECONDS.sleep(number); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("第一階段end:" + number); return number; } }); CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() { @Override public Integer get() { int number = new Random().nextInt(10); System.out.println("第二階段start:" + number); try { TimeUnit.SECONDS.sleep(number); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("第二階段end:" + number); return number; } }); future1.applyToEither(future2, new Function<Integer, Integer>() { @Override public Integer apply(Integer number) { System.out.println("最快結果:" + number); return number * 2; } });
【2】acceptEither
1.說明
//兩個執行緒任務相比較,先獲得執行結果的,就對該結果進行下一步的消費操作, public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action); public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action);
2.示例
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() { @Override public Integer get() { int number = new Random().nextInt(10) + 1; try { TimeUnit.SECONDS.sleep(number); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("第一階段:" + number); return number; } }); CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() { @Override public Integer get() { int number = new Random().nextInt(10) + 1; try { TimeUnit.SECONDS.sleep(number); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("第二階段:" + number); return number; } }); future1.acceptEither(future2, new Consumer<Integer>() { @Override public void accept(Integer number) { System.out.println("最快結果:" + number); } });
【3】runAfterEither
1.說明
//兩個執行緒任務相比較,有任何一個執行完成,就進行下一步操作,不關心運行結果, public CompletionStage<Void> runAfterEither(CompletionStage<?> other,Runnable action); public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action);
2.示例
CompletableFuture<Integer> future1 = CompletableFuture .supplyAsync(new Supplier<Integer>() { @Override public Integer get() { int number = new Random().nextInt(5); try { TimeUnit.SECONDS.sleep(number); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("第一階段:" + number); return number; } }); CompletableFuture<Integer> future2 = CompletableFuture .supplyAsync(new Supplier<Integer>() { @Override public Integer get() { int number = new Random().nextInt(5); try { TimeUnit.SECONDS.sleep(number); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("第二階段:" + number); return number; } }); future1.runAfterEither(future2, new Runnable() { @Override public void run() { System.out.println("已經有一個任務完成了"); } }).join();
【4】runAfterBoth
1.說明
//兩個執行緒任務相比較,兩個全部執行完成,才進行下一步操作,不關心運行結果, public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable action); public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action);
2.示例
CompletableFuture<Integer> future1 = CompletableFuture .supplyAsync(new Supplier<Integer>() { @Override public Integer get() { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("第一階段:1"); return 1; } }); CompletableFuture<Integer> future2 = CompletableFuture .supplyAsync(new Supplier<Integer>() { @Override public Integer get() { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("第二階段:2"); return 2; } }); future1.runAfterBoth(future2, new Runnable() { @Override public void run() { System.out.println("上面兩個任務都執行完成了,"); } });
【5】anyOf
1.說明
//anyOf 方法的引數是多個給定的 CompletableFuture,當其中的任何一個完成時,方法回傳這個 CompletableFuture, public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
2.示例
Random random = new Random(); CompletableFuture<String> future1 = CompletableFuture .supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(random.nextInt(5)); } catch (InterruptedException e) { e.printStackTrace(); } return "hello"; }); CompletableFuture<String> future2 = CompletableFuture .supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(random.nextInt(1)); } catch (InterruptedException e) { e.printStackTrace(); } return "world"; }); CompletableFuture<Object> result = CompletableFuture.anyOf(future1, future2);
【6】allOf
1.說明
//allOf方法用來實作多 CompletableFuture 的同時回傳, public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
2.示例
CompletableFuture<String> future1 = CompletableFuture .supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("future1完成!"); return "future1完成!"; }); CompletableFuture<String> future2 = CompletableFuture .supplyAsync(() -> { System.out.println("future2完成!"); return "future2完成!"; }); CompletableFuture<Void> combindFuture = CompletableFuture .allOf(future1, future2); try { combindFuture.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); }
【4】CompletableFuture常用方法總結:

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/511901.html
標籤:Java
下一篇:python-繪圖與可視化
