8.1 執行緒池
設計執行緒池的原因
- 執行緒是一種系統資源,每次創建一個新的執行緒,都需要占用分配記憶體空間,如果高并發下, 對于每個任務,如果都開啟一個執行緒去處理的話,對記憶體的占用過大,甚至爆出OOM,
- 執行緒數量過多,而CPU只有幾個,那必定由很多執行緒處于等待,而頻繁發生執行緒背景關系切換,也會導致效率問題
所以盡可能的重用已有的執行緒去處理任務,
阻塞佇列:放任務的,執行緒池暫時處理不了的任務先放到阻塞佇列里來;執行緒池能處理了,再從阻塞佇列中去取任務執行
// 注意一下這個ReentrantLock的從條件變數中喚醒,之前沒有正確理解
// 現在的理解是: 從ReentrantLock的條件變數喚醒之后,跟synchronized從waitSet中喚醒一樣,要進入entryList中等待獲取鎖,
// 獲取鎖成功之后, 才能運行, 不然你以為, 最后的那個釋放鎖是怎么釋放的
@Slf4j(topic = "c.BlockingQueue")
public class BlockingQueue<T> {
// 任務佇列
private ArrayDeque<T> queue = new ArrayDeque<>();
// 鎖
private ReentrantLock lock = new ReentrantLock();
// 生產者條件變數 (任務佇列滿了的時候, 等待)
Condition fullWaitSet = lock.newCondition();
// 消費者條件變數 (任務佇列為空的時候, 等待)
Condition emptyWaitSet = lock.newCondition();
// 任務佇列最大容量
private int capacity;
public BlockingQueue(int capacity) {
this.capacity = capacity;
}
// 帶超時阻塞獲取
public T poll(long timeout, TimeUnit unit) {
long nanos = unit.toNanos(timeout);
lock.lock();
try {
while (queue.isEmpty()) {
if (nanos <= 0) { // nanos小于或等于0, 就代表不需要等待了
return null;
}
try {
nanos = emptyWaitSet.awaitNanos(nanos); // 回傳值是nanos減去等待的時間(即還需等待的時間)
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal(); // 通知因為滿了而等待的執行緒(因為滿了而等待的執行緒肯定是等著往佇列里面添加任務嘛)
return t;
} finally {
lock.unlock(); // 這里才釋放的鎖
}
}
// 阻塞獲取
public T take() {
lock.lock();
try {
while (queue.isEmpty()) { // 當任務佇列中沒有任務時,就等待
try {
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal(); // 喚醒在fullWaitSet中等待的隨機一個執行緒,但這個被喚醒的執行緒并不是馬上運行,
return t; // 而是要進入entryList中等待獲取鎖,鎖被釋放了,它才能去競爭鎖(注意一下,這里還并未釋放鎖哦)
} finally { // 下面的這個put方法在lock.lock()這句,在take方法里獲取鎖但未釋放前,外面執行緒進不來,
lock.unlock();// 里面的執行緒又在entryList中等待獲取鎖,所以保證了安全
} // 這里才釋放的鎖
}
// 阻塞添加
public void put(T element) {
lock.lock();
try {
while (queue.size() == capacity) { // 當任務佇列滿時, 就等待(此時,因為不能添加任務了嘛)
try {
log.debug("任務佇列已滿,等待加入任務佇列: {}",element);
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("加入任務佇列: {}",element);
queue.addLast(element);
emptyWaitSet.signal();
} finally {
lock.unlock();
}
}
}
執行緒池
@Slf4j(topic = "c.ThreadPool")
public class ThreadPool {
// 任務佇列
private BlockingQueue<Runnable> taskQueue;
// 作業執行緒集合
HashSet<Worker> workers = new HashSet<>();
// 核心執行緒數
private int coreSize;
// 獲取任務時的超時時間
private long timeout;
// 超時時間的時間單位
private TimeUnit timeUnit;
// 執行任務:
// 如果當前的執行緒數還沒有超過核心執行緒數,那么就創建一個執行緒去執行這個提交的任務
// 如果當前的執行緒數已經超過了核心執行緒數,那么就把這個任務放到BlockingQueue中
public void execute(Runnable task) {
synchronized (workers) { // 這里既包含對worker的讀,也包含對worker的寫,所以需要保護
if (workers.size() < coreSize) {
Worker worker = new Worker(task);
log.debug("創建執行緒: {}, 關聯任務: {}",worker,task);
workers.add(worker);
worker.start();
} else {
taskQueue.put(task);
}
}
}
// 構造一個自定義執行緒池物件
public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit,int queueCapacity) {
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.taskQueue = new BlockingQueue<>(queueCapacity);
}
class Worker extends Thread{
// 執行的任務
private Runnable task;
public Worker(Runnable task) {
this.task = task;
}
// Thread#start 方法會開啟執行緒執行執行緒執行緒物件的run方法,所以這里重寫run方法
@Override
public void run() { // 自己的任務執行完了的話,再看下任務佇列里面還有沒有任務,沒想到里面是個坑,哈哈
// while (task != null || (task = taskQueue.take())!=null) { // 死等
// 沒有任務就等待,有任務就回傳,感覺這個執行緒就是個工具人,除非時呼叫的是超時獲取的方法
while (task != null || (task = taskQueue.poll(timeout,timeUnit))!=null) {
try {
log.debug("執行任務...{}",task);
task.run();
} finally {
task = null; // task執行完了的話,就置為null
}
}
synchronized (workers) {
workers.remove(this);
log.debug("移除執行緒: {}",this);
}
}
}
}
測驗
@Slf4j(topic = "c.TestPool")
public class TestPool {
public static void main(String[] args) {
ThreadPool pool = new ThreadPool(2, 1000, TimeUnit.MILLISECONDS, 10);
for (int i = 0; i < 5; i++) {
final int j = i;
pool.execute(()->{
log.debug("{}",j);
});
}
}
}
測驗結果
- 兩個核心執行緒,它們執行添加的5個任務,有3個任務添加到了任務佇列里面,兩個執行緒處理完了自己的任務之后,再從任務佇列中去取任務,然后執行,如果超過規定的時間還沒取到任務,這個執行緒的任務就完成了,然后就把這個執行緒移除掉,
17:37:44.339 [main] DEBUG c.ThreadPool - 創建執行緒: Thread[Thread-0,5,main], 關聯任務: task@1323468230
17:37:44.342 [main] DEBUG c.ThreadPool - 創建執行緒: Thread[Thread-1,5,main], 關聯任務: task@897697267
17:37:44.342 [Thread-0] DEBUG c.ThreadPool - 執行任務...task@1323468230
17:37:44.342 [main] DEBUG c.BlockingQueue - 加入任務佇列: element@38997010
17:37:44.342 [Thread-0] DEBUG c.TestPool - 0
17:37:44.342 [main] DEBUG c.BlockingQueue - 加入任務佇列: element@1942406066
17:37:44.342 [Thread-1] DEBUG c.ThreadPool - 執行任務...task@897697267
17:37:44.342 [Thread-1] DEBUG c.TestPool - 1
17:37:44.342 [main] DEBUG c.BlockingQueue - 加入任務佇列: element@1213415012
17:37:44.343 [Thread-0] DEBUG c.ThreadPool - 執行任務...task@38997010
17:37:44.343 [Thread-1] DEBUG c.ThreadPool - 執行任務...task@1942406066
17:37:44.343 [Thread-0] DEBUG c.TestPool - 2
17:37:44.343 [Thread-1] DEBUG c.TestPool - 3
17:37:44.343 [Thread-0] DEBUG c.ThreadPool - 執行任務...task@1213415012
17:37:44.343 [Thread-0] DEBUG c.TestPool - 4
17:37:45.343 [Thread-1] DEBUG c.ThreadPool - 移除執行緒: Thread[Thread-1,5,main]
17:37:45.343 [Thread-0] DEBUG c.ThreadPool - 移除執行緒: Thread[Thread-0,5,main]
但是上面存在問題:假設如果main執行緒提交的任務,超過了任務佇列的大小,并且核心執行緒也處理不過來,那么main執行緒就被阻塞住了,測驗如下,
并且我們可以看到,main執行緒本來要提交15個執行緒,但是只有2個正在被執行,10個添加到了任務佇列,還剩3個沒有著落,main執行緒就被阻塞住了,
@Slf4j(topic = "c.TestPool")
public class TestPool {
public static void main(String[] args) {
ThreadPool pool = new ThreadPool(2, 1000, TimeUnit.MILLISECONDS, 10);
for (int i = 0; i < 15; i++) {
final int j = i;
pool.execute(()->{
try {
Thread.sleep(1000000L); // 模擬處理時長
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("{}",j);
});
}
}
}
/*測驗結果*/ // 程式沒有結束,/*
17:43:55.790 [main] DEBUG c.ThreadPool - 創建執行緒: Thread[Thread-0,5,main], 關聯任務: task@1323468230
17:43:55.795 [main] DEBUG c.ThreadPool - 創建執行緒: Thread[Thread-1,5,main], 關聯任務: task@897697267
17:43:55.796 [main] DEBUG c.BlockingQueue - 加入任務佇列: element@38997010
17:43:55.796 [main] DEBUG c.BlockingQueue - 加入任務佇列: element@1942406066
17:43:55.796 [main] DEBUG c.BlockingQueue - 加入任務佇列: element@1213415012
17:43:55.796 [main] DEBUG c.BlockingQueue - 加入任務佇列: element@1688376486
17:43:55.796 [main] DEBUG c.BlockingQueue - 加入任務佇列: element@2114664380
17:43:55.796 [main] DEBUG c.BlockingQueue - 加入任務佇列: element@999661724
17:43:55.796 [main] DEBUG c.BlockingQueue - 加入任務佇列: element@1793329556
17:43:55.797 [main] DEBUG c.BlockingQueue - 加入任務佇列: element@445884362
17:43:55.797 [main] DEBUG c.BlockingQueue - 加入任務佇列: element@1031980531
17:43:55.797 [main] DEBUG c.BlockingQueue - 加入任務佇列: element@721748895
17:43:55.797 [main] DEBUG c.BlockingQueue - 任務佇列已滿,等待加入任務佇列: element@1642534850
17:43:55.798 [Thread-0] DEBUG c.ThreadPool - 執行任務...task@1323468230
17:43:55.799 [Thread-1] DEBUG c.ThreadPool - 執行任務...task@897697267
*/
但是上面存在問題:假設如果main執行緒提交的任務,超過了任務佇列的大小,并且核心執行緒也處理不過來,那么main執行緒就被阻塞住了,測驗如下,
并且我們可以看到,main執行緒本來要提交15個執行緒,但是只有2個正在被執行,10個添加到了任務佇列,還剩3個沒有著落,main執行緒就被阻塞住了,可以看到main執行緒現在控制不了這個局面了,所以執行緒池還應該設計成讓main執行緒可以選擇遇到這種情況下應該要怎么做,是死等?讓呼叫者放棄任務?讓呼叫者超時等待?讓呼叫者拋出例外?
@Slf4j(topic = "c.TestPool")
public class TestPool {
public static void main(String[] args) {
ThreadPool pool = new ThreadPool(2, 1000, TimeUnit.MILLISECONDS, 10);
for (int i = 0; i < 15; i++) {
final int j = i;
pool.execute(()->{
try {
Thread.sleep(1000000L); // 模擬處理時長
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("{}",j);
});
}
}
}
/*測驗結果*/ // 程式沒有結束,/*
17:43:55.790 [main] DEBUG c.ThreadPool - 創建執行緒: Thread[Thread-0,5,main], 關聯任務: task@1323468230
17:43:55.795 [main] DEBUG c.ThreadPool - 創建執行緒: Thread[Thread-1,5,main], 關聯任務: task@897697267
17:43:55.796 [main] DEBUG c.BlockingQueue - 加入任務佇列: element@38997010
17:43:55.796 [main] DEBUG c.BlockingQueue - 加入任務佇列: element@1942406066
17:43:55.796 [main] DEBUG c.BlockingQueue - 加入任務佇列: element@1213415012
17:43:55.796 [main] DEBUG c.BlockingQueue - 加入任務佇列: element@1688376486
17:43:55.796 [main] DEBUG c.BlockingQueue - 加入任務佇列: element@2114664380
17:43:55.796 [main] DEBUG c.BlockingQueue - 加入任務佇列: element@999661724
17:43:55.796 [main] DEBUG c.BlockingQueue - 加入任務佇列: element@1793329556
17:43:55.797 [main] DEBUG c.BlockingQueue - 加入任務佇列: element@445884362
17:43:55.797 [main] DEBUG c.BlockingQueue - 加入任務佇列: element@1031980531
17:43:55.797 [main] DEBUG c.BlockingQueue - 加入任務佇列: element@721748895
17:43:55.797 [main] DEBUG c.BlockingQueue - 任務佇列已滿,等待加入任務佇列: element@1642534850
17:43:55.798 [Thread-0] DEBUG c.ThreadPool - 執行任務...task@1323468230
17:43:55.799 [Thread-1] DEBUG c.ThreadPool - 執行任務...task@897697267
*/
因此抽象出一個拒絕策略介面,把任務佇列滿了時的代碼控制權交給main執行緒決定
其實跟上面比較,改動的并不多,主要是:當核心執行緒滿了的時候的邏輯,當還有任務提交時,就把任務繼續交給任務佇列,而在任務佇列里面添加一個邏輯:當任務佇列滿了的時候,就執行拒絕策略,拒絕策略回傳的引數就是當前的佇列this和當前提交的任務,把這兩個引數交給執行緒池的屬性拒絕策略物件的reject方法處理,
public interface RejectPolicy<T> {
void reject(BlockingQueue<T> blockingQueue, T task);
}
// 注意一下這個ReentrantLock的從條件變數中喚醒,之前沒有正確理解
// 現在的理解是: 從ReentrantLock的條件變數喚醒之后,跟synchronized從waitSet中喚醒一樣,要進入entryList中等待獲取鎖,
// 獲取鎖成功之后, 才能運行, 不然你以為, 最后的那個釋放鎖是怎么釋放的
@Slf4j(topic = "c.BlockingQueue")
public class BlockingQueue<T> {
// 任務佇列
private ArrayDeque<T> queue = new ArrayDeque<>();
// 鎖
private ReentrantLock lock = new ReentrantLock();
// 生產者條件變數 (任務佇列滿了的時候, 等待)
Condition fullWaitSet = lock.newCondition();
// 消費者條件變數 (任務佇列為空的時候, 等待)
Condition emptyWaitSet = lock.newCondition();
// 任務佇列最大容量
private int capacity;
public BlockingQueue(int capacity) {
this.capacity = capacity;
}
// 帶超時阻塞獲取
public T poll(long timeout, TimeUnit unit) {
lock.lock();
try {
long nanos = unit.toNanos(timeout);
while (queue.isEmpty()) {
if (nanos <= 0) { // nanos小于或等于0, 就代表不需要等待了
return null;
}
try {
nanos = emptyWaitSet.awaitNanos(nanos); // 回傳值是nanos減去等待的時間(即還需等待的時間)
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal(); // 通知因為滿了而等待的執行緒(因為滿了而等待的執行緒肯定是等著往佇列里面添加任務嘛)
return t;
} finally {
lock.unlock(); // 這里才釋放的鎖
}
}
// 阻塞獲取
public T take() {
lock.lock();
try {
while (queue.isEmpty()) { // 當任務佇列中沒有任務時,就等待
try {
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal(); // 喚醒在fullWaitSet中等待的隨機一個執行緒,但這個被喚醒的執行緒并不是馬上運行,
return t; // 而是要進入entryList中等待獲取鎖,鎖被釋放了,它才能去競爭鎖(注意一下,這里還并未釋放鎖哦)
} finally { // 下面的這個put方法在lock.lock()這句,在take方法里獲取鎖但未釋放前,外面執行緒進不來,
lock.unlock();// 里面的執行緒又在entryList中等待獲取鎖,所以保證了安全
} // 這里才釋放的鎖
}
// 阻塞添加
public void put(T task) {
lock.lock();
try {
while (queue.size() == capacity) { // 當任務佇列滿時, 就等待(此時,因為不能添加任務了嘛)
try {
log.debug("任務佇列已滿,等待加入任務佇列: {}", "task@" + task.hashCode());
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("加入任務佇列: {}", "element@" + task.hashCode());
queue.addLast(task);
emptyWaitSet.signal();
} finally {
lock.unlock();
}
}
// 帶超時阻塞添加
public boolean offer(T task, long timeout, TimeUnit timeUnit) {
lock.lock();
try {
long nanos = timeUnit.toNanos(timeout);
while (queue.size() == capacity) { // 當任務佇列滿時, 就等待(此時,因為不能添加任務了嘛)
try {
if (nanos <= 0) {
return false;
}
log.debug("任務佇列已滿,等待加入任務佇列: {}", "task@" + task.hashCode());
nanos = fullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("加入任務佇列: {}", "element@" + task.hashCode());
queue.addLast(task);
emptyWaitSet.signal();
return true;
} finally {
lock.unlock();
}
}
// 在上面原本的put方法上, 引入拒絕策略介面
public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
lock.lock();
try {
if (queue.size() == capacity) { // 當任務佇列滿時, 就等待(此時,因為不能添加任務了嘛)
log.debug("任務佇列已滿,執行拒絕策略...{}", "task@" + task.hashCode());
rejectPolicy.reject(this, task);
} else {
log.debug("加入任務佇列: {}", "element@" + task.hashCode());
queue.addLast(task);
emptyWaitSet.signal();
}
} finally {
lock.unlock();
}
}
}
@Slf4j(topic = "c.ThreadPool")
public class ThreadPool {
// 任務佇列
private BlockingQueue<Runnable> taskQueue;
// 作業執行緒集合
HashSet<Worker> workers = new HashSet<>();
// 核心執行緒數
private int coreSize;
// 獲取任務時的超時時間
private long timeout;
// 超時時間的時間單位
private TimeUnit timeUnit;
private RejectPolicy rejectPolicy;
// 執行任務:
// 如果當前的執行緒數還沒有超過核心執行緒數,那么就創建一個執行緒去執行這個提交的任務
// 如果當前的執行緒數已經超過了核心執行緒數,那么就把這個任務放到BlockingQueue中
public void execute(Runnable task) {
synchronized (workers) { // 這里既包含對worker的讀,也包含對worker的寫,所以需要保護
if (workers.size() < coreSize) {
Worker worker = new Worker(task);
log.debug("創建執行緒: {}, 關聯任務: {}",worker,"task@"+task.hashCode());
workers.add(worker);
worker.start();
} else {
// taskQueue.put(task); // 死等 (原來是死等,現在改成帶有拒絕策略的做法)
// 代碼運行到這里,提交的任務已經超過了核心執行緒數,此時考慮到兩種情況
// 1. 提交的任務還可以放到阻塞佇列中
// 2. 阻塞佇列如果也滿了,需要把代碼的控制權返還到呼叫者,讓呼叫者決定采取什么方式?
// 1. 死等 2.超時等 3.放棄任務 4.拋出例外 ...
// 所以這里,抽象出一個介面出來RejectPolicy,讓呼叫者在創建執行緒池時,傳入
// 因此,這里的拒絕策略是當前執行緒池的一個屬性,
// 而如果在提交的任務已經超過了核心執行緒數的情況下,把接下來的運行交給任務佇列去做
taskQueue.tryPut(rejectPolicy,task);
}
}
}
// 構造一個自定義執行緒池物件
public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit,
int queueCapacity,RejectPolicy<Runnable> rejectPolicy) {
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.taskQueue = new BlockingQueue<>(queueCapacity);
this.rejectPolicy = rejectPolicy;
}
class Worker extends Thread{
// 執行的任務
private Runnable task;
public Worker(Runnable task) {
this.task = task;
}
// Thread#start 方法會開啟執行緒執行執行緒執行緒物件的run方法,所以這里重寫run方法
@Override
public void run() { // 自己的任務執行完了的話,再看下任務佇列里面還有沒有任務,沒想到里面是個坑,哈哈
// while (task != null || (task = taskQueue.take())!=null) { // 死等
// 沒有任務就等待,有任務就回傳,感覺這個執行緒就是個工具人
while (task != null || (task = taskQueue.poll(timeout,timeUnit))!=null) {
try { // 除非時呼叫的是超時獲取的方法
log.debug("執行任務...{}","task@"+task.hashCode());
task.run();
} finally {
task = null; // task執行完了的話,就置為null
}
}
synchronized (workers) {
workers.remove(this);
log.debug("移除執行緒: {}",this);
}
}
}
}
@Slf4j(topic = "c.TestPool")
public class TestPool {
public static void main(String[] args) {
ThreadPool pool = new ThreadPool(2, 1000, TimeUnit.MILLISECONDS, 10,
(queue, task) -> {
// 1.死等
// queue.put(task);
// 2.超時等待
// boolean flag = queue.offer(task, 500, TimeUnit.MILLISECONDS);
// log.debug("添加任務結果: {}", flag);
// 3.讓呼叫者放棄任務
// log.debug("什么都不干,直接放棄任務...");
// 4.拋出例外, 當然此時,主執行緒后面添加的任務都沒了
// throw new RuntimeException("任務佇列滿了");
// 5.主執行緒自己做
// task.run();
}
);
for (int i = 0; i < 15; i++) {
final int j = i;
pool.execute(() -> {
try {
Thread.sleep(2000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("{}", j);
});
}
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/260709.html
標籤:其他
