主頁 > 後端開發 > ThreadPoolExecutor原始碼分析-面試問爛了的Java執行緒池執行流程,如果要問你具體的執行細節,你還會嗎?

ThreadPoolExecutor原始碼分析-面試問爛了的Java執行緒池執行流程,如果要問你具體的執行細節,你還會嗎?

2020-11-22 13:35:49 後端開發

Java版本:8u261,

對于Java中的執行緒池,面試問的最多的就是執行緒池中各個引數的含義,又或者是執行緒池執行的流程,彷佛這已成為了固定的模式與套路,但是假如我是面試官,現在我想問一些更細致的問題,你還能答得上來嗎?比如:

  1. 執行緒池是如何實作執行緒復用的?
  2. 如果一個執行緒執行任務的時候拋出例外,那么這個任務是否會被丟棄?
  3. 當前執行緒池中有十個執行緒,其中一個執行緒正在執行任務,那么剩下的九個執行緒正在處于一種什么狀態呢?

相信如果沒有看過執行緒池的相關原始碼實作,這些問題是很難回答得完美的,同時這些問題往深了問還會引出Java中阻塞佇列以及AQS的實作,你都能接得住嗎?

1 簡介

因為執行緒是稀缺資源,如果在高并發的情況下被無限制地創建和銷毀,不僅會消耗系統資源,還會降低系統的穩定性,所以執行緒池的出現就是為了解決這些問題的,執行緒池通過重用已經存在的執行緒資源,減少執行緒創建和銷毀的次數,提高了性能,同時還可以進行統一的分配、調優和監控,

在Java中,可以通過Executors類中的newFixedThreadPool、newCachedThreadPool,newScheduledThreadPool或者其他方式來創建各種執行緒池,它們都會直接或間接地通過ThreadPoolExecutor來進行構建,通過傳入不同的引數來實作不同效果的執行緒池(newScheduledThreadPool比較特殊,它重寫了部分ThreadPoolExecutor的邏輯,后續我會寫一篇對ScheduledThreadPoolExecutor進行原始碼分析的文章),

1.1 執行緒池引數

在ThreadPoolExecutor中共有七個引數:

  • corePoolSize:核心執行緒數,核心執行緒會一直存活,即使沒有任務需要執行(除非allowCoreThreadTimeOut引數設定為true,這樣的話即使是核心執行緒也會被超時銷毀);

  • maximumPoolSize:執行緒池中允許的最大執行緒數;

  • keepAliveTime:維護作業執行緒所允許的空閑時間,如果作業執行緒等待的時間超過了keepAliveTime,則會被銷毀;

  • unit:指定keepAliveTime的單位,如TimeUnit.SECONDS;

  • workQueue:用來保存等待被執行任務的阻塞佇列,常用的有:ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue和PriorityBlockingQueue等;

  • threadFactory:執行緒工廠,提供創建新執行緒的功能,默認的實作是Executors.defaultThreadFactory(),即通過new Thread的方式;

  • handler:如果當前阻塞佇列已滿,并且當前的執行緒數量已超過了最大執行緒數,則會執行相應的拒絕策略,具體有四種(也可以自己實作):

    • AbortPolicy:默認實作,會直接拋出RejectedExecutionException;
    • CallerRunsPolicy:用呼叫者所在的執行緒來執行任務;
    • DiscardPolicy:直接拋棄,任務不執行;
    • DiscardOldestPolicy:丟棄阻塞佇列中最靠前的任務,并執行當前任務,

這四種拒絕策略的實作很簡單,這里就不再過多展示說明了,讀者可自行查看,

1.2 運行程序

ThreadPoolExecutor的大致運行程序如下:

如果使用的是有界阻塞佇列:

有新的任務需要執行,并且當前執行緒池的執行緒數小于核心執行緒數,則創建一個核心執行緒來執行,如果當前執行緒數大于核心執行緒數,則會將除了核心執行緒處理的任務之外剩下的任務加入到阻塞佇列中等待執行,如果佇列已滿,則在當前執行緒數不大于最大執行緒數的前提下,創建新的非核心執行緒,處理完畢后等到達keepAliveTime空閑時間后會被直接銷毀(注意,不一定銷毀的就是這些非核心執行緒,核心執行緒也可能被銷毀,只要減到剩余執行緒數到達核心執行緒數就行,核心執行緒和非核心執行緒的區別僅在于判斷是否到達閾值時有區別:核心執行緒判斷的是核心執行緒數,而非核心執行緒判斷的是最大執行緒數,僅此一個區別,后面講原始碼時會再強調這一點),如果當前執行緒數大于最大執行緒數,則會執行相應的拒絕策略,

如果使用的是無界阻塞佇列:

與有界阻塞佇列相比,除非系統資源耗盡,否則無界的阻塞佇列不存在任務入隊失敗的情況,當有新任務到來,系統的執行緒數小于核心執行緒數時,則創建一個核心執行緒來執行,當達到核心執行緒數后,就不會繼續增加,若后續仍有新的任務加入,而沒有空閑的執行緒資源,則任務直接進入阻塞佇列中進行等待,如果任務創建和處理任務的速度差異很大,無界阻塞佇列會保持快速增長,直到耗盡系統記憶體,

img

1.3 執行緒池狀態

在ThreadPoolExecutor中存在五種狀態:

  • RUNNING:初始狀態,在此狀態下能夠接收新任務,以及對已經添加的任務進行處理;
  • SHUTDOWN:通過呼叫shutdown方法,執行緒池轉成SHUTDOWN狀態,此時不再接收新任務,但是能處理已經添加的任務;
  • STOP:通過呼叫shutdownNow方法,執行緒池轉成STOP狀態,此時不再接收新任務,不處理已經添加的任務,并且會中斷正在處理的任務;
  • TIDYING:當執行緒池中所有的任務已經終止了,任務數量為0并且阻塞佇列為空的時候,會進入到TIDYING狀態,此時會呼叫一個鉤子方法terminated,它是一個空的實作,可以供呼叫者覆寫;
  • TREMINATED:執行緒池徹底終止的狀態,當執行緒池處于TIDYING狀態時,執行完terminated方法后,就會進入到該狀態,

