點關注,不迷路!如果本文對你有幫助的話不要忘記點贊支持哦!

上述面試題答案都整理成檔案筆記, 也還整理了一些面試資料&最新2020收集的一些大廠的面試真題(都整理成檔案,小部分截圖),有需要的可以 點擊進入暗號:csdn ,
在了解Fork-Join之前,我們得先了解什么是并行計算,
并行計算
相對于串行計算,并行計算可以劃分成時間并行和空間并行,時間并行即指令流水化,也就是流水線技術,比如說生產一輛小汽車,有特定的輪子車間/發動機車間,同時進行各自的生產,空間并行是指使用多個處理器執行并發計算,
以程式和演算法設計人員的角度看,并行計算又可分為資料并行和任務并行,資料并行把大的任務化解成若干個相同的子任務,任務并行是指每一個執行緒執行一個分配到的任務,而這些執行緒則被分配(通常是作業系統內核)到該并行計算體系的各個計算節點中去,
簡單來說,并行計算是通過把大問題劃分為小問題,運用計算機資源并行的處理子問題,當需要得到大問題的結果時,將小問題的結果按順序合并起來得到最終結果,這種思想就是分治思想,小到歸并排序,大到大資料計算...
Fork-Join
Fork-Join框架是Doug Lea 大神在JDK7引入的,Fork就是把大問題拆分成小問題,也就是大任務拆成多個子任務,并行執行子任務,Join就是把任務的結果按順序合并起來,

假設我們需要求從 1-1億之間的數字和,按照Fork-Join的思想,可分為以下三步:
1.定義拆分子任務和合并子任務的規則
- 劃分子任務的規則
首先將任務拆為 1-5千萬 和 5千萬01 - 1億兩個子任務,直到每個子任務計算的數字范圍在1萬以內的時候,我們才計算這個任務的和,
- 合并子任務的規則
同一父任務的所有子任務的結果再相加,就是這一父任務的結果,
2.充分利用計算機資源,最大并行的執行子任務
3.充分利用計算機資源,執行合并所有子任務,獲得最終的結果
顯然一般人做不了后兩步,我們只需要把 怎么拆,怎么和 告訴Fork-Join框架,Fork-Join框架就幫我們做好 如何最大并行執行子任務 和 如何最有效合并子任務,
設計原理
如何充分利用計算機資源,最大并行執行子任務?一般小伙伴應該可以想到使用多執行緒,讓執行緒數等于CPU核數,此時可以充分利用CPU的計算資源,
我們來看一下JDK普通執行緒池是咋玩的,(不要說你不懂為啥池化 :)

任務都是丟到一個同步佇列BlockingQueue中的,如果你了解JDK BlockingQueue的實作,就知道有界的同步佇列都是用鎖阻塞的,有些push/poll操作還共用一把鎖,
問題1:并行的任務有必要共用一個阻塞佇列嗎?
問題2: 如果任務佇列中的任務存在依賴,worker執行緒只能被阻塞著,啥意思呢?
假設任務佇列中存在兩個任務task1和task2,task1的執行結果依賴于task2的結果,如果worker1先拉取到task1,結果發現此時task2還沒有被執行,則worker1只能阻塞等待別的worker拉取到task2,task2執行完了worker1才能繼續執行task1,
如果worker1當發現task1無法繼續執行下去時,能夠先把它放一邊,繼續拉取任務執行,這樣效率是比較高的,
Work?Stealing
Fork-Join框架通過Work?Stealing演算法解決上面兩個問題,

