目錄
一、引言
二、JDK提供的原生執行緒池
三、深入原始碼剖析執行緒池作業原理
execute:
addWoker:
四、深入原始碼分析執行緒池執行緒復用原理
五、自定義執行緒池實戰
五、執行緒池引數合理配置
六、參考
一、引言
-
一般在開發程序中,一個功能是運行時長太久了,一般是通過什么方式去優化的?
異步/多執行緒,對于一個業務方法而言,如果其中的呼叫鏈太長勢必會引起程式運行時間延長,導致整個系統吞吐來量下降,而我們使用多執行緒方式來對該方法的呼叫鏈進行優化,對于一些耦合度不是特別高的呼叫關系可以直接通過多執行緒來走異步的方式進行處理,大大的縮短了程式的運行時長,但是如果我們的多執行緒創建方式是通過new Thread();這種方式去進行顯式創建的話它真的可以嗎?答案是不可以,Why?答案如下: -
如果在生產環境使用
new Thread();這種方式去進行顯式創建執行緒會帶來什么后果?- 1. OOM: 如果當前方法突遇高并發情況,假設此時來了1000個請求,而按傳統的網路模型是BIO,此時服務器會開1000個執行緒來處理這1000個請求(不考慮WEB容器的最大執行緒數配置),當1000個請求執行時又會發現此方法中存在
new Thread();創建執行緒,此時每個執行請求的執行緒又會創建一個執行緒,此時就會出現1000*2=2000個執行緒的情況出現,而在一個程式中創建執行緒是需要向JVM申請記憶體分配的,但是此時大量執行緒在同一瞬間向JVM申請分配記憶體,此時會很容易造成記憶體溢位(OOM)的情況發生, - 2. 資源開銷與耗時: Java物件的生命周期大致包括三個階段:物件的創建,物件的使用,物件的清除,因此,物件的生命周期長度可用如下的運算式表示:Object = O1 + O2 +O3,其中O1表示物件的創建時間,O2表示物件的使用時間,而O3則表示其清除(垃圾回收)時間,由此,我們可以看出,只有O2是真正有效的時間,而O1、O3則是物件本身的開銷,當我們去創建一個執行緒時也是一樣,因為執行緒在Java中其實也是一個Thread類的實體,所以對于執行緒而言,其實它的創建(申請記憶體分配、JVM向OS提交執行緒映射行程申請、OS真實執行緒映射)和銷毀對資源是開銷非常大的并且非常耗時的,
- 3. 不可管理性: 對于
new Thread();的顯示創建出來的執行緒是無法管理的,一旦CPU調度成功,此執行緒的可管理性幾乎為零,
- 1. OOM: 如果當前方法突遇高并發情況,假設此時來了1000個請求,而按傳統的網路模型是BIO,此時服務器會開1000個執行緒來處理這1000個請求(不考慮WEB容器的最大執行緒數配置),當1000個請求執行時又會發現此方法中存在
-
那么我們使用執行緒池能給我們帶來什么好處?
-
- 降低資源消耗:通過重用已經創建的執行緒來降低執行緒創建和銷毀的消耗,
-
- 提高回應速度:任務到達時不需要等待執行緒創建就可以立即執行,
-
- 提高執行緒的可管理性:執行緒池可以統一管理、分配、調優和監控,
-
而在Java中為我們提供四種原生執行緒池,它們都是基于ThreadPoolExecutor類實作的,所以ThreadPoolExecutor類這也是我們待會兒分析執行緒池原理時的重點~

二、JDK提供的原生執行緒池
在Java中,JDK通過Executors類為我們提供了四種封裝好的執行緒池型別(ForkJoinPool不在本章探討范圍之內),原始碼如下:
//創建一個定長的執行緒池
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
//創建一個單執行緒的執行緒池
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}
//創建一個可快取支持靈活回收的執行緒池
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}
//創建一個支持周期執行任務的執行緒池
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
在上面的原始碼中,其實我們通過觀察發現JDK為我們提供的四種執行緒池內部都是通過封裝ThreadPoolExecutor類的建構式來進行執行緒池的初始化的,所以我們先來理清楚執行緒池“家族”體系,

