重點內容
- 執行緒池的使?
- 創建執行緒池
- 提交任務
- 關閉執行緒池
- 執行緒池的原理
- 合理配置執行緒池
- 執行緒池的監控
1.執行緒池的創建
new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
- corePoolSize:執行緒池的基本大小, 提前呼叫prestartAllCoreThreads(),會把所有的基本執行緒啟動 ,
- workQueue: ?于保存等待執?的任務的阻塞佇列,
- ArrayBlockingQueue 基于陣列實作的(先進先出),
- LinkedBlockingQueue 吞吐量要高于ArrayBlockingQueue,
- SynchronousQueue 吞吐量要高于LinkedBlockingQueue 不存盤元素的阻塞佇列,得等一個執行緒做移除操作才能繼續進行,要不會一直阻塞,
- PriorityBlockingQueue 具有優先級的無限阻塞佇列,
- maximumPoolSize: 執行緒池允許創建的最?執行緒數,
- threadFactory: ?于設定創建執行緒的工廠可以使用谷歌的開源方法,
- handler: 飽和策略,阻塞佇列和我們的執行緒的創建數都滿了的時候就會飽和選擇一個策略對新提交的策略進行處理,
- AbortPolicy 直接拋出例外,
- CallerRunsPolicy 只用呼叫者所在的執行緒來處理任務,
- DiscardOldestPolicy 丟棄佇列里最近的一個任務,
- DiscardPolicy 直接丟棄,
- ?定義 自己定義一個處理方式,
- keepAliveTime:執行緒池的?作執行緒空閑后,保持存活的時間,
- unit:執行緒活動保持時間的單位,
2.提交任務
execute:?于提交不需要回傳值的任務
submit:?于提交需要回傳值的任務
shutdown:終止的時候會拋出例外
shutdownNow:中止的時候不會拋出例外
- 執行緒池測驗代碼
/** @Classname ThreadPoolDemo @Author XW @Date 2021/12/17 23:15 */
public class ThreadPoolDemo {
private static ThreadFactory namedThreadFactory =
new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();
private static ExecutorService pool =
new ThreadPoolExecutor(
2,
20,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(2),
namedThreadFactory,
new ThreadPoolExecutor.AbortPolicy());
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
pool.execute(new NoResultThread(i));
/**
* submit 驗證 Future<String> future = pool.submit(new ResultThread()); try {
* System.out.println("main thread get result: " + future.get()); // future.get(100,
* TimeUnit.MICROSECONDS); } catch (Exception e) { e.printStackTrace(); }
*/
}
// shutdown 驗證
System.out.println("執行shutdown! ");
pool.shutdown(); // 會繼續執行并且完成所有未執行的任務, 新提交的任務會被reject(通過reject策略)
for (int i = 10; i < 12; i++) {
pool.execute(new NoResultThread(i));
}
/**
* shutdownnow 驗證 System.out.println("執行shutdownnow! "); List<Runnable> runnableList =
* pool.shutdownNow(); // 會清除所有未執行的任務并且在運行執行緒上呼叫interrupt()
*/
System.out.println("pool shutdown state: " + pool.isShutdown());
while (true) {
if (pool.isTerminated()) {
System.out.println("pool terminated!");
break;
} else {
System.out.println("pool terminated state: " + pool.isTerminated());
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private static class NoResultThread implements Runnable {
private int taskNum;
public NoResultThread(int taskNum) {
this.taskNum = taskNum;
}
@Override
public void run() {
System.out.println("執行緒 " + Thread.currentThread().getName() + " 開始執行任務 " + this.taskNum);
try {
Thread.sleep(1000);
System.out.println("執行緒 " + Thread.currentThread().getName() + " 執行完任務 " + this.taskNum);
} catch (InterruptedException e) {
System.out.println(
"執行緒 "
+ Thread.currentThread().getName()
+ " 在執行任務 "
+ this.taskNum
+ " 時被中斷 :"
+ e.getMessage());
}
}
}
private static class ResultThread implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println(
Thread.currentThread().getState() + "----------" + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Thread.currentThread().getName();
}
}
}
3.執行緒池的實作原理

? 首先會判斷corePoolSize核心執行緒池是否已經滿了,沒滿就直接創建執行緒執行任務,滿了再去判斷佇列是否滿了,佇列沒有滿的話在把任務放在佇列里面,佇列如果滿的話,會將當前的執行緒數量跟maximumPoolSize進行對比如果沒滿的話就創建執行緒執行任務,maximumPoolSize也滿了話就按照策略(handler)處理無法執行的任務,注意執行緒池只要創建執行緒就會獲取全域鎖,