img

在ThreadPoolExecutor中狀態是通過ctl屬性中的高3位來表示的:

 1 //ctl中包含兩部分資訊:高3位表示運行狀態,低29位保存作業執行緒數量,初始狀態是RUNNING
 2 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
 3 //29
 4 private static final int COUNT_BITS = Integer.SIZE - 3;
 5 //1左移29位后-1,也就是29個1,用來表示作業執行緒數量的最大值
 6 private static final int CAPACITY = (1 << COUNT_BITS) - 1;
 7
 8 //ctl高3位為111(低29位都為0)
 9 private static final int RUNNING = -1 << COUNT_BITS;
10 //ctl高3位為000(低29位都為0)
11 private static final int SHUTDOWN = 0 << COUNT_BITS;
12 //ctl高3位為001(低29位都為0)
13 private static final int STOP = 1 << COUNT_BITS;
14 //ctl高3位為010(低29位都為0)
15 private static final int TIDYING = 2 << COUNT_BITS;
16 //ctl高3位為011(低29位都為0)
17 private static final int TERMINATED = 3 << COUNT_BITS;
18
19 //獲取ctl的高3位(低29位都為0)也就是獲取運行狀態
20 private static int runStateOf(int c) {
21     return c & ~CAPACITY;
22 }
23
24 //獲取ctl的低29位(高3位都為0)也就是獲取作業執行緒數量
25 private static int workerCountOf(int c) {
26     return c & CAPACITY;
27 }
28
29 //用來獲取運行狀態和作業執行緒數量拼接起來的值
30 private static int ctlOf(int rs, int wc) {
31     return rs | wc;
32 }
33
34 //判斷ctl是否小于s所代表狀態的值
35 private static boolean runStateLessThan(int c, int s) {
36     return c < s;
37 }
38
39 //判斷ctl是否大于等于s所代表狀態的值
40 private static boolean runStateAtLeast(int c, int s) {
41     return c >= s;
42 }
43
44 //判斷ctl此時是否是RUNNING狀態
45 private static boolean isRunning(int c) {
46     return c < SHUTDOWN;
47 }

1.4 Worker

Worker是ThreadPoolExecutor中的一個內部類,用來封裝作業執行緒:

 1 private final class Worker
 2         extends AbstractQueuedSynchronizer
 3         implements Runnable {
 4     //...
 5
 6     //正在運行Worker的執行緒
 7     final Thread thread;
 8     //傳入的任務
 9     Runnable firstTask;
10     //本Worker已完成的任務數,用于后續的統計與監控
11     volatile long completedTasks;
12
13     Worker(Runnable firstTask) {
14         /*
15         這里設定AQS的state初始為-1是為了將執行緒發生中斷的動作延遲到任務真正開始運行的時候,換句話說就是
16         禁止在執行任務前對執行緒進行中斷,在呼叫一些像shutdown和shutdownNow等方法中會去中斷執行緒,而在
17         中斷前會呼叫tryLock方法嘗試加鎖,而這里設定為-1后,tryLock方法就會回傳為false,所以就不能中斷了
18          */
19         setState(-1);
20         this.firstTask = firstTask;
21         this.thread = getThreadFactory().newThread(this);
22     }
23
24     //因為Worker類實作了Runnable介面,所以當呼叫thread.start方法時最侄訓呼叫到此處運行
25     public void run() {
26         runWorker(this);
27     }
28
29     //...
30 }

由上可以看到Worker繼承了AQS(我之前寫過對AQS、ReentrantLock和阻塞佇列進行原始碼分析的文章,感興趣的可以查看AQS原始碼深入分析之獨占模式-ReentrantLock鎖特性詳解、AQS原始碼深入分析之條件佇列-你知道Java中的阻塞佇列是如何實作的嗎?),并實作了Runnable介面,之后在分析原始碼時將會看到:在運行Worker的時候之所以沒有用ReentrantLock作為獨占鎖來使用是因為這里是要求不可重入的,而ReentrantLock是可重入鎖,在像一些setCorePoolSize方法去手動更改核心執行緒數時,如果修改的值比原本的小,那么多余的執行緒會被中斷、會中斷正在運行著的的執行緒,所以使用自己實作的不可重入獨占鎖而不是使用ReentrantLock就是為了不想讓像setCorePoolSize這樣的方法來重新獲取到鎖資源,不想讓正在運行的執行緒發生自我中斷,其實上面所說的內容在Worker類的注釋中都已經解釋了:

img

2 構造器

 1 /**
 2  * ThreadPoolExecutor:
 3  * 全引數構造器,其他構造器最終都會呼叫到這里
 4  */
 5 public ThreadPoolExecutor(int corePoolSize,
 6                           int maximumPoolSize,
 7                           long keepAliveTime,
 8                           TimeUnit unit,
 9                           BlockingQueue<Runnable> workQueue,
10                           ThreadFactory threadFactory,
11                           RejectedExecutionHandler handler) {
12     //非法引數校驗
13     if (corePoolSize < 0 ||
14             maximumPoolSize <= 0 ||
15             maximumPoolSize < corePoolSize ||
16             keepAliveTime < 0)
17         throw new IllegalArgumentException();
18     //非空校驗
19     if (workQueue == null || threadFactory == null || handler == null)
20         throw new NullPointerException();
21     //如果安全管理器不為空,就進行權限訪問(本文不展開分析)
22     this.acc = System.getSecurityManager() == null ?
23             null :
24             AccessController.getContext();
25     this.corePoolSize = corePoolSize;
26     this.maximumPoolSize = maximumPoolSize;
27     this.workQueue = workQueue;
28     //將keepAliveTime轉換成納秒
29     this.keepAliveTime = unit.toNanos(keepAliveTime);
30     this.threadFactory = threadFactory;
31     this.handler = handler;
32 }

