1. 前言
執行緒池是JAVA開發中最常使用的池化技術之一,可以減少執行緒資源的重復創建與銷毀造成的開銷,
2. 靈魂拷問:怎么做到執行緒重復利用?
很多同學會聯想到連接池,理所當然的說:需要的時候從池中取出執行緒,執行完任務再放回去,
如何用代碼實作呢?
此時就會發現,呼叫執行緒的start方法后,生命周期就不由父執行緒直接控制了,執行緒的run方法執行完成就銷毀了,所謂的“取出”和“放回”只不過是想當然的操作,
這里先說答案:生產者消費者模型
3. ThreadPoolExecutor的實作
3.1 結構
首先看下ThreadPoolExecutor的繼承結構
頂級介面是Executor,定義execute方法
ExecutorService添加了submit方法,支持回傳future獲取執行結果,以及執行緒池運行狀態的相關方法
本文著重講執行緒池的執行流程,因此將暫時忽略執行緒池的狀態相關的代碼,也建議新手看原始碼時從核心流程看起,
3.2 核心方法:execute()
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); // 判斷是否小于核心執行緒數 if (workerCountOf(c) < corePoolSize) { //添加worker,添加成功則退出 if (addWorker(command, true)) return; c = ctl.get(); } // 核心執行緒數已用完則放入佇列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 雙重檢查,避免入隊完成后,所有執行緒已銷毀,導致沒有消費者消費當前任務 if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 佇列已滿則開啟非核心執行緒,達到最大執行緒數則使用拒絕策略 else if (!addWorker(command, false)) reject(command); }
execute方法就是一個生產的程序,主要分為開啟執行緒和入隊
開啟執行緒會傳入command(即當前任務),開啟的執行緒會立即消費該任務
入隊的任務則會由Worker消費
主要關注corePoolSize,maximumPoolSize,queueSize(任務佇列長度),workerCount(當前worker數量)這幾個引數,可以總結為以下:
3.2 消費者:Worker
Worker類實作Runnable介面,繼承AQS,主要先關注thread和firstTask兩個屬性和run方法
Worker(Runnable firstTask) { setState(-1); this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }
從Worker的構造方法可以看出,thread就是執行緒池中真正消費任務的執行緒,創建時會傳入this(即Worker物件),而Worker實作了Runnable,因此執行緒運行時就是執行了Worker的run方法,
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); boolean completedAbruptly = true; try { // 重點關注 while (task != null || (task = getTask()) != null) { // ··· try { beforeExecute(wt, task); Throwable thrown = null; try { // 重點關注 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { // 重點關注 task = null; // ··· } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
再來看run方法,直接呼叫了ThreadPoolExecutor的runWorker方法,runWorker中有一個while回圈,回圈體執行了task.run方法
task首先會獲取Worker中的firstTask屬性,并將其置為null,因此firstTask只會執行一次,后續task將通過getTask方法獲取,
因此Worker的作業流程可以概括為:消費完Worker的firstTask后,回圈執行getTask獲取任務并消費,獲取不到task時,就退出回圈,執行緒銷毀,
此處便可以看出生產者消費者模型了,
private Runnable getTask() { boolean timedOut = false; for (;;) { int c = ctl.get(); // ··· int wc = workerCountOf(c); boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) // 此處回傳null,runWorker將退出回圈 return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
runWorker方法退出回圈的條件是getTask回傳null,
觀察getTask,只有同時滿足以下情況時才會回傳null
回傳的task是通過workQueue.poll和workQueue.take得到的
兩者執行時執行緒均會掛起,直至workQueue中有新的任務
不同之處在于poll方法阻塞keepAliveTime時間后會自動喚醒并回傳null,此時timeOut置為true,即滿足條件1
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); // ··· for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // ··· } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { // ··· if (workerAdded) { t.start(); workerStarted = true; } } } finally { // ··· } return workerStarted; }
了解了Worker之后,再來看execute中呼叫的addWorker方法
方法有兩個引數,firstTask即為Worker的firstTask,core則標記需要新增的是否是核心執行緒
retry回圈與執行緒池狀態相關,內層for回圈則是重復嘗試cas增加執行緒,若大于corePoolSize或者maximumPoolSize則新增失敗,cas成功后,new一個Worker并start
3.3 總結
回到最初的問題,執行緒是如何做到重復利用的?
并不存在取出執行緒使用完再歸還的操作,執行緒啟動后進入回圈,主動獲取任務執行,退出回圈則執行緒銷毀,
execute方法控制新增Worker和任務入隊
附:手寫簡易執行緒池
public class MyThreadPool implements Executor { private int corePoolSize; private int maximumPoolSize; private BlockingQueue<Runnable> queue; //記錄當前作業執行緒數 private AtomicInteger count; private long keepAliveTime; private RejectHandler rejectHandler; public MyThreadPool(int corePoolSize, int maximumPoolSize, BlockingQueue<Runnable> queue, long keepAliveTime, RejectHandler rejectHandler) { this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.queue = queue; this.keepAliveTime = keepAliveTime; this.rejectHandler = rejectHandler; count = new AtomicInteger(0); } @Override public void execute(Runnable task) { int ct = count.get(); //核心執行緒數未滿,嘗試增加核心執行緒 if (ct < corePoolSize && count.compareAndSet(ct, ct + 1)) { new Worker(task).start(); return; } //入隊 if (queue.offer(task)) { return; } //佇列已滿,嘗試增加非核心執行緒 if (ct < maximumPoolSize && count.compareAndSet(ct, ct + 1)) { new Worker(task).start(); return; } //已達最大執行緒數,拒絕 rejectHandler.reject(task); } class Worker extends Thread { Runnable firstTask; public Worker(Runnable firstTask) { this.firstTask = firstTask; } @Override public void run() { Runnable task = firstTask; firstTask = null; while (true) { try { //getTask會阻塞 if (task != null || (task = getTask()) != null) { task.run(); } else { //getTask超時才會進入,直接退出,執行緒銷毀 break; } } catch (InterruptedException e) { e.printStackTrace(); } finally { //置空,否則不能getTask task = null; } } } } Runnable getTask() throws InterruptedException { //標記是否超時過 boolean timedOut = false; while (true) { int ct = count.get(); //超出核心執行緒數才進入超時邏輯,即使timeOut由于執行緒poll超時過一次變成true,執行到這里如果不超出corePoolSize,可以再次進入take分支 if (ct > corePoolSize) { //超出核心執行緒數 if (timedOut) { //已超時過,嘗試減少作業執行緒數,失敗會continue,然后重新比較corePoolSize,重試減少執行緒數 if (count.compareAndSet(ct, ct - 1)) { return null; } else { continue; } } Runnable task = queue.poll(keepAliveTime, TimeUnit.MILLISECONDS); if (task == null) { //poll超時才進入 timedOut = true; continue; } return task; } else { //必然能獲取到task return queue.take(); } } } public static interface RejectHandler { void reject(Runnable r); } public static void main(String[] args) { MyThreadPool pool = new MyThreadPool(2, 5, new LinkedBlockingQueue<>(100), 2000, r -> { System.out.println(r + ": reject"); }); for (int i = 0; i < 3; i++) { final int x = i; new Thread(() -> { for (int j = 0; j < 5; j++) { final int y = j; pool.execute(() -> { try { Thread.sleep(3000L); } catch (InterruptedException e) { e.printStackTrace(); } LocalDateTime now = LocalDateTime.now(); System.out.println(String.format("執行緒i=%s, j=%s,執行結束: %s", x, y, now.format(DateTimeFormatter.ISO_DATE_TIME))); }); } }).start(); } } }
干貨來襲!阿里內部Java面試必備軟/硬實力+大廠面經全都有
有完整的Java初級,高級對應的學習路線和資料!專注于java開發,分享java基礎、原理性知識、JavaWeb實戰、spring全家桶、設計模式、分布式及面試資料、開源專案,助力開發者成長!
歡迎關注微信公眾號:碼邦主

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/266213.html
標籤:Java
上一篇:使用Groovy構建DSL