-
每個
執行緒擁有自己的任務佇列,并且是雙端佇列, -
執行緒操作
自己的任務佇列是LIFO(Last in First out)模式, -
執行緒還可以偷取
別的執行緒任務佇列中的任務,模式為FIFO(First in First out),
顯然 每個執行緒擁有自己的任務佇列可以提高獲取佇列的并行度,
雙端任務佇列將所屬的自己執行緒的push/pop操作 和 其他執行緒的steal操作通過不同的模式區分開,這樣只有當Base==Top-1時,pop操作和steal操作才會有沖突,
如何才能準確及時知道Base==Top-1呢,Fork-Join框架的牛逼之處也在于對任務的調度是輕量級的,
steal操作
考慮steal操作,是多個其他執行緒的同步操作,需要保證:偷到Base處的任務和Base++的原子性,同時Base的值一旦改變,其他執行緒應該能夠馬上可見,聰明的小伙伴是不是想到 鎖和volatile 了:)
//steal操作 就是 poll()方法
final ForkJoinTask<?> poll() {
ForkJoinTask<?>[] a; int b; ForkJoinTask<?> t;
//array就是雙端佇列,實際用陣列實作,
//base是將要偷的任務下標,base是用volatile修飾的,保證可見性
//top是將要push進去的任務下標,可參考上面示意圖
while ((b = base) - top < 0 && (a = array) != null) {
//說明經過while條件初步判斷任務佇列不為空
//獲取base處的任務在任務佇列中的偏移量
int j = (((a.length - 1) & b) << ASHIFT) + ABASE;
//用volatile load 語意取出base處的任務t,可以簡單理解為一定是最新修改版本的任務
t = (ForkJoinTask<?>)U.getObjectVolatile(a, j);
//再次讀取base,判斷此時t是否被別的執行緒偷走
if (base == b) {
if (t != null) {
//如果多次讀判斷都沒啥問題,CAS修改base處的任務t為null
if (U.compareAndSwapObject(a, j, t, null)) {
//如果上面修改成功,表示這個任務被該執行緒偷到了
//此時就將base指標向前移一位,注意這一步是原子操作,base++就不是了
base = b + 1;
return t;
}
}
else if (b + 1 == top)
// 如果t==null && b + 1 == top,此時任務佇列為空
break;
}
}
return null;
}
簡單來說,有任務可偷時,通過CAS偷任務保證只有一個執行緒能偷成功,偷成功的這個執行緒接著修改volatile base指標,使得馬上對其他執行緒可見,同時通過前面的多次讀判斷減少后期CAS并發的沖突概率,
沒任務可偷時,通過CAS偷任務失敗可以判斷出來,
請小伙伴一句句看上面的代碼,阿姨都注釋出來了,雖然上面并沒有鎖,,但是小伙伴想想鎖其實是悲觀控制并發的思想,是不是可以拆成多次讀判斷 + CAS原子修改的樂觀思想來控制并發,只要最終保證只有一個能修改成功就可以了,
push操作
考慮push操作,是任務佇列所屬的執行緒才能操作,天生執行緒安全:不需要通過CAS或鎖來保證同步,只需要原子的修改top處任務 和 top向前移一位 就可以了,
同理,top也不需要用volatile修飾,
final void push(ForkJoinTask<?> task) {
ForkJoinTask<?>[] a; ForkJoinPool p;
int b = base, s = top, n;
if ((a = array) != null) { // ignore if queue removed
int m = a.length - 1; // fenced write for task visibility
//更新雙端佇列array的top處任務為task,直接原子更新,非CAS操作
//因為這個方法只會被array所屬的執行緒呼叫,所以這里是執行緒安全的
U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
//top指標向前移一位
U.putOrderedInt(this, QTOP, s + 1);
if ((n = s - b) <= 1) {
//說明未push前佇列中最多有一個任務
if ((p = pool) != null)
//此時喚醒其他等待的執行緒,表示整體pool中有事情可以做了,,
p.signalWork(p.workQueues, this);
}
else if (n >= m)
//佇列擴容
growArray();
}
}
小伙伴思考下,這里Base和Top指標會存在任務沖突嗎?其實不會哦,因為兩個指標都在往前沖,Base永遠追趕不上Top,這個方法額外需要做的事情 是 喚醒空閑執行緒 表示有任務進來了, 判斷佇列是否需要擴容就好,
pop操作
考慮pop操作,雖然任務佇列所屬的執行緒才能操作,但是當任務佇列只有一個任務時,存在steal操作和pop操作的任務競爭,原理就和steal操作一致了,當CAS修改top-1處任務為空 成功時,再更新top值為top-1,
final ForkJoinTask<?> pop() {
ForkJoinTask<?>[] a; ForkJoinTask<?> t; int m;
if ((a = array) != null && (m = a.length - 1) >= 0) {
for (int s; (s = top - 1) - base >= 0;) {
long j = ((m & s) << ASHIFT) + ABASE;
if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null)
break;
if (U.compareAndSwapObject(a, j, t, null)) {
U.putOrderedInt(this, QTOP, s);
return t;
}
}
}
return null;
}
注意這個pop操作并沒有steal操作那么多次預讀避免并發競爭,小姐姐yy是因為pop操作只有在任務佇列中只有一個任務時,才會存在和Steal操作的競爭問題,而Steal操作也時時可能存在多個其他執行緒的競爭問題的,
通過上面三個任務調度方法的分析,你有沒有感受到一絲絲FJ的調度輕量級呢?
總結一下:Fork-Join框架通過將共享的任務佇列拆分成執行緒獨有的雙端任務佇列,多執行緒steal操作通過多次讀和CAS保證同步,steal操作和pop操作 通過CAS 保證同步,push操作執行緒安全,不需要同步,
問題是什么時候執行緒消費自己的任務佇列中的任務,什么時候會去偷別的執行緒的任務,一個任務在Fork-Join框架中的生命周期是怎樣的,又是怎么流轉的?
Fork-Join框架使用
要能回答上面的問題,我們先看一下如何使用Fork-Join框架,上面這三個方法并不是我們能直接呼叫的,這三個方法是Fork-Join自己在合適的時機自己呼叫的,像最開始所說,使用者只需要:定義好拆分子任務和合并子任務的規則的大任務,并且把任務丟給ForkJoinPool就好
求 1-1億之間的數字和
Step1.定義一個求和的任務類
- 繼承
RecursiveTask類,重寫其compute()方法:
RecursiveTask如其名,是一個歸并任務,compute()方法是具體如何拆分,如何歸并的實作,fork()方法就是在確定拆分子任務規則時呼叫的,該方法會把子任務push到當前執行緒自己的任務佇列中;join()方法就是在確定合并子任務的規則時呼叫的,該方法會等待直到回傳子任務的結果,
public class SumTask extends RecursiveTask<Long> {
private long[] numbers;
private int from;
private int to;
public SumTask(long[] numbers, int from, int to) {
this.numbers = numbers;
this.from = from;
this.to = to;
}
@Override
protected Long compute() {
//拆分子任務的規則:
// 1.當需要計算的數字小于6時,直接計算結果
if (to - from < 6) {
long total = 0;
for (int i = from; i <= to; i++) {
total += numbers[i];
}
return total;
// 2.否則,把任務一分為二,遞回計算
} else {
int middle = (from + to) / 2;
//構造子任務
SumTask taskLeft = new SumTask(numbers, from, middle);
SumTask taskRight = new SumTask(numbers, middle+1, to);
//將子任務添加到任務佇列,這一步我們還是要做了
taskLeft.fork();
taskRight.fork();
//合并所有子任務的規則:所有子任務的結果相加
return taskLeft.join() + taskRight.join();
}
}
}
在等待子任務結果的時候,執行緒被阻塞了嗎?
(當然沒有,這段時間其實就會偷任務來做,后面我們再分析:)
Step2.構造一個Fork-Join執行緒池,把上面的求和大任務SumTask丟進去
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool();
SumTask sumTask = new SumTask(numbers, 0, numbers.length-1)
long result = pool.invoke(sumTask);
System.out.println(result);
}
從這里,我們可以看到任務丟進執行緒池是呼叫的pool.invoke(sumTask)
( 熟悉JDK執行緒池實作的小伙伴可以結合上面ForkJoin框架的原理想想任務該如何流轉,小姐姐開始了:)
一個歸并任務的流轉
Step1.任務提交到任務佇列
包括invoke等所有任務提交方法最終都會呼叫ForkJoinPool.externalPush方法,
這里面需要考慮將任務提交到哪個佇列?
如果提交到ForkJoinWorkerThread自己的雙端任務佇列中:
不管提交到頭還是尾,都會和我們上面分析的三個操作發生任務沖突,
而且如何選擇負載最小的執行緒來提交也會增加問題復雜性,
ForkJoinPool中雙端任務佇列是用陣列(volatile WorkQueue[] workQueues)實作的,其中奇數下標存放的是可激活的任務佇列,偶數下標存放的是不可激活的任務佇列,激活指的是這個佇列是否是某個ForkJoin執行緒的任務佇列,