3 execute方法

 1 /**
 2  * ThreadPoolExecutor:
 3  */
 4 public void execute(Runnable command) {
 5     //非空校驗
 6     if (command == null)
 7         throw new NullPointerException();
 8     int c = ctl.get();
 9     //如果當前執行緒數小于核心執行緒數的話,就直接創建一個核心執行緒
10     if (workerCountOf(c) < corePoolSize) {
11         if (addWorker(command, true))
12             return;
13         /*
14         添加失敗(可能是執行緒池狀態是SHUTDOWN或以上的狀態(SHUTDOWN狀態下不再接收
15         新任務),也可能是執行緒數超過閾值了),就重新獲取一下ctl的值,走下面的邏輯
16          */
17         c = ctl.get();
18     }
19     /*
20     走到這里說明當前執行緒數大于等于核心執行緒數,又或者是上面添加核心執行緒失敗中解釋的情況
21     此時就判斷一下當前執行緒池是否是RUNNING狀態,如果是的話就往阻塞佇列入隊
22     這里offer跟put的區別是如果佇列已滿,offer不會被阻塞,而是立即回傳false
23      */
24     if (isRunning(c) && workQueue.offer(command)) {
25         int recheck = ctl.get();
26         /*
27         這里會再次檢查一次當前執行緒池是否是RUNNING狀態,可能此時執行緒池已經shutdown了
28         如果不是RUNNING狀態,就洗掉上面入隊的任務,并執行相應的拒絕策略
29          */
30         if (!isRunning(recheck) && remove(command))
31             reject(command);
32         /*
33         此時還會去判斷一下是否當前的作業執行緒數已經為0了(可能這些執行緒在上次workerCountOf
34         檢查后(第10行代碼處)被銷毀了(allowCoreThreadTimeOut設定為true)),如果是
35         的話就新創建一個空任務的非核心執行緒,注意,這里傳進addWorker方法的是空任務,因為任務
36         已經在阻塞佇列中存在了,所以這個Worker執行的時候,會直接從阻塞佇列中取出任務來執行
37         所以說這里的意義也就是要保證執行緒池在RUNNING狀態下必須要有一個執行緒來執行任務
38          */
39         else if (workerCountOf(recheck) == 0)
40             addWorker(null, false);
41     } else if (!addWorker(command, false))
42         /*
43         走到這里說明執行緒池不是RUNNING狀態,或者阻塞佇列已滿,此時創建一個非核心執行緒去執行
44         如果創建失敗,說明執行緒池的狀態已經不是RUNNING了,又或者當前執行緒數已經大于等于最大執行緒數了
45         那么就執行相應的拒絕策略
46          */
47         reject(command);
48 }
49
50 /**
51  * 第30行代碼處:
52  */
53 public boolean remove(Runnable task) {
54     //阻塞佇列中洗掉這個任務
55     boolean removed = workQueue.remove(task);
56     //根據執行緒池狀態來判斷是否應該結束執行緒池
57     tryTerminate(); 
58     return removed;
59 }
60
61 /**
62  * 第31行和第47行代碼處:
63  */
64 final void reject(Runnable command) {
65     //根據是哪種拒絕策略,來具體執行其中的邏輯(具體的四種拒絕策略的代碼這里就不再看了,都是很簡單的)
66     handler.rejectedExecution(command, this);
67 }

4 addWorker方法

在上面添加任務時會呼叫到addWorker方法:

  1 /**
  2  * ThreadPoolExecutor:
  3  */
  4 private boolean addWorker(Runnable firstTask, boolean core) {
  5     retry:
  6     for (; ; ) {
  7         int c = ctl.get();
  8         //獲取當前執行緒池的運行狀態
  9         int rs = runStateOf(c);
 10 
 11         /*
 12         如果當前執行緒池狀態大于SHUTDOWN,就直接回傳false,表示不再添加新的Worker
 13         如果當前執行緒池的狀態是SHUTDOWN(此時不再接收新的任務,但是還是會繼續處理
 14         阻塞佇列中的任務),但是firstTask不為null(相當于新的任務)或者阻塞佇列為空
 15         (為空說明也沒有必要去創建Worker了)的話,也直接回傳false,不再添加新的Worker
 16          */
 17         if (rs >= SHUTDOWN &&
 18                 !(rs == SHUTDOWN &&
 19                         firstTask == null &&
 20                         !workQueue.isEmpty()))
 21             return false;
 22 
 23         for (; ; ) {
 24             //重新獲取當前執行緒池的作業執行緒數
 25             int wc = workerCountOf(c);
 26             /*
 27             <1>如果當前執行緒數大于等于最大值;
 28             <2.1>如果是核心執行緒,當前執行緒數大于等于核心執行緒數;
 29             <2.2>如果是非核心執行緒,當前執行緒數大于等于最大執行緒數
 30             以上兩個條件任意一個滿足,就說明當前執行緒數已經達到閾值了,
 31             也直接回傳false,不再添加新的任務
 32              */
 33             if (wc >= CAPACITY ||
 34                     wc >= (core ? corePoolSize : maximumPoolSize))
 35                 return false;
 36             /*
 37             CAS嘗試對ctl+1,也就是作業執行緒數量+1,如果成功了,就跳出死回圈,
 38             從第58行代碼處繼續往下執行
 39              */
 40             if (compareAndIncrementWorkerCount(c))
 41                 break retry;
 42             //如果CAS+1失敗了,重新讀此時ctl的最新值
 43             c = ctl.get();
 44             /*
 45             如果發現此時的運行狀態和之前剛進入該方法時的運行狀態不相等,
 46             說明在此期間發生了狀態的改變,那么就從頭開始重試
 47              */
 48             if (runStateOf(c) != rs)
 49                 continue retry;
 50             /*
 51             走到這里說明狀態沒有發生改變,但是之前ctl+1的CAS操作失敗了,那么重新從第25
 52             行代碼處繼續往下執行
 53              */
 54         }
 55     }
 56
 57     //上面的死回圈主要是為了對ctl做+1的操作,而下面是為了創建Worker
 58     boolean workerStarted = false;
 59     boolean workerAdded = false;
 60     Worker w = null;
 61     try {
 62         //根據firstTask來創建一個Worker(如上面所說,AQS中的state初始值為-1,防止被中斷)
 63         w = new Worker(firstTask);
 64         //每一個Worker都會創建一個Thread
 65         final Thread t = w.thread;
 66         if (t != null) {
 67             final ReentrantLock mainLock = this.mainLock;
 68             //上鎖
 69             mainLock.lock();
 70             try {
 71                 //重新獲取當前執行緒池的運行狀態
 72                 int rs = runStateOf(ctl.get());
 73 
 74                 /*
 75                 如果執行緒池當前狀態是RUNNING狀態,或者是SHUTDOWN狀態并且firstTask
 76                 為空(意味著不去處理新任務而是去處理阻塞佇列中的任務),才能將創建的
 77                 新Worker添加到workers集合中
 78                  */
 79                 if (rs < SHUTDOWN ||
 80                         (rs == SHUTDOWN && firstTask == null)) {
 81                     /*
 82                     此時執行緒還沒有start,但是isAlive方法回傳true,說明這個執行緒是有問題的,
 83                     直接拋出例外
 84                      */
 85                     if (t.isAlive())
 86                         throw new IllegalThreadStateException();
 87                     //在workers集合(HashSet,因為已經加鎖了,所以HashSet就行)里面添加本Worker
 88                     workers.add(w);
 89                     int s = workers.size();
 90                     /*
 91                     如果當前執行緒池中執行緒數量超過了largestPoolSize,就更新一下largestPoolSize為
 92                     當前執行緒數量,即largestPoolSize中保存著執行緒池中出現過的最大執行緒數,用于統計監控
 93                      */
 94                     if (s > largestPoolSize)
 95                         largestPoolSize = s;
 96                     //創建Worker成功
 97                     workerAdded = true;
 98                 }
 99             } finally {
100                 //釋放鎖
101                 mainLock.unlock();
102             }
103             if (workerAdded) {
104                 //如果上面workers集合添加Worker成功,就用Worker中的thread來啟動執行緒
105                 t.start();
106                 workerStarted = true;
107             }
108         }
109     } finally {
110         if (!workerStarted)
111             //如果沒添加成功,就執行失敗處理
112             addWorkerFailed(w);
113     }
114     return workerStarted;
115 }
116
117 private void addWorkerFailed(Worker w) {
118     final ReentrantLock mainLock = this.mainLock;
119     //上鎖
120     mainLock.lock();
121     try {
122         //如果之前創建Worker成功了,就從workers集合中洗掉它
123         if (w != null)
124             workers.remove(w);
125         //將ctl-1,里面使用了死回圈確保CAS操作一定成功
126         decrementWorkerCount();
127         //根據執行緒池狀態來判斷是否應該結束執行緒池
128         tryTerminate();
129     } finally {
130         //釋放鎖
131         mainLock.unlock();
132     }
133 }

