主頁 > 後端開發 > 深入理解執行緒池——細致入微的講原始碼。

深入理解執行緒池——細致入微的講原始碼。

2021-02-15 11:32:35 後端開發

在上一篇博文《圖解執行緒池原理》中,大體上介紹了執行緒池的作業原理,

這一篇從原始碼層面,細致剖析,文章會很長,

如果上篇文章內容沒吸收,先看上篇,先易后難嘛,

本文原始碼是 java 1.8 版本

一、示例代碼


    public static void main(String[] args) throws Exception {
        ExecutorService pool = Executors.newFixedThreadPool(2);
        for(int i =1; i <= 10; i++){
            int index = i;
            pool.execute(new Runnable() {
                @Override
                public void run() {
                    String currentThreadName = Thread.currentThread().getName();
                    log.info("第{}次任務結束,執行者:{}", index, currentThreadName);
                }
            });
        }
        pool.shutdown();
        System.out.println("All thread is over");
    }
    

類圖關系, Executor 是最頂級介面,只有一個 execute() 方法,

最終大數的實作方法,在 ThreadPoolExecutor 這個類中(就是今天講的類),

在這里插入圖片描述
創建執行緒池,最終呼叫的是 ThreadPoolExecutor 類中的方法

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

這里有7 個引數,功能如下

  1. corePoolSize 核心執行緒數
  2. maximumPoolSize 最大執行緒數
  3. keepAliveTime 執行緒空閑時,最大存活時間
  4. unit 存活時間的單位
  5. workQueue 阻塞佇列(存放任務)
  6. threadFactory 執行緒工廠(生產執行緒用)
  7. handler 拒絕策略

二、執行緒控制引數

先看這個,這個COUNT_BITS ,就是數字 29,下面會用到,

	private static final int COUNT_BITS = Integer.SIZE - 3;

CAPACITY 最大執行緒數,(二進制數就是 3 個 0, 29個1)

	private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

在這里插入圖片描述
執行緒池運行狀態


    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
    

如圖,int 值的高三位代表執行緒運行狀態,后 29 位記錄作業執行緒數量
在這里插入圖片描述
執行緒池的五種狀態,轉化關系如圖所示
在這里插入圖片描述


	private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
	private static int ctlOf(int rs, int wc) { return rs | wc; }
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }

ctl 是原子遞增的一個數,這個數高3位,表示執行緒池的運行狀態,后29位是記錄運行的執行緒數,

在這里插入圖片描述
每增加一個執行緒,ctl 增加 1, 每銷毀1個執行緒,ctl 減小1

runStateOf 就是獲取執行緒池的運行狀態,即 ctl 的高三位

在這里插入圖片描述
圖片上,兩個數字,進行與運算,得到的 ctl 值的高三位,低29位一定都是0,

現在不明白,后文用到的時候就明白了,

同樣的,workerCountOf 是獲取正在運行的執行緒數量,即 ctl 的低29位,

在這里插入圖片描述
圖片上的兩個數字,進行與運算,得到的是 ctl 的低29位的值,(高 3 位一定是0啦)

圖中這種狀態,執行緒池是 RUNNING 狀態(看高3位),運行的執行緒數量是 13(低29位的值)

Doug Lea 真的是很牛,用一個數字,即控制了執行緒池的運行狀態,又記錄了執行緒的數量,

三、執行緒池的運行


	public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) { // 1、作業執行緒數量小于核心執行緒數,創建執行緒
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) { // 2、將任務放入阻塞佇列
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false)) // 3、隊滿時,繼續創建執行緒,直至作業執行緒數量達到最大值,
            reject(command); // 4、執行拒絕策略
    }

先明確一點:

傳入進來的任務,要么被 addWorker() 方法接收了,要么被放到阻塞佇列里,要么被拒絕策略給收了,

拿上一篇的例子來說,病人進來,要么被醫生叫走了,要么呆在大廳里,等著被醫生叫走,要么被拒絕策略收走了,

第一步:

workerCountOf(c) < corePoolSize 這個判斷,前半部分是獲取運行的執行緒數,

這個不明白,往上翻看,看看那個圖,

如果該判斷成立,走 addWorker() 方法,

第二步:

作業執行緒數量達到了corePoolSize,那準備往阻塞佇列中放,當然在在執行緒池的處于 RUNNING 狀態,

// 方法很容易懂,ctl 是負數就可以了
    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }

如果將任務成功放入阻塞佇列了,緊接著做了一個判斷 ! isRunning(recheck) && remove(command)

這是執行緒池不是運行狀態,就從阻塞佇列中洗掉任務,

