摘要:Java是如何實作和管理執行緒池的?
本文分享自華為云社區《JUC執行緒池: ThreadPoolExecutor詳解》,作者:龍哥手記 ,
帶著大廠的面試問題去理解
提示
請帶著這些問題繼續后文,會很大程度上幫助你更好的理解相關知識點,@pdai
- 為什么要有執行緒池?
- Java是實作和管理執行緒池有哪些方式? 請簡單舉例如何使用,
- 為什么很多公司不允許使用Executors去創建執行緒池? 那么推薦怎么使用呢?
- ThreadPoolExecutor有哪些核心的配置引數? 請簡要說明
- ThreadPoolExecutor可以創建哪是哪三種執行緒池呢?
- 當佇列滿了并且worker的數量達到maxSize的時候,會怎么樣?
- 說說ThreadPoolExecutor有哪些RejectedExecutionHandler策略? 默認是什么策略?
- 簡要說下執行緒池的任務執行機制? execute –> addWorker –>runworker (getTask)
- 執行緒池中任務是如何提交的?
- 執行緒池中任務是如何關閉的?
- 在配置執行緒池的時候需要考慮哪些配置因素?
- 如何監控執行緒池的狀態?
為什么要有執行緒池
執行緒池能夠對執行緒進行統一分配,調優和監控:
- 降低資源消耗(執行緒無限制地創建,然后使用完畢后銷毀)
- 提高回應速度(無須創建執行緒)
- 提高執行緒的可管理性
ThreadPoolExecutor例子
Java是如何實作和管理執行緒池的?
從JDK 5開始,把作業單元與執行機制分離開來,作業單元包括Runnable和Callable,而執行機制由Executor框架提供,
- WorkerThread
public class WorkerThread implements Runnable { private String command; public WorkerThread(String s){ this.command=s; } @Override public void run() { System.out.println(Thread.currentThread().getName()+" Start. Command = "+command); processCommand(); System.out.println(Thread.currentThread().getName()+" End."); } private void processCommand() { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public String toString(){ return this.command; } }
- SimpleThreadPool
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class SimpleThreadPool { public static void main(String[] args) { ExecutorService executor = Executors.newFixedThreadPool(5); for (int i = 0; i < 10; i++) { Runnable worker = new WorkerThread("" + i); executor.execute(worker); } executor.shutdown(); // This will make the executor accept no new threads and finish all existing threads in the queue while (!executor.isTerminated()) { // Wait until all threads are finish,and also you can use "executor.awaitTermination();" to wait } System.out.println("Finished all threads"); } }
程式中我們創建了固定大小為五個作業執行緒的執行緒池,然后分配給執行緒池十個作業,因為執行緒池大小為五,它將啟動五個作業執行緒先處理五個作業,其他的作業則處于等待狀態,一旦有作業完成,空閑下來作業執行緒就會撿取等待佇列里的其他作業進行執行,
這里是以上程式的輸出,
pool-1-thread-2 Start. Command = 1 pool-1-thread-4 Start. Command = 3 pool-1-thread-1 Start. Command = 0 pool-1-thread-3 Start. Command = 2 pool-1-thread-5 Start. Command = 4 pool-1-thread-4 End. pool-1-thread-5 End. pool-1-thread-1 End. pool-1-thread-3 End. pool-1-thread-3 Start. Command = 8 pool-1-thread-2 End. pool-1-thread-2 Start. Command = 9 pool-1-thread-1 Start. Command = 7 pool-1-thread-5 Start. Command = 6 pool-1-thread-4 Start. Command = 5 pool-1-thread-2 End. pool-1-thread-4 End. pool-1-thread-3 End. pool-1-thread-5 End. pool-1-thread-1 End. Finished all threads
輸出表明執行緒池中至始至終只有五個名為 "pool-1-thread-1" 到 "pool-1-thread-5" 的五個執行緒,這五個執行緒不隨著作業的完成而消亡,會一直存在,并負責執行分配給執行緒池的任務,直到執行緒池消亡,
Executors 類提供了使用了 ThreadPoolExecutor 的簡單的 ExecutorService 實作,但是 ThreadPoolExecutor 提供的功能遠不止于此,我們可以在創建 ThreadPoolExecutor 實體時指定活動執行緒的數量,我們也可以限制執行緒池的大小并且創建我們自己的 RejectedExecutionHandler 實作來處理不能適應作業佇列的作業,
這里是我們自定義的 RejectedExecutionHandler 介面的實作,
- RejectedExecutionHandlerImpl.java
import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; public class RejectedExecutionHandlerImpl implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println(r.toString() + " is rejected"); } }
ThreadPoolExecutor 提供了一些方法,我們可以使用這些方法來查詢 executor 的當前狀態,執行緒池大小,活動執行緒數量以及任務數量,因此我是用來一個監控執行緒在特定的時間間隔內列印 executor 資訊,
- MyMonitorThread.java
import java.util.concurrent.ThreadPoolExecutor; public class MyMonitorThread implements Runnable { private ThreadPoolExecutor executor; private int seconds; private boolean run=true; public MyMonitorThread(ThreadPoolExecutor executor, int delay) { this.executor = executor; this.seconds=delay; } public void shutdown(){ this.run=false; } @Override public void run() { while(run){ System.out.println( String.format("[monitor] [%d/%d] Active: %d, Completed: %d, Task: %d, isShutdown: %s, isTerminated: %s", this.executor.getPoolSize(), this.executor.getCorePoolSize(), this.executor.getActiveCount(), this.executor.getCompletedTaskCount(), this.executor.getTaskCount(), this.executor.isShutdown(), this.executor.isTerminated())); try { Thread.sleep(seconds*1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }
這里是使用 ThreadPoolExecutor 的執行緒池實作例子,
- WorkerPool.java
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class WorkerPool { public static void main(String args[]) throws InterruptedException{ //RejectedExecutionHandler implementation RejectedExecutionHandlerImpl rejectionHandler = new RejectedExecutionHandlerImpl(); //Get the ThreadFactory implementation to use ThreadFactory threadFactory = Executors.defaultThreadFactory(); //creating the ThreadPoolExecutor ThreadPoolExecutor executorPool = new ThreadPoolExecutor(2, 4, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2), threadFactory, rejectionHandler); //start the monitoring thread MyMonitorThread monitor = new MyMonitorThread(executorPool, 3); Thread monitorThread = new Thread(monitor); monitorThread.start(); //submit work to the thread pool for(int i=0; i<10; i++){ executorPool.execute(new WorkerThread("cmd"+i)); } Thread.sleep(30000); //shut down the pool executorPool.shutdown(); //shut down the monitor thread Thread.sleep(5000); monitor.shutdown(); } }
注意在初始化 ThreadPoolExecutor 時,我們保持初始池大小為 2,最大池大小為 4 而作業佇列大小為 2,因此如果已經有四個正在執行的任務而此時分配來更多任務的話,作業佇列將僅僅保留他們(新任務)中的兩個,其他的將會被 RejectedExecutionHandlerImpl 處理,
上面程式的輸出可以證實以上觀點,
pool-1-thread-1 Start. Command = cmd0 pool-1-thread-4 Start. Command = cmd5 cmd6 is rejected pool-1-thread-3 Start. Command = cmd4 pool-1-thread-2 Start. Command = cmd1 cmd7 is rejected cmd8 is rejected cmd9 is rejected [monitor] [0/2] Active: 4, Completed: 0, Task: 6, isShutdown: false, isTerminated: false [monitor] [4/2] Active: 4, Completed: 0, Task: 6, isShutdown: false, isTerminated: false pool-1-thread-4 End. pool-1-thread-1 End. pool-1-thread-2 End. pool-1-thread-3 End. pool-1-thread-1 Start. Command = cmd3 pool-1-thread-4 Start. Command = cmd2 [monitor] [4/2] Active: 2, Completed: 4, Task: 6, isShutdown: false, isTerminated: false [monitor] [4/2] Active: 2, Completed: 4, Task: 6, isShutdown: false, isTerminated: false pool-1-thread-1 End. pool-1-thread-4 End. [monitor] [4/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false [monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false [monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false [monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false [monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false [monitor] [2/2] Active: 0, Completed: 6, Task: 6, isShutdown: false, isTerminated: false [monitor] [0/2] Active: 0, Completed: 6, Task: 6, isShutdown: true, isTerminated: true [monitor] [0/2] Active: 0, Completed: 6, Task: 6, isShutdown: true, isTerminated: true
注意 executor 的活動任務、完成任務以及所有完成任務,這些數量上的變化,我們可以呼叫 shutdown() 方法來結束所有提交的任務并終止執行緒池,
ThreadPoolExecutor使用詳解
其實java執行緒池的實作原理很簡單,說白了就是一個執行緒集合workerSet和一個阻塞佇列workQueue,當用戶向執行緒池提交一個任務(也就是執行緒)時,執行緒池會先將任務放入workQueue中,workerSet中的執行緒會不斷的從workQueue中獲取執行緒然后執行,當workQueue中沒有任務的時候,worker就會阻塞,直到佇列中有任務了就取出來繼續執行,
Execute原理
當一個任務提交至執行緒池之后:
- 執行緒池首先當前運行的執行緒數量是否少于corePoolSize,如果是,則創建一個新的作業執行緒來執行任務,如果都在執行任務,則進入2.
- 判斷BlockingQueue是否已經滿了,倘若還沒有滿,則將執行緒放入BlockingQueue,否則進入3.
- 如果創建一個新的作業執行緒將使當前運行的執行緒數量超過maximumPoolSize,則交給RejectedExecutionHandler來處理任務,
當ThreadPoolExecutor創建新執行緒時,通過CAS來更新執行緒池的狀態ctl.
引數
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)
- corePoolSize 執行緒池中的核心執行緒數,當提交一個任務時,執行緒池創建一個新執行緒執行任務,直到當前執行緒數等于corePoolSize, 即使有其他空閑執行緒能夠執行新來的任務, 也會繼續創建執行緒;如果當前執行緒數為corePoolSize,繼續提交的任務被保存到阻塞佇列中,等待被執行;如果執行了執行緒池的prestartAllCoreThreads()方法,執行緒池會提前創建并啟動所有核心執行緒,
- workQueue 用來保存等待被執行的任務的阻塞佇列. 在JDK中提供了如下阻塞佇列: 具體可以參考JUC 集合: BlockQueue詳解
- ArrayBlockingQueue: 基于陣列結構的有界阻塞佇列,按FIFO排序任務;
- LinkedBlockingQueue: 基于鏈表結構的阻塞佇列,按FIFO排序任務,吞吐量通常要高于ArrayBlockingQueue;
- SynchronousQueue: 一個不存盤元素的阻塞佇列,每個插入操作必須等到另一個執行緒呼叫移除操作,否則插入操作一直處于阻塞狀態,吞吐量通常要高于LinkedBlockingQueue;
- PriorityBlockingQueue: 具有優先級的無界阻塞佇列;
LinkedBlockingQueue比ArrayBlockingQueue在插入洗掉節點性能方面更優,但是二者在put(), take()任務的時均需要加鎖,SynchronousQueue使用無鎖演算法,根據節點的狀態判斷執行,而不需要用到鎖,其核心是Transfer.transfer().
- maximumPoolSize 執行緒池中允許的最大執行緒數,如果當前阻塞佇列滿了,且繼續提交任務,則創建新的執行緒執行任務,前提是當前執行緒數小于maximumPoolSize;當阻塞佇列是無界佇列, 則maximumPoolSize則不起作用, 因為無法提交至核心執行緒池的執行緒會一直持續地放入workQueue.
- keepAliveTime 執行緒空閑時的存活時間,即當執行緒沒有任務執行時,該執行緒繼續存活的時間;默認情況下,該引數只在執行緒數大于corePoolSize時才有用, 超過這個時間的空閑執行緒將被終止;
- unit keepAliveTime的單位
- threadFactory 創建執行緒的工廠,通過自定義的執行緒工廠可以給每個新建的執行緒設定一個具有識別度的執行緒名,默認為DefaultThreadFactory
- handler 執行緒池的飽和策略,當阻塞佇列滿了,且沒有空閑的作業執行緒,如果繼續提交任務,必須采取一種策略處理該任務,執行緒池提供了4種策略:
- AbortPolicy: 直接拋出例外,默認策略;
- CallerRunsPolicy: 用呼叫者所在的執行緒來執行任務;
- DiscardOldestPolicy: 丟棄阻塞佇列中靠最前的任務,并執行當前任務;
- DiscardPolicy: 直接丟棄任務;
當然也可以根據應用場景實作RejectedExecutionHandler介面,自定義飽和策略,如記錄日志或持久化存盤不能處理的任務,
三種型別
newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
執行緒池的執行緒數量達corePoolSize后,即使執行緒池沒有可執行任務時,也不會釋放執行緒,
FixedThreadPool的作業佇列為無界佇列LinkedBlockingQueue(佇列容量為Integer.MAX_VALUE), 這會導致以下問題:
- 執行緒池里的執行緒數量不超過corePoolSize,這導致了maximumPoolSize和keepAliveTime將會是個無用引數
- 由于使用了無界佇列, 所以FixedThreadPool永遠不會拒絕, 即飽和策略失效
newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
初始化的執行緒池中只有一個執行緒,如果該執行緒例外結束,會重新創建一個新的執行緒繼續執行任務,唯一的執行緒可以保證所提交任務的順序執行.
由于使用了無界佇列, 所以SingleThreadPool永遠不會拒絕, 即飽和策略失效
newCachedThreadPool
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
執行緒池的執行緒數可達到Integer.MAX_VALUE,即2147483647,內部使用SynchronousQueue作為阻塞佇列; 和newFixedThreadPool創建的執行緒池不同,newCachedThreadPool在沒有任務執行時,當執行緒的空閑時間超過keepAliveTime,會自動釋放執行緒資源,當提交新任務時,如果沒有空閑執行緒,則創建新執行緒執行任務,會導致一定的系統開銷; 執行程序與前兩種稍微不同:
- 主執行緒呼叫SynchronousQueue的offer()方法放入task, 倘若此時執行緒池中有空閑的執行緒嘗試讀取 SynchronousQueue的task, 即呼叫了SynchronousQueue的poll(), 那么主執行緒將該task交給空閑執行緒. 否則執行(2)
- 當執行緒池為慷訓者沒有空閑的執行緒, 則創建新的執行緒執行任務.
- 執行完任務的執行緒倘若在60s內仍空閑, 則會被終止. 因此長時間空閑的CachedThreadPool不會持有任何執行緒資源.
關閉執行緒池
遍歷執行緒池中的所有執行緒,然后逐個呼叫執行緒的interrupt方法來中斷執行緒.
關閉方式 - shutdown
將執行緒池里的執行緒狀態設定成SHUTDOWN狀態, 然后中斷所有沒有正在執行任務的執行緒.
關閉方式 - shutdownNow
將執行緒池里的執行緒狀態設定成STOP狀態, 然后停止所有正在執行或暫停任務的執行緒. 只要呼叫這兩個關閉方法中的任意一個, isShutDown() 回傳true. 當所有任務都成功關閉了, isTerminated()回傳true.
ThreadPoolExecutor原始碼詳解
幾個關鍵屬性
//這個屬性是用來存放 當前運行的worker數量以及執行緒池狀態的 //int是32位的,這里把int的高3位拿來充當執行緒池狀態的標志位,后29位拿來充當當前運行worker的數量 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //存放任務的阻塞佇列 private final BlockingQueue<Runnable> workQueue; //worker的集合,用set來存放 private final HashSet<Worker> workers = new HashSet<Worker>(); //歷史達到的worker數最大值 private int largestPoolSize; //當佇列滿了并且worker的數量達到maxSize的時候,執行具體的拒絕策略 private volatile RejectedExecutionHandler handler; //超出coreSize的worker的生存時間 private volatile long keepAliveTime; //常駐worker的數量 private volatile int corePoolSize; //最大worker的數量,一般當workQueue滿了才會用到這個引數 private volatile int maximumPoolSize;
內部狀態
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // Packing and unpacking ctl private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }
其中AtomicInteger變數ctl的功能非常強大: 利用低29位表示執行緒池中執行緒數,通過高3位表示執行緒池的運行狀態:
- RUNNING: -1 << COUNT_BITS,即高3位為111,該狀態的執行緒池會接收新任務,并處理阻塞佇列中的任務;
- SHUTDOWN: 0 << COUNT_BITS,即高3位為000,該狀態的執行緒池不會接收新任務,但會處理阻塞佇列中的任務;
- STOP : 1 << COUNT_BITS,即高3位為001,該狀態的執行緒不會接收新任務,也不會處理阻塞佇列中的任務,而且會中斷正在運行的任務;
- TIDYING : 2 << COUNT_BITS,即高3位為010, 所有的任務都已經終止;
- TERMINATED: 3 << COUNT_BITS,即高3位為011, terminated()方法已經執行完成
任務的執行
execute –> addWorker –>runworker (getTask)
執行緒池的作業執行緒通過Woker類實作,在ReentrantLock鎖的保證下,把Woker實體插入到HashSet后,并啟動Woker中的執行緒, 從Woker類的構造方法實作可以發現: 執行緒工廠在創建執行緒thread時,將Woker實體本身this作為引數傳入,當執行start方法啟動執行緒thread時,本質是執行了Worker的runWorker方法, firstTask執行完成之后,通過getTask方法從阻塞佇列中獲取等待的任務,如果佇列中沒有任務,getTask方法會被阻塞并掛起,不會占用cpu資源;
execute()方法
ThreadPoolExecutor.execute(task)實作了Executor.execute(task)
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { //workerCountOf獲取執行緒池的當前執行緒數;小于corePoolSize,執行addWorker創建新執行緒執行command任務 if (addWorker(command, true)) return; c = ctl.get(); } // double check: c, recheck // 執行緒池處于RUNNING狀態,把提交的任務成功放入阻塞佇列中 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // recheck and if necessary 回滾到入隊操作前,即倘若執行緒池shutdown狀態,就remove(command) //如果執行緒池沒有RUNNING,成功從阻塞佇列中洗掉任務,執行reject方法處理任務 if (! isRunning(recheck) && remove(command)) reject(command); //執行緒池處于running狀態,但是沒有執行緒,則創建執行緒 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 往執行緒池中創建新的執行緒失敗,則reject任務 else if (!addWorker(command, false)) reject(command); }
- 為什么需要double check執行緒池的狀態?
在多執行緒環境下,執行緒池的狀態時刻在變化,而ctl.get()是非原子操作,很有可能付訓取了執行緒池狀態后執行緒池狀態就改變了,判斷是否將command加入workque是執行緒池之前的狀態,倘若沒有double check,萬一執行緒池處于非running狀態(在多執行緒環境下很有可能發生),那么command永遠不會執行,
addWorker方法
從方法execute的實作可以看出: addWorker主要負責創建新的執行緒并執行任務 執行緒池創建新執行緒執行任務時,需要 獲取全域鎖:
private final ReentrantLock mainLock = new ReentrantLock();
private boolean addWorker(Runnable firstTask, boolean core) { // CAS更新執行緒池數量 retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { // 執行緒池重入鎖 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); // 執行緒啟動,執行任務(Worker.thread(firstTask).start()); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
Worker類的runworker方法
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); // 創建執行緒 } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } // ... }
- 繼承了AQS類,可以方便的實作作業執行緒的中止操作;
- 實作了Runnable介面,可以將自身作為一個任務在作業執行緒中執行;
- 當前提交的任務firstTask作為引數傳入Worker的構造方法;
一些屬性還有構造方法:
//運行的執行緒,前面addWorker方法中就是直接通過啟動這個執行緒來啟動這個worker final Thread thread; //當一個worker剛創建的時候,就先嘗試執行這個任務 Runnable firstTask; //記錄完成任務的數量 volatile long completedTasks; Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; //創建一個Thread,將自己設定給他,后面這個thread啟動的時候,也就是執行worker的run方法 this.thread = getThreadFactory().newThread(this); }
runWorker方法是執行緒池的核心:
- 執行緒啟動之后,通過unlock方法釋放鎖,設定AQS的state為0,表示運行可中斷;
- Worker執行firstTask或從workQueue中獲取任務:
- 進行加鎖操作,保證thread不被其他執行緒中斷(除非執行緒池被中斷)
- 檢查執行緒池狀態,倘若執行緒池處于中斷狀態,當前執行緒將中斷,
- 執行beforeExecute
- 執行任務的run方法
- 執行afterExecute方法
- 解鎖操作
通過getTask方法從阻塞佇列中獲取等待的任務,如果佇列中沒有任務,getTask方法會被阻塞并掛起,不會占用cpu資源;
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // 先執行firstTask,再從workerQueue中取task(getTask()) while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
getTask方法
下面來看一下getTask()方法,這里面涉及到keepAliveTime的使用,從這個方法我們可以看出執行緒池是怎么讓超過corePoolSize的那部分worker銷毀的,
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
注意這里一段代碼是keepAliveTime起作用的關鍵:
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
allowCoreThreadTimeOut為false,執行緒即使空閑也不會被銷毀;倘若為ture,在keepAliveTime內仍空閑則會被銷毀,
如果執行緒允許空閑等待而不被銷毀timed == false,workQueue.take任務: 如果阻塞佇列為空,當前執行緒會被掛起等待;當佇列中有任務加入時,執行緒被喚醒,take方法回傳任務,并執行;
如果執行緒不允許無休止空閑timed == true, workQueue.poll任務: 如果在keepAliveTime時間內,阻塞佇列還是沒有任務,則回傳null;
任務的提交

- submit任務,等待執行緒池execute
- 執行FutureTask類的get方法時,會把主執行緒封裝成WaitNode節點并保存在waiters鏈表中, 并阻塞等待運行結果;
- FutureTask任務執行完成后,通過UNSAFE設定waiters相應的waitNode為null,并通過LockSupport類unpark方法喚醒主執行緒;
public class Test{ public static void main(String[] args) { ExecutorService es = Executors.newCachedThreadPool(); Future<String> future = es.submit(new Callable<String>() { @Override public String call() throws Exception { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } return "future result"; } }); try { String result = future.get(); System.out.println(result); } catch (Exception e) { e.printStackTrace(); } } }
在實際業務場景中,Future和Callable基本是成對出現的,Callable負責產生結果,Future負責獲取結果,
- Callable介面類似于Runnable,只是Runnable沒有回傳值,
- Callable任務除了回傳正常結果之外,如果發生例外,該例外也會被回傳,即Future可以拿到異步執行任務各種結果;
- Future.get方法會導致主執行緒阻塞,直到Callable任務執行完成;
submit方法
AbstractExecutorService.submit()實作了ExecutorService.submit() 可以獲取執行完的回傳值, 而ThreadPoolExecutor 是AbstractExecutorService.submit()的子類,所以submit方法也是ThreadPoolExecutor`的方法,
// submit()在ExecutorService中的定義 <T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Runnable task, T result); Future<?> submit(Runnable task); // submit方法在AbstractExecutorService中的實作 public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); // 通過submit方法提交的Callable任務會被封裝成了一個FutureTask物件, RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; }
通過submit方法提交的Callable任務會被封裝成了一個FutureTask物件,通過Executor.execute方法提交FutureTask到執行緒池中等待被執行,最終執行的是FutureTask的run方法;
FutureTask物件
public class FutureTask<V> implements RunnableFuture<V> 可以將FutureTask提交至執行緒池中等待被執行(通過FutureTask的run方法來執行)
- 內部狀態
/* The run state of this task, initially NEW. * ... * Possible state transitions: * NEW -> COMPLETING -> NORMAL * NEW -> COMPLETING -> EXCEPTIONAL * NEW -> CANCELLED * NEW -> INTERRUPTING -> INTERRUPTED */ private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6;
內部狀態的修改通過sun.misc.Unsafe修改
- get方法
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); }
內部通過awaitDone方法對主執行緒進行阻塞,具體實作如下:
private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); } }
如果主執行緒被中斷,則拋出中斷例外;
- 判斷FutureTask當前的state,如果大于COMPLETING,說明任務已經執行完成,則直接回傳;
- 如果當前state等于COMPLETING,說明任務已經執行完,這時主執行緒只需通過yield方法讓出cpu資源,等待state變成NORMAL;
- 通過WaitNode類封裝當前執行緒,并通過UNSAFE添加到waiters鏈表;
- 最終通過LockSupport的park或parkNanos掛起執行緒;
run方法
public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
FutureTask.run方法是在執行緒池中被執行的,而非主執行緒
- 通過執行Callable任務的call方法;
- 如果call執行成功,則通過set方法保存結果;
- 如果call執行有例外,則通過setException保存例外;
任務的關閉
shutdown方法會將執行緒池的狀態設定為SHUTDOWN,執行緒池進入這個狀態后,就拒絕再接受任務,然后會將剩余的任務全部執行完
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //檢查是否可以關閉執行緒 checkShutdownAccess(); //設定執行緒池狀態 advanceRunState(SHUTDOWN); //嘗試中斷worker interruptIdleWorkers(); //預留方法,留給子類實作 onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); } private void interruptIdleWorkers() { interruptIdleWorkers(false); } private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //遍歷所有的worker for (Worker w : workers) { Thread t = w.thread; //先嘗試呼叫w.tryLock(),如果獲取到鎖,就說明worker是空閑的,就可以直接中斷它 //注意的是,worker自己本身實作了AQS同步框架,然后實作的類似鎖的功能 //它實作的鎖是不可重入的,所以如果worker在執行任務的時候,會先進行加鎖,這里tryLock()就會回傳false if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
shutdownNow做的比較絕,它先將執行緒池狀態設定為STOP,然后拒絕所有提交的任務,最后中斷左右正在運行中的worker,然后清空任務佇列,
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); //檢測權限 advanceRunState(STOP); //中斷所有的worker interruptWorkers(); //清空任務佇列 tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; } private void interruptWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //遍歷所有worker,然后呼叫中斷方法 for (Worker w : workers) w.interruptIfStarted(); } finally { mainLock.unlock(); } }
更深入理解
為什么執行緒池不允許使用Executors去創建? 推薦方式是什么?
執行緒池不允許使用Executors去創建,而是通過ThreadPoolExecutor的方式,這樣的處理方式讓寫的同學更加明確執行緒池的運行規則,規避資源耗盡的風險, 說明:Executors各個方法的弊端:
- newFixedThreadPool和newSingleThreadExecutor: 主要問題是堆積的請求處理佇列可能會耗費非常大的記憶體,甚至OOM,
- newCachedThreadPool和newScheduledThreadPool: 主要問題是執行緒數最大數是Integer.MAX_VALUE,可能會創建數量非常多的執行緒,甚至OOM,
推薦方式 1
首先引入:commons-lang3包
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1, new BasicThreadFactory.Builder().namingPattern("example-schedule-pool-%d").daemon(true).build());
推薦方式 2
首先引入:com.google.guava包
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build(); //Common Thread Pool ExecutorService pool = new ThreadPoolExecutor(5, 200, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy()); // excute pool.execute(()-> System.out.println(Thread.currentThread().getName())); //gracefully shutdown pool.shutdown();
推薦方式 3
spring配置執行緒池方式:自定義執行緒工廠bean需要實作ThreadFactory,可參考該介面的其它默認實作類,使用方式直接注入bean呼叫execute(Runnable task)方法即可
<bean id="userThreadPool" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <property name="corePoolSize" value=https://www.cnblogs.com/huaweiyun/archive/2022/12/21/"10" /> <property name="maxPoolSize" value=https://www.cnblogs.com/huaweiyun/archive/2022/12/21/"100" /> <property name="queueCapacity" value=https://www.cnblogs.com/huaweiyun/archive/2022/12/21/"2000" /> <property name="threadFactory" value= https://www.cnblogs.com/huaweiyun/archive/2022/12/21/threadFactory />"rejectedExecutionHandler"> <ref local="rejectedExecutionHandler" /> </property> </bean> //in code userThreadPool.execute(thread);
配置執行緒池需要考慮因素
從任務的優先級,任務的執行時間長短,任務的性質(CPU密集/ IO密集),任務的依賴關系這四個角度來分析,并且近可能地使用有界的作業佇列,
性質不同的任務可用使用不同規模的執行緒池分開處理:
- CPU密集型: 盡可能少的執行緒,Ncpu+1
- IO密集型: 盡可能多的執行緒, Ncpu*2,比如資料庫連接池
- 混合型: CPU密集型的任務與IO密集型任務的執行時間差別較小,拆分為兩個執行緒池;否則沒有必要拆分,
監控執行緒池的狀態
可以使用ThreadPoolExecutor以下方法:
- 《Java并發編程藝術》
- https://www.jianshu.com/p/87bff5cc8d8c
- https://blog.csdn.net/programmer_at/article/details/79799267
- https://blog.csdn.net/u013332124/article/details/79587436
- https://www.journaldev.com/1069/threadpoolexecutor-java-thread-pool-example-executorservice
參考文章
- 《Java并發編程藝術》
- https://www.jianshu.com/p/87bff5cc8d8c
- https://blog.csdn.net/programmer_at/article/details/79799267
- https://blog.csdn.net/u013332124/article/details/79587436
- https://www.journaldev.com/1069/threadpoolexecutor-java-thread-pool-example-executorservice
點擊關注,第一時間了解華為云新鮮技術~
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/540454.html
標籤:其他
上一篇:java常用代碼整理