5 runWorker方法

因為Worker類實作了Runnable介面,所以當呼叫thread.start方法時最侄訓呼叫到Worker的run方法處:

  1 /**
  2  * ThreadPoolExecutor:
  3  * 當呼叫t.start()方法時最侄訓呼叫到此處
  4  */
  5 public void run() {
  6     runWorker(this);
  7 }
  8
  9 final void runWorker(Worker w) {
 10     //獲取當前執行緒(當前執行緒也就是在Worker中的thread)
 11     Thread wt = Thread.currentThread();
 12     Runnable task = w.firstTask;
 13     //把Worker中的firstTask清空,因為下面要執行它了
 14     w.firstTask = null;
 15     /*
 16     因為之前創建Worker的時候將AQS的state初始為-1,是為了防止執行緒被中斷
 17     而這里unlock方法是把state重置為0,意思就是已經進入到runWorker方法
 18     中,可以允許中斷了
 19      */
 20     w.unlock();
 21     boolean completedAbruptly = true;
 22     try {
 23         //如果task不為空,或者從阻塞佇列中拿取到任務了
 24         while (task != null || (task = getTask()) != null) {
 25             /*
 26             上鎖(注意,這里是用Worker而不是ReentrantLock來加鎖的,為了確保
 27             以下的代碼不會被同一執行緒所重入,同時可以做到不同執行緒可以并發執行)
 28              */
 29             w.lock();
 30             /*
 31             如果當前執行緒池狀態大于等于STOP,確保當前執行緒也是需要中斷的(因為這個時候要
 32             結束執行緒池了,不能再添加新的執行緒);否則如果在上面這個判斷不滿足之后呼叫了shutdownNow
 33             方法的時候(注意,shutdownNow方法是ReentrantLock上鎖,而代碼走到
 34             這里是當前Worker上鎖,兩者上的不是同一個鎖,所以可以并發執行),
 35             之前的狀態要么是RUNNING要么是SHUTDOWN,在走完第一個runStateAtLeast
 36             判斷條件發現不滿足后,現在執行了shutdownNow方法將狀態改為了STOP,
 37             同時設定Worker中斷位,那么此時在該處的第二個判斷Thread.interrupted()回傳true,
 38             同時執行緒池的狀態此時已經改為了STOP,那么也會去中斷這個執行緒(注意,這里說的
 39             乃至整個ThreadPoolExecutor中我說的中斷執行緒并不是會去真的中斷,
 40             wt.interrupt()只是會設定一個中斷標志位,需要使用者在run方法中首先
 41             通過isInterrupted方法去進行判斷,是否應該執行接下來的業務代碼)
 42              */
 43             if ((runStateAtLeast(ctl.get(), STOP) ||
 44                     (Thread.interrupted() &&
 45                             runStateAtLeast(ctl.get(), STOP))) &&
 46                     !wt.isInterrupted())
 47                 wt.interrupt();
 48             try {
 49                 //鉤子方法,空實作
 50                 beforeExecute(wt, task);
 51                 Throwable thrown = null;
 52                 try {
 53                     //這里就是在具體執行執行緒的任務了(也就是使用者具體寫的任務)
 54                     task.run();
 55                 } catch (RuntimeException x) {
 56                     thrown = x;
 57                     throw x;
 58                 } catch (Error x) {
 59                     thrown = x;
 60                     throw x;
 61                 } catch (Throwable x) {
 62                     thrown = x;
 63                     throw new Error(x);
 64                 } finally {
 65                     //鉤子方法,空實作
 66                     afterExecute(task, thrown);
 67                 }
 68             } finally {
 69                 //這里將task置為null,下次回圈的時候就會在阻塞佇列中拿取下一個任務了
 70                 task = null;
 71                 //完成的任務數+1
 72                 w.completedTasks++;
 73                 //釋放鎖
 74                 w.unlock();
 75             }
 76         }
 77         //回圈執行上面的while回圈來拿取任務,而走到這里說明Worker和阻塞佇列中都已經沒有了任務
 78         completedAbruptly = false;
 79     } finally {
 80         //最后對Worker做收尾作業
 81         processWorkerExit(w, completedAbruptly);
 82     }
 83 }
 84
 85 private void processWorkerExit(Worker w, boolean completedAbruptly) {
 86     /*
 87     completedAbruptly為true表示在runWorker方法中的while回圈中拋出了例外,那么此時
 88     作業執行緒是沒有-1的,需要-1(正常情況下在while回圈最后一次呼叫getTask方法中會-1)
 89      */
 90     if (completedAbruptly)
 91         decrementWorkerCount();
 92
 93     final ReentrantLock mainLock = this.mainLock;
 94     //上鎖
 95     mainLock.lock();
 96     try {
 97         //累加所有Worker已經完成的任務數,用于統計監控
 98         completedTaskCount += w.completedTasks;
 99         /*
100         把當前Worker(也就是當前執行緒)剔除出workers集合中,等待GC
101         注意,能走到這里,說明在getTask方法中的timed標志位肯定為true(為false的話就會在getTask方法中的
102         take方法中一直被阻塞,中斷喚醒也不可能,因為這種情況下還是會繼續在getTask方法中回圈),那么無外乎兩種情況,
103         要么是空閑的核心執行緒超時需要被銷毀,要么是空閑的非核心執行緒超時需要被銷毀,不管屬于哪一種,當前執行緒都是
104         要被銷毀的
105          */
106         workers.remove(w);
107     } finally {
108         //釋放鎖
109         mainLock.unlock();
110     }
111
112     //根據執行緒池狀態來判斷是否應該結束執行緒池
113     tryTerminate();
114 
115     int c = ctl.get();
116     //如果當前執行緒池處在RUNNING或SHUTDOWN狀態
117     if (runStateLessThan(c, STOP)) {
118         //通過之前的分析,如果completedAbruptly為false,表明此時已經沒有任務可以執行了
119         if (!completedAbruptly) {
120             //如果allowCoreThreadTimeOut為true,min就為0,否則為核心執行緒數
121             int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
122             /*
123             如果阻塞佇列不為空(可能代碼執行到這里阻塞佇列中又有資料了),并且allowCoreThreadTimeOut
124             為true,就將min改為1
125              */
126             if (min == 0 && !workQueue.isEmpty())
127                 min = 1;
128             /*
129             兩種情況:
130             <1>如果阻塞佇列不為空,并且allowCoreThreadTimeOut為true,就判斷一下當前作業執行緒數是否大于等于1,
131             如果是的話就直接回傳,不是的話說明當前沒有作業執行緒了,就添加一個非核心執行緒去執行阻塞佇列中的任務
132             <2>如果allowCoreThreadTimeOut為false,就判斷一下下當前作業執行緒數是否大于等于核心執行緒數,如果是
133             的話就直接回傳,不是的話說明當前作業執行緒數小于核心執行緒數,那么也去添加一個非核心執行緒
134              */
135             if (workerCountOf(c) >= min)
136                 return;
137         }
138         /*
139         上面已經分析了在completedAbruptly為false時的兩種情況,下面來分析第三種情況,也就是completedAbruptly為
140         true的時候,completedAbruptly為true表示在runWorker方法中的while回圈中拋出了例外,那么也去添加一個
141         非核心執行緒(雖然之前那個報錯的任務是會在finally子句中被清空的,但是在這之前使用者可以覆寫afterExecute
142         鉤子方法,在其中保存這個執行失敗的任務,以此來進行后續的處理,從這個角度上來說,添加一個非核心執行緒還是
143         有意義的,另外,如之前的分析,在addWorker方法中的第34行代碼處,核心執行緒和非核心執行緒的區別僅在于閾值的判斷上,
144         其他都是一樣的,所以這里添加一個非核心執行緒也是可以的,反正沒達到閾值)
145          */
146         addWorker(null, false);
147     }
148 }

