JDK自帶執行緒池
執行緒池的狀態
執行緒有如下狀態
- RUNNING狀態:Accept new tasks and process queued tasks
- SHUTDOWN狀態:Don't accept new tasks, but process queued tasks
- STOP狀態: Don't accept new tasks, don't process queued tasks, and interrupt in-progress tasks
- TIDYING狀態:All tasks have terminated, workerCount is zero, the thread transitioning to state TIDYING will run the terminated() hook method
- TERMINATED狀態:terminated() has completed The numerical order among these values matters, to allow ordered comparisons.
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS; // -1為全1,所以我們左移29位就是111開頭的狀態位
private static final int SHUTDOWN = 0 << COUNT_BITS; // 000
private static final int STOP = 1 << COUNT_BITS; // 001
private static final int TIDYING = 2 << COUNT_BITS; // 010
private static final int TERMINATED = 3 << COUNT_BITS; // 011
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
我們從上面的源代碼中可以看出,我們將狀態存盤在一個原子整型的前三位,然后將執行緒的容量存盤在后29位,將狀態和容量放在一起,這樣更新狀態和容量只需要進行一次cas操作,
下面的三個方法就是進行獲取狀態和作業執行緒數量和初始化狀態,
在源代碼注釋中解釋到,當未來原子整型不夠用了,就會將其升級為原子長整形,且狀態位也有擴展的空間,如果需要的話,
同時源代碼中也有表明各個狀態轉換的條件,可以ThreadPoolExecutor類中下載source查看,
執行緒池構造方法
引陣列成
- corePoolSize核心執行緒的數量
- maximumPoolSize最大的執行緒數量 PS: 最大執行緒數-核心執行緒數 = 急救執行緒的數量
- keepAliveTime 急救執行緒的存活時間
- unit 急救執行緒存活時間單位
- workQueue 阻塞佇列
- threadFactory 執行緒工廠 PS:執行緒工廠就是創造執行緒的工廠,為其進行給任務和名字
- handler 拒絕策略的實作 PS:就是當阻塞佇列滿了之后所要做的動作,死等,限時等(RocketMQ),交給呼叫者運行,直接拋棄,創建一個新執行緒(netty),拋出例外寫日志(dubbo),
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
這就是拒絕策略在JDK自帶的實作,
- AbortPolicy直接拋出一個RejectedExecutionException例外,dubbo應該是加以記錄一些更多的資訊 猜測
- CallerRunsPolicy就是讓呼叫者自己執行這個任務
- DiscardPolicy直接拋棄
- DiscardOldestPolicy拋棄早進入阻塞佇列的然后讓當前任務進入阻塞佇列

