主頁 > 資料庫 > 面試大廠必問的ForkJoin框架剖析【建議收藏】

面試大廠必問的ForkJoin框架剖析【建議收藏】

2020-09-25 21:19:25 資料庫

點關注,不迷路!如果本文對你有幫助的話不要忘記點贊支持哦!

概述

image.png

和傳統的執行緒池使用AQS的實作邏輯不同,ForkJoin引入全新的結構來標識:

  • ForkJoinPool: 用于執行ForkJoinTask任務的執行池,不再是傳統執行池 Worker+Queue 的組合模式,而是維護了一個佇列陣列WorkQueue,這樣在提交任務和執行緒任務的時候大幅度的減少碰撞,
  • WorkQueue: 雙向串列,用于任務的有序執行,如果WorkQueue用于自己的執行執行緒Thread,執行緒默認將會從top端選取任務用來執行 - LIFO,因為只有owner的Thread才能從top端取任務,所以在設定變數時, int top; 不需要使用 volatile
  • ForkJoinWorkThread: 用于執行任務的執行緒,用于區別使用非ForkJoinWorkThread執行緒提交的task;啟動一個該Thread,會自動注冊一個WorkQueue到Pool,這里規定,擁有Thread的WorkQueue只能出現在WorkQueue陣列的奇數位
  • ForkJoinTask: 任務, 它比傳統的任務更加輕量,不再對是RUNNABLE的子類,提供fork/join方法用于分割任務以及聚合結果,
  • 為了充分施展并行運算,該框架實作了復雜的 worker steal演算法,當任務處于等待中,thread通過一定策略,不讓自己掛起,充分利用資源,當然,它比其他語言的協程要重一些,

ForkJoinPool變數基本說明

作為框架的提交入口,ForkJoinPool管理著執行緒池中執行緒和任務佇列,標識執行緒池是否還接收任務,顯示現在的執行緒運行狀態,本節,對這些控制量進行解釋,

如果讀者看過 類似 disrupter 這種高效率佇列的開源實作,大家肯定會對cache line記憶猶新,他們通常的做法自己設定偽變數來填充,jdk1.8�中官網為我們帶來了sun.misc.Contended,所以你如果閱讀ForkJoinPool原始碼可以發現該類也被sun.misc.Contended標識,
幾個重要變數:

  • runState: 標識Pool運行狀態,使用bit位來標識不同狀態,比如
        // runState bits: SHUTDOWN must be negative, others arbitrary powers of two
      private static final int  RSLOCK     = 1;
      private static final int  RSIGNAL    = 1 << 1;
      private static final int  STARTED    = 1 << 2;
      private static final int  STOP       = 1 << 29;
      private static final int  TERMINATED = 1 << 30;
      private static final int  SHUTDOWN   = 1 << 31;
    
    如果執行 runState & RSLOCK ==0 就能直接說明,目前的運行狀態沒有被鎖住,其他情況一樣,
  • config:parallelism | mode
  • parallelism: 這個變數不是內部定義的變數,但是需要各位注意一下它的界限,因為后面的處理需要注意
    static final int MAX_CAP      = 0x7fff;        // max #workers - 1
    

也就是說他最大就占16位

  • ctl:ctl是Pool的控制變數,型別是long - 說明有64位,每個部分都有不同的作用,我們使用十六進制來標識ctl,依次說明不同部分的作用,
    long np = (long)(-parallelism); // offset ctl counts
    this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
    0x xxxx-1  xxxx-2  xxxx-3  xxxx-4
    
    我為每個部分使用了數字來標識 - 1,2,3,4,
    • 編號為1的16位: AC 表示現在獲取的執行緒數,這里的初始化比較有技巧,使用的是并行數的相反數,這樣如果active的執行緒數,還沒到達了我們設定的閾值的時候,ctl是個負數,我們可以根據ctl的正負直觀的知道現在的并行數達到閾值了么,
    • 編號為2的16位:TC 表示執行緒總量,初始值也是并行數的相反數,這里需要說明一下,這個編號1所表示的活躍的執行緒數的區別,我們雖然開啟了并行數等量的執行緒,但是可能在某些條件下,運行的thread不得不wait或者park,原因我們后面會提到,這個時候,雖然我們開啟的執行緒數量是和并行數相同,但是實際真正執行的卻不是這么多,TC 記錄了我們一共開啟了多少執行緒,而AC則記錄了沒有掛起的執行緒,
    • 編號為3的16位:后32位標識 idle workers 前面16位第一位標識是active的還是inactive的,其他為是版本標識,
    • 編號為4的16位:標識idle workersWorkQueue[]陣列中的index,這里需要說明的是,ctl的后32位其實只能表示一個idle workers,那么我們如果有很多個idle worker要怎么辦呢?老爺子使用的是stack的概念來保存這些資訊,后32位標識的是top的那個,我們能從top中的變數stackPred追蹤到下一個idle worker

WorkQueue變數基本說明