ForkJoinPool.externalPush只能將任務提交到不可激活任務佇列,該方法的主要邏輯為:
當提交的任務是pool的第一個任務時,會初始化workQueues,ForkJoinWorkerThread等資源,通過hash演算法選擇一個偶數下標的workQueue,在TOP處放入任務,同時喚醒ForkJoinWorkerThread開始拉取任務作業,
當提交的任務不是第一個任務,此時workQueues等資源已初始化好,同樣需要選擇一個偶數下標的workQueue存放任務,如果選中的workQueue只有這一個任務,說明之前執行緒資源大概率是閑置的狀態,會嘗試 喚醒(signalWork方法) 一個空閑的ForkJoinWorkerThread開始拉取任務作業,
Step2.ForkJoinWorkerThread的運行
我們先看一下任務的生產和消費模式:

可激活的workQueue自己所屬ForkJoinWorkerThread的任務模式是LIFO(Last In First Out)
不可激活的workQueue的任務模式是FIFO(First In First Out)
ForkJoinWorkerThread剛開始運行時會呼叫ForkJoinWorkerThread.scan方法隨機選取一個佇列從Base處撈取任務.撈取到任務會呼叫WorkQueue.runTask方法執行任務,最終對于RecursiveTask任務執行的是RecursiveTask.exec方法,
protected final boolean exec() {
//我們一開始定義SumTask的實作方法:compute
result = compute();
return true;
}
里面呼叫的就是我們一開始定義SumTask的實作方法:compute方法,
fork所做的事情就是將我們切分的子任務添加到當前ForkJoinWorkerThread自己的workQueue中
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
join所做的事情就是等待子任務的回傳結果
public final V join() {
int s;
//doJoin會回傳執行結果
if ((s = doJoin() & DONE_MASK) != NORMAL)
//結果有例外,拋出例外資訊
reportException(s);
//結果無例外,回傳正常結果
return getRawResult();
}
講原理的時候我們提到了當呼叫join獲取任務結果時,ForkJoinWorkerThread會根據當前任務的情況,做出最正確的執行判斷,而不是單純的阻塞等待結果,
Step3.join時執行任務的判斷
結合上面求和的例子,我們來看一下求1-10之間的數字和的求和任務的可能join程序:
case1:任務未被偷

