【1】為什么要使用執行緒池?
示例演示:
//設定業務模擬 class MyRunnable implements Runnable { private int count; public MyRunnable(int count) { this.count = count; } public int getCount() { return count; } @Override public void run() { for (int i = 0; i < 100000; i++) { count += i; } System.out.println("結果:"+count); } } //模擬執行緒池復用執行緒執行業務 public static void main(String[] args) throws InterruptedException { Long start = System.currentTimeMillis(); int count =0; ExecutorService executorService = Executors.newSingleThreadExecutor(); MyRunnable myRunnable = new MyRunnable(count); for (int i = 0; i < 1000; i++) { executorService.execute(myRunnable); } executorService.shutdown(); executorService.awaitTermination(1, TimeUnit.DAYS); System.out.println("時間:"+(System.currentTimeMillis() - start)); } //模擬每次執行業務都開一個執行緒 public static void main(String[] args) throws InterruptedException { Long start = System.currentTimeMillis(); int count =0; MyRunnable myRunnable = new MyRunnable(count); for (int i = 0; i < 1000; i++) { Thread thread = new Thread(myRunnable); thread.start(); thread.join(); } System.out.println("時間:" + (System.currentTimeMillis() - start)); }
示例結果:
采用每次都開一個執行緒的結果是292毫秒,而執行緒池的是69毫秒,(隨著業務次數的增多這個數值的差距會越大)
示例說明:
如果每個請求到達就創建一個新執行緒,開銷是相當大的,在實際使用中,服務器在創建和銷毀執行緒上花費的時間和消耗的系統資源都相當大,甚至可能要比在處理實際的用戶請求的時間和資源要多的多,除了創建和銷毀執行緒的開銷之外,活動的執行緒也需要消耗系統資源,
如果并發的請求數量非常多,但每個執行緒執行的時間很短,這樣就會頻繁的創建和銷毀執行緒,如此一來會大大降低系統的效率,可能出現服務器在為每個請求創建新執行緒和銷毀執行緒上花費的時間和消耗的系統資源要比處理實際的用戶請求的時間和資源更多,(說明了我們什么時候使用執行緒池:1.單個任務處理時間比較短;2.需要處理的任務數量很大;)
執行緒池主要用來解決執行緒生命周期開銷問題和資源不足問題,通過對多個任務重復使用執行緒,執行緒創建的開銷就被分攤到了多個任務上了,而且由于在請求到達時執行緒已經存在,所以消除了執行緒創建所帶來的延遲,這樣,就可以立即為請求服務,使用應用程式回應更快,另外,通過適當的調整執行緒中的執行緒數目可以防止出現資源不足的情況,
【2】執行緒池的介紹
(1)執行緒池優勢
1.重用存在的執行緒,減少執行緒創建,消亡的開銷,提高性能
2.提高回應速度,當任務到達時,任務可以不需要的等到執行緒創建就能立即執行,
3.提高執行緒的可管理性,執行緒是稀缺資源,如果無限制的創建,不僅會消耗系統資源,還會降低系統的穩定性,使用執行緒池可以進行統一的分配,調優和監控,
(2)常見執行緒池
1.newSingleThreadExecutor :單個執行緒的執行緒池,即執行緒池中每次只有一個執行緒作業,單執行緒串行執行任務
2.newFixedThreadExecutor(n) :固定數量的執行緒池,每提交一個任務就是一個執行緒,直到達到執行緒池的最大數量,然后后面進入等待佇列直到前面的任務完成才繼續執行
3.newCacheThreadExecutor(推薦使用) :可快取執行緒池, 當執行緒池大小超過了處理任務所需的執行緒,那么就會回收部分空閑(一般是60秒無執行)的執行緒,當有任務來時,又智能的添加新執行緒來執行,
4.newScheduleThreadExecutor :大小無限制的執行緒池,支持定時和周期性的執行執行緒
5.常見執行緒池的說明
在阿里的開發手冊中其實不推薦我們使用默認的執行緒池,為什么?
【1】Executors 回傳的執行緒池物件的弊端如下:
1)FixedThreadPool 和 SingleThreadPool:
允許的請求佇列長度為 Integer.MAX_VALUE,可能會堆積大量的請求,從而導致 OOM,
2)CachedThreadPool 和 ScheduledThreadPool:
允許的創建執行緒數量為 Integer.MAX_VALUE,可能會創建大量的執行緒,從而導致 OOM,
【2】其次newCacheThreadExecutor,沒有核心執行緒數,且非核心執行緒數是最大值,不斷創建執行緒容易出現CPU100%的問題,
(3)默認執行緒池
1.ThreadPoolExecutor
1)說明
實際上不管是newSingleThreadExecutor,newFixedThreadExecutor還是newCacheThreadExecutor,他們都是使用ThreadPoolExecutor去生成的,
只不過由于引數不同導致產生的執行緒池的不同,因此,我們常使用是ThreadPoolExecutor去自建自己想要的執行緒池,
2)引數決議
1.corePoolSize
執行緒池中的核心執行緒數,當提交一個任務時,執行緒池創建一個新執行緒執行任務,直到當前執行緒數等于corePoolSize;如果當前執行緒數為corePoolSize,繼續提交的任務被保存到 阻塞佇列中,等待被執行;如果執行了執行緒池的prestartAllCoreThreads()方法,執行緒池會提前創建并啟動所有核心執行緒,
2.maximumPoolSize
執行緒池中允許的最大執行緒數,如果當前阻塞佇列滿了,且繼續提交任務,則創建新的執行緒執行任務,前提是當前執行緒數小于maximumPoolSize;
3.keepAliveTime
執行緒池維護執行緒所允許的空閑時間,當執行緒池中的執行緒數量大于corePoolSize的時候,如果這時沒有新的任務提交,核心執行緒外的執行緒不會立即銷毀,而是會等待,直到等待的時間超過了keepAliveTime;
4.unit
keepAliveTime的單位;
5.workQueue
用來保存等待被執行的任務的阻塞佇列,且任務必須實作Runable介面,在JDK中提供了如下阻塞佇列:
1、ArrayBlockingQueue:基于陣列結構的有界阻塞佇列,按FIFO排序任務;
2、LinkedBlockingQuene:基于鏈表結構的阻塞佇列,按FIFO排序任務,吞吐量通常要高于ArrayBlockingQuene;
3、SynchronousQuene:一個不存盤元素的阻塞佇列,每個插入操作必須等到另一個執行緒呼叫移除操作,否則插入操作一直處于阻塞狀態,吞吐量通常要高于LinkedBlockingQuene;
4、priorityBlockingQuene:具有優先級的無界阻塞佇列;
6.threadFactory
它是ThreadFactory型別的變數,用來創建新執行緒,默認使用Executors.defaultThreadFactory() 來創建執行緒,使用默認的ThreadFactory來創建執行緒時,會使新創建的執行緒具有相同的NORM_PRIORITY優先級并且是非守護執行緒,同時也設定了執行緒的名稱,
7.handler
執行緒池的飽和策略,當阻塞佇列滿了,且沒有空閑的作業執行緒,如果繼續提交任務,必須采取一種策略處理該任務,執行緒池提供了4種策略:
1、AbortPolicy:直接拋出例外,默認策略;
2、CallerRunsPolicy:用呼叫者所在的執行緒來執行任務;
3、DiscardOldestPolicy:丟棄阻塞佇列中靠最前的任務,并執行當前任務;
4、DiscardPolicy:直接丟棄任務;
上面的4種策略都是ThreadPoolExecutor的內部類,
當然也可以根據應用場景實作RejectedExecutionHandler介面,自定義飽和策略,如記錄日志或持久化存盤不能處理的任務,(自定義的才是最常用的)
【3】執行緒池相關的類分析
1.ExecutorService介面與Executor介面
//定義了一個用于執行Runnable的execute方法 public interface Executor { void execute(Runnable command); } /** * 介面ExecutorService,其中定義了執行緒池的具體行為 * 1,execute(Runnable command):履行Ruannable型別的任務, * 2,submit(task):可用來提交Callable或Runnable任務,并回傳代表此任務的Future 物件 * 3,shutdown():在完成已提交的任務后封閉辦事,不再接管新任務, * 4,shutdownNow():停止所有正在履行的任務并封閉辦事, * 5,isTerminated():測驗是否所有任務都履行完畢了, * 6,isShutdown():測驗是否該ExecutorService已被關閉, */ public interface ExecutorService extends Executor { // 停止執行緒池 void shutdown(); // 立即停止執行緒池,回傳尚未執行的任務串列 List<Runnable> shutdownNow(); // 執行緒池是否停止 boolean isShutdown(); // 執行緒池是否終結 boolean isTerminated(); // 等待執行緒池終結 boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; // 提交Callable型別任務 <T> Future<T> submit(Callable<T> task); // 提交Runnable型別任務,預先知道回傳值 <T> Future<T> submit(Runnable task, T result); // 提交Runnable型別任務,對回傳值無感知 Future<?> submit(Runnable task); // 永久阻塞 - 提交和執行一個任務串列的所有任務 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; // 帶超時阻塞 - 提交和執行一個任務串列的所有任務 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; // 永久阻塞 - 提交和執行一個任務串列的某一個任務 <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; // 帶超時阻塞 - 提交和執行一個任務串列的某一個任務 <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
2.抽象類AbstractExecutorService
public abstract class AbstractExecutorService implements ExecutorService { protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); } protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); } public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; } .... }
3.ThreadPoolExecutor類
public class ThreadPoolExecutor extends AbstractExecutorService { ... public void execute(Runnable command) { if (command == null) int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); } ... }
4.ScheduledThreadPoolExecutor類
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService { ... public void execute(Runnable command) { schedule(command, 0, NANOSECONDS); } public Future<?> submit(Runnable task) { return schedule(task, 0, NANOSECONDS); } public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { if (callable == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture<V> t = decorateTask(callable, new ScheduledFutureTask<V>(callable, triggerTime(delay, unit))); delayedExecute(t); return t; } private void delayedExecute(RunnableScheduledFuture<?> task) { if (isShutdown()) reject(task); else { super.getQueue().add(task); if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else ensurePrestart(); } } ... }
5.問題點
1)execute方法與submit方法的區別?
【1】最明顯的就是 :
void execute() //提交任務無回傳值
Future<?> submit() //任務執行完成后有回傳值
【2】另外一個不明顯的就是佇列的提交方法(add【ScheduledThreadPoolExecutor類中使用】與offer【ThreadPoolExecutor類中使用】)
public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); }
明顯當佇列滿了的時候,add方法會拋出例外,而offer不會,
【4】執行緒池的狀態分析
1.執行緒池存在5種狀態
1)RUNNING = ‐1 << COUNT_BITS; //高3位為111 運行狀態
2)SHUTDOWN = 0 << COUNT_BITS; //高3位為000 關閉狀態
3)STOP = 1 << COUNT_BITS; //高3位為001 停止狀態
4)TIDYING = 2 << COUNT_BITS; //高3位為010 整理狀態
5)TERMINATED = 3 << COUNT_BITS; //高3位為011 銷毀狀態
2.狀態說明
1、RUNNING
(1) 狀態說明:執行緒池處在RUNNING狀態時,能夠接收新任務,以及對已添加的任務進行處理,
(02) 狀態切換:執行緒池的初始化狀態是RUNNING,換句話說,執行緒池被一旦被創建,就處于RUNNING狀態,并且執行緒池中的任務數為0!
2、 SHUTDOWN
(1)狀態說明:執行緒池處在SHUTDOWN狀態時,不接收新任務,但能處理已添加的任務,
(2)狀態切換:呼叫執行緒池的shutdown()介面時,執行緒池由RUNNING -> SHUTDOWN,
3、STOP
(1)狀態說明:執行緒池處在STOP狀態時,不接收新任務,不處理已添加的任務,并且會中斷正在處理的任務,
(2)狀態切換:呼叫執行緒池的shutdownNow()介面時,執行緒池由(RUNNING or SHUTDOWN ) -> STOP,
4、TIDYING
(1)狀態說明:當所有的任務已終止,ctl記錄的”任務數量”為0,執行緒池會變為TIDYING 狀態,當執行緒池變為TIDYING狀態時,會執行鉤子函式terminated(),terminated()在ThreadPoolExecutor類中是空的,若用戶想在執行緒池變為TIDYING時,進行相應的處理; 可以通過多載terminated()函式來實作,
(2)狀態切換:當執行緒池在SHUTDOWN狀態下,阻塞佇列為空并且執行緒池中執行的任務也為空時,就會由 SHUTDOWN -> TIDYING, 當執行緒池在STOP狀態下,執行緒池中執行的任務為空時,就會由STOP -> TIDYING,
5、 TERMINATED
(1)狀態說明:執行緒池徹底終止,就變成TERMINATED狀態,
(2)狀態切換:執行緒池處在TIDYING狀態時,執行完terminated()之后,就會由 TIDYING -> TERMINATED,
進入TERMINATED的條件如下:
執行緒池不是RUNNING狀態;
執行緒池狀態不是TIDYING狀態或TERMINATED狀態;
如果執行緒池狀態是SHUTDOWN并且workerQueue為空;
workerCount為0;
設定TIDYING狀態成功,
3.匯總
默認情況下,如果不呼叫關閉方法,執行緒池會一直處于 RUNNING 狀態,而執行緒池狀態的轉移有兩個路徑:當呼叫 shutdown() 方法時,執行緒池的狀態會從 RUNNING 到 SHUTDOWN,再到 TIDYING,最后到 TERMENATED 銷毀狀態;當呼叫 shutdownNow() 方法時,執行緒池的狀態會從 RUNNING 到 STOP,再到 TIDYING,最后到 TERMENATED 銷毀狀態,
4.圖示