WorkQueue是一個雙向串列,用于task的排隊,
幾個變數的定義說明:

  • scanState: // versioned, <0: inactive; odd:scanning 如果WorkQueue沒有屬于自己的owner(下標為偶數的都沒有),該值為 inactive 也就是一個負數,如果有自己的owner,該值的初始值為其在WorkQueue[]陣列中的下標,也肯定是個奇數,
    如果這個值,變成了偶數,說明該佇列所屬的Thread正在執行Task

        static final int SCANNING     = 1;             // false when running   tasks
        static final int INACTIVE     = 1 << 31;       // must be negative
    
  • stackPred: 記錄前任的 idle worker

  • config:index | mode, 如果下標為偶數的WorkQueue,則其mode是共享型別,如果有自己的owner 默認是 LIFO

    什么時候應該設定成 FIFO,注釋中這么給的建議:
    establishes local first-in-first-out scheduling mode for forked
    tasks that are never joined. This mode may be more appropriate
    than default locally stack-based mode in applications in which
    worker threads only process event-style asynchronous tasks.
    For default value, use {@code false}

  • qlock: 鎖標識,在多執行緒往佇列中添加資料,會有競爭,使用此標識搶占鎖,

  • base:worker steal的偏移量,因為其他的執行緒都可以偷該佇列的任務,所有base使用volatile標識,

  • top:owner執行任務的偏移量,

  • parker:如果 owner 掛起,則使用該變數做記錄,

  • currentJoin: 當前正在join等待結果的任務,

  • currentSteal:當前執行的任務是steal過來的任務,該變數做記錄,

ForkJoinTask變數基本說明

  • status: 標識任務目前的狀態,如果<0,表示任務處于結束狀態,
    ((s >>> 16) != 0)表示需要signal其他執行緒

任務提交程序剖析

ForkJoinPool提供的提交介面很多,不管提交的是CallableRunnableForkJoinTask最終都會轉換成ForkJoinTask型別的任務,呼叫方法externalPush(ForkJoinTask<?> task)來進行提交邏輯,讓我們來看看提交的程序:

  • 如果第一次提交(或者是hash之后的佇列還未初始化),呼叫externalSubmit

    • 第一遍回圈: (runState不是開始狀態): 1.lock; 2.創建陣列WorkQueue[n],這里的n是power of 2; 3. runState設定為開始狀態,
    • 第二遍回圈:(根據ThreadLocalRandom.getProbe()hash后的陣列中相應位置的WorkQueue未初始化): 初始化WorkQueue,通過這種方式創立的WorkQueue均是SHARED_QUEUE,scanStateINACTIVE
    • 第三遍回圈: 找到剛剛創建的WorkQueue,lock住佇列,將資料塞到arraytop位置,如果添加成功,就用呼叫接下來要攤開講的重要的方法signalWork
  • 如果hash之后的佇列已經存在

    • lock住佇列,將資料塞到top位置,如果該佇列任務很少(n <= 1)也會呼叫signalWork

signalWork

signalWork是fork/join框架中重要的方法之一,用于創建或者激活作業執行緒,本小節主要看它的原始碼實作,文章后面我們會總結它的使用場景,

    final void signalWork(WorkQueue[] ws, WorkQueue q) {
        long c; int sp, i; WorkQueue v; Thread p;
        while ((c = ctl) < 0L) {                       // too few active
            if ((sp = (int)c) == 0) {                  // no idle workers
                if ((c & ADD_WORKER) != 0L)            // too few workers
                    tryAddWorker(c);
                break;
            }
            if (ws == null)                            // unstarted/terminated
                break;
            if (ws.length <= (i = sp & SMASK))         // terminated
                break;
            if ((v = ws[i]) == null)                   // terminating
                break;
            int vs = (sp + SS_SEQ) & ~INACTIVE;        // next scanState
            int d = sp - v.scanState;                  // screen CAS
            long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);
            if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {
                v.scanState = vs;                      // activate v
                if ((p = v.parker) != null)
                    U.unpark(p);
                break;
            }
            if (q != null && q.base == q.top)          // no more work
                break;
        }
    }