6 getTask方法

由上所示,在第24行代碼處,當本Worker中的task任務為空時,就會從阻塞佇列中拿取任務,也就是呼叫到getTask方法:

 1 /**
 2  * ThreadPoolExecutor:
 3  */
 4 private Runnable getTask() {
 5     //timedOut標志位用來判斷poll方法拿取任務是否超時了
 6     boolean timedOut = false;
 7 
 8     for (; ; ) {
 9         int c = ctl.get();
10         //重新獲取當前執行緒池的運行狀態
11         int rs = runStateOf(c);
12 
13         /*
14         如果當前執行緒池是SHUTDOWN狀態,并且阻塞佇列為空的時候;或者當前執行緒池的狀態大于等于STOP
15         以上兩種情況都會將作業執行緒-1,直接回傳null,因為這兩種情況下不需要
16         獲取任務了,作業執行緒-1后,后續會在processWorkerExit方法中從workers集合中剔除掉這個Worker等待GC的
17          */
18         if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
19             decrementWorkerCount();
20             return null;
21         }
22
23         /*
24         走到這里說明當前執行緒池要么是RUNNING狀態,要么是SHUTDOWN狀態但是阻塞佇列不為空(SHUTDOWN狀態還是要
25         處理阻塞佇列中的任務的)
26 
27         重新獲取當前執行緒池的作業執行緒數
28          */
29         int wc = workerCountOf(c);
30 
31         /*
32         timed標志位表示作業執行緒是否需要超時銷毀
33         如果allowCoreThreadTimeOut設定為true(表示空閑的核心執行緒也是要超時銷毀的),或者當前執行緒數大于
34         核心執行緒數(這個條件代表的是空閑的非核心執行緒是要被銷毀的,如果allowCoreThreadTimeOut為false,
35         那么執行緒池中最多保留“傳進執行緒池中的核心執行緒數”個執行緒),就將timed置為true
36          */
37         boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
38
39         /*
40         如果當前作業執行緒數大于最大執行緒數,可能是呼叫了setMaximumPoolSize方法,把最大執行緒數改小了(走到這里
41         說明addWorker方法運行成功,而在addWorker方法中的第34行代碼處已經判斷了大于最大執行緒數的情況);
42         timedOut為true說明當前已經不是第一次回圈了,在上次回圈中已經發生了poll的超時,所以總結來說這個if條件的意思是:
43         <1.1>如果當前作業執行緒數大于最大執行緒數
44         <1.2>或者當前執行緒處于空閑狀態并且是需要被銷毀的
45         <2.1>并且當前作業執行緒要有多于一個
46         <2.2>或者當前阻塞佇列是空的
47         滿足上面兩個條件,就將作業執行緒-1,去掉當前這個多余的執行緒,然后直接回傳
48          */
49         if ((wc > maximumPoolSize || (timed && timedOut))
50                 && (wc > 1 || workQueue.isEmpty())) {
51             //這里的方法和decrementWorkerCount方法的區別是不會死回圈去一直CAS嘗試,如果失敗了就直接回傳false
52             if (compareAndDecrementWorkerCount(c))
53                 return null;
54             //如果CAS-1失敗了,就進入到下次回圈中繼續判斷即可
55             continue;
56         }
57
58         try {
59             /*
60             如果timed為true,則通過poll方法進行限時拿取(超過keepAliveTime時間沒有拿取到,就直接回傳null),
61             否則通過take方法進行拿取(如果阻塞佇列為空,take方法在此時就會被阻塞住,也就是本執行緒會被阻塞住,直到
62             阻塞佇列中有資料了,也就是說如果timed為false的話,這些作業執行緒會一直被阻塞在這里)
63              */
64             Runnable r = timed ?
65                     workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
66                     workQueue.take();
67             if (r != null)
68                 //如果拿取到任務了,就直接回傳給Worker處理
69                 return r;
70             /*
71             走到這里說明發生了poll超時,那么將timedOut標志位置為true,進入到下一次回圈中重試
72             (大概率會走到第53行代碼處回傳null)
73              */
74             timedOut = true;
75         } catch (InterruptedException retry) {
76             //如果在阻塞的程序中發生了中斷,那么將timedOut置為false,也進入到下一次回圈中重試
77             timedOut = false;
78         }
79     }
80     /*
81     以上的邏輯說明了:核心執行緒和非核心執行緒的區別并不是在Worker中有個表示是否是核心執行緒的屬性,Worker是無狀態的,
82     每個Worker都是一樣的,而區分是通過判斷當前作業執行緒數是否大于核心執行緒數來進行的(因為只有阻塞佇列滿了的時候
83     才會去創建新的非核心執行緒,也就會使作業執行緒數大于核心執行緒數),如果大于,那么不管之前這個執行緒到底是核心執行緒
84     還是非核心執行緒,現在我就認定當前這個執行緒就是“非核心執行緒“,那么等這個“非核心執行緒”空閑時間超過keepAliveTime后,
85     就會被銷毀
86      */
87 }


