主頁 >  其他 > 面試大廠必問的ForkJoin框架剖析【建議收藏】

面試大廠必問的ForkJoin框架剖析【建議收藏】

2020-09-25 13:11:21 其他

點關注,不迷路!如果本文對你有幫助的話不要忘記點贊支持哦!

概述

image.png

和傳統的執行緒池使用AQS的實作邏輯不同,ForkJoin引入全新的結構來標識:

  • ForkJoinPool: 用于執行ForkJoinTask任務的執行池,不再是傳統執行池 Worker+Queue 的組合模式,而是維護了一個佇列陣列WorkQueue,這樣在提交任務和執行緒任務的時候大幅度的減少碰撞,
  • WorkQueue: 雙向串列,用于任務的有序執行,如果WorkQueue用于自己的執行執行緒Thread,執行緒默認將會從top端選取任務用來執行 - LIFO,因為只有owner的Thread才能從top端取任務,所以在設定變數時, int top; 不需要使用 volatile
  • ForkJoinWorkThread: 用于執行任務的執行緒,用于區別使用非ForkJoinWorkThread執行緒提交的task;啟動一個該Thread,會自動注冊一個WorkQueue到Pool,這里規定,擁有Thread的WorkQueue只能出現在WorkQueue陣列的奇數位
  • ForkJoinTask: 任務, 它比傳統的任務更加輕量,不再對是RUNNABLE的子類,提供fork/join方法用于分割任務以及聚合結果,
  • 為了充分施展并行運算,該框架實作了復雜的 worker steal演算法,當任務處于等待中,thread通過一定策略,不讓自己掛起,充分利用資源,當然,它比其他語言的協程要重一些,

ForkJoinPool變數基本說明

作為框架的提交入口,ForkJoinPool管理著執行緒池中執行緒和任務佇列,標識執行緒池是否還接收任務,顯示現在的執行緒運行狀態,本節,對這些控制量進行解釋,

如果讀者看過 類似 disrupter 這種高效率佇列的開源實作,大家肯定會對cache line記憶猶新,他們通常的做法自己設定偽變數來填充,jdk1.8�中官網為我們帶來了sun.misc.Contended,所以你如果閱讀ForkJoinPool原始碼可以發現該類也被sun.misc.Contended標識,
幾個重要變數:

  • runState: 標識Pool運行狀態,使用bit位來標識不同狀態,比如
        // runState bits: SHUTDOWN must be negative, others arbitrary powers of two
      private static final int  RSLOCK     = 1;
      private static final int  RSIGNAL    = 1 << 1;
      private static final int  STARTED    = 1 << 2;
      private static final int  STOP       = 1 << 29;
      private static final int  TERMINATED = 1 << 30;
      private static final int  SHUTDOWN   = 1 << 31;
    
    如果執行 runState & RSLOCK ==0 就能直接說明,目前的運行狀態沒有被鎖住,其他情況一樣,
  • config:parallelism | mode
  • parallelism: 這個變數不是內部定義的變數,但是需要各位注意一下它的界限,因為后面的處理需要注意
    static final int MAX_CAP      = 0x7fff;        // max #workers - 1
    

也就是說他最大就占16位

  • ctl:ctl是Pool的控制變數,型別是long - 說明有64位,每個部分都有不同的作用,我們使用十六進制來標識ctl,依次說明不同部分的作用,
    long np = (long)(-parallelism); // offset ctl counts
    this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
    0x xxxx-1  xxxx-2  xxxx-3  xxxx-4
    
    我為每個部分使用了數字來標識 - 1,2,3,4,
    • 編號為1的16位: AC 表示現在獲取的執行緒數,這里的初始化比較有技巧,使用的是并行數的相反數,這樣如果active的執行緒數,還沒到達了我們設定的閾值的時候,ctl是個負數,我們可以根據ctl的正負直觀的知道現在的并行數達到閾值了么,
    • 編號為2的16位:TC 表示執行緒總量,初始值也是并行數的相反數,這里需要說明一下,這個編號1所表示的活躍的執行緒數的區別,我們雖然開啟了并行數等量的執行緒,但是可能在某些條件下,運行的thread不得不wait或者park,原因我們后面會提到,這個時候,雖然我們開啟的執行緒數量是和并行數相同,但是實際真正執行的卻不是這么多,TC 記錄了我們一共開啟了多少執行緒,而AC則記錄了沒有掛起的執行緒,
    • 編號為3的16位:后32位標識 idle workers 前面16位第一位標識是active的還是inactive的,其他為是版本標識,
    • 編號為4的16位:標識idle workersWorkQueue[]陣列中的index,這里需要說明的是,ctl的后32位其實只能表示一個idle workers,那么我們如果有很多個idle worker要怎么辦呢?老爺子使用的是stack的概念來保存這些資訊,后32位標識的是top的那個,我們能從top中的變數stackPred追蹤到下一個idle worker

WorkQueue變數基本說明