在上面的章節我們已經具體的分析了ForkJoinPool中的ctl的標識含義,我們知道當ctl<0意味著active的執行緒還沒有到達閾值,只有ctl<0我們才會去討論要不要創建或者激活新的執行緒,(int)ctl很巧妙的拿到了ctl的低16位

  • 我們知道ctl代表的是idle worker當低16位為0的時候,意味著此刻沒有已經啟動但是空閑的執行緒,如果在沒有空閑的執行緒的情況下

    (c & ADD_WORKER) != 0L
    private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign
    

    意味著我們再增加一個執行緒,也不能使ac為非負數(只要ctl的最高位為1,表明ac仍是負數) ,我們呼叫方法tryAddWorker來創建作業執行緒,

    • 我們首先需要使用ctl來記錄我們增加的執行緒, ctl編號-1的16位和編號-2的16位均需要加1,表示active的worker加一,總的worker加一,成功后我們將呼叫createWorker
    • 我們使用ForkJoinWorkerThreadFactory來產生一個ForkJoinWorkerThread型別的執行緒,該執行緒將會把自己注冊到Pool上,怎么注冊的呢?實作在方法registerWorker,前文我們已經提及,擁有執行緒的WorkQueue只能出現在陣列的奇數下標處,所以執行緒 首先,創建一個新的WorkQueue,其次在陣列WorkQueue[]尋找奇數下標尚未初始化的位置,如果回圈的次數大于陣列長度,還可能需要對陣列進行擴容,然后,設定這個WorkQueue的 config 為 index | mode (下標和模式),scanState為 index (下標>0),最后啟動這個執行緒,執行緒的處理我們接下來的章節介紹,
  • 當我們發現我們還有idle worker(即(int)ctl!=0L),我們需要active其中的一個,

    • 我們上文說過ctl的編號-3的16位,標記inactive和版本控制,我們將編號-3設定為激活狀態并且版本加一,編號-4的16位我們之前也說過,放置了top的掛起的執行緒的index所以我們可以根據這個index拿到WorkQueue--- 意味著就是這個WorkQueueOwner執行緒被掛起了,
    • 我們將要把top的掛起執行緒喚醒,意味著我們要講下一個掛起的執行緒的資訊記錄到ctl上,前文也說在上一個掛起的執行緒的index資訊在這個掛起的執行緒的stackPred
      int vs = (sp + SS_SEQ) & ~INACTIVE;        // next scanState
             int d = sp - v.scanState;                  // screen CAS
             long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);
             if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {
                 v.scanState = vs;                      // activate v
                 if ((p = v.parker) != null)
                     U.unpark(p);
                 break;
             }
      
      更新完ctl后,我們將喚醒之前掛起的執行緒,

    通過上述的簡單介紹,用戶的任務的提交所經歷的步驟就介紹完了,

ForkJoinWorkerThread運行程序剖析

ForkJoinWorkerThread啟動之后會呼叫poolrunWorker來獲取任務執行,

    final void runWorker(WorkQueue w) {
        w.growArray();                   // allocate queue
        int seed = w.hint;               // initially holds randomization hint
        int r = (seed == 0) ? 1 : seed;  // avoid 0 for xorShift
        for (ForkJoinTask<?> t;;) {
            if ((t = scan(w, r)) != null)
                w.runTask(t);
            else if (!awaitWork(w, r))
                break;
            r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
        }
    }

代碼很短,但是我們很容易讀懂其中的意思,呼叫scan嘗試去偷取一個任務,然后呼叫runTask或者awaitWork,這里的scan是框架的重要的實作,我們將詳述上面的三個方法,

scan

代碼如下:

    private ForkJoinTask<?> scan(WorkQueue w, int r) {
        WorkQueue[] ws; int m;
        if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) {
            int ss = w.scanState;                     // initially non-negative
            for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) {
                WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
                int b, n; long c;
                if ((q = ws[k]) != null) {
                    if ((n = (b = q.base) - q.top) < 0 &&
                        (a = q.array) != null) {      // non-empty
                        long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
                        if ((t = ((ForkJoinTask<?>)
                                  U.getObjectVolatile(a, i))) != null &&
                            q.base == b) {
                            if (ss >= 0) {
                                if (U.compareAndSwapObject(a, i, t, null)) {
                                    q.base = b + 1;
                                    if (n < -1)       // signal others
                                        signalWork(ws, q);
                                    return t;
                                }
                            }
                            else if (oldSum == 0 &&   // try to activate
                                     w.scanState < 0)
                                tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);
                        }
                        if (ss < 0)                   // refresh
                            ss = w.scanState;
                        r ^= r << 1; r ^= r >>> 3; r ^= r << 10;
                        origin = k = r & m;           // move and rescan
                        oldSum = checkSum = 0;
                        continue;
                    }
                    checkSum += b;
                }
                if ((k = (k + 1) & m) == origin) {    // continue until stable
                    if ((ss >= 0 || (ss == (ss = w.scanState))) &&
                        oldSum == (oldSum = checkSum)) {
                        if (ss < 0 || w.qlock < 0)    // already inactive
                            break;
                        int ns = ss | INACTIVE;       // try to inactivate
                        long nc = ((SP_MASK & ns) |
                                   (UC_MASK & ((c = ctl) - AC_UNIT)));
                        w.stackPred = (int)c;         // hold prev stack top
                        U.putInt(w, QSCANSTATE, ns);
                        if (U.compareAndSwapLong(this, CTL, c, nc))
                            ss = ns;
                        else
                            w.scanState = ss;         // back out
                    }
                    checkSum = 0;
                }
            }
        }
        return null;
    }

因為我們的WorkQueue是有owner執行緒的佇列,我們可以知道以下資訊:

  • config = index | mode
  • scanState = index > 0