7 shutdown方法

關閉執行緒池時一般呼叫的是shutdown方法,而不是shutdownNow方法:

 1 /**
 2  * ThreadPoolExecutor:
 3  */
 4 public void shutdown() {
 5     final ReentrantLock mainLock = this.mainLock;
 6     //上鎖
 7     mainLock.lock();
 8     try {
 9         //如果有安全管理器,確保呼叫者有權限關閉執行緒池(本文不展開分析)
10         checkShutdownAccess();
11         //將執行緒池狀態改為SHUTDOWN,里面使用了死回圈確保CAS操作一定成功
12         advanceRunState(SHUTDOWN);
13         interruptIdleWorkers();
14         //鉤子方法,空實作
15         onShutdown();
16     } finally {
17         //釋放鎖
18         mainLock.unlock();
19     }
20     //根據執行緒池狀態來判斷是否應該結束執行緒池
21     tryTerminate();
22 }
23
24 /**
25  * 第13行代碼處:
26  */
27 private void interruptIdleWorkers() {
28     //中斷所有的空閑執行緒
29     interruptIdleWorkers(false);
30 }
31
32 private void interruptIdleWorkers(boolean onlyOne) {
33     final ReentrantLock mainLock = this.mainLock;
34     //上鎖
35     mainLock.lock();
36     try {
37         for (Worker w : workers) {
38             Thread t = w.thread;
39             if (!t.isInterrupted() && w.tryLock()) {
40                 try {
41                     /*
42                     如果當前Worker中的執行緒沒有被中斷過,且嘗試加鎖成功,就將
43                     中斷標志位重新置為true,意思就是說要中斷這個空閑的Worker
44                      */
45                     t.interrupt();
46                 } catch (SecurityException ignore) {
47                 } finally {
48                     //將AQS中的state復位為0,恢復為tryLock之前的狀態
49                     w.unlock();
50                 }
51             }
52             if (onlyOne)
53                 //如果onlyOne為true,就只嘗試中斷一次
54                 break;
55         }
56     } finally {
57         //釋放鎖
58         mainLock.unlock();
59     }
60 }

8 tryTerminate方法

