一.執行緒池的簡介與實作類
1.什么是執行緒池
字面意思可以抽象為所有水裝在一個池子中,執行緒池就是所有執行緒集合在一個堆疊空間所開辟出來的一個空間就是執行緒池,我們可以把要執行的多執行緒交給執行緒池來處理,和連接池的概念一樣,通過維護一定數量的執行緒池來達到多個執行緒的復用我可以這么理解,
2.執行緒池的好處
每個執行緒都要通過new Thread(xxRunnable).start()的方式來創建并運行一個執行緒亦或者繼承Runnable()介面實作run()來實作執行緒的使用,當啟用過多的執行緒會造成執行緒并發或者出現阻塞,這是有的可以使用MQ訊息佇列亦或者JVM底層優化進行full.gc()亦或者進行STW進行執行緒優化,執行緒池就是屬于一種方法執行緒少的話這不會是問題,而真實環境可能會開啟多個執行緒讓系統和程式達到最佳效率,當執行緒數達到一定數量就會耗盡系統的CPU和記憶體資源,也會造成GC頻繁收集和停頓,因為每次創建和銷毀一個執行緒都是要消耗系統資源的,如果為每個任務都創建執行緒這無疑是一個很大的性能瓶頸,所以,執行緒池中的執行緒復用極大節省了系統資源,當執行緒一段時間不再有任務處理時它也會自動銷毀,而不會長駐記憶體,
3.執行緒池的類

- corePoolSize:執行緒池的核心大小,也可以理解為最小的執行緒池大小,
- maximumPoolSize:最大執行緒池大小,
- keepAliveTime:空余執行緒存活時間,指的是超過corePoolSize的空余執行緒達到多長時間才進行銷毀,
- unit:銷毀時間單位,
- workQueue:存盤等待執行執行緒的作業佇列,
- threadFactory:創建執行緒的工廠,一般用默認即可,
- handler:拒絕策略,當作業佇列、執行緒池全已滿時如何拒絕新任務,默認拋出例外,
4.執行緒池的流程