我們首先通過random的r來找到一個我們準備偷取的佇列,

  • 如果我們準備偷取的佇列剛好有任務在排隊(也有可能是owner自己的那個佇列);
    • 從佇列的隊尾即base位置取到任務回傳
    • base + 1
  • 如果我們遍歷了一圈(((k = (k + 1) & m) == origin))都沒有偷到,我們就認為當前的active 執行緒過剩了,我們準備將當前的執行緒(即owner)掛起,我們首先 index | INACTIVE 形成 ctl的后32位;并行將ac減一,其次,將原來的掛起的top的index記錄到stackPred中,
  • 繼續遍歷如果仍然一無所獲,將跳出回圈;如果偷到了一個任務,我們將使用tryRelease激活,

runTask

如果我們通過scan偷到了任務,這個時候我們將進入執行狀態:

        final void runTask(ForkJoinTask<?> task) {
            if (task != null) {
                scanState &= ~SCANNING; // mark as busy
                (currentSteal = task).doExec();
                U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC
                execLocalTasks();
                ForkJoinWorkerThread thread = owner;
                if (++nsteals < 0)      // collect on overflow
                    transferStealCount(pool);
                scanState |= SCANNING;
                if (thread != null)
                    thread.afterTopLevelExec();
            }
        }

首先scanState &= ~SCANNING;標識該執行緒處于繁忙狀態,

  • 執行偷取的Task,
  • 呼叫execLocalTasks對執行緒所屬的WorkQueue內的任務進行LIFO執行,

awaitWork

如果我們通過scan一無所獲,這個時候我們將進入執行狀態:

    private boolean awaitWork(WorkQueue w, int r) {
        if (w == null || w.qlock < 0)                 // w is terminating
            return false;
        for (int pred = w.stackPred, spins = SPINS, ss;;) {
            if ((ss = w.scanState) >= 0)
                break;
            else if (spins > 0) {
                r ^= r << 6; r ^= r >>> 21; r ^= r << 7;
                if (r >= 0 && --spins == 0) {         // randomize spins
                    WorkQueue v; WorkQueue[] ws; int s, j; AtomicLong sc;
                    if (pred != 0 && (ws = workQueues) != null &&
                        (j = pred & SMASK) < ws.length &&
                        (v = ws[j]) != null &&        // see if pred parking
                        (v.parker == null || v.scanState >= 0))
                        spins = SPINS;                // continue spinning
                }
            }
            else if (w.qlock < 0)                     // recheck after spins
                return false;
            else if (!Thread.interrupted()) {
                long c, prevctl, parkTime, deadline;
                int ac = (int)((c = ctl) >> AC_SHIFT) + (config & SMASK);
                if ((ac <= 0 && tryTerminate(false, false)) ||
                    (runState & STOP) != 0)           // pool terminating
                    return false;
                if (ac <= 0 && ss == (int)c) {        // is last waiter
                    prevctl = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & pred);
                    int t = (short)(c >>> TC_SHIFT);  // shrink excess spares
                    if (t > 2 && U.compareAndSwapLong(this, CTL, c, prevctl))
                        return false;                 // else use timed wait
                    parkTime = IDLE_TIMEOUT * ((t >= 0) ? 1 : 1 - t);
                    deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;
                }
                else
                    prevctl = parkTime = deadline = 0L;
                Thread wt = Thread.currentThread();
                U.putObject(wt, PARKBLOCKER, this);   // emulate LockSupport
                w.parker = wt;
                if (w.scanState < 0 && ctl == c)      // recheck before park
                    U.park(false, parkTime);
                U.putOrderedObject(w, QPARKER, null);
                U.putObject(wt, PARKBLOCKER, null);
                if (w.scanState >= 0)
                    break;
                if (parkTime != 0L && ctl == c &&
                    deadline - System.nanoTime() <= 0L &&
                    U.compareAndSwapLong(this, CTL, c, prevctl))
                    return false;                     // shrink pool
            }
        }
        return true;
    }
  • 如果ac還沒到達閾值,但是tc>2 說明現在仍然運行中的執行緒和掛起的執行緒加一起處于過剩狀態,我們將放棄該執行緒的掛起,直接讓它執行結束,不再回圈執行任務,
  • 否則,我們計算一個掛起的時間,等到了時間之后(或者被外部喚醒),執行緒醒了之后,如果發現自己狀態是active狀態(w.scanState >= 0),則執行緒繼續回去scan任務,如果發現自己是因為時間到了自動醒來,但是自己還是inactive狀態,也許,自己真的是多余的,執行緒也會執行結束,不再回圈執行任務,

ForkJoinTask執行程序剖析

從上節我們知道,我們獲取任務之后,將呼叫任務的doExec來具體執行任務:

    final int doExec() {
        int s; boolean completed;
        if ((s = status) >= 0) {
            try {
                completed = exec();
            } catch (Throwable rex) {
                return setExceptionalCompletion(rex);
            }
            if (completed)
                s = setCompletion(NORMAL);
        }
        return s;
    }