JDK執行緒池和上一次的執行緒池不一樣的就是急救執行緒,
急救執行緒就是當阻塞佇列滿了之后,并不會像我上次的例子一樣直接進行拒絕策略的判斷,會創建一個急救執行緒或者已存活的急救執行緒進行執行任務,如果急救執行緒也滿了的話,才會進入拒絕策略的判斷,
JDK執行緒池的基本使用
固定大小執行緒池
ExecutorService executorService = Executors.newFixedThreadPool(2, new ThreadFactory() {
AtomicInteger ctl = new AtomicInteger();
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r,"myThreadPoll-" + ctl.getAndIncrement());
return thread;
}
});
executorService.execute(()->{
log.debug("執行緒執行了一次");
});
executorService.execute(()->{
log.debug("執行緒執行了一次");
});
executorService.execute(()->{
log.debug("執行緒執行了一次");
});
我看了一下它的默認構造方法,它創造了一個Integer.MAX_VALUE大小的阻塞佇列,阻塞佇列其實和小測驗的阻塞佇列是差不多的,
快取執行緒池
主要是它的阻塞佇列的不同,其中核心數為0,然后通過阻塞佇列直到有執行緒對其進行取任務,不然就是一直阻塞的狀態,
@Slf4j
public class Test2 {
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(()->{
log.debug("執行成功");
});
executorService.execute(()->{
log.debug("執行成功");
});
}
}
A blocking queue in which each insert operation must wait for a corresponding remove operation by another thread, and vice versa.
翻譯:一種阻塞佇列,其中每個插入操作必須等待另一個執行緒執行相應的洗掉操作,反之亦然,
單執行緒執行緒池
顧名思義即單執行緒的執行緒池,不過有意思的一點就是這個使用了一個設計模式就是裝飾器模式,
@Slf4j
public class Test3 {
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(()->{
log.debug("hello");
});
executorService.execute(()->{
log.debug("hello");
});
}
}
首先是因為如果我們直接回傳ThreadPoolExecutor這個類的話,我們是知道了它的類,是可以直接使用強轉來實作修改執行緒的核心數以及一些引數,如下
@Slf4j
public class Test1 {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(2, new ThreadFactory() {
AtomicInteger ctl = new AtomicInteger();
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r,"myThreadPoll-" + ctl.getAndIncrement());
return thread;
}
});
// 我們將其物件通過強轉直接修改了其的核心數,執行結構同樣生效
ThreadPoolExecutor executor= (ThreadPoolExecutor) executorService;
executor.setCorePoolSize(1);
executorService.execute(()->{
log.debug("執行緒執行了一次");
});
executorService.execute(()->{
log.debug("執行緒執行了一次");
});
executorService.execute(()->{
log.debug("執行緒執行了一次");
});
}
}
但是如果我們通過裝飾器模式將其進行包裝,然后包裝的物件回傳,是無法進行修改核心數的,更何況單執行緒執行緒池的情況下,我們需要保證核心數總為1把,不能讓其他人修改,展示部分代碼,回傳的是其的裝飾類,
static class DelegatedExecutorService extends AbstractExecutorService {
private final ExecutorService e;
DelegatedExecutorService(ExecutorService executor) { e = executor; }
public void execute(Runnable command) { e.execute(command); }
public void shutdown() { e.shutdown(); }
public List<Runnable> shutdownNow() { return e.shutdownNow(); }
ExecutorService方法使用
這個類就是執行緒池的介面類,掌管著執行緒池的方法,
- void shutdown() 繼續執行當前執行緒池中的任務和阻塞佇列中的任務,不再接收新的任務,
- List
shutdownNow() 嘗試停止所有正在執行的任務,停止正在等待的任務的處理,并回傳正在等待執行的任務的串列 - boolean isShutdown() 回傳當前執行緒池是否已經關閉
- boolean isTerminated() 在呼叫了shutdown或Shutdownow后,關閉后所有任務都已完成,則回傳true,如果沒有呼叫永遠不會回傳true,
- boolean awaitTermination(long timeout, TimeUnit unit) 阻塞,直到所有任務在關閉請求后完成執行,或超時發生,或當前執行緒中斷,以先發生的為準,
Future submit(Callable task) 回傳執行結果 List<Future > invokeAll(Collection<? extends Callable > tasks) 批量回傳結果 T invokeAny(Collection<? extends Callable > tasks) 執行任意一個并回傳結果,如果任何一個完成,其他正在執行的直接結束,
作業執行緒的饑餓現象
/**
* @Author 10276
* @Date 2022/5/11 20:52
*/
@Slf4j
public class Test4 {
public static void main(String[] args) {
ExecutorService rest = Executors.newFixedThreadPool(2);
rest.execute(()->{
log.debug("準備點餐");
Future<String> submit = rest.submit(() -> {
log.debug("正在坐菜");
return "菜";
});
try {
log.debug("上菜{}",submit.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
rest.execute(()->{
log.debug("準備點餐");
Future<String> submit = rest.submit(() -> {
log.debug("正在坐菜");
return "菜";
});
try {
log.debug("上菜{}",submit.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
}
}
一個執行緒池的導致兩個執行緒沒辦法繼續進行下去,沒有多余的執行緒取做接下來的事情,但這不是死鎖問題,歸結原因還是執行緒資源不夠,同時也無法繼續進行下去了,

解決方案:由此可以得出對于執行緒池數量的選擇和執行緒池中核心執行緒數量的選擇是十分重要的,
public static void main(String[] args) {
ExecutorService waitress = Executors.newFixedThreadPool(1);
ExecutorService cooker = Executors.newFixedThreadPool(1);
waitress.execute(()->{
log.debug("準備點餐");
Future<String> submit = cooker.submit(() -> {
log.debug("正在做菜");
return "湖南菜";
});
try {
log.debug("上菜:{}",submit.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
waitress.execute(()->{
log.debug("準備點餐");
Future<String> submit = cooker.submit(() -> {
log.debug("正在做菜");
return "廣東菜";
});
try {
log.debug("上菜:{}",submit.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/472358.html
標籤:其他
上一篇:led跑馬燈多種方法(移位法,位拼接法,呼叫模塊法,位移及位拼接語法,testbench的理解,源檔案的存盤路徑,計數器的個數,呼叫模塊的方式)
