一、概述
1、問題
先看我們遇到的問題:我們創建執行緒的方式很簡單,new Thread(() -> {...}),就是因為這么簡單粗暴的方式,才帶來了致命的問題,首先執行緒的創建和銷毀都是很耗時很浪費性能的操作,你用執行緒為了什么?為了就是異步,為了就是提升性能,簡單的new三五個Thread還好,我需要一千個執行緒呢?你也for回圈new1000個Thread嗎?用完在銷毀掉,那這一千個執行緒的創建和銷毀的性能是很糟糕的!
2、解決
為了解決上述問題,執行緒池誕生了,執行緒池的核心思想就是:執行緒復用,也就是說執行緒用完后不銷毀,放到池子里等著新任務的到來,反復利用N個執行緒來執行所有新老任務,這帶來的開銷只會是那N個執行緒的創建,而不是每來一個請求都帶來一個執行緒的從生到死的程序,
二、執行緒池
1、概念
還說個雞兒,上面的問題解決方案已經很通俗易懂了,針對特級小白我在舉個生活的案例:
比如找作業面試,涉及到兩個角色:面試官、求職者,求職者成千上萬,每來一個求職者都要為其單獨新找一個面試官來面試嗎?顯然不是,公司都有面試官池子,比如:A、B、C你們三就是這公司的面試官了,有人來面試你們三輪流面就行了,可能不是很恰當,含義就是說我并不需要為每個請求(求職者)都單獨分配一個新的執行緒(面試官) ,而是我固定好幾個執行緒,由他們幾個來處理所有請求,不會反復創建銷毀,
2、引數
2.1、原始碼
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {}
2.2、解釋
corePoolSize:核心執行緒數
執行緒池在完成初始化之后,默認情況下,執行緒池中不會有任何執行緒,執行緒池會等有任務來的時候再去創建執行緒,核心執行緒創建出來后即使超出了執行緒保持的存活時間配置也不會銷毀,核心執行緒只要創建就永駐了,就等著新任務進來進行處理,
maximumPoolSize:最大執行緒數
核心執行緒忙不過來且任務存盤佇列滿了的情況下,還有新任務進來的話就會繼續開辟執行緒,但是也不是任意的開辟執行緒數量,執行緒數(包含核心執行緒)達到
maximumPoolSize后就不會產生新執行緒了,就會執行拒絕策略,
keepAliveTime:執行緒保持的存活時間
如果執行緒池當前的執行緒數多于
corePoolSize,那么如果多余的執行緒空閑時間超過keepAliveTime,那么這些多余的執行緒(超出核心執行緒數的那些執行緒)就會被回收,
unit:執行緒保持的存活時間單位
比如:
TimeUnit.MILLISECONDS、TimeUnit.SECONDS
workQueue:任務存盤佇列
核心執行緒數滿了后還有任務繼續提交到執行緒池的話,就先進入
workQueue,
workQueue通常情況下有如下選擇:
LinkedBlockingQueue:無界佇列,意味著無限制,其實是有限制,大小是int的最大值,也可以自定義大小,
ArrayBlockingQueue:有界佇列,可以自定義大小,到了閾值就開啟新執行緒(不會超過maximumPoolSize),
SynchronousQueue:Executors.newCachedThreadPool();默認使用的佇列,也不算是個佇列,他不沒有存盤元素的能力,一般都采取
LinkedBlockingQueue,因為他也可以設定大小,可以取代ArrayBlockingQueue有界佇列,
threadFactory:當執行緒池需要新的執行緒時,會用threadFactory來生成新的執行緒
默認采用的是
DefaultThreadFactory,主要負責創建執行緒,newThread()方法,創建出來的執行緒都在同一個執行緒組且優先級也是一樣的,
handler:拒絕策略,任務量超出執行緒池的配置限制或執行shutdown還在繼續提交任務的話,會執行handler的邏輯,
默認采用的是
AbortPolicy,遇到上面的情況,執行緒池將直接采取直接拒絕策略,也就是直接拋出例外,RejectedExecutionException
3、原理
3.1、原理
-
執行緒池剛啟動的時候核心執行緒數為0
-
丟任務給執行緒池的時候,執行緒池會新開啟執行緒來執行這個任務
-
如果執行緒數小于
corePoolSize,即使作業執行緒處于空閑狀態,也會創建一個新執行緒來執行新任務 -
如果執行緒數大于或等于
corePoolSize,則會將任務放到workQueue,也就是任務佇列 -
如果任務佇列滿了,且執行緒數小于
maximumPoolSize,則會創建一個新執行緒來運行任務 -
如果任務佇列滿了,且執行緒數大于或等于
maximumPoolSize,則直接采取拒絕策略
3.2、圖解