我們以RecursiveTask為例:

   protected final boolean exec() {
        result = compute();
        return true;
    }

最終呼叫的是 compute,我們舉個example來看,

demo

求整數陣列所有元素之和

public class ForkJoinCalculator implements Calculator {
    private ForkJoinPool pool;

    private static class SumTask extends RecursiveTask<Long> {
        private long[] numbers;
        private int from;
        private int to;

        public SumTask(long[] numbers, int from, int to) {
            this.numbers = numbers;
            this.from = from;
            this.to = to;
        }

        @Override
        protected Long compute() {
            // 當需要計算的數字小于6時,直接計算結果
            if (to - from < 6) {
                long total = 0;
                for (int i = from; i <= to; i++) {
                    total += numbers[i];
                }
                return total;
            // 否則,把任務一分為二,遞回計算
            } else {
                int middle = (from + to) / 2;
                SumTask taskLeft = new SumTask(numbers, from, middle);
                SumTask taskRight = new SumTask(numbers, middle+1, to);
                taskLeft.fork();
                taskRight.fork();
                return taskLeft.join() + taskRight.join();
            }
        }
    }

    public ForkJoinCalculator() {
        // 也可以使用公用的 ForkJoinPool:
        // pool = ForkJoinPool.commonPool()
        pool = new ForkJoinPool();
    }

    @Override
    public long sumUp(long[] numbers) {
        return pool.invoke(new SumTask(numbers, 0, numbers.length-1));
    }
}

這里需要注意的是其中的compute

  • 如果任務很小,直接計算結果,
  • 如果任務很大,一分為二,呼叫fork方法,獲取join方法的回傳值,

很明顯,這里任務的 forkjoin就是我們框架的核心,那么,它們都做了什么呢?

fork

fork的邏輯很簡單,

   public final ForkJoinTask<V> fork() {
        Thread t;
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
            ((ForkJoinWorkerThread)t).workQueue.push(this);
        else
            ForkJoinPool.common.externalPush(this);
        return this;
    }
  • 如果當前執行緒是作業執行緒,直接push到自己所擁有的佇列的top位置,
  • 如果是非作業執行緒,就是一個提交到Pool的程序,

join

join是一個等待結果的方法:

   public final V join() {
        int s;
        if ((s = doJoin() & DONE_MASK) != NORMAL)
            reportException(s);
        return getRawResult();
    }
  • 如果得到的結果例外,則拋出例外;
  • 如果得到的正常,則獲取回傳值,

那么,執行緒在doJoin做了什么呢?

   private int doJoin() {
        int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
        return (s = status) < 0 ? s :
            ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
            (w = (wt = (ForkJoinWorkerThread)t).workQueue).
            tryUnpush(this) && (s = doExec()) < 0 ? s :
            wt.pool.awaitJoin(w, this, 0L) :
            externalAwaitDone();
    }
  • 如果當前執行緒不是作業執行緒,則呼叫externalAwaitDone,首先,設定任務的status為signal狀態,這樣該任務執行結束之后會呼叫notifyAll來喚醒自己;其次,阻塞自己,知道任務執行完成后把自己喚醒,
  • 如果需要join的任務已經完成,直接回傳運行結果;
  • 如果需要join的任務剛剛好是當前執行緒所擁有的佇列的top位置,這意味著當前作業執行緒下一個就將執行到它,則執行它,
  • 如果該任務不在top位置,則呼叫awaitJoin方法:
    • 設定currentJoin表明自己正在等待該任務;
    • 如果發現 w.base == w.top 或者 tryRemoveAndExec回傳 true說明自己所屬的佇列為空,也說明我們通過fork將本執行緒的任務已經被別的執行緒偷走,該執行緒也不會閑著,將會helpStealer幫助幫助自己執行任務的執行緒執行任務(互惠互利,你來我往)
    • 如果tryCompensate為 true,則阻塞本執行緒,等待任務執行結束的喚醒

tryRemoveAndExec

如果join的任務還沒有被執行,我們去自己的佇列中去查找,看看任務是否不在top位置但是還是在佇列中

        while ((n = (s = top) - (b = base)) > 0) {
                    for (ForkJoinTask<?> t;;) {      // traverse from s to b
                        long j = ((--s & m) << ASHIFT) + ABASE;
                        if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null)
                            return s + 1 == top;     // shorter than expected
                        else if (t == task) {
                            boolean removed = false;
                            if (s + 1 == top) {      // pop
                                if (U.compareAndSwapObject(a, j, task, null)) {
                                    U.putOrderedInt(this, QTOP, s);
                                    removed = true;
                                }
                            }
                            else if (base == b)      // replace with proxy
                                removed = U.compareAndSwapObject(
                                    a, j, task, new EmptyTask());
                            if (removed)
                                task.doExec();
                            break;
                        }
                        else if (t.status < 0 && s + 1 == top) {
                            if (U.compareAndSwapObject(a, j, t, null))
                                U.putOrderedInt(this, QTOP, s);
                            break;                  // was cancelled
                        }
                        if (--n == 0)
                            return false;
                    }
                    if (task.status < 0)
                        return false;
                }
  • 如果剛好在top位置,pop出來執行,
  • 如果在佇列中間,則使用EmptyTask來占位,將任務取出來執行,
  • 如果執行的任務還沒結束,則不進行helpStealer,