1、判斷核心執行緒池是否已滿,沒滿則創建一個新的作業執行緒來執行任務,已滿則,
2、判斷任務佇列是否已滿,沒滿則將新提交的任務添加在作業佇列,已滿則,
3、判斷整個執行緒池是否已滿,沒滿則創建一個新的作業執行緒來執行任務,已滿則執行飽和策略,
(1、判斷執行緒池中當前執行緒數是否大于核心執行緒數,如果小于,在創建一個新的執行緒來執行任務,如果大于則
2、判斷任務佇列是否已滿,沒滿則將新提交的任務添加在作業佇列,已滿則,
3、判斷執行緒池中當前執行緒數是否大于最大執行緒數,如果小于,則創建一個新的執行緒來執行任務,如果大于,則執行飽和策略,)
5.Java中提供的執行緒池
Executors類提供了4種不同的執行緒池:newCachedThreadPool, newFixedThreadPool, newScheduledThreadPool, newSingleThreadExecutor,且這四種實作功能各有不同1、newCachedThreadPool:用來創建一個可以無限擴大的執行緒池,適用于負載較輕的場景,執行短期異步任務,(可以使得任務快速得到執行,因為任務時間執行短,可以很快結束,也不會造成cpu過度切換)
2、newFixedThreadPool:創建一個固定大小的執行緒池,因為采用無界的阻塞佇列,所以實際執行緒數量永遠不會變化,適用于負載較重的場景,對當前執行緒數量進行限制,(保證執行緒數可控,不會造成執行緒過多,導致系統負載更為嚴重)
3、newSingleThreadExecutor:創建一個單執行緒的執行緒池,適用于需要保證順序執行各個任務,
4、newScheduledThreadPool:適用于執行延時或者周期性任務,
6.execut()和submit()方法1、execute(),執行一個任務,沒有回傳值,
2、submit(),提交一個執行緒任務,有回傳值,
submit(Callable<T> task)能獲取到它的回傳值,通過future.get()獲取(阻塞直到任務執行完),一般使用FutureTask+Callable配合使用(IntentService中有體現),
submit(Runnable task, T result)能通過傳入的載體result間接獲得執行緒的回傳值,
submit(Runnable task)則是沒有回傳值的,就算獲取它的回傳值也是null,
Future.get方法會使取結果的執行緒進入阻塞狀態,知道執行緒執行完成之后,喚醒取結果的執行緒,然后回傳結果,
二.實作原理與簡介
1.執行緒池狀態
在ThreadPoolExecutor中定義了一個volatile變數,另外定義了幾個static final變數表示執行緒池的各個狀態:
volatile int runState; static final int RUNNING = 0; static final int SHUTDOWN = 1; static final int STOP = 2; static final int TERMINATED = 3;
unState表示當前執行緒池的狀態,它是一個volatile變數用來保證執行緒之間的可見性;
下面的幾個static final變數表示runState可能的幾個取值,
當創建執行緒池后,初始時,執行緒池處于RUNNING狀態;
如果呼叫了shutdown()方法,則執行緒池處于SHUTDOWN狀態,此時執行緒池不能夠接受新的任務,它會等待所有任務執行完畢;
如果呼叫了shutdownNow()方法,則執行緒池處于STOP狀態,此時執行緒池不能接受新的任務,并且會去嘗試終止正在執行的任務;
當執行緒池處于SHUTDOWN或STOP狀態,并且所有作業執行緒已經銷毀,任務快取佇列已經清慷訓執行結束后,執行緒池被設定為TERMINATED狀態,
2.執行操作
ThreadPoolExecutor類中其他的一些比較重要成員變數:
private final BlockingQueue<Runnable> workQueue; //任務快取佇列,用來存放等待執行的任務 private final ReentrantLock mainLock = new ReentrantLock(); //執行緒池的主要狀態鎖,對執行緒池狀態(比如執行緒池大小、runState等)的改變都要使用這個鎖 private final HashSet<Worker> workers = new HashSet<Worker>(); //用來存放作業集 private volatile long keepAliveTime; //執行緒存貨時間 private volatile boolean allowCoreThreadTimeOut; //是否允許為核心執行緒設定存活時間 private volatile int corePoolSize; //核心池的大小(即執行緒池中的執行緒數目大于這個引數時,提交的任務會被放進任務快取佇列) private volatile int maximumPoolSize; //執行緒池最大能容忍的執行緒數 private volatile int poolSize; //執行緒池中當前的執行緒數 private volatile RejectedExecutionHandler handler; //任務拒絕策略 private volatile ThreadFactory threadFactory; //執行緒工廠,用來創建執行緒 private int largestPoolSize; //用來記錄執行緒池中曾經出現過的最大執行緒數 · private long completedTaskCount; //用來記錄已經執行完畢的任務個數
execut()或submit()方法實作方法操作
public void execute(Runnable command) {
if (command == null)//首先,判斷提交的任務command是否為null,若是null,則拋出空指標例外
throw new NullPointerException();
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated
}
}
代碼實作操作:
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command))
如果執行緒池中當前執行緒數不小于核心池大小,那么就會直接進入下面的if陳述句塊了,如果執行緒池中當前執行緒數小于核心池大小則執行addIfUnderCorePoolSize(command);如果執行完回傳false責第一層判斷完成然后執行第二層
if (runState == RUNNING && workQueue.offer(command))
如果當前執行緒池處于RUNNING狀態,則將任務放入任務快取佇列;如果當前執行緒池不處于RUNNING狀態或者任務放入快取佇列失敗,則執行:
addIfUnderMaximumPoolSize(command)
如果執行addIfUnderMaximumPoolSize方法失敗,則執行reject()方法進行任務拒絕處理,
if (runState == RUNNING && workQueue.offer(command))
這句的執行,如果說當前執行緒池處于RUNNING狀態且將任務放入任務快取佇列成功,則繼續進行判斷
if (runState != RUNNING || poolSize == 0)
這句判斷是為了防止在將此任務添加進任務快取佇列的同時其他執行緒突然呼叫shutdown或者shutdownNow方法關閉了執行緒池的一種應急措施,如果是這樣就執行:
ensureQueuedTaskHandled(command) 進行應急處理添加到任務快取佇列中的任務得到處理,
addIfUnderCorePoolSize和addIfUnderMaximumPoolSize()方法:
private boolean addIfUnderCorePoolSize(Runnable firstTask) {
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (poolSize < corePoolSize && runState == RUNNING)
t = addThread(firstTask); //創建執行緒去執行firstTask任務
} finally {
mainLock.unlock();
}
if (t == null)
return false;
t.start();
return true;
}
首先獲取到鎖,因為這地方涉及到執行緒池狀態的變化,先通過if陳述句判斷當前執行緒池中的執行緒數目是否小于核心池大小,有朋友也許會有疑問:前面在execute()方法中不是已經判斷過了嗎,只有執行緒池當前執行緒數目小于核心池大小才會執行addIfUnderCorePoolSize方法的,為何這地方還要繼續判斷?原因很簡單,前面的判斷程序中并沒有加鎖,因此可能在execute方法判斷的時候poolSize小于corePoolSize,而判斷完之后,在其他執行緒中又向執行緒池提交了任務,就可能導致poolSize不小于corePoolSize了,所以需要在這個地方繼續判斷,然后接著判斷執行緒池的狀態是否為RUNNING,原因也很簡單,因為有可能在其他執行緒中呼叫了shutdown或者shutdownNow方法,然后就是執行
t = addThread(firstTask);
該方法傳的引數為提交的任務,回傳值為Thread型別,然后接著在下面判斷t是否為空,為空則表明創建執行緒失敗(即poolSize>=corePoolSize或者runState不等于RUNNING),否則呼叫t.start()方法啟動執行緒
addThread()方法:
private Thread addThread(Runnable firstTask) {
Worker w = new Worker(firstTask);
Thread t = threadFactory.newThread(w); //創建一個執行緒,執行任務
if (t != null) {
w.thread = t; //將創建的執行緒的參考賦值為w的成員變數
workers.add(w);
int nt = ++poolSize; //當前執行緒數加1
if (nt > largestPoolSize)
largestPoolSize = nt;
}
return t;
}
用提交的任務創建了一個Worker物件,然后呼叫執行緒工廠threadFactory創建了一個新的執行緒t,然后將執行緒t的參考賦值給了Worker物件的成員變數thread,接著通過workers.add(w)將Worker物件添加到作業集當中
Worker實作類:
private final class Worker implements Runnable {
private final ReentrantLock runLock = new ReentrantLock();
private Runnable firstTask;
volatile long completedTasks;
Thread thread;
Worker(Runnable firstTask) {
this.firstTask = firstTask;
}
boolean isActive() {
return runLock.isLocked();
}
void interruptIfIdle() {
final ReentrantLock runLock = this.runLock;
if (runLock.tryLock()) {
try {
if (thread != Thread.currentThread())
thread.interrupt();
} finally {
runLock.unlock();
}
}
}
void interruptNow() {
thread.interrupt();
}
private void runTask(Runnable task) {
final ReentrantLock runLock = this.runLock;
runLock.lock();
try {
if (runState < STOP &&
Thread.interrupted() &&
runState >= STOP)
boolean ran = false;
beforeExecute(thread, task); //beforeExecute方法是ThreadPoolExecutor類的一個方法,沒有具體實作,用戶可以根據
//自己需要多載這個方法和后面的afterExecute方法來進行一些統計資訊,比如某個任務的執行時間等
try {
task.run();
ran = true;
afterExecute(task, null);
++completedTasks;
} catch (RuntimeException ex) {
if (!ran)
afterExecute(task, ex);
throw ex;
}
} finally {
runLock.unlock();
}
}
public void run() {
try {
Runnable task = firstTask;
firstTask = null;
while (task != null || (task = getTask()) != null) {
runTask(task);
task = null;
}
} finally {
workerDone(this); //當任務佇列中沒有任務時,進行清理作業
}
}
}
workes繼承了Runble所以Thread t = threadFactory.newThread(w)跟Thread t = new Thread(w);效果等同
Worker實作了Runnable介面并實作run()方法
public void run() {
try {
Runnable task = firstTask;
firstTask = null;
while (task != null || (task = getTask()) != null) {
runTask(task);
task = null;
}
} finally {
workerDone(this);
}
首先執行的是通過構造器傳進來的任務firstTask,在呼叫runTask()執行完firstTask之后,在while回圈里面不斷通過getTask()去取新的任務來執行,那么去哪里取呢?自然是從任務快取佇列里面去取,getTask是ThreadPoolExecutor類中的方法,并不是Worker類中的方法,下面是getTask方法的實作:
Runnable getTask() {
for (;;) {
try {
int state = runState;
if (state > SHUTDOWN)
return null;
Runnable r;
if (state == SHUTDOWN) // Help drain queue
r = workQueue.poll();
else if (poolSize > corePoolSize || allowCoreThreadTimeOut) //如果執行緒數大于核心池大小或者允許為核心池執行緒設定空閑時間,
//則通過poll取任務,若等待一定的時間取不到任務,則回傳null
r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
else
r = workQueue.take();
if (r != null)
return r;
if (workerCanExit()) { //如果沒取到任務,即r為null,則判斷當前的worker是否可以退出
if (runState >= SHUTDOWN) // Wake up others
interruptIdleWorkers(); //中斷處于空閑狀態的worker
return null;
}
// Else retry
} catch (InterruptedException ie) {
// On interruption, re-check runState
}
}
}
在getTask中,先判斷當前執行緒池狀態,如果runState大于SHUTDOWN(即為STOP或者TERMINATED),則直接回傳null,
如果runState為SHUTDOWN或者RUNNING,則從任務快取佇列取任務,
如果當前執行緒池的執行緒數大于核心池大小corePoolSize或者允許為核心池中的執行緒設定空閑存活時間,則呼叫poll(time,timeUnit)來取任務,這個方法會等待一定的時間,如果取不到任務就回傳null,
然后判斷取到的任務r是否為null,為null則通過呼叫workerCanExit()方法來判斷當前worker是否可以退出,我們看一下workerCanExit()的實作:
private boolean workerCanExit() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
boolean canExit;
//如果runState大于等于STOP,或者任務快取佇列為空了
//或者 允許為核心池執行緒設定空閑存活時間并且執行緒池中的執行緒數目大于1
try {
canExit = runState >= STOP ||
workQueue.isEmpty() ||
(allowCoreThreadTimeOut &&
poolSize > Math.max(1, corePoolSize));
} finally {
mainLock.unlock();
}
return canExit;
}
如果執行緒池處于STOP狀態、或者任務佇列已為慷訓者允許為核心池執行緒設定空閑存活時間并且執行緒數大于1時,允許worker退出,如果允許worker退出,則呼叫interruptIdleWorkers()中斷處于空閑狀態的worker,我們看一下interruptIdleWorkers()的實作
void interruptIdleWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) //實際上呼叫的是worker的interruptIfIdle()方法
w.interruptIfIdle();
} finally {
mainLock.unlock();
}
}
interruptIfIdle()方法:
void interruptIfIdle() {
final ReentrantLock runLock = this.runLock;
if (runLock.tryLock()) { //注意這里,是呼叫tryLock()來獲取鎖的,因為如果當前worker正在執行任務,鎖已經被獲取了,是無法獲取到鎖的
//如果成功獲取了鎖,說明當前worker處于空閑狀態
try {
if (thread != Thread.currentThread())
thread.interrupt();
} finally {
runLock.unlock();
}
}
}
addIfUnderMaximumPoolSize方法的實作和addIfUnderCorePoolSize方法的實作思想非常相似,只不過前一個方法是在執行緒池中的執行緒數達到了核心池大小并且往任務佇列中添加任務失敗的情況下執行的:
private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (poolSize < maximumPoolSize && runState == RUNNING)
t = addThread(firstTask);
} finally {
mainLock.unlock();
}
if (t == null)
return false;
t.start();
return true;
}
且只是if陳述句判斷條件中的poolSize < maximumPoolSize不同而已
- 如果當前執行緒池中的執行緒數目小于corePoolSize,則每來一個任務,就會創建一個執行緒去執行這個任務;
- 如果當前執行緒池中的執行緒數目>=corePoolSize,則每來一個任務,會嘗試將其添加到任務快取佇列當中,若添加成功,則該任務會等待空閑執行緒將其取出去執行;若添加失敗(一般來說是任務快取佇列已滿),則會嘗試創建新的執行緒去執行這個任務;
- 如果當前執行緒池中的執行緒數目達到maximumPoolSize,則會采取任務拒絕策略進行處理;
- 如果執行緒池中的執行緒數量大于 corePoolSize時,如果某執行緒空閑時間超過keepAliveTime,執行緒將被終止,直至執行緒池中的執行緒數目不大于corePoolSize;如果允許為核心池中的執行緒設定存活時間,那么核心池中的執行緒空閑時間超過keepAliveTime,執行緒也會被終止,
3.執行緒池初始化
- prestartCoreThread():初始化一個核心執行緒;
- prestartAllCoreThreads():初始化所有核心執行緒
public boolean prestartCoreThread() {
return addIfUnderCorePoolSize(null); //注意傳進去的引數是null
}
public int prestartAllCoreThreads() {
int n = 0;
while (addIfUnderCorePoolSize(null))//注意傳進去的引數是null
++n;
return n;
}
如果傳進去的引數為null,則最后執行執行緒會阻塞在getTask方法中的r = workQueue.take();且有任務在等待中
4.任務快取佇列及排隊策略
workQueue的型別為BlockingQueue<Runnable>,通常可以取下面三種型別:
1)ArrayBlockingQueue:基于陣列的先進先出佇列,此佇列創建時必須指定大小;
2)LinkedBlockingQueue:基于鏈表的先進先出佇列,如果創建時沒有指定此佇列大小,則默認為Integer.MAX_VALUE;
3)synchronousQueue:這個佇列比較特殊,它不會保存提交的任務,而是將直接新建一個執行緒來執行新來的任務,
5.任務拒絕策略
執行緒池的任務快取佇列已滿并且執行緒池中的執行緒數目達到maximumPoolSize可以實作下面四種形式
ThreadPoolExecutor.AbortPolicy:丟棄任務并拋出RejectedExecutionException例外, ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不拋出例外, ThreadPoolExecutor.DiscardOldestPolicy:丟棄佇列最前面的任務,然后重新嘗試執行任務(重復此程序) ThreadPoolExecutor.CallerRunsPolicy:由呼叫執行緒處理該任務
6.執行緒池的關閉
- shutdown():不會立即終止執行緒池,而是要等所有任務快取佇列中的任務都執行完后才終止,但再也不會接受新的任務
- shutdownNow():立即終止執行緒池,并嘗試打斷正在執行的任務,并且清空任務快取佇列,回傳尚未執行的任務
7.執行緒池容量的動態調整
- setCorePoolSize:設定核心池大小
- setMaximumPoolSize:設定執行緒池最大能創建的執行緒數目大小
8.實作方法
public class Test {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(5));
for(int i=0;i<15;i++){
MyTask myTask = new MyTask(i);
executor.execute(myTask);
System.out.println("執行緒池中執行緒數目:"+executor.getPoolSize()+",佇列等待執行的任務數目:"+
executor.getQueue().size()+",已執行完的任務數目:"+executor.getCompletedTaskCount());
}
executor.shutdown();
}
}
class MyTask implements Runnable {
private int taskNum;
public MyTask(int num) {
this.taskNum = num;
}
@Override
public void run() {
System.out.println("正在執行task "+taskNum);
try {
Thread.currentThread().sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("task "+taskNum+"執行完成");
}
}
Java中建議使用Executors類中提供的幾個靜態方法來創建執行緒池
Executors.newCachedThreadPool(); //創建一個緩沖池,緩沖池容量大小為Integer.MAX_VALUE Executors.newSingleThreadExecutor(); //創建容量為1的緩沖池 Executors.newFixedThreadPool(int); //創建固定容量大小的緩沖池
/**
*靜態實作方法
**/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
newFixedThreadPool創建的執行緒池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;
newSingleThreadExecutor將corePoolSize和maximumPoolSize都設定為1,也使用的LinkedBlockingQueue;
newCachedThreadPool將corePoolSize設定為0,將maximumPoolSize設定為Integer.MAX_VALUE,使用的SynchronousQueue,也就是說來了任務就創建執行緒運行,當執行緒空閑超過60秒,就銷毀執行緒,
著作權歸作者所有,商業轉載請聯系作者獲得授權,非商業轉載請注明出處,/** * Creates a new {@code ThreadPoolExecutor} with the given initial * parameters and default thread factory. */publicThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler){if(corePoolSize <0|| maximumPoolSize <=0|| maximumPoolSize < corePoolSize || keepAliveTime <0)thrownewIllegalArgumentException();if(workQueue == null || threadFactory == null || handler == null)thrownewNullPointerException();this.acc = System.getSecurityManager()== null ? null : AccessController.getContext();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/115044.html
標籤:其他
上一篇:C/C++編程筆記:新手易錯集錦之main函式錯誤,編程小白必備!
下一篇:Java 資料型別的包裝資料型別