仔細想想,前后判斷了兩次運行狀態,這里是做了極端情況,并發處理,不明說了,

第二個判斷 workerCountOf(recheck) == 0,這個是干啥的?開始我也沒想清楚,

后來明白了,這個是所有醫生都休息了,然后佇列有任務,派個醫生穿上防護服,去等著病人,

這個具體后面再分析,大概知道下,不明白沒關系,后面再解釋,

第三步:

任務放到阻塞佇列失敗了,那就創建作業執行緒,

第四步:

前面幾步都失敗了,執行拒絕策略,

總體上這個方法就講完了,沒有鎖、沒有CAS,不怕有并發么?

鎖控制是在addWorker() 方法中,這是盡可能減小鎖的粒度,提高性能,

然后講重頭戲,addWorker() ,先打下預防針,不太好理解,

  • addWorker() 方法決議
private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

先說這一段


            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
                

看到這一大串,正常都會暈,我也暈,即某些情況下,addWorker() 方法直接回傳 false

我看了很多次,這段代碼與下面的同一個意思,


            if (rs > SHUTDOWN) return false;
            if (rs = SHUTDOWN && firstTask != null) return false;
            if (rs = SHUTDOWN && workQueue.isEmpty()) return false;
            

也就是說,執行緒池是 RUNNING 可以執行 addWorker() 方法,

執行緒池是 SHUTDOWN 且佇列不為空,且是空任務時,可以執行 addWorker() 方法,

其他情況一律不得執行,為什么第二個條件,那么復雜,后面講到再說,

看下 for 回圈


	retry:
	for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c); // 執行緒池運行狀態
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize)) // 控制執行緒數量
                    return false;
                if (compareAndIncrementWorkerCount(c)) // 執行緒數量+1
                    break retry; // 跳出外層 for 回圈,執行下面的操作
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs) 
                    continue retry; // 如果執行緒運行狀態變化,從外層回圈開始,重新執行,否則從內層回圈開始,重新執行,
            }
        }


    private boolean compareAndIncrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect + 1);
    }
    

這個雙層 for 回圈,就是做了一件事,在符合的條件時,ctl 增加1,

這里 break retrycontinue retry 的作用,都寫了注釋,語法不熟悉的,得問度娘,

然后再看下半段


		boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask); // 創建執行緒
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock(); // 獲取全域鎖
                try {
                    int rs = runStateOf(ctl.get());
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) { // 前面說過addWorker 執行的條件,RUNNING, 或者 SHUTDOWN 且 空任務
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w); // 核心,將worker放入集合中
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start(); // 啟動執行緒
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w); // 啟動失敗的執行緒,做相應處理,
        }
        return workerStarted;

這段代碼中 new Worker(firstTask); 是一個重點要說的, 這是 Worker 這個類的繼承關系圖
在這里插入圖片描述
看下這個類的簡化代碼


    private final class Worker extends AbstractQueuedSynchronizer
        implements Runnable
    {
        private static final long serialVersionUID = 6138294804551838833L;
        final Thread thread; // 執行緒
        Runnable firstTask; // 任務
        volatile long completedTasks; // 已完成的任務數
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
        public void run() {
            runWorker(this);
        }
    }

Worker 繼承了 AbstractQueuedSynchronizer,就有了可重入鎖的功能,
實作了 Runnable,就重寫了 run() 方法,

new Worker(firstTask) 時,將任務賦值了,創建了執行緒,并且加了鎖,

這里還要說明下,ThreadPoolExecutor 類中的一個全域變數 workers

 	/**
     * Set containing all worker threads in pool. Accessed only when
     * holding mainLock.
     */
    private final HashSet<Worker> workers = new HashSet<Worker>();
    

原始碼的注釋寫的很清楚,workers 是存放 worker 物件的集合,

且只有拿到鎖的worker物件,才會被放到該集合中,

后半段的核心邏輯是創立執行緒,即 new 一個 Worker物件,把物件放到集合中,然后啟動執行緒,

至此,execute() 方法講的差不多了,除了拒絕策略沒說,

四、回圈執行任務

  • addWorker 方法決議

addWorker 方法中會呼叫 Thread 類中的 start() 方法,然后JVM 會在合適的時間呼叫 run() 方法,


        public void run() {
            runWorker(this);
        }


    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

runWorker 的本質是執行 task.run(),而 task 獲取 是將 阻塞佇列取空為止,

順便說一句,第一次執行時,task 是 worker 在初始化時的那個任務,之后是從阻塞佇列中取任務,

