點關注,不迷路!如果本文對你有幫助的話不要忘記點贊支持哦!
概述

image.png
和傳統的執行緒池使用AQS的實作邏輯不同,ForkJoin引入全新的結構來標識:
- ForkJoinPool: 用于執行
ForkJoinTask任務的執行池,不再是傳統執行池 Worker+Queue 的組合模式,而是維護了一個佇列陣列WorkQueue,這樣在提交任務和執行緒任務的時候大幅度的減少碰撞, - WorkQueue: 雙向串列,用于任務的有序執行,如果
WorkQueue用于自己的執行執行緒Thread,執行緒默認將會從top端選取任務用來執行 - LIFO,因為只有owner的Thread才能從top端取任務,所以在設定變數時,int top;不需要使用volatile, - ForkJoinWorkThread: 用于執行任務的執行緒,用于區別使用非ForkJoinWorkThread執行緒提交的task;啟動一個該Thread,會自動注冊一個WorkQueue到Pool,這里規定,擁有Thread的WorkQueue只能出現在WorkQueue陣列的奇數位
- ForkJoinTask: 任務, 它比傳統的任務更加輕量,不再對是RUNNABLE的子類,提供
fork/join方法用于分割任務以及聚合結果, - 為了充分施展并行運算,該框架實作了復雜的 worker steal演算法,當任務處于等待中,thread通過一定策略,不讓自己掛起,充分利用資源,當然,它比其他語言的協程要重一些,
ForkJoinPool變數基本說明
作為框架的提交入口,ForkJoinPool管理著執行緒池中執行緒和任務佇列,標識執行緒池是否還接收任務,顯示現在的執行緒運行狀態,本節,對這些控制量進行解釋,
如果讀者看過 類似 disrupter 這種高效率佇列的開源實作,大家肯定會對cache line記憶猶新,他們通常的做法自己設定偽變數來填充,jdk1.8�中官網為我們帶來了sun.misc.Contended,所以你如果閱讀ForkJoinPool原始碼可以發現該類也被sun.misc.Contended標識,
幾個重要變數:
- runState: 標識
Pool運行狀態,使用bit位來標識不同狀態,比如
如果執行// runState bits: SHUTDOWN must be negative, others arbitrary powers of two private static final int RSLOCK = 1; private static final int RSIGNAL = 1 << 1; private static final int STARTED = 1 << 2; private static final int STOP = 1 << 29; private static final int TERMINATED = 1 << 30; private static final int SHUTDOWN = 1 << 31;runState & RSLOCK ==0就能直接說明,目前的運行狀態沒有被鎖住,其他情況一樣, - config:parallelism | mode
- parallelism: 這個變數不是內部定義的變數,但是需要各位注意一下它的界限,因為后面的處理需要注意
static final int MAX_CAP = 0x7fff; // max #workers - 1
也就是說他最大就占16位
- ctl:ctl是
Pool的控制變數,型別是long - 說明有64位,每個部分都有不同的作用,我們使用十六進制來標識ctl,依次說明不同部分的作用,
我為每個部分使用了數字來標識 - 1,2,3,4,long np = (long)(-parallelism); // offset ctl counts this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK); 0x xxxx-1 xxxx-2 xxxx-3 xxxx-4- 編號為1的16位: AC 表示現在獲取的執行緒數,這里的初始化比較有技巧,使用的是并行數的相反數,這樣如果active的執行緒數,還沒到達了我們設定的閾值的時候,ctl是個負數,我們可以根據ctl的正負直觀的知道現在的并行數達到閾值了么,
- 編號為2的16位:TC 表示執行緒總量,初始值也是并行數的相反數,這里需要說明一下,這個編號1所表示的活躍的執行緒數的區別,我們雖然開啟了并行數等量的執行緒,但是可能在某些條件下,運行的thread不得不
wait或者park,原因我們后面會提到,這個時候,雖然我們開啟的執行緒數量是和并行數相同,但是實際真正執行的卻不是這么多,TC 記錄了我們一共開啟了多少執行緒,而AC則記錄了沒有掛起的執行緒, - 編號為3的16位:后32位標識 idle workers 前面16位第一位標識是
active的還是inactive的,其他為是版本標識, - 編號為4的16位:標識idle workers 在
WorkQueue[]陣列中的index,這里需要說明的是,ctl的后32位其實只能表示一個idle workers,那么我們如果有很多個idle worker要怎么辦呢?老爺子使用的是stack的概念來保存這些資訊,后32位標識的是top的那個,我們能從top中的變數stackPred追蹤到下一個idle worker
WorkQueue變數基本說明
WorkQueue是一個雙向串列,用于task的排隊,
幾個變數的定義說明:
-
scanState:
// versioned, <0: inactive; odd:scanning如果WorkQueue沒有屬于自己的owner(下標為偶數的都沒有),該值為 inactive 也就是一個負數,如果有自己的owner,該值的初始值為其在WorkQueue[]陣列中的下標,也肯定是個奇數,
如果這個值,變成了偶數,說明該佇列所屬的Thread正在執行Taskstatic final int SCANNING = 1; // false when running tasks static final int INACTIVE = 1 << 31; // must be negative -
stackPred: 記錄前任的
idle worker -
config:index | mode, 如果下標為偶數的
WorkQueue,則其mode是共享型別,如果有自己的owner默認是 LIFO什么時候應該設定成 FIFO,注釋中這么給的建議:
establishes local first-in-first-out scheduling mode for forked
tasks that are never joined. This mode may be more appropriate
than default locally stack-based mode in applications in which
worker threads only process event-style asynchronous tasks.
For default value, use {@code false} -
qlock: 鎖標識,在多執行緒往佇列中添加資料,會有競爭,使用此標識搶占鎖,
-
base:worker steal的偏移量,因為其他的執行緒都可以偷該佇列的任務,所有base使用
volatile標識, -
top:
owner執行任務的偏移量, -
parker:如果
owner掛起,則使用該變數做記錄, -
currentJoin: 當前正在join等待結果的任務,
-
currentSteal:當前執行的任務是steal過來的任務,該變數做記錄,
ForkJoinTask變數基本說明
- status: 標識任務目前的狀態,如果<0,表示任務處于結束狀態,
((s >>> 16) != 0)表示需要signal其他執行緒
任務提交程序剖析
ForkJoinPool提供的提交介面很多,不管提交的是Callable、Runnable、ForkJoinTask最終都會轉換成ForkJoinTask型別的任務,呼叫方法externalPush(ForkJoinTask<?> task)來進行提交邏輯,讓我們來看看提交的程序:
-
如果第一次提交(或者是hash之后的佇列還未初始化),呼叫
externalSubmit- 第一遍回圈: (runState不是開始狀態): 1.lock; 2.創建陣列
WorkQueue[n],這里的n是power of 2; 3. runState設定為開始狀態, - 第二遍回圈:(根據
ThreadLocalRandom.getProbe()hash后的陣列中相應位置的WorkQueue未初始化): 初始化WorkQueue,通過這種方式創立的WorkQueue均是SHARED_QUEUE,scanState為INACTIVE - 第三遍回圈: 找到剛剛創建的
WorkQueue,lock住佇列,將資料塞到arraytop位置,如果添加成功,就用呼叫接下來要攤開講的重要的方法signalWork,
- 第一遍回圈: (runState不是開始狀態): 1.lock; 2.創建陣列
-
如果hash之后的佇列已經存在
- lock住佇列,將資料塞到top位置,如果該佇列任務很少(n <= 1)也會呼叫
signalWork
- lock住佇列,將資料塞到top位置,如果該佇列任務很少(n <= 1)也會呼叫
signalWork
signalWork是fork/join框架中重要的方法之一,用于創建或者激活作業執行緒,本小節主要看它的原始碼實作,文章后面我們會總結它的使用場景,
final void signalWork(WorkQueue[] ws, WorkQueue q) {
long c; int sp, i; WorkQueue v; Thread p;
while ((c = ctl) < 0L) { // too few active
if ((sp = (int)c) == 0) { // no idle workers
if ((c & ADD_WORKER) != 0L) // too few workers
tryAddWorker(c);
break;
}
if (ws == null) // unstarted/terminated
break;
if (ws.length <= (i = sp & SMASK)) // terminated
break;
if ((v = ws[i]) == null) // terminating
break;
int vs = (sp + SS_SEQ) & ~INACTIVE; // next scanState
int d = sp - v.scanState; // screen CAS
long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);
if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {
v.scanState = vs; // activate v
if ((p = v.parker) != null)
U.unpark(p);
break;
}
if (q != null && q.base == q.top) // no more work
break;
}
}
在上面的章節我們已經具體的分析了ForkJoinPool中的ctl的標識含義,我們知道當ctl<0意味著active的執行緒還沒有到達閾值,只有ctl<0我們才會去討論要不要創建或者激活新的執行緒,(int)ctl很巧妙的拿到了ctl的低16位
-
我們知道
ctl代表的是idle worker當低16位為0的時候,意味著此刻沒有已經啟動但是空閑的執行緒,如果在沒有空閑的執行緒的情況下(c & ADD_WORKER) != 0L private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign意味著我們再增加一個執行緒,也不能使ac為非負數(只要ctl的最高位為1,表明ac仍是負數) ,我們呼叫方法
tryAddWorker來創建作業執行緒,- 我們首先需要使用
ctl來記錄我們增加的執行緒,ctl編號-1的16位和編號-2的16位均需要加1,表示active的worker加一,總的worker加一,成功后我們將呼叫createWorker, - 我們使用
ForkJoinWorkerThreadFactory來產生一個ForkJoinWorkerThread型別的執行緒,該執行緒將會把自己注冊到Pool上,怎么注冊的呢?實作在方法registerWorker,前文我們已經提及,擁有執行緒的WorkQueue只能出現在陣列的奇數下標處,所以執行緒 首先,創建一個新的WorkQueue,其次在陣列WorkQueue[]尋找奇數下標尚未初始化的位置,如果回圈的次數大于陣列長度,還可能需要對陣列進行擴容,然后,設定這個WorkQueue的 config 為 index | mode (下標和模式),scanState為 index (下標>0),最后啟動這個執行緒,執行緒的處理我們接下來的章節介紹,
- 我們首先需要使用
-
當我們發現我們還有
idle worker(即(int)ctl!=0L),我們需要active其中的一個,- 我們上文說過
ctl的編號-3的16位,標記inactive和版本控制,我們將編號-3設定為激活狀態并且版本加一,編號-4的16位我們之前也說過,放置了top的掛起的執行緒的index所以我們可以根據這個index拿到WorkQueue--- 意味著就是這個WorkQueue的Owner執行緒被掛起了, - 我們將要把top的掛起執行緒喚醒,意味著我們要講下一個掛起的執行緒的資訊記錄到ctl上,前文也說在上一個掛起的執行緒的
index資訊在這個掛起的執行緒的stackPred
更新完int vs = (sp + SS_SEQ) & ~INACTIVE; // next scanState int d = sp - v.scanState; // screen CAS long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred); if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) { v.scanState = vs; // activate v if ((p = v.parker) != null) U.unpark(p); break; }ctl后,我們將喚醒之前掛起的執行緒,
通過上述的簡單介紹,用戶的任務的提交所經歷的步驟就介紹完了,
- 我們上文說過
ForkJoinWorkerThread運行程序剖析
ForkJoinWorkerThread啟動之后會呼叫pool的runWorker來獲取任務執行,
final void runWorker(WorkQueue w) {
w.growArray(); // allocate queue
int seed = w.hint; // initially holds randomization hint
int r = (seed == 0) ? 1 : seed; // avoid 0 for xorShift
for (ForkJoinTask<?> t;;) {
if ((t = scan(w, r)) != null)
w.runTask(t);
else if (!awaitWork(w, r))
break;
r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
}
}
代碼很短,但是我們很容易讀懂其中的意思,呼叫scan嘗試去偷取一個任務,然后呼叫runTask或者awaitWork,這里的scan是框架的重要的實作,我們將詳述上面的三個方法,
scan
代碼如下:
private ForkJoinTask<?> scan(WorkQueue w, int r) {
WorkQueue[] ws; int m;
if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) {
int ss = w.scanState; // initially non-negative
for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) {
WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
int b, n; long c;
if ((q = ws[k]) != null) {
if ((n = (b = q.base) - q.top) < 0 &&
(a = q.array) != null) { // non-empty
long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
if ((t = ((ForkJoinTask<?>)
U.getObjectVolatile(a, i))) != null &&
q.base == b) {
if (ss >= 0) {
if (U.compareAndSwapObject(a, i, t, null)) {
q.base = b + 1;
if (n < -1) // signal others
signalWork(ws, q);
return t;
}
}
else if (oldSum == 0 && // try to activate
w.scanState < 0)
tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);
}
if (ss < 0) // refresh
ss = w.scanState;
r ^= r << 1; r ^= r >>> 3; r ^= r << 10;
origin = k = r & m; // move and rescan
oldSum = checkSum = 0;
continue;
}
checkSum += b;
}
if ((k = (k + 1) & m) == origin) { // continue until stable
if ((ss >= 0 || (ss == (ss = w.scanState))) &&
oldSum == (oldSum = checkSum)) {
if (ss < 0 || w.qlock < 0) // already inactive
break;
int ns = ss | INACTIVE; // try to inactivate
long nc = ((SP_MASK & ns) |
(UC_MASK & ((c = ctl) - AC_UNIT)));
w.stackPred = (int)c; // hold prev stack top
U.putInt(w, QSCANSTATE, ns);
if (U.compareAndSwapLong(this, CTL, c, nc))
ss = ns;
else
w.scanState = ss; // back out
}
checkSum = 0;
}
}
}
return null;
}
因為我們的WorkQueue是有owner執行緒的佇列,我們可以知道以下資訊:
- config = index | mode
- scanState = index > 0
我們首先通過random的r來找到一個我們準備偷取的佇列,
- 如果我們準備偷取的佇列剛好有任務在排隊(也有可能是owner自己的那個佇列);
- 從佇列的隊尾即
base位置取到任務回傳 - base + 1
- 從佇列的隊尾即
- 如果我們遍歷了一圈(
((k = (k + 1) & m) == origin))都沒有偷到,我們就認為當前的active 執行緒過剩了,我們準備將當前的執行緒(即owner)掛起,我們首先index | INACTIVE形成ctl的后32位;并行將ac減一,其次,將原來的掛起的top的index記錄到stackPred中, - 繼續遍歷如果仍然一無所獲,將跳出回圈;如果偷到了一個任務,我們將使用
tryRelease激活,
runTask
如果我們通過scan偷到了任務,這個時候我們將進入執行狀態:
final void runTask(ForkJoinTask<?> task) {
if (task != null) {
scanState &= ~SCANNING; // mark as busy
(currentSteal = task).doExec();
U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC
execLocalTasks();
ForkJoinWorkerThread thread = owner;
if (++nsteals < 0) // collect on overflow
transferStealCount(pool);
scanState |= SCANNING;
if (thread != null)
thread.afterTopLevelExec();
}
}
首先scanState &= ~SCANNING;標識該執行緒處于繁忙狀態,
- 執行偷取的Task,
- 呼叫
execLocalTasks對執行緒所屬的WorkQueue內的任務進行LIFO執行,
awaitWork
如果我們通過scan一無所獲,這個時候我們將進入執行狀態:
private boolean awaitWork(WorkQueue w, int r) {
if (w == null || w.qlock < 0) // w is terminating
return false;
for (int pred = w.stackPred, spins = SPINS, ss;;) {
if ((ss = w.scanState) >= 0)
break;
else if (spins > 0) {
r ^= r << 6; r ^= r >>> 21; r ^= r << 7;
if (r >= 0 && --spins == 0) { // randomize spins
WorkQueue v; WorkQueue[] ws; int s, j; AtomicLong sc;
if (pred != 0 && (ws = workQueues) != null &&
(j = pred & SMASK) < ws.length &&
(v = ws[j]) != null && // see if pred parking
(v.parker == null || v.scanState >= 0))
spins = SPINS; // continue spinning
}
}
else if (w.qlock < 0) // recheck after spins
return false;
else if (!Thread.interrupted()) {
long c, prevctl, parkTime, deadline;
int ac = (int)((c = ctl) >> AC_SHIFT) + (config & SMASK);
if ((ac <= 0 && tryTerminate(false, false)) ||
(runState & STOP) != 0) // pool terminating
return false;
if (ac <= 0 && ss == (int)c) { // is last waiter
prevctl = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & pred);
int t = (short)(c >>> TC_SHIFT); // shrink excess spares
if (t > 2 && U.compareAndSwapLong(this, CTL, c, prevctl))
return false; // else use timed wait
parkTime = IDLE_TIMEOUT * ((t >= 0) ? 1 : 1 - t);
deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;
}
else
prevctl = parkTime = deadline = 0L;
Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this); // emulate LockSupport
w.parker = wt;
if (w.scanState < 0 && ctl == c) // recheck before park
U.park(false, parkTime);
U.putOrderedObject(w, QPARKER, null);
U.putObject(wt, PARKBLOCKER, null);
if (w.scanState >= 0)
break;
if (parkTime != 0L && ctl == c &&
deadline - System.nanoTime() <= 0L &&
U.compareAndSwapLong(this, CTL, c, prevctl))
return false; // shrink pool
}
}
return true;
}
- 如果ac還沒到達閾值,但是tc>2 說明現在仍然運行中的執行緒和掛起的執行緒加一起處于過剩狀態,我們將放棄該執行緒的掛起,直接讓它執行結束,不再回圈執行任務,
- 否則,我們計算一個掛起的時間,等到了時間之后(或者被外部喚醒),執行緒醒了之后,如果發現自己狀態是
active狀態(w.scanState >= 0),則執行緒繼續回去scan任務,如果發現自己是因為時間到了自動醒來,但是自己還是inactive狀態,也許,自己真的是多余的,執行緒也會執行結束,不再回圈執行任務,
ForkJoinTask執行程序剖析
從上節我們知道,我們獲取任務之后,將呼叫任務的doExec來具體執行任務:
final int doExec() {
int s; boolean completed;
if ((s = status) >= 0) {
try {
completed = exec();
} catch (Throwable rex) {
return setExceptionalCompletion(rex);
}
if (completed)
s = setCompletion(NORMAL);
}
return s;
}
我們以RecursiveTask為例:
protected final boolean exec() {
result = compute();
return true;
}
最終呼叫的是 compute,我們舉個example來看,
demo
求整數陣列所有元素之和
public class ForkJoinCalculator implements Calculator {
private ForkJoinPool pool;
private static class SumTask extends RecursiveTask<Long> {
private long[] numbers;
private int from;
private int to;
public SumTask(long[] numbers, int from, int to) {
this.numbers = numbers;
this.from = from;
this.to = to;
}
@Override
protected Long compute() {
// 當需要計算的數字小于6時,直接計算結果
if (to - from < 6) {
long total = 0;
for (int i = from; i <= to; i++) {
total += numbers[i];
}
return total;
// 否則,把任務一分為二,遞回計算
} else {
int middle = (from + to) / 2;
SumTask taskLeft = new SumTask(numbers, from, middle);
SumTask taskRight = new SumTask(numbers, middle+1, to);
taskLeft.fork();
taskRight.fork();
return taskLeft.join() + taskRight.join();
}
}
}
public ForkJoinCalculator() {
// 也可以使用公用的 ForkJoinPool:
// pool = ForkJoinPool.commonPool()
pool = new ForkJoinPool();
}
@Override
public long sumUp(long[] numbers) {
return pool.invoke(new SumTask(numbers, 0, numbers.length-1));
}
}
這里需要注意的是其中的compute
- 如果任務很小,直接計算結果,
- 如果任務很大,一分為二,呼叫
fork方法,獲取join方法的回傳值,
很明顯,這里任務的 fork、join就是我們框架的核心,那么,它們都做了什么呢?
fork
fork的邏輯很簡單,
public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this);
return this;
}
- 如果當前執行緒是作業執行緒,直接
push到自己所擁有的佇列的top位置, - 如果是非作業執行緒,就是一個提交到Pool的程序,
join
join是一個等待結果的方法:
public final V join() {
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}
- 如果得到的結果例外,則拋出例外;
- 如果得到的正常,則獲取回傳值,
那么,執行緒在doJoin做了什么呢?
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
return (s = status) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
wt.pool.awaitJoin(w, this, 0L) :
externalAwaitDone();
}
- 如果當前執行緒不是作業執行緒,則呼叫
externalAwaitDone,首先,設定任務的status為signal狀態,這樣該任務執行結束之后會呼叫notifyAll來喚醒自己;其次,阻塞自己,知道任務執行完成后把自己喚醒, - 如果需要join的任務已經完成,直接回傳運行結果;
- 如果需要join的任務剛剛好是當前執行緒所擁有的佇列的top位置,這意味著當前作業執行緒下一個就將執行到它,則執行它,
- 如果該任務不在top位置,則呼叫
awaitJoin方法:- 設定
currentJoin表明自己正在等待該任務; - 如果發現
w.base == w.top或者tryRemoveAndExec回傳 true說明自己所屬的佇列為空,也說明我們通過fork將本執行緒的任務已經被別的執行緒偷走,該執行緒也不會閑著,將會helpStealer幫助幫助自己執行任務的執行緒執行任務(互惠互利,你來我往) - 如果
tryCompensate為 true,則阻塞本執行緒,等待任務執行結束的喚醒
- 設定
tryRemoveAndExec
如果join的任務還沒有被執行,我們去自己的佇列中去查找,看看任務是否不在top位置但是還是在佇列中
while ((n = (s = top) - (b = base)) > 0) {
for (ForkJoinTask<?> t;;) { // traverse from s to b
long j = ((--s & m) << ASHIFT) + ABASE;
if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null)
return s + 1 == top; // shorter than expected
else if (t == task) {
boolean removed = false;
if (s + 1 == top) { // pop
if (U.compareAndSwapObject(a, j, task, null)) {
U.putOrderedInt(this, QTOP, s);
removed = true;
}
}
else if (base == b) // replace with proxy
removed = U.compareAndSwapObject(
a, j, task, new EmptyTask());
if (removed)
task.doExec();
break;
}
else if (t.status < 0 && s + 1 == top) {
if (U.compareAndSwapObject(a, j, t, null))
U.putOrderedInt(this, QTOP, s);
break; // was cancelled
}
if (--n == 0)
return false;
}
if (task.status < 0)
return false;
}
- 如果剛好在top位置,pop出來執行,
- 如果在佇列中間,則使用
EmptyTask來占位,將任務取出來執行, - 如果執行的任務還沒結束,則不進行
helpStealer,
helpStealer
helpStealer的原則是你幫助我執行任務,我也幫你執行任務,
- 遍歷
奇數下標,如果發現佇列物件currentSteal放置的剛好是自己要找的任務,則說明自己的任務被該佇列A的owner執行緒偷來執行 - 如果佇列A佇列中有任務,則從隊尾(
base)取出執行; - 如果發現佇列A佇列為空,則根據它正在join的任務,在拓撲找到相關的佇列B去偷取任務執行,
在執行的程序中要注意,我們應該完整的把任務完成
do {
U.putOrderedObject(w, QCURRENTSTEAL, t);
t.doExec(); // clear local tasks too
} while (task.status >= 0 &&
w.top != top &&
(t = w.pop()) != null);
這是什么意思呢? 因為你在執行這個任務的時候,這個任務也可能fork出什么子任務push到當前執行緒,我們應該記錄原來佇列top的位置,然后在執行結束后,還回到top原來的位置,
- 幫忙執行任務完成后,如果發現自己的佇列有任務了(
w.base != w.top),在不再幫助執行任務了, - 否則在等待自己的join的那個任務結束之前,可以不斷的偷取任務執行,
tryCompensate
如果自己等待的任務被偷走執行還沒結束,自己的佇列還有任務,我們需要做一些補償
private boolean tryCompensate(WorkQueue w) {
boolean canBlock;
WorkQueue[] ws; long c; int m, pc, sp;
if (w == null || w.qlock < 0 || // caller terminating
(ws = workQueues) == null || (m = ws.length - 1) <= 0 ||
(pc = config & SMASK) == 0) // parallelism disabled
canBlock = false;
else if ((sp = (int)(c = ctl)) != 0) // release idle worker
canBlock = tryRelease(c, ws[sp & m], 0L);
else {
int ac = (int)(c >> AC_SHIFT) + pc;
int tc = (short)(c >> TC_SHIFT) + pc;
int nbusy = 0; // validate saturation
for (int i = 0; i <= m; ++i) { // two passes of odd indices
WorkQueue v;
if ((v = ws[((i << 1) | 1) & m]) != null) {
if ((v.scanState & SCANNING) != 0)
break;
++nbusy;
}
}
if (nbusy != (tc << 1) || ctl != c)
canBlock = false; // unstable or stale
else if (tc >= pc && ac > 1 && w.isEmpty()) {
long nc = ((AC_MASK & (c - AC_UNIT)) |
(~AC_MASK & c)); // uncompensated
canBlock = U.compareAndSwapLong(this, CTL, c, nc);
}
else if (tc >= MAX_CAP ||
(this == common && tc >= pc + commonMaxSpares))
throw new RejectedExecutionException(
"Thread limit exceeded replacing blocked worker");
else { // similar to tryAddWorker
boolean add = false; int rs; // CAS within lock
long nc = ((AC_MASK & c) |
(TC_MASK & (c + TC_UNIT)));
if (((rs = lockRunState()) & STOP) == 0)
add = U.compareAndSwapLong(this, CTL, c, nc);
unlockRunState(rs, rs & ~RSLOCK);
canBlock = add && createWorker(); // throws on exception
}
}
return canBlock;
}
- 如果
((sp = (int)(c = ctl)) != 0)說明還有idle worker則可以選擇喚醒一個執行緒替代自己,掛起自己等待任務來喚醒自己, - 如果沒有
idle worker則額外創建一個新的作業執行緒替代自己,掛起自己等待任務來喚醒自己,
總結
在了解了 Fork/Join Framework 的作業原理之后,相信很多使用上的注意事項就可以從原理中找到原因
最后:
針對最近很多人都在面試,我這邊也整理了相當多的面試專題資料(spring、mybatis、jvm,,,帶多了可以看附上的圖片)和多家公司的面試真題,
上述面試題答案都整理成檔案筆記, 也還整理了一些面試資料&最新2020收集的一些大廠的面試真題(都整理成檔案,小部分截圖),有需要的可以 點擊進入 查看領取資料,
有人可能會說了:面試真題你全部放上來就好了,,,
你知道我最近整理了多少嗎?
希望對大家有所幫助,有用的話點贊給我支持!

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/125768.html
標籤:其他