WorkQueue是一個雙向串列,用于task的排隊,
幾個變數的定義說明:

  • scanState: // versioned, <0: inactive; odd:scanning 如果WorkQueue沒有屬于自己的owner(下標為偶數的都沒有),該值為 inactive 也就是一個負數,如果有自己的owner,該值的初始值為其在WorkQueue[]陣列中的下標,也肯定是個奇數,
    如果這個值,變成了偶數,說明該佇列所屬的Thread正在執行Task

        static final int SCANNING     = 1;             // false when running   tasks
        static final int INACTIVE     = 1 << 31;       // must be negative
    
  • stackPred: 記錄前任的 idle worker

  • config:index | mode, 如果下標為偶數的WorkQueue,則其mode是共享型別,如果有自己的owner 默認是 LIFO

    什么時候應該設定成 FIFO,注釋中這么給的建議:
    establishes local first-in-first-out scheduling mode for forked
    tasks that are never joined. This mode may be more appropriate
    than default locally stack-based mode in applications in which
    worker threads only process event-style asynchronous tasks.
    For default value, use {@code false}

  • qlock: 鎖標識,在多執行緒往佇列中添加資料,會有競爭,使用此標識搶占鎖,

  • base:worker steal的偏移量,因為其他的執行緒都可以偷該佇列的任務,所有base使用volatile標識,

  • top:owner執行任務的偏移量,

  • parker:如果 owner 掛起,則使用該變數做記錄,

  • currentJoin: 當前正在join等待結果的任務,

  • currentSteal:當前執行的任務是steal過來的任務,該變數做記錄,

ForkJoinTask變數基本說明

  • status: 標識任務目前的狀態,如果<0,表示任務處于結束狀態,
    ((s >>> 16) != 0)表示需要signal其他執行緒

任務提交程序剖析

ForkJoinPool提供的提交介面很多,不管提交的是CallableRunnableForkJoinTask最終都會轉換成ForkJoinTask型別的任務,呼叫方法externalPush(ForkJoinTask<?> task)來進行提交邏輯,讓我們來看看提交的程序:

  • 如果第一次提交(或者是hash之后的佇列還未初始化),呼叫externalSubmit

    • 第一遍回圈: (runState不是開始狀態): 1.lock; 2.創建陣列WorkQueue[n],這里的n是power of 2; 3. runState設定為開始狀態,
    • 第二遍回圈:(根據ThreadLocalRandom.getProbe()hash后的陣列中相應位置的WorkQueue未初始化): 初始化WorkQueue,通過這種方式創立的WorkQueue均是SHARED_QUEUE,scanStateINACTIVE
    • 第三遍回圈: 找到剛剛創建的WorkQueue,lock住佇列,將資料塞到arraytop位置,如果添加成功,就用呼叫接下來要攤開講的重要的方法signalWork
  • 如果hash之后的佇列已經存在

    • lock住佇列,將資料塞到top位置,如果該佇列任務很少(n <= 1)也會呼叫signalWork

signalWork

signalWork是fork/join框架中重要的方法之一,用于創建或者激活作業執行緒,本小節主要看它的原始碼實作,文章后面我們會總結它的使用場景,

    final void signalWork(WorkQueue[] ws, WorkQueue q) {
        long c; int sp, i; WorkQueue v; Thread p;
        while ((c = ctl) < 0L) {                       // too few active
            if ((sp = (int)c) == 0) {                  // no idle workers
                if ((c & ADD_WORKER) != 0L)            // too few workers
                    tryAddWorker(c);
                break;
            }
            if (ws == null)                            // unstarted/terminated
                break;
            if (ws.length <= (i = sp & SMASK))         // terminated
                break;
            if ((v = ws[i]) == null)                   // terminating
                break;
            int vs = (sp + SS_SEQ) & ~INACTIVE;        // next scanState
            int d = sp - v.scanState;                  // screen CAS
            long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);
            if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {
                v.scanState = vs;                      // activate v
                if ((p = v.parker) != null)
                    U.unpark(p);
                break;
            }
            if (q != null && q.base == q.top)          // no more work
                break;
        }
    }