這種設計,實作了一個執行緒執行了多個任務,

說細節:

w.unlock(); // allow interrupts 這行代碼,是不是和我一樣,剛開始覺得很奇怪,

怎么就好端端的來一個 釋放鎖呢? 這是因為在 new Worker物件時,加了鎖

        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker:執行runWorker 不會被中斷
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

關于 AQS 的加鎖與解鎖,這里不詳細解釋,不理解的可以看我之前的博文《ReentrtantLock 分析》

while (task != null || (task = getTask()) != null) 這是回圈取任務,等下詳細說,

下面這一段看著都會頭暈,誰看誰頭暈,


                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                    

原始碼注釋說的很明確,

當執行緒池狀態,大于等于 STOP 時,保證作業執行緒都有中斷標志,

當執行緒池狀態,小于STOP時,保證作業執行緒都沒有中斷標志,

這里詳細解釋下: 先看前半部分

runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))



    private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }

runStateAtLeast(ctl.get(), STOP) = A

那 偽代碼就是 A || (Thread.interrupted() && A)

情況1:A 為 true,那后面不會執行了,整體為true

情況2:A 為false, 執行 Thread.interrupted() ,即清除執行緒的中斷標志,
(Thread.interrupted() && A) 這個條件是 false, 整體結果為false,

前半部分說完了,說后半部分&& !wt.isInterrupted()

上文 情況2時,不會執行后半部分,即判斷結束了,A 為 false,清除執行緒中斷標志,

這也就實作了,執行緒池狀態小于 STOP,作業執行緒保證沒有中斷標志,

情況1時,執行 wt.isInterrupted(), 如果有中斷,回傳true,取反后,整體是false,

如果沒中斷,回傳false,取反后,整體是 true,那會執行 wt.interrupt(),即打個中斷標志,

不管怎樣,最終執行緒會有中斷標志,

也就實作了,執行緒池狀態大于 等于STOP,作業執行緒保證有中斷標志,

有時候,代碼不好理解,真不是讀代碼的人笨,也不是寫代碼的人,寫的不好

.
.

  • processWorkerExit 方法決議

繼續說 runWorker 方法,當阻塞佇列中取出的任務是null時,會跳出 while 回圈,執行 processWorkerExit()

這個方法是在 finally 中執行的,正常情況下,是從阻塞佇列中取任務,取到的是null,此時 completedAbruptly是 false,

當然非正常情況,某個環節拋出了例外,執行了這個方法,此時completedAbruptly 是 true.

具體是哪個環節會拋出例外,后面再說,


private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w); // 洗掉執行緒
        } finally {
            mainLock.unlock();
        }

        tryTerminate(); // 優雅的嘗試關閉執行緒池

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }

如果是出了例外,才執行這個方法的,那作業執行緒的數量減一,這個好理解,

不論是否出例外,都會在加鎖的情況下,洗掉 worker,也就是洗掉了執行緒,

當然,在洗掉執行緒之前,會統計個執行緒完成了多少個任務,

tryTerminate() 這個方法,等到 shutDown() 時再詳細說,這里只要知道,這個方法是有可能關閉執行緒池的,

繼續說這個方法:processWorkerExit ,在最后,當執行緒池狀態是 RUNNING、SHUTDOWN 時,有可能會再次呼叫 addWorker 方法,只是傳入的任務是null,

首先是 completedAbruptly 為 true 時,呼叫 addWorker,這個好理解,上個方法例外退出了,那再起一個執行緒,繼續消費任務,

其次是completedAbruptly 為 false 時,即真的是從阻塞佇列中取的任務是null,作業執行緒的數量小于 min, 這時會呼叫 addWorker,

順便說一句,當作業執行緒數量,小于或等于 corePoolSize,正常不會進到這個方法的,為啥,等會兒說這個,

int min = allowCoreThreadTimeOut ? 0 : corePoolSize 這里 allowCoreThreadTimeOut 默認是false,這里 min 是可以等于corePoolSize的,這種情況的出現,想不明白一定不是作者寫錯了,

corePoolSize 是可以重置的, setCorePoolSize(int corePoolSize) ,特定時間的重置 corePoolSize ,這里的代碼判斷就有必要了,

總之這個方法會銷毀執行緒,也會在必要的情況下創建執行緒,保證作業執行緒不小于corePoolSize ,
.
.

  • getTask() 方法決議

現在退回來,說 runWorker 方法中的這一段 while (task != null || (task = getTask()) != null)

getTask() 方法回傳 null 時,runWorker 會執行結束,進入 processWorkerExit 方法中,銷毀執行緒,