在上面的實作中可以看到有多處呼叫到了tryTerminate方法,以此來判斷當前執行緒池是否應該結束:

 1 /**
 2  * ThreadPoolExecutor:
 3  * (注:該方法放在最后再看比較好)
 4  */
 5 final void tryTerminate() {
 6     for (; ; ) {
 7         int c = ctl.get();
 8         /*
 9         <1>如果當前執行緒池是RUNNING狀態,就直接回傳,因為這時候不需要結束執行緒池
10         <2>如果當前執行緒池是TIDYING或TERMINATED狀態,也直接回傳,這時候就等著
11         修改狀態的那個執行緒把terminated方法執行完畢就行了
12         <3>如果當前執行緒池是SHUTDOWN狀態并且阻塞佇列不為空,也直接回傳,因為這時
13         候還是要去執行阻塞佇列中的任務的,不能改變執行緒池狀態
14          */
15         if (isRunning(c) ||
16                 runStateAtLeast(c, TIDYING) ||
17                 (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))
18             return;
19         /*
20         走到這里說明有兩種情況,要么當前執行緒池是STOP狀態,要么當前執行緒池是SHUTDOWN狀態并且阻塞佇列為空
21         這個時候是否可以結束執行緒池還要查看一下當前的作業執行緒數,如果不為0,說明當前執行緒不是最后一個執行任務
22         的執行緒(因為如果當前要銷毀的執行緒是空閑狀態,會最終在getTask方法中完成-1的動作(執行時拋出例外會
23         在processWorkerExit方法中完成-1),也就是說每個應該要銷毀的空閑執行緒在最后拿取不到任務時都會-1的,
24         所以如果發現當前作業執行緒數沒有減到0的話,就說明當前執行緒不是最后一個執行執行緒),那么就不會結束執行緒池
25         (結束執行緒池的任務交給最后一個執行緒來做),這里ONLY_ONE永遠為true,也就是說如果當前執行緒不是最后一個
26         執行任務的執行緒的話,那么就只是中斷一個空閑的執行緒而已(相當于中斷自己),然后就直接回傳就行了
27          */
28         if (workerCountOf(c) != 0) {
29             interruptIdleWorkers(ONLY_ONE);
30             return;
31         }
32
33         /*
34         走到這里說明當前作業執行緒數已經為0了,也就是說當前執行緒是最后一個執行任務的執行緒,
35         此時需要完成結束執行緒池的動作
36          */
37         final ReentrantLock mainLock = this.mainLock;
38         //上鎖
39         mainLock.lock();
40         try {
41             //CAS將ctl狀態改為TIDYING
42             if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
43                 try {
44                     //鉤子方法,空實作
45                     terminated();
46                 } finally {
47                     //在執行完terminated方法后,將執行緒池狀態置為TERMINATED
48                     ctl.set(ctlOf(TERMINATED, 0));
49                     /*
50                     可能在此之前某執行緒呼叫了awaitTermination方法,一直處在阻塞中,
51                     并且沒有超時,也沒有發生中斷,那么在結束執行緒池的此時就需要喚醒這些執行緒了
52                      */
53                     termination.signalAll();
54                 }
55                 return;
56             }
57         } finally {
58             //釋放鎖
59             mainLock.unlock();
60         }
61         //走到這里說明之前的CAS將狀態改為TIDYING失敗了,那么就從頭開始重試
62     }
63 }

更多內容請關注微信公眾號:奇客時間

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/226352.html

標籤:Java

上一篇:volatile