在上面的章節我們已經具體的分析了ForkJoinPool中的ctl的標識含義,我們知道當ctl<0意味著active的執行緒還沒有到達閾值,只有ctl<0我們才會去討論要不要創建或者激活新的執行緒,(int)ctl很巧妙的拿到了ctl的低16位

  • 我們知道ctl代表的是idle worker當低16位為0的時候,意味著此刻沒有已經啟動但是空閑的執行緒,如果在沒有空閑的執行緒的情況下

    (c & ADD_WORKER) != 0L
    private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign
    

    意味著我們再增加一個執行緒,也不能使ac為非負數(只要ctl的最高位為1,表明ac仍是負數) ,我們呼叫方法tryAddWorker來創建作業執行緒,

    • 我們首先需要使用ctl來記錄我們增加的執行緒, ctl編號-1的16位和編號-2的16位均需要加1,表示active的worker加一,總的worker加一,成功后我們將呼叫createWorker
    • 我們使用ForkJoinWorkerThreadFactory來產生一個ForkJoinWorkerThread型別的執行緒,該執行緒將會把自己注冊到Pool上,怎么注冊的呢?實作在方法registerWorker,前文我們已經提及,擁有執行緒的WorkQueue只能出現在陣列的奇數下標處,所以執行緒 首先,創建一個新的WorkQueue,其次在陣列WorkQueue[]尋找奇數下標尚未初始化的位置,如果回圈的次數大于陣列長度,還可能需要對陣列進行擴容,然后,設定這個WorkQueue的 config 為 index | mode (下標和模式),scanState為 index (下標>0),最后啟動這個執行緒,執行緒的處理我們接下來的章節介紹,
  • 當我們發現我們還有idle worker(即(int)ctl!=0L),我們需要active其中的一個,

    • 我們上文說過ctl的編號-3的16位,標記inactive和版本控制,我們將編號-3設定為激活狀態并且版本加一,編號-4的16位我們之前也說過,放置了top的掛起的執行緒的index所以我們可以根據這個index拿到WorkQueue--- 意味著就是這個WorkQueueOwner執行緒被掛起了,
    • 我們將要把top的掛起執行緒喚醒,意味著我們要講下一個掛起的執行緒的資訊記錄到ctl上,前文也說在上一個掛起的執行緒的index資訊在這個掛起的執行緒的stackPred
      int vs = (sp + SS_SEQ) & ~INACTIVE;        // next scanState
             int d = sp - v.scanState;                  // screen CAS
             long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);
             if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {
                 v.scanState = vs;                      // activate v
                 if ((p = v.parker) != null)
                     U.unpark(p);
                 break;
             }
      
      更新完ctl后,我們將喚醒之前掛起的執行緒,

    通過上述的簡單介紹,用戶的任務的提交所經歷的步驟就介紹完了,

ForkJoinWorkerThread運行程序剖析

ForkJoinWorkerThread啟動之后會呼叫poolrunWorker來獲取任務執行,

    final void runWorker(WorkQueue w) {
        w.growArray();                   // allocate queue
        int seed = w.hint;               // initially holds randomization hint
        int r = (seed == 0) ? 1 : seed;  // avoid 0 for xorShift
        for (ForkJoinTask<?> t;;) {
            if ((t = scan(w, r)) != null)
                w.runTask(t);
            else if (!awaitWork(w, r))
                break;
            r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
        }
    }

代碼很短,但是我們很容易讀懂其中的意思,呼叫scan嘗試去偷取一個任務,然后呼叫runTask或者awaitWork,這里的scan是框架的重要的實作,我們將詳述上面的三個方法,

scan

代碼如下:

    private ForkJoinTask<?> scan(WorkQueue w, int r) {
        WorkQueue[] ws; int m;
        if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) {
            int ss = w.scanState;                     // initially non-negative
            for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) {
                WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
                int b, n; long c;
                if ((q = ws[k]) != null) {
                    if ((n = (b = q.base) - q.top) < 0 &&
                        (a = q.array) != null) {      // non-empty
                        long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
                        if ((t = ((ForkJoinTask<?>)
                                  U.getObjectVolatile(a, i))) != null &&
                            q.base == b) {
                            if (ss >= 0) {
                                if (U.compareAndSwapObject(a, i, t, null)) {
                                    q.base = b + 1;
                                    if (n < -1)       // signal others
                                        signalWork(ws, q);
                                    return t;
                                }
                            }
                            else if (oldSum == 0 &&   // try to activate
                                     w.scanState < 0)
                                tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);
                        }
                        if (ss < 0)                   // refresh
                            ss = w.scanState;
                        r ^= r << 1; r ^= r >>> 3; r ^= r << 10;
                        origin = k = r & m;           // move and rescan
                        oldSum = checkSum = 0;
                        continue;
                    }
                    checkSum += b;
                }
                if ((k = (k + 1) & m) == origin) {    // continue until stable
                    if ((ss >= 0 || (ss == (ss = w.scanState))) &&
                        oldSum == (oldSum = checkSum)) {
                        if (ss < 0 || w.qlock < 0)    // already inactive
                            break;
                        int ns = ss | INACTIVE;       // try to inactivate
                        long nc = ((SP_MASK & ns) |
                                   (UC_MASK & ((c = ctl) - AC_UNIT)));
                        w.stackPred = (int)c;         // hold prev stack top
                        U.putInt(w, QSCANSTATE, ns);
                        if (U.compareAndSwapLong(this, CTL, c, nc))
                            ss = ns;
                        else
                            w.scanState = ss;         // back out
                    }
                    checkSum = 0;
                }
            }
        }
        return null;
    }

因為我們的WorkQueue是有owner執行緒的佇列,我們可以知道以下資訊:

  • config = index | mode
  • scanState = index > 0