我曾經錯誤的以為,當阻塞佇列中取不到任務了,進入processWorkerExit 這個方法中,

銷毀作業執行緒,但同時維護corePoolSize ,還會創建執行緒,

那不就有可能,前面創建,后面銷毀,回圈不停啦!

后來我發現我錯了,原因就在 getTadk() 的原始碼中,

    /**
     * Performs blocking or timed wait for a task, depending on
     * current configuration settings, or returns null if this worker
     * must exit because of any of:
     * 1. There are more than maximumPoolSize workers (due to
     *    a call to setMaximumPoolSize).
     * 2. The pool is stopped.
     * 3. The pool is shutdown and the queue is empty.
     * 4. This worker timed out waiting for a task, and timed-out
     *    workers are subject to termination (that is,
     *    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
     *    both before and after the timed wait, and if the queue is
     *    non-empty, this worker is not the last thread in the pool.
     *
     * @return task, or null if the worker must exit, in which case
     *         workerCount is decremented
     */
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

英文好的,可以先看下原始碼的注釋

這又是一個無限回圈,先看這段執行緒池運行狀態的判斷,看著會暈

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

這個的意思就是

rs = SHUTDOWN && workQueue.isEmpty() 執行 大括號里的代碼

rs > SHUTDOWN 時,執行大括號里的代碼

也就是說,這兩種情況下,回傳null,進入 processWorkerExit 方法銷毀執行緒了,

其它情況(rs = RUNNING,或者 rs = SHUTDOWN 并且 阻塞佇列不為空),要嘗試回傳任務,

接著看這一行

	boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

timed 這個引數,你可以理解為,是否支持超時機制

false 不支持,從后文的代碼可看出,取任務時,呼叫 workQueue.take()阻塞,代碼就停到這里了,一直等到,其它執行緒往佇列中放了任務,阻塞被喚醒,繼續執行,

true 支持, 取任務時,呼叫 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS),即在一定時間內,取不到任務,直接回傳 null,

allowCoreThreadTimeOut 引數的默認值是 false, 那咱們就只討論 wc > corePoolSize,

當 wc > corePoolSize 時,timed 為 true,即支持超時,一定時間取不到任務,回傳 null,

結合前面所講,可以總結這么兩句話:

當作業執行緒大于 corePoolSize,取任務時,沒有任務,操作會超時,回傳null,進入下一次回圈,

當作業執行緒 不大于 corePoolSize,取任務時,沒有任務,會被阻塞,即代碼停止執行,直到有新的任務進來,

再看下面這一段 ,可以理解為 是否具備銷毀執行緒的條件


            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
            

timedOut 這個引數,可以理解為:是否經歷過取任務超時,false 沒有經歷過, true 經歷過,

條件1: wc > maximumPoolSize || (timed && timedOut)

條件2: wc > 1 || workQueue.isEmpty()

wc > maximumPoolSize 條件1就滿足了,什么時候出現呢? 當重置 maximumPoolSize時會出現這種情況,

那條件1,可以用文字描述為:要么作業執行緒數大于 最大執行緒數,要么是 經歷過取任務超時(支持超時機制,才有可能經歷過取任務超時),才可能去銷毀執行緒,

條件1滿足的情況下,才會判斷條件2

wc > 1 這個說明當前作業執行緒,至少是 2 個,可以銷毀 1 個,

workQueue.isEmpty() 這個說明阻塞佇列中沒有任務了,可以放心銷毀執行緒,


    private boolean compareAndDecrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect - 1);
    }
    

如果 compareAndDecrementWorkerCount 執行成功了,那就直接回傳null,

如果執行失敗了,那就從頭開始,重新判斷,這個不難理解,并發控制的真好

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }

前面說的理解好了,這里就容易理解了,

支持超時等待,就呼叫 非阻塞poll 方法,不支持的呼叫 阻塞 方法 take()

創建執行緒池時,指定的引數 最大執行緒存活時間 keepAliveTime, 就是在這里發揮功效的,

至此,getTask() 方法講完了,籠統的概括:

當作業執行緒 小于等于 corePoolSize,取任務會呼叫 阻塞方法 take()

即佇列中沒有任務會被阻塞,也就是說,執行緒不會被銷毀,

當作業執行緒 大于 corePoolSize, 取任務會呼叫 非阻塞的 poll() 方法,

即佇列中沒有任務時,會超時,在下個回圈中,回傳 null 進入銷毀執行緒的流程,
.
.