從上圖中我們可以得知,執行緒池的最上層介面是Executor,而這個介面定義了一個核心方法execute(Runnable command),當我們使用它時,需要傳遞一個Runnable型別的異步任務作為引數,我們看一下Executor介面的定義:
public interface Executor {
// 提交任務到執行緒池并執行的方法
void execute(Runnable command);
}
而Executor介面是一個函式式介面,其中只定義了一個方法,但是我們在使用執行緒池的時候為什么能夠呼叫的方法卻會有那么多呢?因為還有一個ExecutorService介面,它繼承了Executor介面作為Executor介面的子介面,為Executor介面提供了很多拓展方法,我們接著看ExecutorService介面的實作:
```java
public interface ExecutorService extends Executor {
// 等待執行緒池執行完成已接收的任何后關閉執行緒池,將執行緒池置為SHUNTDOWM狀態
void shutdown();
// 嘗試主動終止執行緒池中的所有正在執行的任務并回傳未執行的任務串列,
// 將執行緒池置為STOP狀態
List<Runnable> shutdownNow();
// 判斷執行緒池是否已關閉:執行緒池呼叫過shutdown或者shutdownNow后回傳true
boolean isShutdown();
// 判斷執行緒池中的子執行緒是否已全部終止
// 當呼叫shutdown后全部任務執行完成回傳true或呼叫shutdownNow成功后回傳true
boolean isTerminated();
// 配合shutdown使用,在呼叫shutdown后呼叫該方法,讓執行緒池在指定時間內關閉,
// 不管任務是否執行完成,在指定時間內還在執行任務則拋出例外中斷執行緒
// 注意:有時能夠關閉執行緒池單并不能完全保證執行緒池中子執行緒停止執行
// 比如子執行緒中用到 BufferedReader,那么需要配合shutdownNow主動中斷所有子執行緒
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
// 向執行緒池提交一個Callable型別的異步任務,當執行緒池執行后回傳執行結果
<T> Future<T> submit(Callable<T> task);
// 向執行緒池提交一個Runnable型別的異步任務,執行緒池執行完成后將回傳指定型別的執行結果
<T> Future<T> submit(Runnable task, T result);
// 向執行緒池提交一個Runnable型別的異步任務,執行緒池執行完成后執行的結果
Future<?> submit(Runnable task);
// 傳入一個Collection型別的異步任務集合,批量執行并回傳執行結果
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
// 傳入一個Collection型別的異步任務集合,在指定的時間內批量執行并回傳執行
// 結果,如果超時則拋出例外中斷執行緒
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
// 傳入一個Collection型別的異步任務集合,回傳第一個執行完成的結果并終止其他執行緒
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
// 傳入一個Collection型別的異步任務集合,在指定的時間內回傳第一個執行完成的結果
// 并終止其他執行緒,如果超時則拋出例外中斷執行緒
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
通過上面的代碼我們會發現ExecutorService的確繼承了Executor介面,作為Executor拓展介面提供了很多其他的方法以便于開發人員使用執行緒池,而Executor和ExecutorService介面中的方法實作全部都是由ThreadPoolExecutor類來完成的,而ThreadPoolExecutor繼承了AbstractExecutorService,我們來看一下AbstractExecutorService的實作:
public abstract class AbstractExecutorService implements ExecutorService {
// 將異步任務包裝為Future,傳遞Runnable型別異步任務,宣告回傳型別,回傳一個RunnableFuture
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { };
// 將異步任務包裝為Future,傳遞Callable型別異步任務,回傳一個RunnableFuture
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { };
public Future<?> submit(Runnable task) {};
public <T> Future<T> submit(Runnable task, T result) { };
public <T> Future<T> submit(Callable<T> task) { };
// 在指定的時間內執行傳入的異步任務集合,回傳最后一個任務執行
//執行集合tasks結果是最后一個執行結束的任務結果
//可以設定超時 timed為true并且nanos是未來的一個時間
//任何一個任務完成都將會回傳結果
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
//傳入的任務集合不能為null
if (tasks == null)
throw new NullPointerException();
//傳入的任務數不能是0
int ntasks = tasks.size();
if (ntasks == 0)
throw new IllegalArgumentException();
//滿足上面的校驗后將任務分裝到一個ArrayList中
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
//并且創建一個執行器傳入this
//這里簡單講述他的執行原理,傳入this會使用傳入的this(型別為Executor)作為執行器用于執行任務,當submit提交任務的時候回將任務
//封裝為一個內部的Future并且重寫他的done而此方法就是在future完成的時候呼叫的,而他的寫法則是將當前完成的future添加到esc
//維護的結果佇列中
ExecutorCompletionService<T> ecs =
new ExecutorCompletionService<T>(this);
try {
//創建一個執行例外,以便后面拋出
ExecutionException ee = null;
//如果開啟了超時則計算死線時間如果時間是0則代表沒有開啟執行超時
final long deadline = timed ? System.nanoTime() + nanos : 0L;
//獲取任務的迭代器
Iterator<? extends Callable<T>> it = tasks.iterator();
//先獲取迭代器中的第一個任務提交給前面創建的ecs執行器
futures.add(ecs.submit(it.next()));
//前面記錄的任務數減一
--ntasks;
//當前激活數為1
int active = 1;
//進入死回圈
for (;;) {
//獲取剛才提價的任務是否完成如果完成則f不是null否則為null
Future<T> f = ecs.poll();
//如果為null則代表任務還在繼續
if (f == null) {
//如果當前任務大于0 說明除了剛才的任務還有別的任務存在
if (ntasks > 0) {
//則任務數減一
--ntasks;
//并且再次提交新的任務
futures.add(ecs.submit(it.next()));
//當前的存活的執行任務加一
++active;
}
//如果當前存活任務數是0則代表沒有任務在執行了從而跳出回圈
else if (active == 0)
break;
//如果當前任務執行設定了超時時間
else if (timed) {
//則設定指定的超時時間獲取
f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
//等待執行超時還沒有獲取到則拋出超時例外
if (f == null)
throw new TimeoutException();
//否則使用當前時間計算剩下的超時時間用于下一個回圈使用
nanos = deadline - System.nanoTime();
}
//如果沒有設定超時則直接獲取任務
else
f = ecs.take();
}
//如果獲取到了任務結果f!=null
if (f != null) {
//激活數減一
--active;
try {
//回傳獲取到的結果
return f.get();
//如果獲取結果出錯則包裝例外
} catch (ExecutionException eex) {
ee = eex;
} catch (RuntimeException rex) {
ee = new ExecutionException(rex);
}
}
}
//如果例外不是null則拋出如果是則創建一個
if (ee == null)
ee = new ExecutionException();
throw ee;
} finally {
//其他任務則設定取消
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
};
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
};
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
};
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout,
TimeUnit unit) throws InterruptedException {
};
}
(Executor介面有一個子介面ExecutorService,而AbstracExecutorService類又實作了ExecutorService介面,而ThreadPoolExcutor正是AbstrcExecutorService的子類)
到這里,大家應該明白了ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor幾個之間的關系了,
Executor是一個頂層介面,在它里面只宣告了一個方法execute(Runnable),回傳值為void,引數為Runnable型別,從字面意思可以理解,就是用來執行傳進去的任務的;
然后ExecutorService介面繼承了Executor介面,并宣告了一些方法:submit、invokeAll、invokeAny以及shutDown等;
抽象類AbstractExecutorService實作了ExecutorService介面,基本實作了ExecutorService中宣告的所有方法;
然后ThreadPoolExecutor繼承了類AbstractExecutorService,
在ThreadPoolExecutor類中有幾個非常重要的方法:execute()submit()shutdown()shutdownNow()
execute()方法實際上是Executor中宣告的方法,在ThreadPoolExecutor進行了具體的實作,這個方法是ThreadPoolExecutor的核心方法,通過這個方法可以向執行緒池提交一個任務,交由執行緒池去執行,
submit()方法是在ExecutorService中宣告的方法,在AbstractExecutorService就已經有了具體的實作,在ThreadPoolExecutor中并沒有對其進行重寫,這個方法也是用來向執行緒池提交任務的,但是它和execute()方法不同,它能夠回傳任務執行的結果,去看submit()方法的實作,會發現它實際上還是呼叫的execute()方法,只不過它利用了Future來獲取任務執行結果(Future相關內容將在以后章節講述),
shutdown()和shutdownNow()是用來關閉執行緒池的,
還有很多其他的方法:
比如:getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等獲取與執行緒池相關屬性的方法,有興趣的朋友可以自行查閱API,
而Executor介面最終被ThreadPoolExecutor類實作,而且ThreadPoolExecutor是執行緒池體系的核心類,此類的構造方法如下:
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
ThreadPoolExecutor類總共為我們提供了四個構造方法,前面三個構造方法都是呼叫最后一個全參的建構式來完成作業的,最后一個全參的構造方法需要我們傳遞7個引數,這七個引數的具體含義如下:
- 建構式引數串列:
- corePoolSize: 核心執行緒池的大小,如果核心執行緒池有空閑位置,這時新的任務就會被核心執行緒池新建一個執行緒執行,執行完畢后不會銷毀執行緒,執行緒會進入快取佇列等待再次被運行,
- maximunPoolSize: 執行緒池能創建最大的執行緒數量,如果核心執行緒池和快取佇列都已經滿了,新的任務進來就會創建新的執行緒來執行,但是數量不能超過maximunPoolSize,否側會采取拒絕接受任務策略,我們下面會具體分析,
- keepAliveTime: 非核心執行緒能夠空閑的最長時間,超過時間,執行緒終止,這個引數默認只有在執行緒數量超過核心執行緒池大小時才會起作用,只要執行緒數量不超過核心執行緒大小,就不會起作用(當然如果設定了allowCoreThreadTimeOut(true)執行緒池中的核心執行緒也受該引數的影響),
- unit: 時間單位,和keepAliveTime配合使用,可選擇項如下:
- TimeUnit.DAYS:天
- TimeUnit.HOURS:小時
- TimeUnit.MINUTES:分鐘
- TimeUnit.SECONDS:秒
- TimeUnit.MILLISECONDS:毫秒
- TimeUnit.MICROSECONDS:微妙
- TimeUnit.NANOSECONDS:納秒
- workQueue: 任務佇列,用來存放等待被執行的任務,一般為阻塞佇列(BlockingQueue)三種常用為:(可自定義阻塞佇列),
- ArrayBlockingQueue:基于陣列的先進先出佇列,此佇列創建時必須指定大小;
- LinkedBlockingQueue:基于鏈表的先進先出佇列,如果創建時沒有指定此佇列大小,則默認為Integer.MAX_VALUE;
- SynchronousQueue:這個佇列比較特殊,它不會保存提交的任務,而是將直接新建一個執行緒來執行新來的任務,
- threadFactory: 執行緒工廠,用來創建執行緒,一般有三種選擇策略(可自定義),
- handler: 任務拒絕策略,執行緒數量大于最大執行緒數就會采用拒絕處理策略,ThreadPoolExecutor中為我們提供了四種默認策略可選擇(可自定義):
- ThreadPoolExecutor.AbortPolicy:丟棄任務并拋出RejectedExecutionException例外,
- ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不拋出例外,
- ThreadPoolExecutor.DiscardOldestPolicy:丟棄佇列最前面的任務,然后重新嘗試執行任務(重復此程序)
- ThreadPoolExecutor.CallerRunsPolicy:由呼叫執行緒處理該任務
而當我們需要使用執行緒池時,我們可以通過呼叫Executors中為我們封裝好的方法創建執行緒池,也可以通過自己對于ThreadPoolExecutor的構造方法進行封裝自定義執行緒池(后面會詳細談到),示例如下:
public class ThreadPoolDemo {
public static void main(String[] args) {
/*
* 創建可快取的執行緒池
* 優點:當執行緒池中執行緒執行完任務后會將執行緒快取起來,默認60s后空閑執行緒會自動回收
* 缺點:任然存在由于并發過高導致瞬間創建大量執行緒產生的OOM
*/
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
cachedThreadPool.execute(() -> {
System.out.println("我是遞交到cachedThreadPool的異步任務....竹子....");
});
/*
* 創建定長的執行緒池
* 優點:可以避免由于并發過高導致瞬間創建大量執行緒產生的OOM
* 缺點:
* 1. 執行緒創建后永不釋放執行緒資源
* 2. 任務佇列最大長度為Integer.MAX_VALUE,并發時會創建大量的任務導致OOM
*/
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
Future<?> futureResult = fixedThreadPool.submit(() -> {
System.out.println("我是遞交到fixedThreadPool的異步任務....竹子...");
return "竹子";
});
try {
// 得到執行后回傳結果
String str = (String) futureResult.get();
System.out.println("我是遞交到fixedThreadPool的異步任務執行完成后的回傳結果:" + str);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
/*
* 創建定長可支持周期調度的執行緒池
* 優點:可以支持按時調度執行任務
* 缺點:
* 1. 執行緒創建后永不釋放執行緒資源
* 2. 任務佇列最大長度為Integer.MAX_VALUE,并發時會創建大量的任務導致OOM
*/
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
scheduledThreadPool.schedule(()->{
System.out.println("我是遞交到scheduledThreadPool十秒鐘之后執行的異步任務....熊貓...");
},10,TimeUnit.SECONDS);
/*
* 創建單執行緒的執行緒池
* 優點:可以支持執行緒池任務的執行按照遞交的順序先進先出(FIFO)
* 缺點:單執行緒效率比不上前面的三種執行緒池(前面的執行緒池都存在多執行緒并行執行任務)
*/
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
List<Callable<String>> callables = new ArrayList<>();
callables.add(()->{
System.out.println("我是遞交到singleThreadExecutor的異步任務...熊貓1號...");
return "熊貓一號";
});
callables.add(()->{
System.out.println("我是遞交到singleThreadExecutor的異步任務...熊貓2號...");
return "熊貓二號";
});
callables.add(()->{
System.out.println("我是遞交到singleThreadExecutor的異步任務...熊貓3號...");
return "熊貓三號";
});
try {
// 接收批量執行后的結果
List<Future<String>> futures = singleThreadExecutor.invokeAll(callables);
} catch (InterruptedException e) {
e.printStackTrace();
}
cachedThreadPool.shutdown();
fixedThreadPool.shutdown();
scheduledThreadPool.shutdown();
singleThreadExecutor.shutdown();
/* 執行結果:
* 我是遞交到cachedThreadPool的異步任務....竹子....
*
* 我是遞交到fixedThreadPool的異步任務....竹子...
* 我是遞交到fixedThreadPool的異步任務執行完成后的回傳結果:竹子
*
* 我是遞交到singleThreadExecutor的異步任務...熊貓1號...
* 我是遞交到singleThreadExecutor的異步任務...熊貓2號...
* 我是遞交到singleThreadExecutor的異步任務...熊貓3號...
*
* 我是遞交到scheduledThreadPool十秒鐘之后執行的異步任務....熊貓...
*/
}
}
在上面的案例中我們使用到了execute()、schedule()、submit()、invokeAll()等方法向執行緒池中遞交任務,但是當我們跟進原始碼分析會發現,執行緒池遞交任務的核心就是Executor介面定義的核心方法execute(Runnabel command),所以我們如果要分析執行緒池原理的重點就在此方法,

三、深入原始碼剖析執行緒池作業原理
在上一節我們從宏觀上介紹了ThreadPoolExecutor,下面我們來深入決議一下執行緒池的具體實作原理,將從下面幾個方面講解:
-
1. 執行緒池狀態控制引數ctl
要了解執行緒池,我們首先要了解的執行緒池里面的狀態控制的引數 ctl,這個執行緒池的狀態控制引數是一個原子操作的 AtomicInteger,這個ctl包含兩個引數 :runState:當前執行緒池的狀態workerCount:激活(作業)的執行緒數
-
它的低29位用于存放當前的執行緒數, 因此一個執行緒池在理論上最大的執行緒數是 536870911; 高 3 位是用于表示當前執行緒池的狀態, 其中高三位的值和狀態對應如下:
111: RUNNING:執行緒池初始化(創建出來之后)處于此狀態,能夠接收新任務,以及對已添加的任務進行處理,000: SHUTDOWN:當呼叫shutdown()方法時改為此狀態,在此狀態時,不接收新任務,但能處理已添加的任務,001: STOP:呼叫shutdownNow()方法時處于此狀態,在此狀態時,不接收新任務,不處理已添加的任務,并且會嘗試中斷正在處理的任務,010: TIDYING:當執行緒池在SHUTDOWN狀態下,阻塞佇列為空并且執行緒池中執行的任務也為空時,就會由 SHUTDOWN -> TIDYING,|| 當所有的任務已終止,ctl記錄的”任務數量”為0,執行緒池會變為TIDYING狀態,當執行緒池變為TIDYING狀態時,會執行鉤子函式terminated(),terminated()在ThreadPoolExecutor類中是空的,若用戶想在執行緒池變為TIDYING時,進行相應的處理;可以通過多載terminated()函式來實作,110: TERMINATED:執行緒池處在TIDYING狀態時,執行完terminated()之后,就會由 TIDYING -> TERMINATED,執行緒池徹底終止,就變成TERMINATED狀態,
-
為了能夠使用 ctl 執行緒池提供了三個方法:
// 獲取執行緒池的狀態
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 獲取執行緒池的作業執行緒數
private static int workerCountOf(int c) { return c & CAPACITY; }
// 根據作業執行緒數和執行緒池狀態獲取 ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }
- 2. 任務的執行
如果想使用執行緒池就必須通過 execute 這個方法來向執行緒池提交任務,而這個方法也是執行緒池的核心,所以我們來看代碼:
execute:
public void execute(Runnable command) {
//如果傳遞的任務為空則拋出空指標例外
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//如果作業執行緒數小于核心執行緒數,
if (workerCountOf(c) < corePoolSize) {
//執行addWork,提交為核心執行緒,提交成功return,提交失敗重新獲取ctl
if (addWorker(command, true))
return;
c = ctl.get();
}
//如果作業執行緒數大于核心執行緒數,則檢查執行緒池狀態是否是正在運行,且將新執行緒向阻塞佇列提交,
if (isRunning(c) && workQueue.offer(command)) {
//recheck 需要再次檢查,主要目的是判斷加入到阻塞隊里中的執行緒是否可以被執行
int recheck = ctl.get();
//如果執行緒池狀態不為running,將任務從阻塞佇列里面移除,啟用拒絕策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果執行緒池的作業執行緒為零,則呼叫addWoker提交任務
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//添加非核心執行緒失敗,拒絕
else if (!addWorker(command, false))
reject(command);
}

addWoker:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
//獲取執行緒池狀態
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 判斷是否可以添加任務,
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
//獲取作業執行緒數量
int wc = workerCountOf(c);
//是否大于執行緒池上限,是否大于核心執行緒數,或者最大執行緒數
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//CAS 增加作業執行緒數
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
//如果執行緒池狀態改變,回到開始重新來
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
//上面的邏輯是考慮是否能夠添加執行緒,如果可以就cas的增加作業執行緒數量
//下面正式啟動執行緒
try {
//新建worker
w = new Worker(firstTask);
//獲取當前執行緒
final Thread t = w.thread;
if (t != null) {
//獲取可重入鎖
final ReentrantLock mainLock = this.mainLock;
//鎖住
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// rs < SHUTDOWN ==> 執行緒處于RUNNING狀態
// 或者執行緒處于SHUTDOWN狀態,且firstTask == null(可能是workQueue中仍有未執行完成的任務,創建沒有初始任務的worker執行緒執行)
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 當前執行緒已經啟動,拋出例外
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//workers 是一個 HashSet 必須在 lock的情況下操作,
workers.add(w);
int s = workers.size();
//設定 largeestPoolSize 標記workAdded
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//如果添加成功,啟動執行緒
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//啟動執行緒失敗,回滾,
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

先看看 addWork()的兩個引數,第一個是需要提交的執行緒Runnable firstTask,第二個引數是 boolean 型別,表示是否為核心執行緒,execute() 中有三處呼叫了 addWork()我們逐一分析,
- 第一次,條件
if (workerCountOf(c) < corePoolSize)這個很好理解,作業執行緒數少于核心執行緒數,提交任務,所以addWorker(command, true), - 第二次,如果
workerCountOf(recheck) == 0如果worker的數量為0,那就addWorker(null,false),為什么這里是 null ?之前已經把command提交到阻塞佇列了workQueue.offer(command),所以提交一個空執行緒,直接從阻塞佇列里面取就可以了, - 第三次,如果執行緒池沒有
RUNNING或者offer阻塞佇列失敗,addWorker(command,false),很好理解,對應的就是,阻塞佇列滿了,將任務提交到,非核心執行緒池,與最大執行緒池比較,
至此,重新歸納execute()的邏輯應該是:
如果當前運行的執行緒,少于corePoolSize,則創建一個新的執行緒來執行任務,
如果運行的執行緒等于或多于corePoolSize,將任務加入BlockingQueue,
如果加入BlockingQueue成功,需要二次檢查執行緒池的狀態如果執行緒池沒有處于Running,則從BlockingQueue移除任務,啟動拒絕策略,
如果執行緒池處于Running狀態,則檢查作業執行緒(worker)是否為0,如果為0,則創建新的執行緒來處理任務,如果啟動執行緒數大于maximumPoolSize,任務將被拒絕策略拒絕,
如果加入BlockingQueue,失敗,則創建新的執行緒來處理任務,
如果啟動執行緒數大于maximumPoolSize,任務將被拒絕策略拒絕,
3. 執行緒池中的執行緒初始化
默認情況下,創建執行緒池之后,執行緒池中是沒有執行緒的,需要提交任務之后才會創建執行緒,
在實際中如果需要執行緒池創建之后立即創建執行緒,可以通過以下兩個方法辦到:
prestartCoreThread():初始化一個核心執行緒;
prestartAllCoreThreads():初始化所有核心執行緒;
下面是這2個方法的實作:
public boolean prestartCoreThread() {
return addIfUnderCorePoolSize(null); //注意傳進去的引數是null
}
public int prestartAllCoreThreads() {
int n = 0;
while (addIfUnderCorePoolSize(null))//注意傳進去的引數是null
++n;
return n;
}
注意上面傳進去的引數是null,根據第2小節的分析可知如果傳進去的引數為null,則最后執行執行緒會阻塞在getTask方法中的r = workQueue.take();即等待任務佇列中有任務,
4. 任務快取佇列及排隊策略
見執行緒池引數,在選擇執行緒池任務佇列時的阻塞時佇列就決定了這個執行緒池的任務快取及排隊策略,
5. 任務拒絕策略
當執行緒池的任務快取佇列已滿并且執行緒池中的執行緒數目達到maximumPoolSize,如果還有任務到來就會采取任務拒絕策略,具體拒絕策略參考執行緒池引數串列,
6. 執行緒池的關閉
ThreadPoolExecutor提供了兩個方法,用于執行緒池的關閉,分別是shutdown()和shutdownNow(),其中:
shutdown():不會立即終止執行緒池,而是要等所有任務快取佇列中的任務都執行完后才終止,但再也不會接受新的任務;
shutdownNow():立即終止執行緒池,并嘗試打斷正在執行的任務,并且清空任務快取佇列,回傳尚未執行的任務;
7. 執行緒池容量的動態調整
ThreadPoolExecutor提供了動態調整執行緒池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),setCorePoolSize:設定核心池大小setMaximumPoolSize:設定執行緒池最大能創建的執行緒數目大小
當上述引數從小變大時,ThreadPoolExecutor進行執行緒賦值,還可能立即創建新的執行緒來執行任務,

四、深入原始碼分析執行緒池執行緒復用原理
通過前面分析執行緒池的作業原理我們可以得知一個結論:在執行緒池內部關于執行緒的調度執行都是被封裝成一個Worker物件來操作的,而當我們使用Worker.thread.start()啟動執行緒時,JVM會呼叫Worker中重寫的run()方法執行,而Worker.run()方法原始碼如下:
/** Delegates main run loop to outer runWorker */
// 將執行緒運行主邏輯交給外部 Worker.runWorker()
public void run() {runWorker(this);}
我們進一步跟進Worker.runWorker()原始碼:
// 執行緒執行邏輯:執行回圈并反復從佇列獲取任務并執行
final void runWorker(Worker w) {
// 獲取當前執行執行緒
Thread wt = Thread.currentThread();
// 獲取當前傳遞進執行緒池的方法
Runnable task = w.firstTask;
// 將Worker.firstTask 置為空
w.firstTask = null;
// 允許發生執行緒中斷
w.unlock(); // allow interrupts
// 突然執行完成標志:是否因為例外跳出回圈
boolean completedAbruptly = true;
try {
// 1. 如果執行緒池外部傳遞了任務則直接執行外部傳遞的任務
// 2. 如果沒有獲取到外部傳遞進來的任務則呼叫getTask()去佇列中獲取任務并執行
// 2.1. 如果在任務佇列中獲取到了任務則直接執行已經獲取的任務
// 2.2. 如果任務佇列為空,沒有任務則反復執行慷訓圈阻塞當前執行緒死亡
while (task != null || (task = getTask()) != null) {
// 禁止執行緒中斷(防止執行緒在執行程序中中斷導致不可恢復的錯誤)
w.lock();
// 二次確認執行緒池以及當前作業執行緒狀態:
// 如果執行緒池停止,確保當前執行緒被中斷
// If pool is stopping, ensure thread is interrupted;
// 如果執行緒池為停止,請確保當前執行緒未被中斷
// if not, ensure thread is not interrupted. This
// 如果是第二種情況則需要重新檢測并且清除中斷
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 鉤子方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 呼叫任務的run方法,而不是start()方法,因為Worker本身就是一個執行緒類
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 鉤子方法
afterExecute(task, thrown);
}
} finally {
// 執行完成后將獲取的任務置空
task = null;
// 執行完成后自增當前作業執行緒執行的任務數量
w.completedTasks++;
// 釋放Worker中自實作的鎖
w.unlock();
}
}
// 如果執行緒能夠執行到最后一行代表執行緒執行程序中沒有由于發生例外導致跳出回圈,將 突然結束 標志改為false
completedAbruptly = false;
} finally {
// 執行回收作業執行緒的邏輯
processWorkerExit(w, completedAbruptly);
}
}
如上就是關于執行緒池復用的原理,簡單來說就是通過一個死回圈讓當前執行緒一直處于運行狀態,阻止OS將當前作業執行緒回收,從而做到執行緒的復用,而關于死回圈的條件則比較簡單,判斷task是否為空,在呼叫方法執行的時候會先獲取外部傳遞的任務,如果沒有獲取到外部傳遞的任務則呼叫getTask()方法獲取任務佇列中的任務并執行:
// 如果回傳null,在runWorker方法中會執行processWorkerExit,即關閉該執行緒,
private Runnable getTask() {
// 表示上次從佇列獲取任務是否超時
boolean timedOut = false; // Did the last poll() time out?
// 死回圈標志位
retry:
for (;;) {
int c = ctl.get(); // 獲取ctl
int rs = runStateOf(c); // 決議ctl獲取當前執行緒池運行狀態
// Check if queue empty only if necessary.
// 如果rs >= STOP,或者 rs=SHUTDOWN且佇列為空,此時不再接收新任務,將WorkerCount遞減并回傳null,
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount(); // 自旋CAS遞減workerCount直到成功
return null;
}
// timed用于判斷是否需要重試控制
boolean timed; // Are workers subject to culling?
for (;;) {
// allowCoreThreadTimeOut默認是false,核心執行緒不進行超時控制,
// 當執行緒數量大于corePoolSize時需要進行超時控制
int wc = workerCountOf(c);
timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 如果wc <= maximumPoolSize ,且上次從佇列獲取任務超時或本次需要進行超時控制,
// 則跳出內層回圈,
// timedOut=true表示上次從佇列獲取元素超時,說明佇列在上次獲取的keepAliveTime時間內是空的,
// timed=true說明執行緒數量大于corePoolSize,
// 所以timedOut=true和timed=true同時滿足則說明當前執行緒已經空閑了keepAliveTime時間,
// 并且執行緒池的數量大于corePoolSize,這時就需要關閉多余的空閑執行緒
//(即compareAndDecrementWorkerCount并回傳null),
if (wc <= maximumPoolSize && ! (timedOut && timed))
break;
// 如果執行緒數量大于maximumPoolSize,或者上次從佇列獲取任務超時且本次需要進行
// 超時控制,需要遞減WorkerCount,如果遞減成功則回傳null
if (compareAndDecrementWorkerCount(c))
return null;
//檢查執行緒池運行狀態是否改變,如果改變,那么繼續外層回圈,如果未改變,那么繼續內層回圈,
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
//超時方式獲取,注意keepAliveTime為超出corePoolSize大小的執行緒的空閑存活時間
workQueue.take(); //阻塞方式獲取,如果佇列為空阻塞當前執行緒
if (r != null)
return r;
timedOut = true; //如果超時,繼續回圈,
} catch (InterruptedException retry) {
//如果發生中斷,則將timedOut置為false,繼續回圈
timedOut = false;
}
}
}
在getTask()方法中的邏輯也比較簡單,前期效驗執行緒池狀態,一切正常時開始任務的獲取邏輯,但是值得注意的是這里使用的是阻塞時獲取方式,也就代表如果任務佇列中沒有任務,當前執行緒會阻塞等待,直到任務佇列中有新的任務時才會獲取并回傳執行,不過如果執行緒池設定了存活時間,那么當前執行緒會阻塞到存活時間的閾值,如果超出存活時間會回傳null,而如果回傳null,則在runWorker方法中會執行processWorkerExit,即關閉該作業執行緒,從而實作了執行緒池的另一個功能: 執行緒池內執行緒空閑時間超過給定的存活時間時自動回收該執行緒資源,
下面我們再來看看processWorkerExit方法的實作:
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果completedAbruptly=false,說明是由getTask回傳null導致的,WorkerCount遞減的操作已經執行
// 如果completedAbruptly=true,說明是由執行任務的程序中發生例外導致,需要進行WorkerCount遞減的操作
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
// 從workers中洗掉當前worker,對workers更新需要加mainLock鎖
workers.remove(w);
} finally {
mainLock.unlock();
}
// 根據執行緒池狀態判斷是否結束執行緒池
tryTerminate();
// 如果是例外結束(completedAbruptly=true),需要重新呼叫addWorker()增加一個執行緒,保持執行緒數量
// 如果是由getTask()回傳null導致的執行緒結束,需要進行以下判斷:
// 1)如果allowCoreThreadTimeOut=true且佇列不為空,那么需要至少保證有一個執行緒
// 2)如果allowCoreThreadTimeOut=false,那么需要保證執行緒數大于等于corePoolSize
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
關于執行緒池中作業執行緒的銷毀則是由processWorkerExit()方法來完成的,在這個方法中首先會判斷當前執行緒是因為執行出現例外還是超出存活時間導致需要發生回收的,如果是因為超出存活時間,先判斷執行緒池狀態之后再從作業集中移除當前執行緒即可,如果是由于例外導致的則需要先對執行緒池的作業執行緒數進行自減,然后再移除作業集中的作業執行緒,最后再呼叫addWorker()添加一個作業執行緒保證執行緒池內作業執行緒的數量,在上面的原始碼中我們也會看到tryTerminate()這個方法,那么我們也簡單分析一下它的原始碼:
//根據執行緒池狀態判斷是否結束執行緒池
final void tryTerminate() {
for (;;) {
int c = ctl.get(); // 獲取ctl
// 如果執行緒池運行狀態是RUNNING,或者大于等于TIDYING,或者運行狀態為
// SHUTDOWN且佇列為空,則直接return回傳
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 如果作業執行緒數不為0,則中斷一個空閑執行緒并return
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 嘗試將執行緒池狀態設定為TIDYING狀態
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
//如果CAS成功,執行terminated()鉤子方法
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
如果執行緒池狀態不處于STOP或者TERMINATED狀態則直接回傳,反之執行terminated()鉤子函式,
到此關于執行緒池的復用原理就告一段落了,關于執行緒池的復用原理只需要理解死回圈+getTask即可大致明白執行緒池復用的思維,
五、自定義執行緒池實戰
再前面我們曾提到,JDK為我們提供的已經封裝好的執行緒池實作在高并發情況下都會存在OOM的風險,而通過前面分析我們也可以得知,JDK提供的執行緒池也是通過封裝ThreadPoolExecutor的構造,所以我們在生產環境時更應該自定義執行緒池來規避這些風險以及更好的操作執行緒池,注:在《阿里巴巴java開發規范手冊》中明確規定如下:

所以在一般生產環境使用創建執行緒都是通過自定義執行緒池來使用執行緒資源,代碼如下:
public static void main(String[] args){
// 執行緒工廠可通過 implements ThreadFactory介面自定義
// 任務拒絕策略可通過 implements RejectedExecutionHandler介面自定義
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 3, 0,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(3),
Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
for (int i = 0; i < 10;i++){
final int num = i;
threadPoolExecutor.execute(()->{
System.out.println("執行緒:" + Thread.currentThread().getName() + "正在執行:" + num + "個任務");
});
System.out.println("執行緒池中執行緒數目:" + threadPoolExecutor.getPoolSize() + ",佇列中等待執行的任務數目:" + threadPoolExecutor.getQueue().size() + ",已執行玩別的任務數目:"+threadPoolExecutor.getCompletedTaskCount());
}
}
五、執行緒池引數合理配置
本節來討論一個比較重要的話題:如何合理配置執行緒池大小,參考如下:

六、參考
- 《Java并發編程的藝術》
- 《java并發編程實戰》

來自小編福利的分享
這篇文章到這里面就結束了,喜歡小編分享的技術內容可以點贊關注哦,小編這里也整理了對應的技術書籍,《Java并發編程的藝術》以及整理成檔案,需要的小伙伴可以 點我 免費領取 ,每天感謝有你們的支持,我會繼續努力的,


轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/198725.html
標籤:AI