假設求和 1-10任務被Thread1執行,fork出兩個子任務:1-5 和 6-10,只要Thread1能判斷出來要join的任務在自己的任務佇列中,那當前join哪個子任務就把它取出來執行就可以,
case2:任務被偷,此時自己的任務佇列為空,可以幫助小偷執行它未完成的任務

假設求和 1-10任務被Thread1執行,fork出兩個子任務:1-5 和 6-10,6-10已成功執行完成,join回傳了結果,但此時發現1-5被Thread2偷走了,自己的任務佇列中已經沒有任務可以執行了,此時Thread1可以找到小偷Thread2,并偷取Thread2的10-20任務來幫助它執行,
case3:任務被偷,此時自己的任務佇列不為空

假設求和 1-10任務被Thread1執行,fork出兩個子任務:1-5 和 6-10,要join 1-5時發現已經被Thread2偷走了,而自己佇列中還有6-10等待join執行,不好意思幫不了小偷了,
只好嘗試掛起自己等待1-5的執行結果通知,并嘗試喚醒空閑執行緒或者創建新的執行緒替代自己執行任務佇列中的6-10任務,
上述三種情況代碼均在ForkJoinPool.awaitJoin方法中,整體思路是:
當任務還在自己的佇列:
- 自己執行,獲取結果,
當被別人偷走阻塞了:
- 自己又沒任務執行,就幫助小偷執行任務,
- 自己有任務要執行,就嘗試掛起自己等待小偷的反饋結果,同時找隊友幫助自己執行,
這里任務模式有意思的是:
scan/steal操作都是從Base處獲取任務,那么更容易獲取到大的任務執行,從而使得整體執行緒的資源分配更加均衡,
任務佇列所屬的執行緒是LIFO的任務生產消費模式,剛好符合遞回任務的執行順序,
至此你有沒有對ForkJoin框架的輕量級調度和Work?Stealing演算法有一些了解呀:)
到此這篇關于文章就結束了!
點關注,不迷路!如果本文對你有幫助的話不要忘記點贊支持哦!

上述面試題答案都整理成檔案筆記, 也還整理了一些面試資料&最新2020收集的一些大廠的面試真題(都整理成檔案,小部分截圖),有需要的可以 點擊進入暗號:csdn ,
希望對大家有所幫助,有用的話點贊給我支持!

轉載請註明出處,本文鏈接:https://www.uj5u.com/qianduan/37324.html
標籤:其他