五、執行緒池的關閉

    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess(); // 檢查有沒有權限關閉執行緒池
            advanceRunState(SHUTDOWN); // 更改執行緒池的狀態
            interruptIdleWorkers(); // 中斷執行緒
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }

這里的代碼很簡潔,來一個一個分析

  • 1、checkShutdownAccess() 方法決議

    private void checkShutdownAccess() {
        SecurityManager security = System.getSecurityManager();
        if (security != null) {
            security.checkPermission(shutdownPerm);
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                for (Worker w : workers)
                    security.checkAccess(w.thread);
            } finally {
                mainLock.unlock();
            }
        }
    }

這個方法邏輯很清楚,先檢查有沒有權限關閉執行緒池,

如果有,再檢查是否可以中斷每個作業執行緒,

沒有相關權限,會拋出例外,
.

  • 2、advanceRunState() 方法決議

這個方法,最終是把執行緒池的運行狀態,設定為一個大于等于0 的狀態,即大于等于 SHUTDOWN


    private void advanceRunState(int targetState) {
        for (;;) {
            int c = ctl.get();
            if (runStateAtLeast(c, targetState) ||
                ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
                break;
        }
    }
    
    private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }

入參是 SHUTDOWN,如果執行緒池的運行狀態 大于等于 SHUTDOWN,直接跳出,

否則,執行 ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))) 這一段

workerCountOf(c) 是作業執行緒的數量,執行這它的時候,執行緒池的狀是 RUNNING,

workerCountOf(c) 是大于等于0的,所以 ctlOf(targetState, workerCountOf(c))) 計算出來也是大于0的,

ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) 執行的結果就是,執行緒池的狀態設定為大于等于 0,

.

  • 3、interruptIdleWorkers() 方法決議

這個方法,是中斷空閑執行緒,

    private void interruptIdleWorkers() {
        interruptIdleWorkers(false);
    }


    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

首先 mainLock.lock() 表明,整個程序是加鎖進行的,且是拿到主鎖才執行,

!t.isInterrupted() && w.tryLock() 這個條件,

若 t.isInterrupted() 回傳 true,即執行緒本身有中斷標志,取反后是false,后面的就不再執行了,

順便說一句,在 決議 runWorker() 方法時,分析過,

當執行緒池狀態,大于等于 STOP 時,保證作業執行緒都有中斷標志,

當執行緒池狀態,小于STOP時,保證作業執行緒都沒有中斷標志,

作業執行緒沒有中斷標志的情況下,會執行 w.tryLock() 方法,

有沒有疑問,這是什么意思呀?

前面說過,Worker 物件 繼承了 AQS,也就是說,它可以實作加鎖,

回頭可以再看下 runWorker() 方法,在執行這個方法之前,不會被中斷,

在執行任務時,是加鎖的,也不會被中斷,

在執行 getTask() 方法時,是可以被中斷的,


		public boolean tryLock()  { return tryAcquire(1); }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
        

這段加鎖的代碼很容易理解,接著往下說,

在執行任務的執行緒不會被中斷,這也就實作了,即使執行緒池關閉了,正在執行任務的執行緒,依然會把任務給執行完,

拿到鎖之后,會執行 t.interrupt(), 即給執行緒打上中斷標志,

本例中,傳入的引數是 false,那會對所有的空閑執行緒打上中斷標志,
.

那下一個問題,執行緒被打上中斷標志后,有啥影響呢?


結論:在執行 getTask() 方法時,會拋出例外,跳出這個方法,進入processWorkerExit() 銷毀執行緒,

而且此時,執行緒池的狀態若是等于SHUTDOWN,就會會呼叫 addWorker(null, false) ,

前面分析 addWorker() 方法時,我們知道只有兩種情況下,會創建一個新的 Worker,

不記得的話,翻回去看下, 結合現在的場景,只有佇列不為空的時候,分創建新的 Worker,

若佇列為空,不會創建新的Worker,

想想,佇列不為空,說明還有任務存在,銷毀了一個執行緒,再創建一個,也正常,

由此,把前面分析過的幾個方法都串了起來,

如果說,上面的分析你理解了,那下面這句話你會有新的認識:

當執行緒池是SHUTDOWN 狀態時,佇列不為空時,是可以提交空任務,

當執行緒池是STOP 狀態時,不可以提交任務,
.

咱們還有一個問題沒有說,中斷的執行緒怎么就拋出例外了?

getTask() 方法中, 取任務的那行代碼

       Runnable r = timed ?
           workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
           workQueue.take();