我們首先通過random的r來找到一個我們準備偷取的佇列,

  • 如果我們準備偷取的佇列剛好有任務在排隊(也有可能是owner自己的那個佇列);
    • 從佇列的隊尾即base位置取到任務回傳
    • base + 1
  • 如果我們遍歷了一圈(((k = (k + 1) & m) == origin))都沒有偷到,我們就認為當前的active 執行緒過剩了,我們準備將當前的執行緒(即owner)掛起,我們首先 index | INACTIVE 形成 ctl的后32位;并行將ac減一,其次,將原來的掛起的top的index記錄到stackPred中,
  • 繼續遍歷如果仍然一無所獲,將跳出回圈;如果偷到了一個任務,我們將使用tryRelease激活,

runTask

如果我們通過scan偷到了任務,這個時候我們將進入執行狀態:

        final void runTask(ForkJoinTask<?> task) {
            if (task != null) {
                scanState &= ~SCANNING; // mark as busy
                (currentSteal = task).doExec();
                U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC
                execLocalTasks();
                ForkJoinWorkerThread thread = owner;
                if (++nsteals < 0)      // collect on overflow
                    transferStealCount(pool);
                scanState |= SCANNING;
                if (thread != null)
                    thread.afterTopLevelExec();
            }
        }

首先scanState &= ~SCANNING;標識該執行緒處于繁忙狀態,

  • 執行偷取的Task,
  • 呼叫execLocalTasks對執行緒所屬的WorkQueue內的任務進行LIFO執行,

awaitWork

如果我們通過scan一無所獲,這個時候我們將進入執行狀態:

    private boolean awaitWork(WorkQueue w, int r) {
        if (w == null || w.qlock < 0)                 // w is terminating
            return false;
        for (int pred = w.stackPred, spins = SPINS, ss;;) {
            if ((ss = w.scanState) >= 0)
                break;
            else if (spins > 0) {
                r ^= r << 6; r ^= r >>> 21; r ^= r << 7;
                if (r >= 0 && --spins == 0) {         // randomize spins
                    WorkQueue v; WorkQueue[] ws; int s, j; AtomicLong sc;
                    if (pred != 0 && (ws = workQueues) != null &&
                        (j = pred & SMASK) < ws.length &&
                        (v = ws[j]) != null &&        // see if pred parking
                        (v.parker == null || v.scanState >= 0))
                        spins = SPINS;                // continue spinning
                }
            }
            else if (w.qlock < 0)                     // recheck after spins
                return false;
            else if (!Thread.interrupted()) {
                long c, prevctl, parkTime, deadline;
                int ac = (int)((c = ctl) >> AC_SHIFT) + (config & SMASK);
                if ((ac <= 0 && tryTerminate(false, false)) ||
                    (runState & STOP) != 0)           // pool terminating
                    return false;
                if (ac <= 0 && ss == (int)c) {        // is last waiter
                    prevctl = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & pred);
                    int t = (short)(c >>> TC_SHIFT);  // shrink excess spares
                    if (t > 2 && U.compareAndSwapLong(this, CTL, c, prevctl))
                        return false;                 // else use timed wait
                    parkTime = IDLE_TIMEOUT * ((t >= 0) ? 1 : 1 - t);
                    deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;
                }
                else
                    prevctl = parkTime = deadline = 0L;
                Thread wt = Thread.currentThread();
                U.putObject(wt, PARKBLOCKER, this);   // emulate LockSupport
                w.parker = wt;
                if (w.scanState < 0 && ctl == c)      // recheck before park
                    U.park(false, parkTime);
                U.putOrderedObject(w, QPARKER, null);
                U.putObject(wt, PARKBLOCKER, null);
                if (w.scanState >= 0)
                    break;
                if (parkTime != 0L && ctl == c &&
                    deadline - System.nanoTime() <= 0L &&
                    U.compareAndSwapLong(this, CTL, c, prevctl))
                    return false;                     // shrink pool
            }
        }
        return true;
    }
  • 如果ac還沒到達閾值,但是tc>2 說明現在仍然運行中的執行緒和掛起的執行緒加一起處于過剩狀態,我們將放棄該執行緒的掛起,直接讓它執行結束,不再回圈執行任務,
  • 否則,我們計算一個掛起的時間,等到了時間之后(或者被外部喚醒),執行緒醒了之后,如果發現自己狀態是active狀態(w.scanState >= 0),則執行緒繼續回去scan任務,如果發現自己是因為時間到了自動醒來,但是自己還是inactive狀態,也許,自己真的是多余的,執行緒也會執行結束,不再回圈執行任務,

ForkJoinTask執行程序剖析

從上節我們知道,我們獲取任務之后,將呼叫任務的doExec來具體執行任務:

    final int doExec() {
        int s; boolean completed;
        if ((s = status) >= 0) {
            try {
                completed = exec();
            } catch (Throwable rex) {
                return setExceptionalCompletion(rex);
            }
            if (completed)
                s = setCompletion(NORMAL);
        }
        return s;
    }