下一篇:volatile

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 【C++】Microsoft C++、C 和匯編程式檔案

    ......

    uj5u.com 2020-09-10 00:57:23 more
  • 例外宣告

    相比于斷言適用于排除邏輯上不可能存在的狀態,例外通常是用于邏輯上可能發生的錯誤。 例外宣告 Item 1:當函式不可能拋出例外或不能接受拋出例外時,使用noexcept 理由 如果不打算拋出例外的話,程式就會認為無法處理這種錯誤,并且應當盡早終止,如此可以有效地阻止例外的傳播與擴散。 示例 //不可 ......

    uj5u.com 2020-09-10 00:57:27 more
  • Codeforces 1400E Clear the Multiset(貪心 + 分治)

    鏈接:https://codeforces.com/problemset/problem/1400/E 來源:Codeforces 思路:給你一個陣列,現在你可以進行兩種操作,操作1:將一段沒有 0 的區間進行減一的操作,操作2:將 i 位置上的元素歸零。最終問:將這個陣列的全部元素歸零后操作的最少 ......

    uj5u.com 2020-09-10 00:57:30 more
  • UVA11610 【Reverse Prime】

    本人看到此題沒有翻譯,就附帶了一個自己的翻譯版本 思考 這一題,它的第一個要求是找出所有 $7$ 位反向質數及其質因數的個數。 我們應該需要質數篩篩選1~$10^{7}$的所有數,這里就不慢慢介紹了。但是,重讀題,我們突然發現反向質數都是 $7$ 位,而將它反過來后的數字卻是 $6$ 位數,這就說明 ......

    uj5u.com 2020-09-10 00:57:36 more
  • 統計區間素數數量

    1 #pragma GCC optimize(2) 2 #include <bits/stdc++.h> 3 using namespace std; 4 bool isprime[1000000010]; 5 vector<int> prime; 6 inline int getlist(int ......

    uj5u.com 2020-09-10 00:57:47 more
  • C/C++編程筆記:C++中的 const 變數詳解,教你正確認識const用法

    1、C中的const 1、區域const變數存放在堆疊區中,會分配記憶體(也就是說可以通過地址間接修改變數的值)。測驗代碼如下: 運行結果: 2、全域const變數存放在只讀資料段(不能通過地址修改,會發生寫入錯誤), 默認為外部聯編,可以給其他源檔案使用(需要用extern關鍵字修飾) 運行結果: ......

    uj5u.com 2020-09-10 00:58:04 more
  • 【C++犯錯記錄】VS2019 MFC添加資源不懂如何修改資源宏ID

    1. 首先在資源視圖中,添加資源 2. 點擊新添加的資源,復制自動生成的ID 3. 在解決方案資源管理器中找到Resource.h檔案,編輯,使用整個專案搜索和替換的方式快速替換 宏宣告 4. Ctrl+Shift+F 全域搜索,點擊查找全部,然后逐個替換 5. 為什么使用搜索替換而不使用屬性視窗直 ......

    uj5u.com 2020-09-10 00:59:11 more
  • 【C++犯錯記錄】VS2019 MFC不懂的批量添加資源

    1. 打開資源頭檔案Resource.h,在其中預先定義好宏 ID(不清楚其實ID值應該設定多少,可以先新建一個相同的資源項,再在這個資源的ID值的基礎上遞增即可) 2. 在資源視圖中選中專案資源,按F7編輯資源檔案,按 ID 型別 相對路徑的形式添加 資源。(別忘了先把檔案拷貝到專案中的res檔案 ......

    uj5u.com 2020-09-10 01:00:19 more
  • C/C++編程筆記:關于C++的參考型別,專供新手入門使用

    今天要講的是C++中我最喜歡的一個用法——參考,也叫別名。 參考就是給一個變數名取一個變數名,方便我們間接地使用這個變數。我們可以給一個變數創建N個參考,這N + 1個變數共享了同一塊記憶體區域。(參考型別的變數會占用記憶體空間,占用的記憶體空間的大小和指標型別的大小是相同的。雖然參考是一個物件的別名,但 ......

    uj5u.com 2020-09-10 01:00:22 more
  • 【C/C++編程筆記】從頭開始學習C ++:初學者完整指南

    眾所周知,C ++的學習曲線陡峭,但是花時間學習這種語言將為您的職業帶來奇跡,并使您與其他開發人員區分開。您會更輕松地學習新語言,形成真正的解決問題的技能,并在編程的基礎上打下堅實的基礎。 C ++將幫助您養成良好的編程習慣(即清晰一致的編碼風格,在撰寫代碼時注釋代碼,并限制類內部的可見性),并且由 ......

    uj5u.com 2020-09-10 01:00:41 more
最新发布
  • Rust中的智能指標:Box<T> Rc<T> Arc<T> Cell<T> RefCell<T> Weak

    Rust中的智能指標是什么 智能指標(smart pointers)是一類資料結構,是擁有資料所有權和額外功能的指標。是指標的進一步發展 指標(pointer)是一個包含記憶體地址的變數的通用概念。這個地址參考,或 ” 指向”(points at)一些其 他資料 。參考以 & 符號為標志并借用了他們所 ......

    uj5u.com 2023-04-20 07:24:10 more
  • Java的值傳遞和參考傳遞

    值傳遞不會改變本身,參考傳遞(如果傳遞的值需要實體化到堆里)如果發生修改了會改變本身。 1.基本資料型別都是值傳遞 package com.example.basic; public class Test { public static void main(String[] args) { int ......

    uj5u.com 2023-04-20 07:24:04 more
  • [2]SpinalHDL教程——Scala簡單入門

    第一個 Scala 程式 shell里面輸入 $ scala scala> 1 + 1 res0: Int = 2 scala> println("Hello World!") Hello World! 檔案形式 object HelloWorld { /* 這是我的第一個 Scala 程式 * 以 ......

    uj5u.com 2023-04-20 07:23:58 more
  • 理解函式指標和回呼函式

    理解 函式指標 指向函式的指標。比如: 理解函式指標的偽代碼 void (*p)(int type, char *data); // 定義一個函式指標p void func(int type, char *data); // 宣告一個函式func p = func; // 將指標p指向函式func ......

    uj5u.com 2023-04-20 07:23:52 more
  • Django筆記二十五之資料庫函式之日期函式

    本文首發于公眾號:Hunter后端 原文鏈接:Django筆記二十五之資料庫函式之日期函式 日期函式主要介紹兩個大類,Extract() 和 Trunc() Extract() 函式作用是提取日期,比如我們可以提取一個日期欄位的年份,月份,日等資料 Trunc() 的作用則是截取,比如 2022-0 ......

    uj5u.com 2023-04-20 07:23:45 more
  • 一天吃透JVM面試八股文

    什么是JVM? JVM,全稱Java Virtual Machine(Java虛擬機),是通過在實際的計算機上仿真模擬各種計算機功能來實作的。由一套位元組碼指令集、一組暫存器、一個堆疊、一個垃圾回收堆和一個存盤方法域等組成。JVM屏蔽了與作業系統平臺相關的資訊,使得Java程式只需要生成在Java虛擬機 ......

    uj5u.com 2023-04-20 07:23:31 more
  • 使用Java接入小程式訂閱訊息!

    更新完微信服務號的模板訊息之后,我又趕緊把微信小程式的訂閱訊息給實作了!之前我一直以為微信小程式也是要企業才能申請,沒想到小程式個人就能申請。 訊息推送平臺🔥推送下發【郵件】【短信】【微信服務號】【微信小程式】【企業微信】【釘釘】等訊息型別。 https://gitee.com/zhongfuch ......

    uj5u.com 2023-04-20 07:22:59 more
  • java -- 緩沖流、轉換流、序列化流

    緩沖流 緩沖流, 也叫高效流, 按照資料型別分類: 位元組緩沖流:BufferedInputStream,BufferedOutputStream 字符緩沖流:BufferedReader,BufferedWriter 緩沖流的基本原理,是在創建流物件時,會創建一個內置的默認大小的緩沖區陣列,通過緩沖 ......

    uj5u.com 2023-04-20 07:22:49 more
  • Java-SpringBoot-Range請求頭設定實作視頻分段傳輸

    老實說,人太懶了,現在基本都不喜歡寫筆記了,但是網上有關Range請求頭的文章都太水了 下面是抄的一段StackOverflow的代碼...自己大修改過的,寫的注釋挺全的,應該直接看得懂,就不解釋了 寫的不好...只是希望能給視頻網站開發的新手一點點幫助吧. 業務場景:視頻分段傳輸、視頻多段傳輸(理 ......

    uj5u.com 2023-04-20 07:22:42 more
  • Windows 10開發教程_編程入門自學教程_菜鳥教程-免費教程分享

    教程簡介 Windows 10開發入門教程 - 從簡單的步驟了解Windows 10開發,從基本到高級概念,包括簡介,UWP,第一個應用程式,商店,XAML控制元件,資料系結,XAML性能,自適應設計,自適應UI,自適應代碼,檔案管理,SQLite資料庫,應用程式到應用程式通信,應用程式本地化,應用程式 ......

    uj5u.com 2023-04-20 07:22:35 more