JUC自定義執行緒池練習

首先上面該執行緒池的大致流程
自定義阻塞佇列
- 首先定義一個雙向的佇列和鎖一定兩個等待的condition
- 本類用lock來控制多執行緒下的流程執行
- take和push方法就是死等,呼叫await就是等,后面優化為限時等待
- take呼叫后取出阻塞佇列的task后會呼叫fullWaitSet的signal方法來喚醒因為阻塞佇列滿了的執行緒將task放入阻塞佇列,
@Slf4j
class TaskQueue<T> {
// 雙向的阻塞佇列
private Deque<T> deque;
// 佇列最大容量
private int capacity;
// 鎖
private ReentrantLock lock = new ReentrantLock();
// 消費者任務池空的等待佇列
private Condition emptyWaitSet = lock.newCondition();
// 生產者任務池滿的等待佇列
private Condition fullWaitSet = lock.newCondition();
public TaskQueue(int capacity) {
this.capacity = capacity;
deque = new ArrayDeque<>(capacity);
}
// 死等take,即從阻塞佇列取出任務
public T take() {
lock.lock();
try {
while (deque.isEmpty()) {
try {
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("取走任務");
T task = deque.pollFirst();
fullWaitSet.signal();
return task;
} finally {
lock.unlock();
}
}
// 執行緒添加任務,屬于是死等添加
public void push(T task) {
lock.lock();
try {
while (deque.size() >= capacity) {
try {
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("添加任務");
deque.offerLast(task);
emptyWaitSet.signal();
} finally {
lock.unlock();
}
}
public int getSize() {
lock.lock();
try {
return deque.size();
}finally {
lock.unlock();
}
}
}
優化,死等優化為超時等
- awaitNanos方法回傳的是等待的剩余時間,如果已經等了base時間就會回傳0,如果沒有就會回傳大于0即還沒有等待的時間,防止虛假喚醒導致重新等待時間加長,當然在本題的設計中不會出現虛假喚醒的情況,
public T poll(Long timeout,TimeUnit unit) {
lock.lock();
try {
long base = unit.toNanos(timeout);
while (deque.isEmpty()) {
try {
if (base <= 0){
return null;
}
base = emptyWaitSet.awaitNanos(base); // 回傳還剩下的時間
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("取走任務");
T task = deque.pollFirst();
fullWaitSet.signal();
return task;
} finally {
lock.unlock();
}
}
執行緒池類
- 成員變數如下,對于Worker就作業執行緒
@Slf4j
class ThreadPool {
// 阻塞佇列大小
private int capacity;
// 阻塞佇列
private TaskQueue<Runnable> taskQueue;
// 作業執行緒
private HashSet<Worker> workerSet = new HashSet<>();
// 核心數
private int coreNum;
// 超時等待時間
private long timeout;
// 超時等待單位
private TimeUnit unit;
// 拒絕策略
private RejectPolicy rejectPolicy;
// 執行緒物件
class Worker extends Thread {
private Runnable task;
public Worker(Runnable runnable) {
this.task = runnable;
}
@Override
public void run() {
// 就是執行緒把當前分配的任務做完,然后還要去阻塞佇列找活干,沒活就退出
// taks 如果不為空就執行然后講其置為空,后續再次進入回圈后會從阻塞佇列中再次取出task,
// 如果不為空就繼續執行,但是因為take死等,會導致無法結束
// 使用了這個超時等的方法,當無法取出時就會退出程式
while (task != null || (task = taskQueue.poll(timeout,unit)) != null) {
try {
log.debug("開始執行任務");
Thread.sleep(1000);
task.run();
} catch (Exception e) {
e.printStackTrace();
} finally {
task = null;
}
}
// 當沒有任務可執行,執行緒自動銷毀,由于這是根據物件來銷毀,且hashset無序,所以這里無需保證其的執行緒安全,
workerSet.remove(this);
}
}
public ThreadPool(int capacity, int coreNum, long timeout, TimeUnit unit,RejectPolicy rejectPolicy) {
this.capacity = capacity;
this.coreNum = coreNum;
this.timeout = timeout;
this.unit = unit;
this.taskQueue = new TaskQueue<>(capacity);
this.rejectPolicy = rejectPolicy;
}
/**
* 當執行緒數大于核心數,就將任務放入阻塞佇列
* 否則創建執行緒進行處理
*
* @param runnable
*/
public void execute(Runnable runnable) {
// 需要synchronized關鍵字控制多執行緒下對執行方法的執行,保證共享變數workerSet安全,
synchronized (workerSet) {
// 如果已經存在的作業執行緒已經大于核心數,就不適合在進行創建執行緒了,創太多執行緒對于執行并不會加快,反而會因為執行緒不斷切換而拖累CPU的執行,
if (workerSet.size() >= coreNum) {
taskQueue.push(runnable);
} else {
// 如果作業執行緒小于核心數就可創建一個worker執行緒來作業
Worker worker = new Worker(runnable);
workerSet.add(worker);
worker.start();
}
}
}
}
測驗類
@Slf4j
public class MyThreadPool {
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(3,2,1,TimeUnit.SECONDS,(taskQueue,task)->{
taskQueue.push(task);
});
for (int i = 0; i < 10; i++) {
int j = i;
threadPool.execute(() -> {
log.debug("任務{}", j);
});
}
}
}
優化---拒絕策略
我們沒有進行優化的就是當任務太多導致阻塞執行緒也滿了,此時任務執行緒就會進行阻塞,直到等到有人在執行緒池中取走任務,也就是push方法,我們在舊的方法中仍采用的是死等的方法,
但是方法中有很多死等,超時等,放棄任務,拋出例外,讓呼叫者自己執行任務等等方法,
我們就可用講其進行抽象,把操作交給呼叫者,
定義了如下的函式式介面,即為拒絕策略,
@FunctionalInterface
interface RejectPolicy<T>{
void reject(TaskQueue<T> taskQueue,T task);
}
將在TaskQueue任務佇列中定義不同的策略,我們只要傳入這個函式式介面的實作物件就可用實作定制拒絕的策略,
在TaskQueue類添加一個方法,用來呼叫拒絕策略
public void tryAndAdd(T task,RejectPolicy rejectPolicy){
lock.lock();
try {
if (deque.size() >= capacity) {
rejectPolicy.reject(this,task);
}else{
log.debug("添加任務");
deque.offerLast(task);
emptyWaitSet.signal();
}
} finally {
lock.unlock();
}
}
更改了構造方法的執行緒池類,這樣就可用傳入一個自定義的拒絕策略,
@Slf4j
class ThreadPool {
// 阻塞佇列大小
private int capacity;
// 阻塞佇列
private TaskQueue<Runnable> taskQueue;
// 作業執行緒
private HashSet<Worker> workerSet = new HashSet<>();
// 核心數
private int coreNum;
// 超時等待時間
private long timeout;
// 超時等待單位
private TimeUnit unit;
// 拒絕策略
private RejectPolicy rejectPolicy;
// 執行緒物件
class Worker extends Thread {
private Runnable task;
public Worker(Runnable runnable) {
this.task = runnable;
}
@Override
public void run() {
while (task != null || (task = taskQueue.poll(timeout,unit)) != null) {
try {
log.debug("開始執行任務");
Thread.sleep(1000);
task.run();
} catch (Exception e) {
e.printStackTrace();
} finally {
task = null;
}
}
workerSet.remove(this);
}
}
public ThreadPool(int capacity, int coreNum, long timeout, TimeUnit unit,RejectPolicy rejectPolicy) {
this.capacity = capacity;
this.coreNum = coreNum;
this.timeout = timeout;
this.unit = unit;
this.taskQueue = new TaskQueue<>(capacity);
this.rejectPolicy = rejectPolicy;
}
/**
* 當執行緒數大于核心數,就將任務放入阻塞佇列
* 否則創建執行緒進行處理
*
* @param runnable
*/
public void execute(Runnable runnable) {
synchronized (workerSet) {
if (workerSet.size() >= coreNum) {
taskQueue.tryAndAdd(runnable,rejectPolicy);
} else {
Worker worker = new Worker(runnable);
workerSet.add(worker);
worker.start();
}
}
}
}
將啟動類修改如下
@Slf4j
public class MyThreadPool {
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(3,2,1,TimeUnit.SECONDS,(taskQueue,task)->{
// 采用死等的方法,當然我們可用在taskQueue中定義更多的方法讓呼叫者選擇
taskQueue.push(task);
});
for (int i = 0; i < 10; i++) {
int j = i;
threadPool.execute(() -> {
log.debug("任務{}", j);
});
}
}
}
這樣我們就完成了自定義的執行緒池,
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/472356.html
標籤:其他
上一篇:Java實作3DES加密
下一篇:led跑馬燈多種方法(移位法,位拼接法,呼叫模塊法,位移及位拼接語法,testbench的理解,源檔案的存盤路徑,計數器的個數,呼叫模塊的方式)