我們以RecursiveTask為例:

   protected final boolean exec() {
        result = compute();
        return true;
    }

最終呼叫的是 compute,我們舉個example來看,

demo

求整數陣列所有元素之和

public class ForkJoinCalculator implements Calculator {
    private ForkJoinPool pool;

    private static class SumTask extends RecursiveTask<Long> {
        private long[] numbers;
        private int from;
        private int to;

        public SumTask(long[] numbers, int from, int to) {
            this.numbers = numbers;
            this.from = from;
            this.to = to;
        }

        @Override
        protected Long compute() {
            // 當需要計算的數字小于6時,直接計算結果
            if (to - from < 6) {
                long total = 0;
                for (int i = from; i <= to; i++) {
                    total += numbers[i];
                }
                return total;
            // 否則,把任務一分為二,遞回計算
            } else {
                int middle = (from + to) / 2;
                SumTask taskLeft = new SumTask(numbers, from, middle);
                SumTask taskRight = new SumTask(numbers, middle+1, to);
                taskLeft.fork();
                taskRight.fork();
                return taskLeft.join() + taskRight.join();
            }
        }
    }

    public ForkJoinCalculator() {
        // 也可以使用公用的 ForkJoinPool:
        // pool = ForkJoinPool.commonPool()
        pool = new ForkJoinPool();
    }

    @Override
    public long sumUp(long[] numbers) {
        return pool.invoke(new SumTask(numbers, 0, numbers.length-1));
    }
}

這里需要注意的是其中的compute

  • 如果任務很小,直接計算結果,
  • 如果任務很大,一分為二,呼叫fork方法,獲取join方法的回傳值,

很明顯,這里任務的 forkjoin就是我們框架的核心,那么,它們都做了什么呢?

fork

fork的邏輯很簡單,

   public final ForkJoinTask<V> fork() {
        Thread t;
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
            ((ForkJoinWorkerThread)t).workQueue.push(this);
        else
            ForkJoinPool.common.externalPush(this);
        return this;
    }
  • 如果當前執行緒是作業執行緒,直接push到自己所擁有的佇列的top位置,
  • 如果是非作業執行緒,就是一個提交到Pool的程序,

join

join是一個等待結果的方法:

   public final V join() {
        int s;
        if ((s = doJoin() & DONE_MASK) != NORMAL)
            reportException(s);
        return getRawResult();
    }
  • 如果得到的結果例外,則拋出例外;
  • 如果得到的正常,則獲取回傳值,

那么,執行緒在doJoin做了什么呢?

   private int doJoin() {
        int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
        return (s = status) < 0 ? s :
            ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
            (w = (wt = (ForkJoinWorkerThread)t).workQueue).
            tryUnpush(this) && (s = doExec()) < 0 ? s :
            wt.pool.awaitJoin(w, this, 0L) :
            externalAwaitDone();
    }
  • 如果當前執行緒不是作業執行緒,則呼叫externalAwaitDone,首先,設定任務的status為signal狀態,這樣該任務執行結束之后會呼叫notifyAll來喚醒自己;其次,阻塞自己,知道任務執行完成后把自己喚醒,
  • 如果需要join的任務已經完成,直接回傳運行結果;
  • 如果需要join的任務剛剛好是當前執行緒所擁有的佇列的top位置,這意味著當前作業執行緒下一個就將執行到它,則執行它,
  • 如果該任務不在top位置,則呼叫awaitJoin方法:
    • 設定currentJoin表明自己正在等待該任務;
    • 如果發現 w.base == w.top 或者 tryRemoveAndExec回傳 true說明自己所屬的佇列為空,也說明我們通過fork將本執行緒的任務已經被別的執行緒偷走,該執行緒也不會閑著,將會helpStealer幫助幫助自己執行任務的執行緒執行任務(互惠互利,你來我往)
    • 如果tryCompensate為 true,則阻塞本執行緒,等待任務執行結束的喚醒

tryRemoveAndExec

如果join的任務還沒有被執行,我們去自己的佇列中去查找,看看任務是否不在top位置但是還是在佇列中

        while ((n = (s = top) - (b = base)) > 0) {
                    for (ForkJoinTask<?> t;;) {      // traverse from s to b
                        long j = ((--s & m) << ASHIFT) + ABASE;
                        if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null)
                            return s + 1 == top;     // shorter than expected
                        else if (t == task) {
                            boolean removed = false;
                            if (s + 1 == top) {      // pop
                                if (U.compareAndSwapObject(a, j, task, null)) {
                                    U.putOrderedInt(this, QTOP, s);
                                    removed = true;
                                }
                            }
                            else if (base == b)      // replace with proxy
                                removed = U.compareAndSwapObject(
                                    a, j, task, new EmptyTask());
                            if (removed)
                                task.doExec();
                            break;
                        }
                        else if (t.status < 0 && s + 1 == top) {
                            if (U.compareAndSwapObject(a, j, t, null))
                                U.putOrderedInt(this, QTOP, s);
                            break;                  // was cancelled
                        }
                        if (--n == 0)
                            return false;
                    }
                    if (task.status < 0)
                        return false;
                }
  • 如果剛好在top位置,pop出來執行,
  • 如果在佇列中間,則使用EmptyTask來占位,將任務取出來執行,
  • 如果執行的任務還沒結束,則不進行helpStealer,