【5】執行緒池的原始碼決議
1.針對自定義執行緒池的運行分析
1)示例代碼:
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(10));//自定義執行緒 for (int i = 1; i <= 100; i++) { threadPoolExecutor.execute(new MyTask(i)); }
2)示例結果:

3)示例疑問:
輸出的順序并不是預想的1-5,6-10,11-15,16-20,反而是1-5,16-20,6-10,11-15,(深入原始碼查探原因)
2.針對自定義執行緒池ThreadPoolExecutor類的運行分析
1)ThreadPoolExecutor類重要屬性 private final AtomicInteger ctl
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0))? //默認值-536870912 private static final int COUNT_BITS = Integer.SIZE - 3? //默認值29,轉為2進制11101 private static final int CAPACITY = (1 << COUNT_BITS)-1? //默認值536870911,轉為2進制11111111111111111111111111111 private static final int RUNNING = -1 << COUNT_BITS; //-536870912 private static final int SHUTDOWN = 0 << COUNT_BITS; //0 private static final int STOP = 1 << COUNT_BITS; //536870912 private static final int TIDYING = 2 << COUNT_BITS; //1073741824 private static final int TERMINATED = 3 << COUNT_BITS; //1610612736 //ctl相關方法 private static int runStateOf(int c) { return c & ~CAPACITY? } //runStateOf:獲取運行狀態;//~x=-(x+1) //默認值0 private static int workerCountOf(int c) { return c & CAPACITY? } //workerCountOf:獲取活動執行緒數; //默認值0,當執行緒數+1是值也會+1 private static int ctlOf(int rs, int wc) { return rs | wc? } //ctlOf:獲取運行狀態和活動執行緒數的值,//默認值-536870912 說明: ctl 是對執行緒池的運行狀態和執行緒池中有效執行緒的數量進行控制的一個欄位, 它包含兩部分的資訊: 執行緒池的運行狀態 (runState) 和執行緒池內有效執行緒的數量 (workerCount),
可以看到,使用了Integer型別來保存,高3位保存runState,低29位保存workerCount,COUNT_BITS 就是29,CAPACITY就是1左移29位減1(29個1),這個常量表示workerCount的上限值,大約是5億, PS: 1.&和&&的區別 相同點: 最終得到的boolean值結果一樣,都是“并且and”的意思 不同點: &既是邏輯運算子也是位運算子;&&只是邏輯運算子 &不具有短路效果,即左邊false,右邊還會執行;&&具有短路效果,左邊為false,右邊則不執行 2.| 和 || 的區別 相同點: 最終得到的boolean值結果一樣,都是“或者or”的意思 不同點: | 既是邏輯運算子也是位運算子;|| 只是邏輯運算子 | 不具有短路效果,即左邊true,右邊還會執行;|| 具有短路效果,左邊為true,右邊則不執行
2)ThreadPoolExecutor類#execute方法【這里涉及到一個概念,提交優先級: 核心執行緒>佇列>非核心執行緒】
展示
public void execute(Runnable command) { if (command == null) //不能提交空任務 throw new NullPointerException(); int c = ctl.get(); //獲取運行的執行緒數 //核心執行緒數不滿 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) //在addWorker中創建作業執行緒執行任務 return; c = ctl.get(); } //執行緒還在運行,且核心數滿了,放入執行緒池佇列中 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command))//執行緒池是否處于運行狀態,如果不是,則剛塞入的任務要移除 reject(command); //走拒絕策略 //這一步其實沒有很大意義,除非出現執行緒池所有執行緒完蛋了,但是佇列還有任務的情況,(一般是進入時時運行態,然后遇到狀態變更的情況) else if (workerCountOf(recheck) == 0) addWorker(null, false); } //插入佇列不成功,且當前執行緒數數量小于最大執行緒池數量,此時則創建新執行緒執行任務,創建失敗拋出例外 else if (!addWorker(command, false)) reject(command); //走拒絕策略 }
說明
在正常運行狀態下,執行緒池:核心執行緒執行任務-》塞入佇列-》非核心執行緒執行任務,
體現了在并發不激烈的情況下,盡量減少創建執行緒的操作,用已有的執行緒,而且核心執行緒數并不是提前創建的,而是用到的時候才會創建,而且核心執行緒數不滿,優先以創建執行緒來執行任務,
邏輯展示

3)ThreadPoolExecutor類#addWorker方法
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { //獲取執行緒池的狀態 int c = ctl.get(); int rs = runStateOf(c); //如果是非運行狀態(因為只有運行狀態是負數) if (rs >= SHUTDOWN && ! //判斷是不是關閉狀態,不接收新任務,但能處理已添加的任務 //任務是不是空任務,佇列是不是空(這一步說明了關閉狀態不接受任務) (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { //獲取活動執行緒數 int wc = workerCountOf(c); //檢驗執行緒數是否大于容量值【這是避免設定的非核心執行緒數沒有限制大小】 //根據傳入引數判斷核心執行緒數與非核心執行緒數是否達到了最大值 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //嘗試增加workerCount數量【也就是活躍執行緒數+1】,如果成功,則跳出第一個for回圈 if (compareAndIncrementWorkerCount(c)) break retry; // 如果增加workerCount失敗,則重新獲取ctl的值 c = ctl.get(); // 如果當前的運行狀態不等于rs,說明狀態已被改變,回傳第一個for回圈繼續執行 if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false; //執行緒啟動標志 boolean workerAdded = false; //執行緒添加標志 Worker w = null; try { //根據firstTask來創建Worker物件,每一個Worker物件都會創建一個執行緒 w = new Worker(firstTask); //【呼叫1】 final Thread t = w.thread; //如果過執行緒不為空,則試著將執行緒加入作業佇列中 if (t != null) { final ReentrantLock mainLock = this.mainLock; //加重入鎖 mainLock.lock(); try { // 重新獲取執行緒的狀態 int rs = runStateOf(ctl.get()); //是否執行緒池正處于運行狀態 if (rs < SHUTDOWN || //執行緒池是否處于關閉狀態 且 傳入的任務為空(說明關閉狀態還是能添加作業者,但是不允許添加任務) (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) //判斷執行緒是否存活 throw new IllegalThreadStateException(); //workers是一個HashSet,將該worker物件添加其中 workers.add(w); //記錄執行緒作業者的值 int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; //修改添加標記 workerAdded = true; } } finally { //解鎖 mainLock.unlock(); } //如果添加成功,則啟動執行緒 if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w);//【呼叫2】 } return workerStarted; }
//呼叫1 Worker(Runnable firstTask) { setState(-1); // 創建時不允許中斷 this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } //呼叫2:添加作業者失敗方法 private void addWorkerFailed(Worker w) { //加重入鎖 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //移除作業者 if (w != null) workers.remove(w); //任務數量減一 decrementWorkerCount(); //進入整理狀態() tryTerminate(); } finally { mainLock.unlock(); } }
說明
Worker繼承了AQS,使用AQS來實作獨占鎖的功能,為什么不使用ReentrantLock來實作呢?
可以看到tryAcquire方法,它是不允許重入的,而ReentrantLock是允許重入的:
1)lock方法一旦獲取了獨占鎖,表示當前執行緒正在執行任務中;
2)如果正在執行任務,則不應該中斷執行緒;
3)如果該執行緒現在不是獨占鎖的狀態,也就是空閑的狀態,說明它沒有在處理任務,這時可以對該執行緒進行中斷;
4)執行緒池在執行shutdown方法或tryTerminate方法時會呼叫interruptIdleWorkers方法來中斷空閑的執行緒,interruptIdleWorkers方法會使用tryLock方法來判斷執行緒池中的執行緒是否是空閑狀態;
5)之所以設定為不可重入,是因為我們不希望任務在呼叫像setCorePoolSize這樣的執行緒池控制方法時重新獲取鎖,如果使用ReentrantLock,它是可重入的,這樣如果在任務中呼叫了如setCorePoolSize這類執行緒池控制的方法,會中斷正在運行的執行緒,
所以,Worker繼承自AQS(AbstractQueuedSynchronizer類),用于判斷執行緒是否空閑以及是否可以被中斷,
此外,在構造方法中執行了setState(-1);,把state變數設定為-1,為什么這么做呢?是因為AQS中默認的state是0,如果剛創建了一個Worker物件,還沒有執行任務時,這時就不應該被中斷,tryAcquire方法是根據state是否是0來判斷的,所以,setState(-1);將state設定為-1是為了禁止在執行任務前對執行緒進行中斷,正因為如此,在runWorker方法中會先呼叫Worker物件的unlock方法將state設定為0.
4)ThreadPoolExecutor類#runWorker方法【這里有涉及到一個概念,執行優先級: 核心執行緒>非核心執行緒>佇列】
代碼展示
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; //取出任務 w.firstTask = null; //將作業者持有任務清空 w.unlock(); //將執行緒置為可中斷,因為創建時候設定不可中斷 boolean completedAbruptly = true; // 是否因為例外退出回圈 try { //當沒有任務的時候,優先從佇列里面獲取(自旋方式) while (task != null || (task = getTask()) != null) { w.lock(); //如果執行緒池正在停止,那么要保證當前執行緒是中斷狀態; if ((runStateAtLeast(ctl.get(), STOP) || // 如果不是的話,則要保證當前執行緒不是中斷狀態;(這里要考慮在執行該if陳述句期間可能也執行了shutdownNow方法,shutdownNow方法會把狀態設定為STOP,STOP狀態要中斷執行緒池中的所有執行緒,而這里使用Thread.interrupted()來判斷是否中斷是為了確保在RUNNING或者SHUTDOWN狀態時執行緒是非中斷狀態的,因為Thread.interrupted()方法會復位中斷的狀態,) (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { //如果不是中斷狀態,則呼叫task.run()執行任務 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { //completedAbruptly變數來表示在執行任務程序中是否出現了例外,在processWorkerExit方法中會對該變數的值進行判斷, processWorkerExit(w, completedAbruptly); } }
匯總說明
總結一下runWorker方法的執行程序:
1)while回圈不斷地通過getTask()方法獲取任務;
2)getTask()方法從阻塞佇列中取任務;
3)如果執行緒池正在停止,那么要保證當前執行緒是中斷狀態,否則要保證當前執行緒不是中斷狀態;呼叫task.run()執行任務;
4)如果task為null則跳出回圈,執行processWorkerExit()方法;
5)ThreadPoolExecutor類#getTask方法
代碼展示
private Runnable getTask() { // timeOut變數的值表示上次從阻塞佇列中取任務時是否超時 boolean timedOut = false; for (;;) { //獲取執行緒池狀態 int c = ctl.get(); int rs = runStateOf(c); //是否是非運行狀態(因為如果當前執行緒池狀態的值是SHUTDOWN或以上時,不允許再向阻塞佇列中添加任務,) //如果是非運行狀態:是不是STOP,TIDYING,TERMINATED ,三種狀態之一;或者佇列為空 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { //如果以上條件滿足,則將workerCount減1并回傳null decrementWorkerCount(); //CAS執行緒數減一 return null; } //重新獲取執行緒的數量 int wc = workerCountOf(c); //allowCoreThreadTimeOut默認是false,也就是核心執行緒不允許進行超時; //wc > corePoolSize,表示當前執行緒池中的執行緒數量大于核心執行緒數量;對于超過核心執行緒數量的這些執行緒,需要進行超時控制 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;//有非核心執行緒必定為true //wc > maximumPoolSize的情況是因為可能在此方法執行階段同時執行了setMaximumPoolSize方法; //timed && timedOut 如果為true,表示當前操作需要進行超時控制,并且上次從阻塞佇列中獲取任務發生了超時, if ((wc > maximumPoolSize || (timed && timedOut)) //接下來判斷,如果有效執行緒數量大于1,或者阻塞佇列是空的,那么嘗試將workerCount減1; //如果wc == 1時,也就說明當前執行緒是執行緒池中唯一的一個執行緒了, && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; //如果減1失敗,則回傳重試, continue; } try { //根據timed來判斷,如果為true(大概率是有非核心執行緒),則通過阻塞佇列的poll方法進行超時控制,如果在keepAliveTime時間內沒有獲取到任務,則回傳null; //否則通過take方法,如果這時佇列為空,則take方法會阻塞直到佇列不為空, Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; //判斷任務超時 } catch (InterruptedException retry) { // 如果獲取任務時當前執行緒發生了中斷,則設定timedOut為false并回傳回圈重試 timedOut = false; } } }
匯總說明
運行狀態下(這種情況下會把超出核心執行緒數的部分進入回收,也有一定概率回收核心執行緒):
情況1:當有非核心執行緒數的時候,timed為true,導致呼叫poll方法,這時候如果沒有任務且超時,timedOut變為true,第二次進入自旋,timed還是true,進入判斷會走compareAndDecrementWorkerCount,執行緒數減一,并回傳null,(這種情況存在極端情況就是,全部執行緒走到同一邏輯去減,導致全部執行緒數都被減完了【即時有著wc > 1的判斷,因為多執行緒并發情況,你懂得】)
情況2:沒有非核心執行緒數,timed為false,導致呼叫take方法,執行緒一致阻塞直至,拿到任務,(這時候不存在減少執行緒)
非運行狀態下(這種情況下是執行緒都會進入回收):
情況3:如果執行緒狀態是STOP,TIDYING,TERMINATED,那么呼叫decrementWorkerCount,執行緒數減一,回傳null,
情況4:如果執行緒狀態是SHUTDOWN,佇列不為空,則繼續任務,如果佇列為空,那么呼叫decrementWorkerCount,執行緒數減一,回傳null,
所以,綜上所述,非核心執行緒和核心執行緒其實都存在被回收的概率,
6)ThreadPoolExecutor類#processWorkerExit方法
代碼展示
//主要用于執行緒的清理作業 private void processWorkerExit(Worker w, boolean completedAbruptly) { // 如果completedAbruptly值為true,則說明執行緒執行時出現了例外,需要將workerCount減1; // 如果執行緒執行時沒有出現例外,說明在getTask()方法中已經已經對workerCount進行了減1操作,這里就不必再減了, if (completedAbruptly) decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //統計完成的任務數 completedTaskCount += w.completedTasks; //從workers中移除,也就表示著從執行緒池中移除了一個作業執行緒 workers.remove(w); } finally { mainLock.unlock(); } //根據執行緒池狀態進行判斷是否結束執行緒池 tryTerminate(); int c = ctl.get(); //當執行緒池是RUNNING或SHUTDOWN狀態時,如果worker是例外結束,那么會直接addWorker;如果是其他三種,就不會去補Worker, if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; //如果allowCoreThreadTimeOut=true(可設定),并且等待佇列有任務,至少保留一個worker; if (min == 0 && ! workQueue.isEmpty()) min = 1; //如果allowCoreThreadTimeOut=false(默認值),workerCount不少于corePoolSize,【靠后面的addWorker】 if (workerCountOf(c) >= min) return; } addWorker(null, false); } }
代碼說明
通過設定allowCoreThreadTimeOut引數,我們可以選擇核心執行緒的回收,在不用的時候保留一個worker,(這種更適用于某時間段高并發,其余時間段作業量不足的情況)
7)ThreadPoolExecutor類#tryTerminate方法
final void tryTerminate() { for (;;) { int c = ctl.get(); /** * 當前執行緒池的狀態為以下幾種情況時,直接回傳: * 1. RUNNING,因為還在運行中,不能停止; * 2. TIDYING或TERMINATED,因為執行緒池中已經沒有正在運行的執行緒了; * 3. SHUTDOWN并且等待佇列非空,這時要執行完workQueue中的task; */ if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; // 如果執行緒數量不為0,則中斷一個空閑的作業執行緒,并回傳 if (workerCountOf(c) != 0) { interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 這里嘗試設定狀態為TIDYING,如果設定成功,則呼叫terminated方法 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally { // 設定狀態為TERMINATED ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
8)ThreadPoolExecutor類#shutdown方法
//shutdown方法要將執行緒池切換到SHUTDOWN狀態,并呼叫 interruptIdleWorkers方法請求中斷所有空閑的worker,最后呼叫tryTerminate嘗試結束執行緒池, public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 安全策略判斷 checkShutdownAccess(); // 切換狀態為SHUTDOWN advanceRunState(SHUTDOWN); // 中斷空閑執行緒 interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } // 嘗試結束執行緒池 tryTerminate(); }
9)ThreadPoolExecutor類#interruptIdleWorkers方法
private void interruptIdleWorkers() { interruptIdleWorkers(false); }
//interruptIdleWorkers遍歷workers中所有的作業執行緒,若執行緒沒有被中斷tryLock成功,就中斷該執行緒,
//為什么需要持有mainLock?因為workers是HashSet型別的,不能保證執行緒安全,private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
10)ThreadPoolExecutor類#hutdownNow方法
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); // 中斷所有作業執行緒,無論是否空閑 interruptWorkers(); // 取出佇列中沒有被執行的任務 tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
11)問題思考
1.在runWorker方法中,執行任務時對Worker物件w進行了lock操作,為什么要在執行任務的時候對每個作業執行緒都加鎖呢?
(1)在getTask方法中,如果這時執行緒池的狀態是SHUTDOWN并且workQueue為空,那么就應該回傳null來結束這個作業執行緒,而使執行緒池進入SHUTDOWN狀態需要呼叫shutdown方法;
(2)shutdown方法會呼叫interruptIdleWorkers來中斷空閑的執行緒,interruptIdleWorkers持有mainLock,會遍歷workers來逐個判斷作業執行緒是否空閑,但getTask方法中沒有mainLock;
(3)在getTask中,如果判斷當前執行緒池狀態是RUNNING,并且阻塞佇列為空,那么會呼叫workQueue.take()進行阻塞;
(4)如果在判斷當前執行緒池狀態是RUNNING后,這時呼叫了shutdown方法把狀態改為了SHUTDOWN,這時如果不進行中斷,那么當前的作業執行緒在呼叫了workQueue.take()后會一直阻塞而不會被銷毀,因為在SHUTDOWN狀態下不允許再有新的任務添加到workQueue中,這樣一來執行緒池永遠都關閉不了;
(5)由上可知,shutdown方法與getTask方法(從佇列中獲取任務時)存在競態條件;
(6)解決這一問題就需要用到執行緒的中斷,也就是為什么要用interruptIdleWorkers方法,在呼叫workQueue.take()時,如果發現當前執行緒在執行之前或者執行期間是中斷狀態,則會拋出InterruptedException,解除阻塞的狀態;
(7)但是要中斷作業執行緒,還要判斷作業執行緒是否是空閑的,如果作業執行緒正在處理任務,就不應該發生中斷;
(8)所以Worker繼承自AQS,在作業執行緒處理任務時會進行lock,interruptIdleWorkers在進行中斷時會使用tryLock來判斷該作業執行緒是否正在處理任務,如果tryLock回傳true,說明該作業執行緒當前未執行任務,這時才可以被中斷,
【6】額外拓展
(1)有關阻塞佇列部分(可查看 java原生阻塞佇列詳解索引)
(2)有關Future和Callable的部分(可查看 針對Future部分的詳解)
tryTerminate
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/509584.html
標籤:其他
下一篇:資料批處理速度慢?不妨試試這個