在 BlockQueue這個介面中,定義了這兩個方法,是會拋出例外的,


    E poll(long timeout, TimeUnit unit)
        throws InterruptedException;

    E take() throws InterruptedException;
    

以 LinkedBlockingQueue的實作為例子,詳細資訊可以看了之前的博文《LinkedBlockingQueue原始碼圖解》


    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly(); // 執行緒有中斷,直接拋出例外
        try {
            while (count.get() == 0) {
                notEmpty.await();
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

下面是 ReentrantLocklockInterruptibly() 方法的實作


    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }
    
    public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }

至此為止,interruptIdleWorkers() 這個方法決議完了,總結下大概就是

執行緒池狀態大于等于SHUTDOWN 時,會呼叫該方法,將所有空閑的執行緒打個中斷標志,

被打中斷標志的執行緒,會在 getTask() 方法中拋出例外,從而在后一個方法中銷毀執行緒,

若銷毀執行緒后,阻塞佇列中還有任務,還是可以新建 Worker,繼續消費佇列中的任務,
.

  • 4、tryTerminate() 方法決議

onShutdown() 這個方法,在 ThreadPoolExecutor 類中是一個空方法,這里就不講了,

tryTerminate() 這個方法,在 processWorkerExit() 里也呼叫過,當時沒講,放在這里決議


    final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();
                    } finally {
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }

先看這一段,判斷執行緒池狀態的

       if (isRunning(c) ||
           runStateAtLeast(c, TIDYING) ||
           (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
           return;

執行緒池處于運行狀態,不處理,這個好理解,

執行緒池狀態 大于等于TIDYING,不處理,這個說明已經在成功執行過 tryTerminate 方法了,

執行緒池狀態 是SHUTDOWN 且 阻塞佇列不為空,不處理,因為有任務,得讓它干活,不能中斷執行緒,

除此之外,就兩種狀態需要處理了:

執行緒池狀態是STOP,可以關閉執行緒池,

執行緒池狀態是SHUTDOWN 且阻塞佇列為空,


        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE); // 中斷一個空閑執行緒,
            return;
        }
        

這段代碼的意思我懂,當作業執行緒大于0時, 中斷一個空閑的執行緒,

interruptIdleWorkers 這個方法,前面講過了,只是引數變成了true,只中斷一個空閑執行緒,

shutdown() 這個方法中,所有空閑執行緒都被中斷了,簡化的考慮,這段沒用,

但是在 processWorkerExit() 方法中,呼叫 tryTerminate() 是有意義的,

它可以讓阻塞的執行緒,從 getTask() 方法中,跳出來,進入processWorkerExit() 被銷毀掉,

同時會觸發再次中斷一個空閑執行緒,使其從getTask() 方法中,跳出來,進入processWorkerExit() 被銷毀掉,

這就形成了一個回圈,把空閑的執行緒一個一個處理掉了,

那么回過頭來想想, shutdown() 呼叫 interruptIdleWorkers(true),也是有意義的,

因為正在執行任務的執行緒,執行完任務,是可能變成空閑執行緒的,(大概知道就行了,不深入解釋)

再往后,獲取主鎖,修改執行緒池的狀態,

先是設定為 TIDYING狀態,再執行 terminated(),最后執行緒池狀態設定為 TERMINATED,

其中 terminated()ThreadPoolExecutor 這個類中,是一個空方法,不用講了,

最后 termination.signalAll(), 這個是喚醒阻塞的執行緒,這個方法本文的方法中未曾涉及,不講了,

至此執行緒池原始碼決議,完畢,

六、總結

本文從原始碼層面,詳細分析了,執行緒池的創建、運行、關閉,

對應的就是 execute(), addWorker(), runWorker(), shutDown() 這幾個方法,

同時與簡單梳理了,這幾個方法之間相互關聯的地方,

上篇博文《圖解執行緒池原理》,從大方向上介紹了執行緒池的原理 ,本篇深入原始碼,詳細剖析了這幾個方法,

關于執行緒池的四種拒絕策略,單獨寫了一篇《執行緒池四種拒絕策略》,

本文不再分析,拒絕策略本身沒有復雜的邏輯,代碼也不難理解,

還有一些方法,比如 shutdownNow()awaitTermination() 等 不再詳細分析,文章已經夠長啦!

能把本文所講的東西說出來,面試官基本該滿意了吧,

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/259712.html

標籤:java

上一篇:為什么要配置path環境變數