helpStealer

helpStealer的原則是你幫助我執行任務,我也幫你執行任務,

  • 遍歷奇數下標,如果發現佇列物件currentSteal放置的剛好是自己要找的任務,則說明自己的任務被該佇列A的owner執行緒偷來執行
  • 如果佇列A佇列中有任務,則從隊尾(base)取出執行;
  • 如果發現佇列A佇列為空,則根據它正在join的任務,在拓撲找到相關的佇列B去偷取任務執行,
    在執行的程序中要注意,我們應該完整的把任務完成

  do {
      U.putOrderedObject(w, QCURRENTSTEAL, t);
      t.doExec();        // clear local tasks too
        } while (task.status >= 0 &&
                                         w.top != top &&
                                         (t = w.pop()) != null);

這是什么意思呢? 因為你在執行這個任務的時候,這個任務也可能fork出什么子任務push到當前執行緒,我們應該記錄原來佇列top的位置,然后在執行結束后,還回到top原來的位置,

  • 幫忙執行任務完成后,如果發現自己的佇列有任務了(w.base != w.top),在不再幫助執行任務了,
  • 否則在等待自己的join的那個任務結束之前,可以不斷的偷取任務執行,

tryCompensate

如果自己等待的任務被偷走執行還沒結束,自己的佇列還有任務,我們需要做一些補償

    private boolean tryCompensate(WorkQueue w) {
        boolean canBlock;
        WorkQueue[] ws; long c; int m, pc, sp;
        if (w == null || w.qlock < 0 ||           // caller terminating
            (ws = workQueues) == null || (m = ws.length - 1) <= 0 ||
            (pc = config & SMASK) == 0)           // parallelism disabled
            canBlock = false;
        else if ((sp = (int)(c = ctl)) != 0)      // release idle worker
            canBlock = tryRelease(c, ws[sp & m], 0L);
        else {
            int ac = (int)(c >> AC_SHIFT) + pc;
            int tc = (short)(c >> TC_SHIFT) + pc;
            int nbusy = 0;                        // validate saturation
            for (int i = 0; i <= m; ++i) {        // two passes of odd indices
                WorkQueue v;
                if ((v = ws[((i << 1) | 1) & m]) != null) {
                    if ((v.scanState & SCANNING) != 0)
                        break;
                    ++nbusy;
                }
            }
            if (nbusy != (tc << 1) || ctl != c)
                canBlock = false;                 // unstable or stale
            else if (tc >= pc && ac > 1 && w.isEmpty()) {
                long nc = ((AC_MASK & (c - AC_UNIT)) |
                           (~AC_MASK & c));       // uncompensated
                canBlock = U.compareAndSwapLong(this, CTL, c, nc);
            }
            else if (tc >= MAX_CAP ||
                     (this == common && tc >= pc + commonMaxSpares))
                throw new RejectedExecutionException(
                    "Thread limit exceeded replacing blocked worker");
            else {                                // similar to tryAddWorker
                boolean add = false; int rs;      // CAS within lock
                long nc = ((AC_MASK & c) |
                           (TC_MASK & (c + TC_UNIT)));
                if (((rs = lockRunState()) & STOP) == 0)
                    add = U.compareAndSwapLong(this, CTL, c, nc);
                unlockRunState(rs, rs & ~RSLOCK);
                canBlock = add && createWorker(); // throws on exception
            }
        }
        return canBlock;
    }
  • 如果 ((sp = (int)(c = ctl)) != 0) 說明還有 idle worker則可以選擇喚醒一個執行緒替代自己,掛起自己等待任務來喚醒自己,
  • 如果沒有idle worker 則額外創建一個新的作業執行緒替代自己,掛起自己等待任務來喚醒自己,

總結

在了解了 Fork/Join Framework 的作業原理之后,相信很多使用上的注意事項就可以從原理中找到原因

最后:

針對最近很多人都在面試,我這邊也整理了相當多的面試專題資料(spring、mybatis、jvm,,,帶多了可以看附上的圖片)和多家公司的面試真題,

上述面試題答案都整理成檔案筆記, 也還整理了一些面試資料&最新2020收集的一些大廠的面試真題(都整理成檔案,小部分截圖),有需要的可以 點擊進入 查看領取資料,

有人可能會說了:面試真題你全部放上來就好了,,,

你知道我最近整理了多少嗎?