3.3、舉例
執行緒池引數配置:核心執行緒5個,最大執行緒數10個,佇列長度為100,
那么執行緒池啟動的時候不會創建任何執行緒,假設請求進來6個,則會創建5個核心執行緒來處理五個請求,另一個沒被處理到的進入到佇列,這時候有進來99個請求,執行緒池發現核心執行緒滿了,佇列還在空著99個位置,所以會進入到佇列里99個,加上剛才的1個正好100個,這時候再次進來5個請求,執行緒池會再次開辟五個非核心執行緒來處理這五個請求,目前的情況是執行緒池里執行緒數是10個RUNNING狀態的,佇列里100個也滿了,如果這時候又進來1個請求,則直接走拒絕策略,
3.4、原始碼
public void execute(Runnable command) { int c = ctl.get(); // workerCountOf(c):作業執行緒數 // worker數量比核心執行緒數小,直接創建worker執行任務 if (workerCountOf(c) < corePoolSize) { // addWorker里面負責創建執行緒且執行任務 if (addWorker(command, true)) return; c = ctl.get(); } // worker數量超過核心執行緒數,任務直接進入佇列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 執行緒池狀態不是RUNNING狀態,說明執行過shutdown命令,需要對新加入的任務執行reject()操作, // 這兒為什么需要recheck,是因為任務入佇列前后,執行緒池的狀態可能會發生變化, if (! isRunning(recheck) && remove(command)) reject(command); // 這兒為什么需要判斷0值,主要是在執行緒池構造方法中,核心執行緒數允許為0 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 如果執行緒池不是運行狀態,或者任務進入佇列失敗,則嘗試創建worker執行任務, // 這兒有3點需要注意: // 1. 執行緒池不是運行狀態時,addWorker內部會判斷執行緒池狀態 // 2. addWorker第2個引數表示是否創建核心執行緒 // 3. addWorker回傳false,則說明任務執行失敗,需要執行reject操作 else if (!addWorker(command, false)) reject(command); }
4、Executors
4.1、概念
首先這不是一個執行緒池,這是執行緒池的工具類,他能方便的為我們創建執行緒,但是阿里巴巴開發手冊上說明不推薦用Executors創建執行緒池,推薦自己定義執行緒池,這是因為Executors創建的任何一種執行緒池都可能引發血案,具體是什么問題下面會說,
4.2、固定執行緒數
4.2.1、描述
核心執行緒數和最大執行緒數是一樣的,所以稱之為固定執行緒數,
其他引數配置默認為:永不超時(0ms),無界佇列(LinkedBlockingQueue)、默認執行緒工廠(DefaultThreadFactory)、直接拒絕策略(AbortPolicy),
4.2.2、api
Executors.newFixedThreadPool(n);
4.2.3、demo
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * Description: 創建2個執行緒來執行10個任務, * * @author TongWei.Chen 2020-07-09 21:28:34 */ public class ThreadPoolTest { public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(2); for (int i = 0; i < 10; i++) { // 從結果中可以發現執行緒name永遠都是兩個,不會有第三個, executorService.execute(() -> System.out.println(Thread.currentThread().getName())); } } }
4.2.4、問題
問題就在于它是無界佇列,佇列里能放int的最大值個任務,并發巨高的情況下極大可能直接OOM了然后任務還在堆積,畢竟直接用的是jvm記憶體,所以建議自定義執行緒池,自己按照需求指定合適的佇列大小,自定義拒絕策略將超出佇列大小的任務放到對外記憶體做補償,比如Redis,別把業務系統壓垮就行,
4.2.5、原始碼
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor( // 核心執行緒數和最大執行緒數都是nThreads nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, // 無界佇列!!!致命問題的關鍵所在, new LinkedBlockingQueue<Runnable>()); }
4.3、單個執行緒
4.3.1、描述
核心執行緒數和最大執行緒數是1,內部默認的,不可更改,所以稱之為單執行緒數的執行緒池,
類似于Executors.newFixedThreadPool(1);
其他引數配置默認為:永不超時(0ms),無界佇列(LinkedBlockingQueue)、默認執行緒工廠(DefaultThreadFactory)、直接拒絕策略(AbortPolicy),
4.3.2、api
Executors.newSingleThreadExecutor();
4.3.3、demo
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * Description: 創建1個執行緒來執行10個任務, * * @author TongWei.Chen 2020-07-09 21:28:34 */ public class ThreadPoolTest { public static void main(String[] args) { ExecutorService executorService = Executors.newSingleThreadExecutor(); for (int i = 0; i < 10; i++) { // 從結果中可以發現執行緒name永遠都是pool-1-thread-1,不會有第二個出現, executorService.execute(() -> System.out.println(Thread.currentThread().getName())); } } }
4.3.4、問題
同【4.2、固定執行緒數】的問題,都是無界佇列惹的禍,
4.3.5、原始碼
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor( // 核心執行緒數和最大執行緒數都是1,寫死的,客戶端不可更改, 1, 1, 0L, TimeUnit.MILLISECONDS, // 無界佇列!!!致命問題的關鍵所在, new LinkedBlockingQueue<Runnable>())); }
4.4、帶快取的執行緒池
4.4.1、描述
他的功能是來個任務我就開辟個執行緒去處理,不會進入佇列,SynchronousQueue佇列也不帶存盤元素的功能,那這意味著來一億個請求就會開辟一億個執行緒去處理,keepAliveTime為60S,意味著執行緒空閑時間超過60S就會被殺死;這就叫帶快取功能的執行緒池,
核心執行緒數是0,最大執行緒數是int的最大值,內部默認的,不可更改,
其他引數配置默認為:1min超時(60s),SynchronousQueue佇列、默認執行緒工廠(DefaultThreadFactory)、直接拒絕策略(AbortPolicy),
4.4.2、api
Executors.newCachedThreadPool();
4.4.3、demo
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * Description: 創建個帶快取功能的執行緒池來執行10個任務, * * @author TongWei.Chen 2020-07-09 21:28:34 */ public class ThreadPoolTest { public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { // 從結果中可以發現執行緒name有10個,也就是有幾個任務就會開辟幾個執行緒, executorService.execute(() -> System.out.println(Thread.currentThread().getName())); } } }
4.4.4、問題
問題就在于他的最大執行緒數是int的最大值,因為他內部采取的佇列是SynchronousQueue,這個佇列沒有容納元素的能力,這將意味著只要來請求我就開啟執行緒去作業,巔峰期能創建二十幾億個執行緒出來作業,你自己想想多么可怕!!!
4.4.5、原始碼
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor( // 核心執行緒數是0,最大執行緒數都是Integer.MAX_VALUE,這個可致命了!!! 0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
4.5、有調度功能的執行緒池
4.5.1、描述
RocketMQ內部大量采用了此種執行緒池來做心跳等任務,
核心執行緒數手動傳進來,最大執行緒數是Integer.MAX_VALUE,最大執行緒數是內部默認的,不可更改,
其他引數配置默認為:永不超時(0ns),帶延遲功能的佇列(DelayedWorkQueue)、默認執行緒工廠(DefaultThreadFactory)、直接拒絕策略(AbortPolicy),
4.5.2、api
Executors.newScheduledThreadPool(n);
4.5.3、demo
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * Description: 創建個帶調度功能的執行緒池來執行任務, * * @author TongWei.Chen 2020-07-09 21:28:34 */ public class ThreadPoolTest { public static void main(String[] args) { ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2); // 五秒一次 scheduledExecutorService.schedule(() -> System.out.println(Thread.currentThread().getName()), 5, TimeUnit.SECONDS); // 首次五秒后執行,其次每隔1s執行一次 scheduledExecutorService.scheduleAtFixedRate(() -> System.out.println(Thread.currentThread().getName()), 5, 1, TimeUnit.SECONDS); } }
4.5.4、問題
【同4.4、帶快取的執行緒池的問題】
問題就在于他的最大執行緒數是int的最大值,這將意味海量并發期能創建二十幾億個執行緒出來作業,你自己想想多么可怕!!!
4.5.5、原始碼
public ScheduledThreadPoolExecutor(int corePoolSize) { // 致命的問題跟newCachedThreadPool一樣,最大執行緒數能開到幾十億(Integer.MAX_VALUE)!!! super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
4.6、停止執行緒
4.6.1、shutdown
平緩的結束執行緒池,比如當前執行緒池還在執行任務,還沒執行完,這時候執行了shutdown的話,執行緒池并不會立即停止作業,而是會等待執行緒池中的任務都執行完成后才會shutdown掉,但是如果執行shutdown了,外界還在繼續提交任務到執行緒池,那么執行緒池會直接采取拒絕策略,
4.6.2、isShutdown
判斷執行緒池是否已經shutdown,
4.6.3、shutdownNow
暴力結束執行緒池,不管你當前執行緒池有沒有任務在執行,佇列里有沒有堆積訊息,我都直接讓執行緒池掛掉,但是他的回傳值是佇列里那些未被執行的任務,有需要的可以記錄下log啥的,
4.7、疑問
這幾種執行緒池為什么要采取不一樣的佇列?比如newFixedThreadPool為什么采取LinkedBlockingQueue,而newCachedThreadPool又為什么采取SynchronousQueue?
因為newFixedThreadPool執行緒數量有限,他又不想丟失任務,只能采取無界佇列,而newCachedThreadPool的話本身自帶int最大值個執行緒數,所以沒必要用無界佇列,他的宗旨就是我有執行緒能處理,不需要佇列,
5、總結幾個問題
1、執行緒池的狀態
RUNNING:接受新任務并處理排隊任務,SHUTDOWN:不接受新任務,但是會處理排隊任務,【見:停止執行緒的4.6.1、shutdown】STOP:不接受新任務,也不處理排隊任務,并中端正在進行的任務,TIDYING:所有任務都已經完事,作業執行緒為0的時候 ,執行緒會進入這個狀態并執行terminate()鉤子方法,TERMINATED:terminate()鉤子方法運行完成,
2、執行緒池自動創建還是手動?
那肯定是手動了,因為Executors自動創建的那些執行緒池都存在致命的問題,手動創建執行緒池我們能自己控制執行緒數大小以及佇列大小,還可以指定組名稱等等個性化配置,重點不會出現致命問題,風險都把控在我們手里,
3、執行緒數多少合適?
- CPU密集型(比如加密、各種復雜計算等):建議設定為CPU核數+1,
- 耗時IO操作(比如讀寫資料庫,壓縮解壓縮大檔案等等):一般會設定CPU核數的2倍,當然也有個很牛X的計算公式:執行緒數=CPU核數 *(1+平均等待時間/平均作業時間)
4、before&after
在執行緒執行前后可以通過兩個方法來進行列印log或其他作業,
原始碼如下:
// 執行前的before beforeExecute(wt, task); Throwable thrown = null; try { // 真正執行 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 執行完成后after afterExecute(task, thrown); }
6、核心原始碼(全)
1、常用變數的解釋
// 1. `ctl`,可以看做一個int型別的數字,高3位表示執行緒池狀態,低29位表示worker數量 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 2. `COUNT_BITS`,`Integer.SIZE`為32,所以`COUNT_BITS`為29 private static final int COUNT_BITS = Integer.SIZE - 3; // 3. `CAPACITY`,執行緒池允許的最大執行緒數,1左移29位,然后減1,即為 2^29 - 1 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits // 4. 執行緒池有5種狀態,按大小排序如下:RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // Packing and unpacking ctl // 5. `runStateOf()`,獲取執行緒池狀態,通過按位與操作,低29位將全部變成0 private static int runStateOf(int c) { return c & ~CAPACITY; } // 6. `workerCountOf()`,獲取執行緒池worker數量,通過按位與操作,高3位將全部變成0 private static int workerCountOf(int c) { return c & CAPACITY; } // 7. `ctlOf()`,根據執行緒池狀態和執行緒池worker數量,生成ctl值 private static int ctlOf(int rs, int wc) { return rs | wc; } /* * Bit field accessors that don't require unpacking ctl. * These depend on the bit layout and on workerCount being never negative. */ // 8. `runStateLessThan()`,執行緒池狀態小于xx private static boolean runStateLessThan(int c, int s) { return c < s; } // 9. `runStateAtLeast()`,執行緒池狀態大于等于xx private static boolean runStateAtLeast(int c, int s) { return c >= s; }
2、構造方法
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { // 基本型別引數校驗 if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); // 空指標校驗 if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; // 根據傳入引數`unit`和`keepAliveTime`,將存活時間轉換為納秒存到變數`keepAliveTime `中 this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
3、提交執行task的程序
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); // worker數量比核心執行緒數小,直接創建worker執行任務 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // worker數量超過核心執行緒數,任務直接進入佇列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 執行緒池狀態不是RUNNING狀態,說明執行過shutdown命令,需要對新加入的任務執行reject()操作, // 這兒為什么需要recheck,是因為任務入佇列前后,執行緒池的狀態可能會發生變化, if (! isRunning(recheck) && remove(command)) reject(command); // 這兒為什么需要判斷0值,主要是在執行緒池構造方法中,核心執行緒數允許為0 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 如果執行緒池不是運行狀態,或者任務進入佇列失敗,則嘗試創建worker執行任務, // 這兒有3點需要注意: // 1. 執行緒池不是運行狀態時,addWorker內部會判斷執行緒池狀態 // 2. addWorker第2個引數表示是否創建核心執行緒 // 3. addWorker回傳false,則說明任務執行失敗,需要執行reject操作 else if (!addWorker(command, false)) reject(command); }
4、addworker原始碼決議
private boolean addWorker(Runnable firstTask, boolean core) { retry: // 外層自旋 for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 這個條件寫得比較難懂,我對其進行了調整,和下面的條件等價 // (rs > SHUTDOWN) || // (rs == SHUTDOWN && firstTask != null) || // (rs == SHUTDOWN && workQueue.isEmpty()) // 1. 執行緒池狀態大于SHUTDOWN時,直接回傳false // 2. 執行緒池狀態等于SHUTDOWN,且firstTask不為null,直接回傳false // 3. 執行緒池狀態等于SHUTDOWN,且佇列為空,直接回傳false // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; // 內層自旋 for (;;) { int wc = workerCountOf(c); // worker數量超過容量,直接回傳false if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 使用CAS的方式增加worker數量, // 若增加成功,則直接跳出外層回圈進入到第二部分 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl // 執行緒池狀態發生變化,對外層回圈進行自旋 if (runStateOf(c) != rs) continue retry; // 其他情況,直接內層回圈進行自旋即可 // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; // worker的添加必須是串行的,因此需要加鎖 mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. // 這兒需要重新檢查執行緒池狀態 int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // worker已經呼叫過了start()方法,則不再創建worker if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // worker創建并添加到workers成功 workers.add(w); // 更新`largestPoolSize`變數 int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } // 啟動worker執行緒 if (workerAdded) { t.start(); workerStarted = true; } } } finally { // worker執行緒啟動失敗,說明執行緒池狀態發生了變化(關閉操作被執行),需要進行shutdown相關操作 if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
5、執行緒池worker任務單元
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; // 這兒是Worker的關鍵所在,使用了執行緒工廠創建了一個執行緒,傳入的引數為當前worker this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } // 省略代碼... }
6、核心執行緒執行邏輯-runworker
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; // 呼叫unlock()是為了讓外部可以中斷 w.unlock(); // allow interrupts // 這個變數用于判斷是否進入過自旋(while回圈) boolean completedAbruptly = true; try { // 這兒是自旋 // 1. 如果firstTask不為null,則執行firstTask; // 2. 如果firstTask為null,則呼叫getTask()從佇列獲取任務, // 3. 阻塞佇列的特性就是:當佇列為空時,當前執行緒會被阻塞等待 while (task != null || (task = getTask()) != null) { // 這兒對worker進行加鎖,是為了達到下面的目的 // 1. 降低鎖范圍,提升性能 // 2. 保證每個worker執行的任務是串行的 w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt // 如果執行緒池正在停止,則對當前執行緒進行中斷操作 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); // 執行任務,且在執行前后通過`beforeExecute()`和`afterExecute()`來擴展其功能, // 這兩個方法在當前類里面為空實作, try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { // 幫助gc task = null; // 已完成任務數加一 w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { // 自旋操作被退出,說明執行緒池正在結束 processWorkerExit(w, completedAbruptly); } }
7、自建執行緒池注意點
- 阻塞任務佇列數
- 執行緒池的名字,最好跟業務相關
- 核心執行緒池大小,看業務實際情況,可以參考【執行緒數多少合適?】
- 最大執行緒池大小,看業務實際情況,可以參考【執行緒數多少合適?】
- 拒絕策略,我個人一般都是記錄log,如果主要的業務我會根據log做補償,
比如:
ThreadPoolExecutor executor = new ThreadPoolExecutor(CPU核數 + 1, 2 * CPU核數 + 1, 5, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), // 執行緒池名字pay-account new DefaultThreadFactory("pay-account"), (r1, executor) -> { // 記錄log 重新入佇列做補償 });
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/144564.html
標籤:Java
上一篇:c語言陣列長度