下一篇:springboot啟動原始碼之SpringApplication

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 【C++】Microsoft C++、C 和匯編程式檔案

    ......

    uj5u.com 2020-09-10 00:57:23 more
  • 例外宣告

    相比于斷言適用于排除邏輯上不可能存在的狀態,例外通常是用于邏輯上可能發生的錯誤。 例外宣告 Item 1:當函式不可能拋出例外或不能接受拋出例外時,使用noexcept 理由 如果不打算拋出例外的話,程式就會認為無法處理這種錯誤,并且應當盡早終止,如此可以有效地阻止例外的傳播與擴散。 示例 //不可 ......

    uj5u.com 2020-09-10 00:57:27 more
  • Codeforces 1400E Clear the Multiset(貪心 + 分治)

    鏈接:https://codeforces.com/problemset/problem/1400/E 來源:Codeforces 思路:給你一個陣列,現在你可以進行兩種操作,操作1:將一段沒有 0 的區間進行減一的操作,操作2:將 i 位置上的元素歸零。最終問:將這個陣列的全部元素歸零后操作的最少 ......

    uj5u.com 2020-09-10 00:57:30 more
  • UVA11610 【Reverse Prime】

    本人看到此題沒有翻譯,就附帶了一個自己的翻譯版本 思考 這一題,它的第一個要求是找出所有 $7$ 位反向質數及其質因數的個數。 我們應該需要質數篩篩選1~$10^{7}$的所有數,這里就不慢慢介紹了。但是,重讀題,我們突然發現反向質數都是 $7$ 位,而將它反過來后的數字卻是 $6$ 位數,這就說明 ......

    uj5u.com 2020-09-10 00:57:36 more
  • 統計區間素數數量

    1 #pragma GCC optimize(2) 2 #include <bits/stdc++.h> 3 using namespace std; 4 bool isprime[1000000010]; 5 vector<int> prime; 6 inline int getlist(int ......

    uj5u.com 2020-09-10 00:57:47 more
  • C/C++編程筆記:C++中的 const 變數詳解,教你正確認識const用法

    1、C中的const 1、區域const變數存放在堆疊區中,會分配記憶體(也就是說可以通過地址間接修改變數的值)。測驗代碼如下: 運行結果: 2、全域const變數存放在只讀資料段(不能通過地址修改,會發生寫入錯誤), 默認為外部聯編,可以給其他源檔案使用(需要用extern關鍵字修飾) 運行結果: ......

    uj5u.com 2020-09-10 00:58:04 more
  • 【C++犯錯記錄】VS2019 MFC添加資源不懂如何修改資源宏ID

    1. 首先在資源視圖中,添加資源 2. 點擊新添加的資源,復制自動生成的ID 3. 在解決方案資源管理器中找到Resource.h檔案,編輯,使用整個專案搜索和替換的方式快速替換 宏宣告 4. Ctrl+Shift+F 全域搜索,點擊查找全部,然后逐個替換 5. 為什么使用搜索替換而不使用屬性視窗直 ......

    uj5u.com 2020-09-10 00:59:11 more
  • 【C++犯錯記錄】VS2019 MFC不懂的批量添加資源

    1. 打開資源頭檔案Resource.h,在其中預先定義好宏 ID(不清楚其實ID值應該設定多少,可以先新建一個相同的資源項,再在這個資源的ID值的基礎上遞增即可) 2. 在資源視圖中選中專案資源,按F7編輯資源檔案,按 ID 型別 相對路徑的形式添加 資源。(別忘了先把檔案拷貝到專案中的res檔案 ......

    uj5u.com 2020-09-10 01:00:19 more
  • C/C++編程筆記:關于C++的參考型別,專供新手入門使用

    今天要講的是C++中我最喜歡的一個用法——參考,也叫別名。 參考就是給一個變數名取一個變數名,方便我們間接地使用這個變數。我們可以給一個變數創建N個參考,這N + 1個變數共享了同一塊記憶體區域。(參考型別的變數會占用記憶體空間,占用的記憶體空間的大小和指標型別的大小是相同的。雖然參考是一個物件的別名,但 ......

    uj5u.com 2020-09-10 01:00:22 more
  • 【C/C++編程筆記】從頭開始學習C ++:初學者完整指南

    眾所周知,C ++的學習曲線陡峭,但是花時間學習這種語言將為您的職業帶來奇跡,并使您與其他開發人員區分開。您會更輕松地學習新語言,形成真正的解決問題的技能,并在編程的基礎上打下堅實的基礎。 C ++將幫助您養成良好的編程習慣(即清晰一致的編碼風格,在撰寫代碼時注釋代碼,并限制類內部的可見性),并且由 ......

    uj5u.com 2020-09-10 01:00:41 more
最新发布
  • Rust中的智能指標:Box<T> Rc<T> Arc<T> Cell<T> RefCell<T> Weak

    Rust中的智能指標是什么 智能指標(smart pointers)是一類資料結構,是擁有資料所有權和額外功能的指標。是指標的進一步發展 指標(pointer)是一個包含記憶體地址的變數的通用概念。這個地址參考,或 ” 指向”(points at)一些其 他資料 。參考以 & 符號為標志并借用了他們所 ......

    uj5u.com 2023-04-20 07:24:10 more
  • Java的值傳遞和參考傳遞

    值傳遞不會改變本身,參考傳遞(如果傳遞的值需要實體化到堆里)如果發生修改了會改變本身。 1.基本資料型別都是值傳遞 package com.example.basic; public class Test { public static void main(String[] args) { int ......

    uj5u.com 2023-04-20 07:24:04 more
  • [2]SpinalHDL教程——Scala簡單入門

    第一個 Scala 程式 shell里面輸入 $ scala scala> 1 + 1 res0: Int = 2 scala> println("Hello World!") Hello World! 檔案形式 object HelloWorld { /* 這是我的第一個 Scala 程式 * 以 ......

    uj5u.com 2023-04-20 07:23:58 more
  • 理解函式指標和回呼函式

    理解 函式指標 指向函式的指標。比如: 理解函式指標的偽代碼 void (*p)(int type, char *data); // 定義一個函式指標p void func(int type, char *data); // 宣告一個函式func p = func; // 將指標p指向函式func ......

    uj5u.com 2023-04-20 07:23:52 more
  • Django筆記二十五之資料庫函式之日期函式

    本文首發于公眾號:Hunter后端 原文鏈接:Django筆記二十五之資料庫函式之日期函式 日期函式主要介紹兩個大類,Extract() 和 Trunc() Extract() 函式作用是提取日期,比如我們可以提取一個日期欄位的年份,月份,日等資料 Trunc() 的作用則是截取,比如 2022-0 ......

    uj5u.com 2023-04-20 07:23:45 more
  • 一天吃透JVM面試八股文

    什么是JVM? JVM,全稱Java Virtual Machine(Java虛擬機),是通過在實際的計算機上仿真模擬各種計算機功能來實作的。由一套位元組碼指令集、一組暫存器、一個堆疊、一個垃圾回收堆和一個存盤方法域等組成。JVM屏蔽了與作業系統平臺相關的資訊,使得Java程式只需要生成在Java虛擬機 ......

    uj5u.com 2023-04-20 07:23:31 more
  • 使用Java接入小程式訂閱訊息!

    更新完微信服務號的模板訊息之后,我又趕緊把微信小程式的訂閱訊息給實作了!之前我一直以為微信小程式也是要企業才能申請,沒想到小程式個人就能申請。 訊息推送平臺🔥推送下發【郵件】【短信】【微信服務號】【微信小程式】【企業微信】【釘釘】等訊息型別。 https://gitee.com/zhongfuch ......

    uj5u.com 2023-04-20 07:22:59 more
  • java -- 緩沖流、轉換流、序列化流

    緩沖流 緩沖流, 也叫高效流, 按照資料型別分類: 位元組緩沖流:BufferedInputStream,BufferedOutputStream 字符緩沖流:BufferedReader,BufferedWriter 緩沖流的基本原理,是在創建流物件時,會創建一個內置的默認大小的緩沖區陣列,通過緩沖 ......

    uj5u.com 2023-04-20 07:22:49 more
  • Java-SpringBoot-Range請求頭設定實作視頻分段傳輸

    老實說,人太懶了,現在基本都不喜歡寫筆記了,但是網上有關Range請求頭的文章都太水了 下面是抄的一段StackOverflow的代碼...自己大修改過的,寫的注釋挺全的,應該直接看得懂,就不解釋了 寫的不好...只是希望能給視頻網站開發的新手一點點幫助吧. 業務場景:視頻分段傳輸、視頻多段傳輸(理 ......

    uj5u.com 2023-04-20 07:22:42 more
  • Windows 10開發教程_編程入門自學教程_菜鳥教程-免費教程分享

    教程簡介 Windows 10開發入門教程 - 從簡單的步驟了解Windows 10開發,從基本到高級概念,包括簡介,UWP,第一個應用程式,商店,XAML控制元件,資料系結,XAML性能,自適應設計,自適應UI,自適應代碼,檔案管理,SQLite資料庫,應用程式到應用程式通信,應用程式本地化,應用程式 ......

    uj5u.com 2023-04-20 07:22:35 more