希望對大家有所幫助,有用的話點贊給我支持!

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/125768.html

標籤:其他

上一篇:群集技術———LVS(Linux Virtual Server,Linux虛擬服務器)負載均衡

下一篇:渣渣本科的2021屆秋招總結-淚目

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 網閘典型架構簡述

    網閘架構一般分為兩種:三主機的三系統架構網閘和雙主機的2+1架構網閘。 三主機架構分別為內端機、外端機和仲裁機。三機無論從軟體和硬體上均各自獨立。首先從硬體上來看,三機都用各自獨立的主板、記憶體及存盤設備。從軟體上來看,三機有各自獨立的作業系統。這樣能達到完全的三機獨立。對于“2+1”系統,“2”分為 ......

    uj5u.com 2020-09-10 02:00:44 more
  • 如何從xshell上傳檔案到centos linux虛擬機里

    如何從xshell上傳檔案到centos linux虛擬機里及:虛擬機CentOs下執行 yum -y install lrzsz命令,出現錯誤:鏡像無法找到軟體包 前言 一、安裝lrzsz步驟 二、上傳檔案 三、遇到的問題及解決方案 總結 前言 提示:其實很簡單,往虛擬機上安裝一個上傳檔案的工具 ......

    uj5u.com 2020-09-10 02:00:47 more
  • 一、SQLMAP入門

    一、SQLMAP入門 1、判斷是否存在注入 sqlmap.py -u 網址/id=1 id=1不可缺少。當注入點后面的引數大于兩個時。需要加雙引號, sqlmap.py -u "網址/id=1&uid=1" 2、判斷文本中的請求是否存在注入 從文本中加載http請求,SQLMAP可以從一個文本檔案中 ......

    uj5u.com 2020-09-10 02:00:50 more
  • Metasploit 簡單使用教程

    metasploit 簡單使用教程 浩先生, 2020-08-28 16:18:25 分類專欄: kail 網路安全 linux 文章標簽: linux資訊安全 編輯 著作權 metasploit 使用教程 前言 一、Metasploit是什么? 二、準備作業 三、具體步驟 前言 Msfconsole ......

    uj5u.com 2020-09-10 02:00:53 more
  • 游戲逆向之驅動層與用戶層通訊

    驅動層代碼: #pragma once #include <ntifs.h> #define add_code CTL_CODE(FILE_DEVICE_UNKNOWN,0x800,METHOD_BUFFERED,FILE_ANY_ACCESS) /* 更多游戲逆向視頻www.yxfzedu.com ......

    uj5u.com 2020-09-10 02:00:56 more
  • 北斗電力時鐘(北斗授時服務器)讓網路資料更精準

    北斗電力時鐘(北斗授時服務器)讓網路資料更精準 北斗電力時鐘(北斗授時服務器)讓網路資料更精準 京準電子科技官微——ahjzsz 近幾年,資訊技術的得了快速發展,互聯網在逐漸普及,其在人們生活和生產中都得到了廣泛應用,并且取得了不錯的應用效果。計算機網路資訊在電力系統中的應用,一方面使電力系統的運行 ......

    uj5u.com 2020-09-10 02:01:03 more
  • 【CTF】CTFHub 技能樹 彩蛋 writeup

    ?碎碎念 CTFHub:https://www.ctfhub.com/ 筆者入門CTF時時剛開始刷的是bugku的舊平臺,后來才有了CTFHub。 感覺不論是網頁UI設計,還是題目質量,賽事跟蹤,工具軟體都做得很不錯。 而且因為獨到的金幣制度的確讓人有一種想去刷題賺金幣的感覺。 個人還是非常喜歡這個 ......

    uj5u.com 2020-09-10 02:04:05 more
  • 02windows基礎操作

    我學到了一下幾點 Windows系統目錄結構與滲透的作用 常見Windows的服務詳解 Windows埠詳解 常用的Windows注冊表詳解 hacker DOS命令詳解(net user / type /md /rd/ dir /cd /net use copy、批處理 等) 利用dos命令制作 ......

    uj5u.com 2020-09-10 02:04:18 more
  • 03.Linux基礎操作

    我學到了以下幾點 01Linux系統介紹02系統安裝,密碼啊破解03Linux常用命令04LAMP 01LINUX windows: win03 8 12 16 19 配置不繁瑣 Linux:redhat,centos(紅帽社區版),Ubuntu server,suse unix:金融機構,證券,銀 ......

    uj5u.com 2020-09-10 02:04:30 more
  • 05HTML

    01HTML介紹 02頭部標簽講解03基礎標簽講解04表單標簽講解 HTML前段語言 js1.了解代碼2.根據代碼 懂得挖掘漏洞 (POST注入/XSS漏洞上傳)3.黑帽seo 白帽seo 客戶網站被黑帽植入劫持代碼如何處理4.熟悉html表單 <html><head><title>TDK標題,描述 ......

    uj5u.com 2020-09-10 02:04:36 more