helpStealer

helpStealer的原則是你幫助我執行任務,我也幫你執行任務,

  • 遍歷奇數下標,如果發現佇列物件currentSteal放置的剛好是自己要找的任務,則說明自己的任務被該佇列A的owner執行緒偷來執行
  • 如果佇列A佇列中有任務,則從隊尾(base)取出執行;
  • 如果發現佇列A佇列為空,則根據它正在join的任務,在拓撲找到相關的佇列B去偷取任務執行,
    在執行的程序中要注意,我們應該完整的把任務完成

  do {
      U.putOrderedObject(w, QCURRENTSTEAL, t);
      t.doExec();        // clear local tasks too
        } while (task.status >= 0 &&
                                         w.top != top &&
                                         (t = w.pop()) != null);

這是什么意思呢? 因為你在執行這個任務的時候,這個任務也可能fork出什么子任務push到當前執行緒,我們應該記錄原來佇列top的位置,然后在執行結束后,還回到top原來的位置,

  • 幫忙執行任務完成后,如果發現自己的佇列有任務了(w.base != w.top),在不再幫助執行任務了,
  • 否則在等待自己的join的那個任務結束之前,可以不斷的偷取任務執行,

tryCompensate

如果自己等待的任務被偷走執行還沒結束,自己的佇列還有任務,我們需要做一些補償

    private boolean tryCompensate(WorkQueue w) {
        boolean canBlock;
        WorkQueue[] ws; long c; int m, pc, sp;
        if (w == null || w.qlock < 0 ||           // caller terminating
            (ws = workQueues) == null || (m = ws.length - 1) <= 0 ||
            (pc = config & SMASK) == 0)           // parallelism disabled
            canBlock = false;
        else if ((sp = (int)(c = ctl)) != 0)      // release idle worker
            canBlock = tryRelease(c, ws[sp & m], 0L);
        else {
            int ac = (int)(c >> AC_SHIFT) + pc;
            int tc = (short)(c >> TC_SHIFT) + pc;
            int nbusy = 0;                        // validate saturation
            for (int i = 0; i <= m; ++i) {        // two passes of odd indices
                WorkQueue v;
                if ((v = ws[((i << 1) | 1) & m]) != null) {
                    if ((v.scanState & SCANNING) != 0)
                        break;
                    ++nbusy;
                }
            }
            if (nbusy != (tc << 1) || ctl != c)
                canBlock = false;                 // unstable or stale
            else if (tc >= pc && ac > 1 && w.isEmpty()) {
                long nc = ((AC_MASK & (c - AC_UNIT)) |
                           (~AC_MASK & c));       // uncompensated
                canBlock = U.compareAndSwapLong(this, CTL, c, nc);
            }
            else if (tc >= MAX_CAP ||
                     (this == common && tc >= pc + commonMaxSpares))
                throw new RejectedExecutionException(
                    "Thread limit exceeded replacing blocked worker");
            else {                                // similar to tryAddWorker
                boolean add = false; int rs;      // CAS within lock
                long nc = ((AC_MASK & c) |
                           (TC_MASK & (c + TC_UNIT)));
                if (((rs = lockRunState()) & STOP) == 0)
                    add = U.compareAndSwapLong(this, CTL, c, nc);
                unlockRunState(rs, rs & ~RSLOCK);
                canBlock = add && createWorker(); // throws on exception
            }
        }
        return canBlock;
    }
  • 如果 ((sp = (int)(c = ctl)) != 0) 說明還有 idle worker則可以選擇喚醒一個執行緒替代自己,掛起自己等待任務來喚醒自己,
  • 如果沒有idle worker 則額外創建一個新的作業執行緒替代自己,掛起自己等待任務來喚醒自己,

總結

在了解了 Fork/Join Framework 的作業原理之后,相信很多使用上的注意事項就可以從原理中找到原因

最后:

針對最近很多人都在面試,我這邊也整理了相當多的面試專題資料(spring、mybatis、jvm,,,帶多了可以看附上的圖片)和多家公司的面試真題,

上述面試題答案都整理成檔案筆記, 也還整理了一些面試資料&最新2020收集的一些大廠的面試真題(都整理成檔案,小部分截圖),有需要的可以 點擊進入 查看領取資料,

有人可能會說了:面試真題你全部放上來就好了,,,

你知道我最近整理了多少嗎?

希望對大家有所幫助,有用的話點贊給我支持!

轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/128206.html

標籤:其他

上一篇:LintCode 181. 將整數A轉換為B JavaScript演算法

下一篇:渣渣本科的2021屆秋招總結-淚目

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • GPU虛擬機創建時間深度優化

    **?桔妹導讀:**GPU虛擬機實體創建速度慢是公有云面臨的普遍問題,由于通常情況下創建虛擬機屬于低頻操作而未引起業界的重視,實際生產中還是存在對GPU實體創建時間有苛刻要求的業務場景。本文將介紹滴滴云在解決該問題時的思路、方法、并展示最終的優化成果。 從公有云服務商那里購買過虛擬主機的資深用戶,一 ......

    uj5u.com 2020-09-10 06:09:13 more
  • 可編程網卡芯片在滴滴云網路的應用實踐

    **?桔妹導讀:**隨著云規模不斷擴大以及業務層面對延遲、帶寬的要求越來越高,采用DPDK 加速網路報文處理的方式在橫向縱向擴展都出現了局限性。可編程芯片成為業界熱點。本文主要講述了可編程網卡芯片在滴滴云網路中的應用實踐,遇到的問題、帶來的收益以及開源社區貢獻。 #1. 資料中心面臨的問題 隨著滴滴 ......

    uj5u.com 2020-09-10 06:10:21 more
  • 滴滴資料通道服務演進之路

    **?桔妹導讀:**滴滴資料通道引擎承載著全公司的資料同步,為下游實時和離線場景提供了必不可少的源資料。隨著任務量的不斷增加,資料通道的整體架構也隨之發生改變。本文介紹了滴滴資料通道的發展歷程,遇到的問題以及今后的規劃。 #1. 背景 資料,對于任何一家互聯網公司來說都是非常重要的資產,公司的大資料 ......

    uj5u.com 2020-09-10 06:11:05 more
  • 滴滴AI Labs斬獲國際機器翻譯大賽中譯英方向世界第三

    **桔妹導讀:**深耕人工智能領域,致力于探索AI讓出行更美好的滴滴AI Labs再次斬獲國際大獎,這次獲獎的專案是什么呢?一起來看看詳細報道吧! 近日,由國際計算語言學協會ACL(The Association for Computational Linguistics)舉辦的世界最具影響力的機器 ......

    uj5u.com 2020-09-10 06:11:29 more
  • MPP (Massively Parallel Processing)大規模并行處理

    1、什么是mpp? MPP (Massively Parallel Processing),即大規模并行處理,在資料庫非共享集群中,每個節點都有獨立的磁盤存盤系統和記憶體系統,業務資料根據資料庫模型和應用特點劃分到各個節點上,每臺資料節點通過專用網路或者商業通用網路互相連接,彼此協同計算,作為整體提供 ......

    uj5u.com 2020-09-10 06:11:41 more
  • 滴滴資料倉庫指標體系建設實踐

    **桔妹導讀:**指標體系是什么?如何使用OSM模型和AARRR模型搭建指標體系?如何統一流程、規范化、工具化管理指標體系?本文會對建設的方法論結合滴滴資料指標體系建設實踐進行解答分析。 #1. 什么是指標體系 ##1.1 指標體系定義 指標體系是將零散單點的具有相互聯系的指標,系統化的組織起來,通 ......

    uj5u.com 2020-09-10 06:12:52 more
  • 單表千萬行資料庫 LIKE 搜索優化手記

    我們經常在資料庫中使用 LIKE 運算子來完成對資料的模糊搜索,LIKE 運算子用于在 WHERE 子句中搜索列中的指定模式。 如果需要查找客戶表中所有姓氏是“張”的資料,可以使用下面的 SQL 陳述句: SELECT * FROM Customer WHERE Name LIKE '張%' 如果需要 ......

    uj5u.com 2020-09-10 06:13:25 more
  • 滴滴Ceph分布式存盤系統優化之鎖優化

    **桔妹導讀:**Ceph是國際知名的開源分布式存盤系統,在工業界和學術界都有著重要的影響。Ceph的架構和演算法設計發表在國際系統領域頂級會議OSDI、SOSP、SC等上。Ceph社區得到Red Hat、SUSE、Intel等大公司的大力支持。Ceph是國際云計算領域應用最廣泛的開源分布式存盤系統, ......

    uj5u.com 2020-09-10 06:14:51 more
  • es~通過ElasticsearchTemplate進行聚合~嵌套聚合

    之前寫過《es~通過ElasticsearchTemplate進行聚合操作》的文章,這一次主要寫一個嵌套的聚合,例如先對sex集合,再對desc聚合,最后再對age求和,共三層嵌套。 Aggregations的部分特性類似于SQL語言中的group by,avg,sum等函式,Aggregation ......

    uj5u.com 2020-09-10 06:14:59 more
  • 爬蟲日志監控 -- Elastc Stack(ELK)部署

    傻瓜式部署,只需替換IP與用戶 導讀: 現ELK四大組件分別為:Elasticsearch(核心)、logstash(處理)、filebeat(采集)、kibana(可視化) 下載均在https://www.elastic.co/cn/downloads/下tar包,各組件版本最好一致,配合fdm會 ......

    uj5u.com 2020-09-10 06:15:05 more
