池化技術(Pool)
池化技術 (Pool) 是一種很常見的編程技巧,我們日常作業中常見的有資料庫連接池、執行緒池、物件池等,它們的特點都是將“昂貴的”、“費時的”的資源維護在一個特定的“池子”中,規定其最小連接數、最大連接數、阻塞佇列等配置,方便進行統一管理和復用,通常還會附帶一些探活機制、強制回收、監控一類的配套功能,
執行緒池(Thread Pool)概念
顧名思義,執行緒池可以理解為一個裝有執行緒的池子,這個池可以用來更好的統一管理執行緒,執行緒池技術可以幫我們管理執行緒,避免增加創建執行緒和銷毀執行緒的資源損耗,我們可以通過執行緒池重復利用已有的執行緒,從而避免每次使用的時候都去創建,
執行緒池在 Java 中的抽象
執行緒池在Java中的抽象是 Executor 介面,但是真正的執行緒池實作其實是 ThreadPoolExecutor,該類提供了四個構造用于創建執行緒池,具體的方法會在下文中詳細解釋,除此之外,Java 還提供了幾種常用的配置好的執行緒池,通過 Executors提供的工廠方法可以快速的創建執行緒池來使用,這些工廠方法其實也是直接或間接的使用 ThreadPoolExecutor 的構造方法來創建的執行緒池,具體的方法將在下文中詳細解釋,
ThreadPoolExecutor
ThreadPoolExecutor 是 Java 中對于執行緒池的具體實作,在 java.util.concurrent 包下,其提供了四個構造用于配置和創建執行緒池,這四個構造方法如下:
/**
* @param corePoolSize 該執行緒池中核心執行緒數最大值
* @param maximumPoolSize 該執行緒池中執行緒總數最大值
* @param keepAliveTime 非核心執行緒閑置超時時長
* @param unit keepAliveTime 的單位
* @param workQueue 執行緒池中的任務佇列
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
}
/**
* @param corePoolSize 該執行緒池中核心執行緒數最大值
* @param maximumPoolSize 該執行緒池中執行緒總數最大值
* @param keepAliveTime 非核心執行緒閑置超時時長
* @param unit keepAliveTime 的單位
* @param workQueue 執行緒池中的任務佇列
* @param threadFactory 創建執行緒的工廠
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
}
/**
* @param corePoolSize 該執行緒池中核心執行緒數最大值
* @param maximumPoolSize 該執行緒池中執行緒總數最大值
* @param keepAliveTime 非核心執行緒閑置超時時長
* @param unit keepAliveTime 的單位
* @param workQueue 執行緒池中的任務佇列
* @param handler 飽和策略
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
}
/**
* @param corePoolSize 該執行緒池中核心執行緒數最大值
* @param maximumPoolSize 該執行緒池中執行緒總數最大值
* @param keepAliveTime 非核心執行緒閑置超時時長
* @param unit keepAliveTime 的單位
* @param workQueue 執行緒池中的任務佇列
* @param threadFactory 創建執行緒的工廠
* @param handler 飽和策略
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
}
引數介紹:
-
int corePoolSize
執行緒池的核心執行緒數,默認情況下,核心執行緒會在執行緒池中一致存活,即使它們處于閑置狀態,如果將 ThreadPoolExecutor 的 allowCoreThreadTimeOut 屬性設定為 true,那么閑置的核心執行緒在等待新任務到來時會有超時策略,這個時間間隔由 keepAliveTime 所指定,當等待時間超過 keepAliveTime 所指定的時長后,核心執行緒就會被終止, -
int maximumPoolSize
執行緒池所能容納的最大執行緒數,當活動執行緒數達到這個數值后,后續的新任務將會被阻塞, -
long keepAliveTime
非核心執行緒閑置時的超時時長,超過這個時長,非核心執行緒就會被回收,當 ThreadPoolExecutor 的 allowCoreThreadTimeOut 屬性設定為 true 時,keepAliveTime 同樣會作用于核心執行緒, -
TimeUnit unit
用于指定 keepAliveTime 引數的時間單位,這是一個列舉, -
BlockingQueue workQueue
執行緒池中的任務佇列,通過執行緒池的 execute 方法提交的 Runnable 物件會存盤在這個佇列中, -
ThreadFactory threadFactory
執行緒工廠,為執行緒池提供創建新執行緒的功能,ThreadFactory 是一個介面,它只有一個方法:Thread newThread(Runnable r),原始碼如下:
/**
* An object that creates new threads on demand. Using thread factories
* removes hardwiring of calls to {@link Thread#Thread(Runnable) new Thread},
* enabling applications to use special thread subclasses, priorities, etc.
*
* <p>
* The simplest implementation of this interface is just:
* <pre> {@code
* class SimpleThreadFactory implements ThreadFactory {
* public Thread newThread(Runnable r) {
* return new Thread(r);
* }
* }}</pre>
*
* The {@link Executors#defaultThreadFactory} method provides a more
* useful simple implementation, that sets the created thread context
* to known values before returning it.
* @since 1.5
* @author Doug Lea
*/
public interface ThreadFactory {
/**
* Constructs a new {@code Thread}. Implementations may also initialize
* priority, name, daemon status, {@code ThreadGroup}, etc.
*
* @param r a runnable to be executed by new thread instance
* @return constructed thread, or {@code null} if the request to
* create a thread is rejected
*/
Thread newThread(Runnable r);
}
- RejectedExecutionHandler handler
飽和策略,當執行緒池無法執行新任務時,這可能是由于任務佇列已滿或者是無法成功執行任務,這個時候 ThreadPoolExecutor 會呼叫 handler 的 void rejectedExecution(Runnable r, ThreadPoolExecutor executor) 方法來通知呼叫者,默認情況下 rejectedExecution() 方法會直接拋出一個 RejectedExecutionException 例外,RejectedExecutionHandler 原始碼如下:
/**
* A handler for tasks that cannot be executed by a {@link ThreadPoolExecutor}.
*
* @since 1.5
* @author Doug Lea
*/
public interface RejectedExecutionHandler {
/**
* Method that may be invoked by a {@link ThreadPoolExecutor} when
* {@link ThreadPoolExecutor#execute execute} cannot accept a
* task. This may occur when no more threads or queue slots are
* available because their bounds would be exceeded, or upon
* shutdown of the Executor.
*
* <p>In the absence of other alternatives, the method may throw
* an unchecked {@link RejectedExecutionException}, which will be
* propagated to the caller of {@code execute}.
*
* @param r the runnable task requested to be executed
* @param executor the executor attempting to execute this task
* @throws RejectedExecutionException if there is no remedy
*/
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
ThreadPoolExecutor 為 RejectedExecutionHandler 提供了四個可選值:AbortPolicy、CallerRunsPolicy、DiscardPolicy、DiscardOldestPolicy,其中 AbortPolicy 是默認值,它會直接拋出一個 RejectedExecutionException 例外,詳細解釋:
-
AbortPolicy
該策略是默認的拒絕策略,該策略將拋出未檢查的 RejectedExecutionException,我們可以捕獲這個例外,然后根據需求做相應的處理 -
CallerRunsPolicy
直接在 execute 方法的呼叫執行緒中運行被拒絕的任務,除非執行程式已關閉,在這種情況下任務將被丟棄, -
DiscardPolicy
它默默地丟棄被拒絕的任務, -
DiscardOldestPolicy
丟棄最早的未處理請求,然后重試 execute,
執行任務的規則
- 如果執行緒池中的執行緒數量未達到核心執行緒的數量,那么會直接啟動一個核心執行緒來執行任務,
- 如果執行緒池中的執行緒數量已經達到或者超過核心執行緒的數量,那么任務會被插入到任務佇列中排隊等待執行,
- 如果在步驟2中無法將任務插入到任務佇列中,這往往是由于佇列任務已滿,這個時候如果執行緒數量未達到執行緒池規定的最大值,那么會立刻啟動一個非核心執行緒來執行任務,
- 如果步驟3中的執行緒數量已經達到執行緒池規定的最大值,那么就拒絕執行此任務, ThreadPoolExecutor 會呼叫 RejectedExecutionHandler 的 rejectedExecution() 方法來通知呼叫者,
執行緒池的分類
上文中提到通過 Executors提供的工廠方法可以快速的創建執行緒池來使用,那么 JDK 1.8 的Executors 總共可以歸納為提供了6種執行緒池的創建:
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-ZjJjUzAE-1634115316459)(https://upload-images.jianshu.io/upload_images/27041669-b3b4224a2e45091d.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)]
- FixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
這是一種執行緒數固定的執行緒池,當執行緒處于空閑狀態時,它們并不會被回收,除非執行緒池被關閉了,當所有執行緒都處于活動狀態時,新任務都會處于等待狀態,直到有執行緒空閑出來,由于 FixedThreadPool 只有核心執行緒并且這些核心執行緒不會被回收,這意味著它能夠更加快速的回應外界的請求,另外,任務佇列也是沒有大小限制的,
- CachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
這是一種執行緒數量不定的執行緒池,該執行緒池只有非核心執行緒,并且最大執行緒數為 Integer.MAX_VALUE,當執行緒池中的執行緒都處于活動狀態時,執行緒池會創建新的執行緒來處理新任務,否則就會利用空閑的執行緒來處理新任務,執行緒池中的空閑執行緒都有超時機制,這個超時時間為60秒,超過該時間閑置執行緒就會被回收,和FixedThreadPool 不同的是,CachedThreadPool 的任務佇列其實相當于一個空集合,這將導致任何任務都會被立即執行,因為在這種場景下 SynchronousQueue 是無法插入任務的,SynchronousQueue 是一個非常特殊的佇列,在很多情況下可以把它簡單理解為一個無法存盤元素的佇列,該執行緒池比較適合執行大量的耗時較少的任務,當整個執行緒池都處于空閑狀態時,執行緒池中的執行緒都會超時而被停止,這個時候 CachedThreadPool 之中實際是沒有任務執行緒的,它幾乎是不占用任何系統資源的,
- ScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
這是一個核心執行緒數量固定的,而非核心執行緒數無限制,并且當非核心執行緒閑置時會被立即回收的執行緒池,該執行緒池主要用于執行定時任務和具有固定周期的重復任務,
- SingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
這是一個內部只有一個執行緒的執行緒池,它確保所有的任務都在同一個執行緒中按順序執行,該執行緒池的意義在于統一所有外界任務到一個執行緒中,這使得這些任務之間不需要處理執行緒同步的問題,
- SingleThreadScheduledExecutor
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1, threadFactory));
}
這是一個可以周期性執行任務的單執行緒執行緒池,
WorkStealingPool
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
這是JDK1.8引入的執行緒池,該執行緒池使用所有可用處理器作為目標并行度,創建一個竊取執行緒的池,擁有多個任務佇列,其中一個任務可以產生其他較小的任務,這些任務被添加到并行處理執行緒的佇列中,如果一個執行緒完成了作業并且無事可做,則可以從另一執行緒的佇列中"竊取"作業,這個執行緒池不會保證任務的順序執行,因為它的作業方式是搶占式的,該執行緒池主要使用了 ForkJoinPool來作為實作,基于 work-stealing 演算法,
轉載請註明出處,本文鏈接:https://www.uj5u.com/yidong/312161.html
標籤:其他
上一篇:重磅首發!騰訊前晚最新爆出的“Android Studio零基礎入門教材“,GitHub已評“鉆級“,看完我愛了!
