文章目錄
- 一、Java 中的執行緒池
- 1. 執行緒池狀態
- 2. 執行緒池主要屬性引數
- 3. 執行緒池的實作原理
- 3.1 ThreadPoolExecutor 執行緒池主要處理流程
- 3.2 執行緒池方法決議
- 4. 合理地配置執行緒池
- 5. 執行緒池的監控
- 二、手寫執行緒池
- 1. 實作阻塞佇列
- 2. 實作執行緒池
- 3. 測驗
- 4. 拒絕策略
- 4.1 帶超時的添加任務
- 4.2 拒絕策略(策略模式)
- 4.3 測驗利用帶超時時間的拒絕策略
- 5. 完整代碼
一、Java 中的執行緒池
1. 執行緒池狀態
ThreadPoolExecutor使用int的高3位來表示執行緒池狀態,低29位表示執行緒數量
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

執行緒池狀態和執行緒池中執行緒的數量由一個原子整型ctl來共同表示
- 使用一個數來表示兩個值的主要原因是:可以通過一次CAS同時更改兩個屬性的值
// 原子整數,前3位保存了執行緒池的狀態,剩余位保存的是執行緒數量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//去掉前三位保存執行緒狀態的位數,剩下的用于保存執行緒數量
private static final int COUNT_BITS = Integer.SIZE - 3;
// 2^COUNT_BITS次方,表示可以保存的最大執行緒數
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
獲取執行緒池狀態、執行緒數量以及合并兩個值的操作
// Packing and unpacking ctl
// 傳入 ctl 值 獲取運行狀態 該操作會讓除高3位以外的數全部變為0
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 傳入 ctl 值 獲取運行執行緒數 該操作會讓高3位為0
private static int workerCountOf(int c) { return c & CAPACITY; }
// 傳入 rs 運行狀態 wc 執行緒數量 計算ctl新值
private static int ctlOf(int rs, int wc) { return rs | wc; }
2. 執行緒池主要屬性引數
//阻塞佇列,用于存放來不及被核心執行緒執行的任務
private final BlockingQueue<Runnable> workQueue;
// 全域鎖,解決創建銷毀執行緒等執行緒安全問題
private final ReentrantLock mainLock = new ReentrantLock();
// 用于存放核心執行緒的容器,只有當持有鎖時才能夠獲取其中的元素
private final HashSet<Worker> workers = new HashSet<Worker>();
//執行緒工廠,給執行緒取名字
private volatile ThreadFactory threadFactory;
// 拒絕執行處理器 處理拒絕策略
private volatile RejectedExecutionHandler handler;
// 救急執行緒(或者核心執行緒)空閑時的最大生存時間
private volatile long keepAliveTime;
// 核心執行緒數
private volatile int corePoolSize;
// 最大執行緒數
// 最大執行緒數 - 核心執行緒數 = 九級執行緒數
private volatile int maximumPoolSize;
// 默認拒絕策略
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
- corePoolSize : (核心執行緒數量),如果呼叫了執行緒池的 prestartAllCoreThreads( ) 方法,執行緒池會提前創建并啟動所有基本執行緒,否則是懶惰創建
- workQueue:用于保存等待執行的任務的阻塞佇列,可以選擇以下幾個具體實作
- ① ArrayBlockingQueue:是一個基于陣列的有界阻塞佇列,按FIFO(先進先出原則)排序,新任務進來后,會放到該佇列的隊尾,有界的陣列可以防止資源耗盡問題,
- ② LinkedBlockingQuene:基于鏈表的無界阻塞佇列(其實最大容量為Interger.MAX),按照FIFO排序,由于該佇列的近似無界性,當執行緒池中執行緒數量達到corePoolSize后,再有新任務進來,會一直存入該佇列,而不會去創建新執行緒直到maxPoolSize,因此使用該作業佇列時,引數maxPoolSize其實是不起作用的,吞吐量通常要高于ArrayBlockingQueue ,靜態工廠方法 Executors.newFixedThreadPool( ) 使用了該佇列
- ③ SynchronousQuene 是一個不存盤元素的阻塞佇列,每個插入操作必須等到另一個執行緒呼叫移除操作,否則一直處于阻塞狀態,吞吐量通常要高于LinkedBlockingQuene ,靜態工廠方法 Executors.newCachedThreadPool() 用的是此佇列
- ④ PriorityBlockingQueue:具有優先級的無界阻塞佇列,優先級通過引數Comparator實作,
- maximumPoolSize:執行緒池最大執行緒數量,包括了核心執行緒數量和救急執行緒數量
- threadFactory:執行緒工廠,可以給執行緒設定名字等
- handler:拒絕執行處理器 處理拒絕策略 在處理程序中具體講解
- keepAliveTime:執行緒活動保持時間,執行緒池作業執行緒空閑后,保持存活的時間,所以,如果任務很多,并且每個任務執行時間很多,可以調大存活時間,提高執行緒利用率
- unit:空閑執行緒存活時間單位
3. 執行緒池的實作原理
3.1 ThreadPoolExecutor 執行緒池主要處理流程