執行緒會根據worker去執行緒池里面拿任務
- 執行緒池execute的原始碼
public void execute(Runnable command) {
if (command == null) {
throw new NullPointerException();
}
/** ctl記錄著workCount和runState */
int c = ctl.get();
/** 第一步: 如果執行緒池中的執行緒數量小于核心執行緒數,那么創建執行緒并執行*/
if (workerCountOf(c) < corePoolSize) { // workerCountOf(c): 獲取當前活動的執行緒數
/**
* 在執行緒池中新建一個新的執行緒
* command:需要執行的Runnable執行緒
* true:新增執行緒時,【當前活動的執行緒數】是否 < corePoolSize
* false:新增執行緒時,【當前活動的執行緒數】是否 < maximumPoolSize
*/
if (addWorker(command, true)) {
// 添加新執行緒成功,則直接回傳,
return;
}
// 添加新執行緒失敗,則重新獲取【當前活動的執行緒數】
c = ctl.get();
}
/**
* 第二步:如果當前執行緒池是運行狀態 并且 任務添加到佇列成功
* (即:case2: 如果workCount >= corePoolSize,創建執行緒往workQueue添加執行緒任務,等待執行)
*/
// BlockingQueue<Runnable> workQueue 和 Runnable command
if (isRunning(c) && workQueue.offer(command)) { // 添加command到workQueue佇列中,
// 重新獲取ctl
int recheck = ctl.get();
// 再次check一下,當前執行緒池是否是運行狀態,如果不是運行時狀態,則把剛剛添加到workQueue中的command移除掉,并呼叫拒絕策略
if (!isRunning(recheck) && remove(command)) {
reject(command);
} else if (workerCountOf(recheck) == 0) { // 如果【當前活動的執行緒數】為0,則執行addWork方法
/**
* null:只創建執行緒,但不去啟動
* false:添加執行緒時,根據maximumPoolSize來判斷
*
* 如果 workerCountOf(recheck) > 0, 則直接回傳,在佇列中的command稍后會出佇列并且執行
*/
addWorker(null, false);
}
}
/**
* 第三步:滿足以下兩種條件之一,進入第三步判斷陳述句
* case1: 執行緒池不是正在運行狀態,即:isRunning(c)==false
* case2: workCount >= corePoolSize 并且 添加workQueue佇列失敗,即:workQueue.offer(command)==false
*
* 由于第二個引數傳的是false,所以如果workCount < maximumPoolSize,則創建執行執行緒;否則,進入方法體執行reject(command)
*/
else if (!addWorker(command, false)) {
// 執行執行緒創建失敗的拒絕策略
reject(command);
}
}
- 執行緒池addWorker的原始碼
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
/** 步驟一:試圖將workerCount+1 */
for (; ; ) {
int c = ctl.get();
// 獲得運行狀態runState
int rs = runStateOf(c);
/**
* 只有如下兩種情況可以新增worker,繼續執行下去:
* case one: rs == RUNNING
* case two: rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()
*/
if (rs >= SHUTDOWN && // 即:非RUNNING狀態(請查看isRunning()方法),執行緒池例外,表示不再去接收新的執行緒任務了,回傳false
/**
* 當執行緒池是SHUTDOWN狀態時,表示不再接收新的任務了,所以:
* case1:如果firstTask!=null,表示要添加新任務,則:新增worker失敗,回傳false,
* case2:如果firstTask==null并且workQueue為空,表示佇列中的任務已經處理完畢,不需要添加新任務了,
* 則:新增worker失敗,回傳false
*/
!(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) {
return false;
}
for (; ; ) {
// 獲得當前執行緒池里的執行緒數
int wc = workerCountOf(c);
/**
* 滿足如下任意情況,則新增worker失敗,回傳false
* case1:大于等于最大執行緒容量,即:int CAPACITY = 00011111111111111111111111111111 = 536870911(十進制)
* case2:當core是true時:>= 核心執行緒數
* 當core是false時:>= 最大執行緒數
*/
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) {
return false;
}
// 當前作業執行緒數加1
if (compareAndIncrementWorkerCount(c)) {
break retry; // 成功加1,則跳出retry標識的這兩層for回圈
}
// 如果執行緒數加1操作失敗,則獲取當前最新的執行緒池運行狀態
c = ctl.get();
// 判斷執行緒池運行狀態(rs)是否改變;如果不同,則說明方法處理期間執行緒池運行狀態發生了變化,重新獲取最新runState
if (runStateOf(c) != rs) {
continue retry; // 跳出內層for回圈,繼續從第一個for回圈執行
}
}
}
/**
* 步驟二:workerCount成功+1后,創建Worker,加入集合workers中,并啟動Worker執行緒
*/
boolean workerStarted = false; /** 用于判斷新的worker實體是否已經開始執行Thread.start() */
boolean workerAdded = false; /** 用于判斷新的worker實體是否已經被添加到執行緒池的workers佇列中 */
Worker w = null; // AQS.Worker
try {
w = new Worker(firstTask); /** 創建Worker實體,每個Worker物件都會針對入參firstTask來創建一個執行緒, */
final Thread t = w.thread; /** 從Worker中獲得新建的執行緒t */
if (t != null) {
final ReentrantLock mainLock = this.mainLock; /** 加重入鎖 */
/** ----------lock() 嘗試加鎖操作!!獲得鎖后繼續執行,沒獲得則等待直到獲得鎖為止---------- */
mainLock.lock();
try {
int rs = runStateOf(ctl.get()); /** 獲得執行緒池當前的運行狀態runStatus */
/**
* 滿足如下任意條件,即可向執行緒池中添加執行緒:
* case1:執行緒池狀態為RUNNING,(請查看isRunning()方法)
* case2:執行緒池狀態為SHUTDOWN并且firstTask為null,
*/
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) { /** 因為t是新構建的執行緒,還沒有啟動,所以,如果是alive狀態,說明已經被啟動了,則拋出例外 */
throw new IllegalThreadStateException();
}
workers.add(w); /** workers中保存執行緒池中存在的所有work實體集合 */
int s = workers.size();
if (s > largestPoolSize) { /** largestPoolSize用于記錄執行緒池中曾經存在的最大的執行緒數量 */
largestPoolSize = s;
}
workerAdded = true;
}
} finally {
mainLock.unlock(); /** ----------unlock 解鎖操作!!---------- */
}
if (workerAdded) {
t.start(); /** 開啟執行緒,執行Worker.run() */
workerStarted = true;
}
}
} finally {
if (!workerStarted) { // 如果沒有開啟執行緒
addWorkerFailed(w); // 往執行緒池中添加worker失敗了
}
}
return workerStarted;
}
- 執行緒池runWorker的原始碼
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
/**
* 如果執行緒池正在停止,請確保執行緒被中斷;否則,請確保執行緒不被中斷,
* 這需要在第二種情況下重新檢查以處理shutdownNow競賽,同時清除中斷
*
* 同時滿足如下兩個條件,則執行wt.interrupt()
* 1>執行緒狀態為STOP、TIDYING、TERMINATED 或者 (當前執行緒被中斷(清除中斷標記)并且執行緒狀態為STOP、TIDYING、TERMINATED)
* 2>當前執行緒wt沒有被標記中斷
*/
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
&& !wt.isInterrupted()) {
wt.interrupt();
}
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run(); /** 真正做事兒的地方了 */
} catch (RuntimeException x) {
thrown = x;
throw x;
} catch (Error x) {
thrown = x;
throw x;
} catch (Throwable x) {
thrown = x;
throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
4.合理配置執行緒池
任務的性質
- CPU密集型 : N cpu + 1 配置盡可能小的執行緒,執行緒數要少一點,減少cpu頻繁的背景關系切換,提高cpu的利用率
- IO 密集型 :2 * N cpu 需要配置盡可能多的執行緒,這樣才能保證cpu能被充分的利用
- 混合型 :拆分成CPU密集型和IO密集型
- N = Runtime.getRuntime().availableProcessors()
任務的優先級 :PriorityBlockingQueue
任務的執?時間
- 不同規模的執行緒池
- PriorityBlockingQueue 讓執行時間比較短的執行緒先執行
任務的依賴性
- 增加執行緒數量
- 使?有界佇列保證系統的穩定性
5.執行緒池的監控
taskCount 任務的數量
completedTaskCount 運行的程序中完成的任務數量
largestPoolSize 曾經創建過的最大的執行緒數量
getPoolSize 執行緒數量
getActiveCount 獲取活動的執行緒數
擴展執行緒池:beforeExecute、afterExecute 在執行緒執行前,執行后做點什么
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/387683.html
標籤:Java
上一篇:Sentinel-Go 原始碼系列(三)滑動時間視窗演算法的工程實作
下一篇:java實作簡易的局域網對話系統