最新发布
  • day02-2-商鋪查詢快取

    功能02-商鋪查詢快取 3.商鋪詳情快取查詢 3.1什么是快取? 快取就是資料交換的緩沖區(稱作Cache),是存盤資料的臨時地方,一般讀寫性能較高。 快取的作用: 降低后端負載 提高讀寫效率,降低回應時間 快取的成本: 資料一致性成本 代碼維護成本 運維成本 3.2需求說明 如下,當我們點擊商店詳 ......

    uj5u.com 2023-04-20 08:33:24 more
  • MySQL中binlog備份腳本分享

    關于MySQL的二進制日志(binlog),我們都知道二進制日志(binlog)非常重要,尤其當你需要point to point災難恢復的時侯,所以我們要對其進行備份。關于二進制日志(binlog)的備份,可以基于flush logs方式先切換binlog,然后拷貝&壓縮到到遠程服務器或本地服務器 ......

    uj5u.com 2023-04-20 08:28:06 more
  • day02-短信登錄

    功能實作02 2.功能01-短信登錄 2.1基于Session實作登錄 2.1.1思路分析 2.1.2代碼實作 2.1.2.1發送短信驗證碼 發送短信驗證碼: 發送驗證碼的介面為:http://127.0.0.1:8080/api/user/code?phone=xxxxx<手機號> 請求方式:PO ......

    uj5u.com 2023-04-20 08:27:27 more
  • 快取與資料庫雙寫一致性幾種策略分析

    本文將對幾種快取與資料庫保證資料一致性的使用方式進行分析。為保證高并發性能,以下分析場景不考慮執行的原子性及加鎖等強一致性要求的場景,僅追求最終一致性。 ......

    uj5u.com 2023-04-20 08:26:48 more
  • sql陳述句優化

    問題查找及措施 問題查找 需要找到具體的代碼,對其進行一對一優化,而非一直把關注點放在服務器和sql平臺 降低簡化每個事務中處理的問題,盡量不要讓一個事務拖太長的時間 例如檔案上傳時,應將檔案上傳這一步放在事務外面 微軟建議 4.啟動sql定時執行計劃 怎么啟動sqlserver代理服務-百度經驗 ......

    uj5u.com 2023-04-20 08:26:35 more
  • 云時代,MySQL到ClickHouse資料同步產品對比推薦

    ClickHouse 在執行分析查詢時的速度優勢很好的彌補了MySQL的不足,但是對于很多開發者和DBA來說,如何將MySQL穩定、高效、簡單的同步到 ClickHouse 卻很困難。本文對比了 NineData、MaterializeMySQL(ClickHouse自帶)、Bifrost 三款產品... ......

    uj5u.com 2023-04-20 08:26:29 more
  • sql陳述句優化

    問題查找及措施 問題查找 需要找到具體的代碼,對其進行一對一優化,而非一直把關注點放在服務器和sql平臺 降低簡化每個事務中處理的問題,盡量不要讓一個事務拖太長的時間 例如檔案上傳時,應將檔案上傳這一步放在事務外面 微軟建議 4.啟動sql定時執行計劃 怎么啟動sqlserver代理服務-百度經驗 ......

    uj5u.com 2023-04-20 08:25:13 more
  • Redis 報”OutOfDirectMemoryError“(堆外記憶體溢位)

    Redis 報錯“OutOfDirectMemoryError(堆外記憶體溢位) ”問題如下: 一、報錯資訊: 使用 Redis 的業務介面 ,產生 OutOfDirectMemoryError(堆外記憶體溢位),如圖: 格式化后的報錯資訊: { "timestamp": "2023-04-17 22: ......

    uj5u.com 2023-04-20 08:24:54 more
  • day02-2-商鋪查詢快取

    功能02-商鋪查詢快取 3.商鋪詳情快取查詢 3.1什么是快取? 快取就是資料交換的緩沖區(稱作Cache),是存盤資料的臨時地方,一般讀寫性能較高。 快取的作用: 降低后端負載 提高讀寫效率,降低回應時間 快取的成本: 資料一致性成本 代碼維護成本 運維成本 3.2需求說明 如下,當我們點擊商店詳 ......

    uj5u.com 2023-04-20 08:24:03 more
  • day02-短信登錄

    功能實作02 2.功能01-短信登錄 2.1基于Session實作登錄 2.1.1思路分析 2.1.2代碼實作 2.1.2.1發送短信驗證碼 發送短信驗證碼: 發送驗證碼的介面為:http://127.0.0.1:8080/api/user/code?phone=xxxxx<手機號> 請求方式:PO ......

    uj5u.com 2023-04-20 08:23:11 more