- 使用者 發布任務
- 如果當前運行的執行緒少于 核心執行緒數(corePoolSize),則創建新執行緒來執行任務(這一步需要獲得全域鎖,不然會引發執行緒安全問題)
- 如果運行的執行緒等于或者大于corePoolSize 則將任務加入阻塞佇列(BlockQueue)
- 如果BlockQueue 已滿,無法將任務加入佇列,則創建新執行緒來處理任務(這同樣需要獲得全域鎖)
- 此處就用到救急執行緒,其數量就是最大執行緒數減去核心執行緒數的數量
- 如果創建新執行緒使當前運行的執行緒超出maximumPoolSize 任務將被拒絕,并呼叫RejectedExecutionHandler.rejectedExecution(Runnable r, ThreadPoolExecutor executor) 方法
-
拒絕策略 jdk 提供了 4 種實作

-
AbortPolicy:讓呼叫者拋出 RejectedExecutionException 例外,這是默認策略
-
CallerRunsPolicy:讓呼叫者運行任務
-
DiscardPolicy:放棄本次任務
-
DiscardOldestPolicy:放棄佇列中最早的任務,本任務取而代之
-
明白了上述內容 我們就可以看看原始碼是如何實作的
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 作業執行緒數小于核心執行緒數 就 呼叫addWorker 創建新執行緒
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如果執行緒池還在運行,并且可以加入阻塞佇列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 重新檢查執行緒池是否還在運行 或者有的執行緒已經死了 必要時回滾佇列并且采用拒絕策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 執行緒池仍在運行 采用救急執行緒
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果創建執行緒失敗,則采用拒絕策略
else if (!addWorker(command, false))
reject(command);
}
3.2 執行緒池方法決議
① 添加執行緒的addWorker( ) 方法
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (; ; ) {
// 獲取表示狀態和執行緒數的原子整數
int c = ctl.get();
// 獲取執行緒池狀態
int rs = runStateOf(c);
// 如果執行緒池狀態不是 RUNNING 或者 阻塞佇列中有任務 則創建執行緒失敗
if (rs >= SHUTDOWN &&
!(rs == SHUTDOWN &&
firstTask == null &&
!workQueue.isEmpty()))
return false;
for (; ; ) {
// 獲取執行緒個數
int wc = workerCountOf(c);
// 如果執行緒數大于容量 或者 大于核心執行緒數或者最大執行緒數(用哪個系結取決于傳入的core)則創建執行緒失敗
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 在多執行緒情況下 如果CAS創建執行緒 修改 原子整數 失敗 則回滾到retry 重新回圈
if (compareAndIncrementWorkerCount(c))
break retry;
// 重新獲取 表示狀態和執行緒個數的原子整數
c = ctl.get();
// 如果 運行狀態和當初不同,則回滾重新回圈
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 創建新執行緒處理任務
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// 創建新執行緒需要獲得全域鎖
mainLock.lock();
try {
//加鎖的同時再次檢測 避免在釋放鎖之前呼叫了shut down
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 再次確認執行緒存活
if (t.isAlive())
throw new IllegalThreadStateException();
// 將該執行緒 加入到 HashSet集合中(執行緒池)
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
// 更新標志位
workerAdded = true;
}
} finally {
// 釋放鎖
mainLock.unlock();
}
// 如果作業執行緒成功添加,開始執行緒開始作業 并更新標志位
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 如果執行緒啟動失敗
// 呼叫addWorkerFailed(w)方法: 洗掉該作業執行緒 作業執行緒數減一,并且嘗試終止執行緒池
if (!workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
② 向執行緒池提交任務
- execute() : 上面已經分析過,用于提交不需要回傳值的任務,所以無法判斷任務是否被執行緒池執行成功
- submit() :用于提交需要回傳值的任務,執行緒池會回傳一個 future 物件,通過這個future物件可以判斷任務是否執行成功,并且可以通過future 的get () 方法來獲取回傳值,get() 方法會阻塞當前執行緒直到任務完成,也可以使用帶超時時間的get() ,這里著重分析該方法
使用
public static void main(String[] args) {
ExecutorService threadPool = Executors.newFixedThreadPool(2);
// 通過submit執行Callable中的call方法
Future<String> future = threadPool.submit(new Callable<String>() {
@Override
public String call() throws Exception {
return "OKK";
}
});
try {
// 通過future 來獲得回傳值
System.out.println(future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
③ 關閉執行緒池的方法
shutdown() 將執行緒池的狀態設定成 SHUTDOWN 中斷沒有執行任務的執行緒,其他執行緒執行完任務,自己消亡
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
// 獲取全域鎖
mainLock.lock();
try {
// 通過安全管理器看是否有權關閉執行緒池
checkShutdownAccess();
// 將執行緒池狀態設定為 SHUTDOWN
advanceRunState(SHUTDOWN);
// 打斷 空閑的作業執行緒
interruptIdleWorkers();
// 給子類提供一些擴展方法
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 嘗試終結執行緒池
tryTerminate();
}
final void tryTerminate() {
for (; ; ) {
// 獲取存盤狀態和執行緒數量的 原子整數
int c = ctl.get();
// 如果存在以下三種情況,嘗試終結執行緒池失敗
// 1、執行緒池狀態為RUNNING
// 2、執行緒池狀態為 RUNNING SHUTDOWN STOP (狀態值大于TIDYING)
// 3、執行緒池狀態為SHUTDOWN,但阻塞佇列中還有任務等待執行
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))
return;
// 如果存活執行緒數不為0 打斷空閑的執行緒
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
// 獲取全域鎖
mainLock.lock();
try {
// 嘗試使用CAS將執行緒池狀態改為 TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
// 通過CAS將執行緒池狀態改為TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
shutdownNow() 首先將執行緒池狀態設定為STOP,然后嘗試停止所有的正在執行或暫停人物的執行緒,并回傳等待執行的任務的串列
public List<Runnable> shutdownNow() {
// 回傳的任務串列
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
// 獲取全域鎖
mainLock.lock();
try {
// 通過安全管理器看是否有權關閉執行緒池
checkShutdownAccess();
// 將執行緒池狀態設定為STOP
advanceRunState(STOP);
// 遍歷打斷所有執行緒
interruptWorkers();
// 將未執行的任務從佇列中移除,然后回傳給呼叫者
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
4. 合理地配置執行緒池
可以從以下角度分析:
- 任務的性質:CPU密集型任務、IO密集型任務、混合型任務
- CPU密集型應該配置盡可能小的執行緒,通常采用cpu核數+1能夠實作最優的CPU利用率,+1是保證當執行緒由于頁缺失故障(作業系統或其它原因導致暫停時,額外的這個執行緒就能頂上去,保證CPU時鐘周期不被浪費
- IO密集型應該配置盡可能多的執行緒,因為CPU不總是處于繁忙狀態,例如,當你執行業務計算時,這時候會使用CPU資源,但當你執行IO操作時、遠程RPC呼叫時,包括進行資料庫操作時,這時候CPU就閑下來了,你可以利用多執行緒提高它的利用率,
- 任務的優先級:高、中和低
- 任務的執行時間:長、中和短
- 任務的依賴性:是否依賴其他系統資源,如資料庫連接
注意:
建議使用有界佇列,有界佇列能增加系統的穩定性和預警能力,可以根據需要設大一點,比如幾千,
有一次,我們系統里后臺任務執行緒的佇列和執行緒池全滿了,不斷拋出拋棄任務的例外,通過排查發現是資料庫出現的問題,導致執行SQL變得非常緩慢,因為后臺任務執行緒池里的任務全是需要向資料庫查詢和插入資料的,所以導致執行緒池里的作業執行緒全部阻塞,任務積壓在執行緒池里,如果當時我們設定成無界佇列,那么執行緒池的佇列就會越來越多,最多可能會撐滿記憶體,OutOfMemory,導致整個系統不可用
5. 執行緒池的監控
如果在系統中大量使用執行緒池,則有必要對執行緒池進行監控,方便在出現問題時,快速定位問題,可以通過執行緒池提供的引數進行監控:
- largestPoolSize:執行緒池里曾經創建過的最大執行緒數量,可以判斷知道執行緒池是否滿過
- completedTaskCount:執行緒池已完成的任務數量
- getPoolSize( ):執行緒池的執行緒數量
- getActiveCount():獲取活動的執行緒數
也可以通過擴展執行緒池進行監控,通過繼承執行緒池來自定義執行緒池,重寫執行緒池的下列方法,在一些特定的時間段進行一些監控
- protected void beforeExecute(Thread t, Runnable r) { }
- protected void afterExecute(Runnable r, Throwable t) { }
- protected void terminated() { }
二、手寫執行緒池
1. 實作阻塞佇列
主要欄位
- 任務佇列queue 用于存放發布的任務
- ReentrantLock加鎖保證取放任務的執行緒安全
- fullWaitSet 和 emptyWaitSet 作為任務佇列滿或者空 時的等待佇列
主要方法
- 執行緒池獲取任務 T take()
- 多載方法,分別用于沒有時間限制的獲取任務以及帶超時的獲取任務
- 若任務佇列為空 進入等待佇列
- 不為空則選取第一個
- 主執行緒 添加任務 void put(T task)
- 獲取 任務佇列的任務個數 size ()
/**
* 阻塞佇列
* @param <T>
*/
class BlockQueue<T> {
/**
* 任務佇列
*/
private Deque<T> queue = new ArrayDeque<>();
/**
* 鎖
*/
private ReentrantLock lock = new ReentrantLock();
/**
* 任務佇列滿后 生產者進入等待佇列等待
*/
private Condition fullWaitSet = lock.newCondition();
/**
* 任務佇列空時,消費者進入等待佇列等待
*/
private Condition emptyWaitSet = lock.newCondition();
/**
* 容量
*/
private int capacity;
public BlockQueue(int capacity) {
this.capacity = capacity;
}
/**
* 從任務佇列獲取任務
* @return
*/
public T take() {
lock.lock();
try {
while (queue.isEmpty()) {
try {
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T task = queue.removeFirst();
fullWaitSet.signal();
return task;
}finally {
lock.unlock();
}
}
/**
* 有超時時間的獲取任務
* @param timeout
* @param unit
* @return
*/
public T take(long timeout, TimeUnit unit) {
lock.lock();
try {
//將timeout 轉換成納秒
long nanos = unit.toNanos(timeout);
while (queue.isEmpty()) {
try {
if (nanos <= 0) {
return null;
}
//回傳剩余的時間
nanos = emptyWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T task = queue.removeFirst();
fullWaitSet.signal();
return task;
}finally {
lock.unlock();
}
}
/**
* 添加任務方法
* @param task
*/
public void put(T task) {
lock.lock();
try {
while (queue.size() == capacity) {
try {
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.addLast(task);
emptyWaitSet.signal();
}finally {
lock.unlock();
}
}
/**
* 獲取任務佇列任務數
* @return
*/
public int size () {
lock.lock();
try {
return queue.size();
}finally {
lock.unlock();
}
}
}
2. 實作執行緒池
主要方法
- void execute (Runnable task)
- 如果 任務數量小于 執行緒池中的執行緒數,則創建Worker物件(實作Thread類)來執行任務
- 如果 大于執行緒數,則先放入阻塞佇列中存放
- Worker 的 run 方法
- 如果傳入的任務不為空 則執行傳入的任務
- 執行完成之后 繼續執行任務佇列中的任務
- 全部結束之后洗掉該執行緒
/**
* 執行緒池
*/
class ThreadPool {
/**
* 任務佇列
*/
private BlockQueue<Runnable> taskQueue;
/**
* 執行緒集合
*/
private HashSet<Worker> workers = new HashSet();
/**
* 核心執行緒數
*/
private int coreSize;
/**
* 獲取任務超時時間
*/
private long timeout;
/**
* 時間工具
*/
private TimeUnit timeUnit;
/**
* 執行任務
* @param task
*/
public void execute (Runnable task) {
synchronized (workers) {
// 當任務數沒有超過核心執行緒數,直接交給執行緒執行
if (workers.size() < coreSize) {
Worker worker = new Worker(task);
workers.add(worker);
System.out.println("新增worker物件"+worker);
worker.start();
}else { //超過核心執行緒數 就加入任務佇列暫存
System.out.println("執行緒數滿,將任務加入任務佇列" + task);
taskQueue.put(task);
}
}
}
public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapacity) {
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.taskQueue = new BlockQueue<>(queueCapacity);
}
/**
* 真正執行任務的執行緒
*/
private class Worker extends Thread{
private Runnable task;
public Worker(Runnable task) {
this.task = task;
}
@Override
public void run() {
while (task != null || (task = taskQueue.take(timeout, timeUnit)) != null) {
try {
System.out.println("正在執行任務"+task);
task.run();
}catch (Exception e) {
e.printStackTrace();
}finally {
task = null;
}
}
synchronized (workers) {
System.out.println("洗掉"+this+"執行緒");
workers.remove(this);
}
}
}
}
3. 測驗
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(2,1000,TimeUnit.MILLISECONDS,10);
for (int i = 0; i < 5; i++) {
int j = i;
threadPool.execute(() -> {
System.out.println("任務"+j + "執行完成");
});
}
}

- 設定執行緒數為2個,執行代碼 前兩個任務進入時 創建兩個執行緒執行任務,之后任務則無法執行進入任務佇列
- 等兩個執行緒執行完任務繼續 獲取任務佇列中的任務執行,如果超過任務獲取等待時間,退出執行任務回圈,沒有任務后洗掉執行緒,
以上實作了基本的執行緒池,但是如果任務數量龐大,并且執行任務比較緩慢,任務佇列滿后,遲遲等不到解決,并且有新的任務來,會一直處于等待狀態,所以要添加拒絕策略
4. 拒絕策略
4.1 帶超時的添加任務
/**
* 待超時的添加任務
* @param task
* @param timeout
* @param timeUnit
* @return
*/
public boolean put(T task, long timeout, TimeUnit timeUnit) {
lock.lock();
try {
long nanos = timeUnit.toNanos(timeout);
while (queue.size() == capacity) {
try {
if (nanos <= 0) {
return false;
}
nanos = fullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.addLast(task);
emptyWaitSet.signal();
return true;
}finally {
lock.unlock();
}
}
4.2 拒絕策略(策略模式)
佇列滿后,新的任務可以選擇繼續死等,帶超時的等待,放棄執行任務,拋出例外等等很多解決策略,所以應該把選擇權交給工程師,提高可擴展性,把所有操作抽象成介面,讓使用者自己實作
/**
* 拒絕策略
* @param <T>
*/
@FunctionalInterface
interface RejectPolicy<T> {
void reject(BlockQueue<T> queue, T task);
}
給 ThreadPool 執行緒池加入拒絕策略屬性,并在構造方法中初始化
/**
* 拒絕策略
*/
private RejectPolicy<Runnable> rejectPolicy;
public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapacity, RejectPolicy<Runnable> rejectPolicy) {
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.taskQueue = new BlockQueue<>(queueCapacity);
this.rejectPolicy = rejectPolicy;
}
將執行任務中 加入任務佇列的方法改成 tryPut() 嘗試加入佇列
/**
* 執行任務
* @param task
*/
public void execute (Runnable task) {
synchronized (workers) {
// 當任務數沒有超過核心執行緒數,直接交給執行緒執行
if (workers.size() < coreSize) {
Worker worker = new Worker(task);
workers.add(worker);
System.out.println("新增worker物件"+worker);
worker.start();
}else { //超過核心執行緒數 就加入任務佇列暫存
//taskQueue.put(task);
taskQueue.tryPut(rejectPolicy, task);
}
}
}
并且在阻塞佇列中實作該方法
- 上鎖,保證佇列的執行緒安全
- 如果佇列滿了,則呼叫自己實作的拒絕策略處理
- 沒有滿則直接加入任務佇列
/**
* 嘗試加入任務佇列
* @param rejectPolicy
* @param task
*/
public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
lock.lock();
try {
if (queue.size() == capacity) {
rejectPolicy.reject(this,task);
}else {
System.out.println("加入任務佇列"+task);
queue.addLast(task);
emptyWaitSet.signal();
}
}finally {
lock.unlock();
}
}
4.3 測驗利用帶超時時間的拒絕策略
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(1,1000,TimeUnit.MILLISECONDS,
1, (queue, task) -> {
queue.put(task,500,TimeUnit.MILLISECONDS);//測驗使用有超時時間的拒絕策略
});
for (int i = 0; i < 3; i++) {
int j = i;
threadPool.execute(() -> {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任務"+j + "執行完成");
});
}
}

- 設定阻塞佇列大小為 1, 執行緒池執行緒數為1,執行第一個任務時, 第二個任務進入任務佇列,第三個任務等待進入佇列,0.5秒后第一個任務沒有完成,任務佇列還是滿的,所以第三個任務放棄加入任務佇列,所以最后只完成了兩個任務,
5. 完整代碼
package threadPool_Test;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* @Description: 自定義執行緒池
* @Author: Aiguodala
* @CreateDate: 2021/4/11 14:59
*/
public class ThreadPoolDemo {
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(1,1000,TimeUnit.MILLISECONDS,
1, (queue, task) -> {
queue.put(task,500,TimeUnit.MILLISECONDS);//測驗使用有超時時間的拒絕策略
});
for (int i = 0; i < 3; i++) {
int j = i;
threadPool.execute(() -> {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任務"+j + "執行完成");
});
}
}
}
/**
* 拒絕策略
* @param <T>
*/
@FunctionalInterface
interface RejectPolicy<T> {
void reject(BlockQueue<T> queue, T task);
}
/**
* 執行緒池
*/
class ThreadPool {
/**
* 任務佇列
*/
private BlockQueue<Runnable> taskQueue;
/**
* 執行緒集合
*/
private HashSet<Worker> workers = new HashSet();
/**
* 核心執行緒數
*/
private int coreSize;
/**
* 獲取任務超時時間
*/
private long timeout;
/**
* 時間工具
*/
private TimeUnit timeUnit;
/**
* 拒絕策略
*/
private RejectPolicy<Runnable> rejectPolicy;
/**
* 執行任務
* @param task
*/
public void execute (Runnable task) {
synchronized (workers) {
// 當任務數沒有超過核心執行緒數,直接交給執行緒執行
if (workers.size() < coreSize) {
Worker worker = new Worker(task);
workers.add(worker);
System.out.println("新增worker物件"+worker);
worker.start();
}else { //超過核心執行緒數 就加入任務佇列暫存
//taskQueue.put(task);
taskQueue.tryPut(rejectPolicy, task);
}
}
}
public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapacity, RejectPolicy<Runnable> rejectPolicy) {
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.taskQueue = new BlockQueue<>(queueCapacity);
this.rejectPolicy = rejectPolicy;
}
/**
* 真正執行任務的執行緒
*/
private class Worker extends Thread{
private Runnable task;
public Worker(Runnable task) {
this.task = task;
}
@Override
public void run() {
while (task != null || (task = taskQueue.take(timeout, timeUnit)) != null) {
try {
System.out.println("正在執行任務"+task);
task.run();
}catch (Exception e) {
e.printStackTrace();
}finally {
task = null;
}
}
synchronized (workers) {
System.out.println("洗掉"+this+"執行緒");
workers.remove(this);
}
}
}
}
/**
* 阻塞佇列
* @param <T>
*/
class BlockQueue<T> {
/**
* 任務佇列
*/
private Deque<T> queue = new ArrayDeque<>();
/**
* 鎖
*/
private ReentrantLock lock = new ReentrantLock();
/**
* 任務佇列滿后 生產者進入等待佇列等待
*/
private Condition fullWaitSet = lock.newCondition();
/**
* 任務佇列空時,消費者進入等待佇列等待
*/
private Condition emptyWaitSet = lock.newCondition();
/**
* 容量
*/
private int capacity;
public BlockQueue(int capacity) {
this.capacity = capacity;
}
/**
* 從任務佇列獲取任務
* @return
*/
public T take() {
lock.lock();
try {
while (queue.isEmpty()) {
try {
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T task = queue.removeFirst();
fullWaitSet.signal();
return task;
}finally {
lock.unlock();
}
}
/**
* 有超時時間的獲取任務
* @param timeout
* @param unit
* @return
*/
public T take(long timeout, TimeUnit unit) {
lock.lock();
try {
//將timeout 轉換成納秒
long nanos = unit.toNanos(timeout);
while (queue.isEmpty()) {
try {
if (nanos <= 0) {
return null;
}
//回傳剩余的時間
nanos = emptyWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T task = queue.removeFirst();
fullWaitSet.signal();
return task;
}finally {
lock.unlock();
}
}
/**
* 添加任務方法
* @param task
*/
public void put(T task) {
lock.lock();
try {
while (queue.size() == capacity) {
try {
System.out.println("等待加入任務佇列"+task);
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("加入任務佇列"+task);
queue.addLast(task);
emptyWaitSet.signal();
}finally {
lock.unlock();
}
}
/**
* 待超時的添加任務
* @param task
* @param timeout
* @param timeUnit
* @return
*/
public boolean put(T task, long timeout, TimeUnit timeUnit) {
lock.lock();
try {
long nanos = timeUnit.toNanos(timeout);
while (queue.size() == capacity) {
try {
if (nanos <= 0) {
return false;
}
System.out.println("等待加入任務佇列");
nanos = fullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("加入任務佇列"+task);
queue.addLast(task);
emptyWaitSet.signal();
return true;
}finally {
lock.unlock();
}
}
/**
* 獲取任務佇列任務數
* @return
*/
public int size () {
lock.lock();
try {
return queue.size();
}finally {
lock.unlock();
}
}
/**
* 嘗試加入任務佇列
* @param rejectPolicy
* @param task
*/
public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
lock.lock();
try {
if (queue.size() == capacity) {
rejectPolicy.reject(this,task);
}else {
System.out.println("加入任務佇列"+task);
queue.addLast(task);
emptyWaitSet.signal();
}
}finally {
lock.unlock();
}
}
}
參考《Java 并發編程的藝術》
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/276318.html
標籤:其他