最新发布
  • 2023年最新微信小程式抓包教程

    01 開門見山 隔一個月發一篇文章,不過分。 首先回顧一下《微信系結手機號資料庫被脫庫事件》,我也是第一時間得知了這個訊息,然后跟蹤了整件事情的經過。下面是這起事件的相關截圖以及近日流出的一萬條資料樣本: 個人認為這件事也沒什么,還不如關注一下之前45億快遞資料查詢渠道疑似在近日復活的訊息。 訊息是 ......

    uj5u.com 2023-04-20 08:48:24 more
  • web3 產品介紹:metamask 錢包 使用最多的瀏覽器插件錢包

    Metamask錢包是一種基于區塊鏈技術的數字貨幣錢包,它允許用戶在安全、便捷的環境下管理自己的加密資產。Metamask錢包是以太坊生態系統中最流行的錢包之一,它具有易于使用、安全性高和功能強大等優點。 本文將詳細介紹Metamask錢包的功能和使用方法。 一、 Metamask錢包的功能 數字資 ......

    uj5u.com 2023-04-20 08:47:46 more
  • vulnhub_Earth

    前言 靶機地址->>>vulnhub_Earth 攻擊機ip:192.168.20.121 靶機ip:192.168.20.122 參考文章 https://www.cnblogs.com/Jing-X/archive/2022/04/03/16097695.html https://www.cnb ......

    uj5u.com 2023-04-20 07:46:20 more
  • 從4k到42k,軟體測驗工程師的漲薪史,給我看哭了

    清明節一過,盲猜大家已經無心上班,在數著日子準備過五一,但一想到銀行卡里的余額……瞬間心情就不美麗了。最近,2023年高校畢業生就業調查顯示,本科畢業月平均起薪為5825元。調查一出,便有很多同學表示自己又被平均了。看著這一資料,不免讓人想到前不久中國青年報的一項調查:近六成大學生認為畢業10年內會 ......

    uj5u.com 2023-04-20 07:44:00 more
  • 最新版本 Stable Diffusion 開源 AI 繪畫工具之中文自動提詞篇

    🎈 標簽生成器 由于輸入正向提示詞 prompt 和反向提示詞 negative prompt 都是使用英文,所以對學習母語的我們非常不友好 使用網址:https://tinygeeker.github.io/p/ai-prompt-generator 這個網址是為了讓大家在使用 AI 繪畫的時候 ......

    uj5u.com 2023-04-20 07:43:36 more
  • 漫談前端自動化測驗演進之路及測驗工具分析

    隨著前端技術的不斷發展和應用程式的日益復雜,前端自動化測驗也在不斷演進。隨著 Web 應用程式變得越來越復雜,自動化測驗的需求也越來越高。如今,自動化測驗已經成為 Web 應用程式開發程序中不可或缺的一部分,它們可以幫助開發人員更快地發現和修復錯誤,提高應用程式的性能和可靠性。 ......

    uj5u.com 2023-04-20 07:43:16 more
  • CANN開發實踐:4個DVPP記憶體問題的典型案例解讀

    摘要:由于DVPP媒體資料處理功能對存放輸入、輸出資料的記憶體有更高的要求(例如,記憶體首地址128位元組對齊),因此需呼叫專用的記憶體申請介面,那么本期就分享幾個關于DVPP記憶體問題的典型案例,并給出原因分析及解決方法。 本文分享自華為云社區《FAQ_DVPP記憶體問題案例》,作者:昇騰CANN。 DVPP ......

    uj5u.com 2023-04-20 07:43:03 more
  • msf學習

    msf學習 以kali自帶的msf為例 一、msf核心模塊與功能 msf模塊都放在/usr/share/metasploit-framework/modules目錄下 1、auxiliary 輔助模塊,輔助滲透(埠掃描、登錄密碼爆破、漏洞驗證等) 2、encoders 編碼器模塊,主要包含各種編碼 ......

    uj5u.com 2023-04-20 07:42:59 more
  • Halcon軟體安裝與界面簡介

    1. 下載Halcon17版本到到本地 2. 雙擊安裝包后 3. 步驟如下 1.2 Halcon軟體安裝 界面分為四大塊 1. Halcon的五個助手 1) 影像采集助手:與相機連接,設定相機引數,采集影像 2) 標定助手:九點標定或是其它的標定,生成標定檔案及內參外參,可以將像素單位轉換為長度單位 ......

    uj5u.com 2023-04-20 07:42:17 more
  • 在MacOS下使用Unity3D開發游戲

    第一次發博客,先發一下我的游戲開發環境吧。 去年2月份買了一臺MacBookPro2021 M1pro(以下簡稱mbp),這一年來一直在用mbp開發游戲。我大致分享一下我的開發工具以及使用體驗。 1、Unity 官網鏈接: https://unity.cn/releases 我一般使用的Apple ......

    uj5u.com 2023-04-20 07:40:19 more