原始碼簡介
ThreadPoolExecutor是JDK中的執行緒池實作,這個類實作了一個執行緒池需要的各個方法,它提供了任務提交、執行緒管理、監控等方法,
下面是ThreadPoolExecutor類的構造方法原始碼,其他創建執行緒池的方法最終都會導向這個構造方法,共有7個引數:corePoolSize、maximumPoolSize、keepAliveTime、unit、workQueue、threadFactory、handler,
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
這些引數都通過volatile修飾:
public class ThreadPoolExecutor extends AbstractExecutorService {
private final BlockingQueue<Runnable> workQueue;
private volatile ThreadFactory threadFactory;
private volatile RejectedExecutionHandler handler;
private volatile long keepAliveTime;
// 是否允許核心執行緒被回收
private volatile boolean allowCoreThreadTimeOut;
private volatile int corePoolSize;
private volatile int maximumPoolSize;
}
corePoolSize:核心執行緒數
執行緒池維護的最小執行緒數量,核心執行緒創建后不會被回收(注意:設定allowCoreThreadTimeout=true后,空閑的核心執行緒超過存活時間也會被回收),
大于核心執行緒數的執行緒,在空閑時間超過keepAliveTime后會被回收,
執行緒池剛創建時,里面沒有一個執行緒,當呼叫 execute() 方法添加一個任務時,如果正在運行的執行緒數量小于corePoolSize,則馬上創建新執行緒并運行這個任務,
maximumPoolSize:最大執行緒數
執行緒池允許創建的最大執行緒數量,
當添加一個任務時,核心執行緒數已滿,執行緒池還沒達到最大執行緒數,并且沒有空閑執行緒,作業佇列已滿的情況下,創建一個新執行緒,然后從作業佇列的頭部取出一個任務交由新執行緒來處理,而將剛提交的任務放入作業佇列尾部,
keepAliveTime:空閑執行緒存活時間
當一個可被回收的執行緒的空閑時間大于keepAliveTime,就會被回收,
可被回收的執行緒:
- 設定allowCoreThreadTimeout=true的核心執行緒,
- 大于核心執行緒數的執行緒(非核心執行緒),
unit:時間單位
keepAliveTime的時間單位:
TimeUnit.NANOSECONDS
TimeUnit.MICROSECONDS
TimeUnit.MILLISECONDS // 毫秒
TimeUnit.SECONDS
TimeUnit.MINUTES
TimeUnit.HOURS
TimeUnit.DAYS
workQueue:作業佇列
新任務被提交后,會先添加到作業佇列,任務調度時再從佇列中取出任務,作業佇列實作了BlockingQueue介面,
JDK默認的作業佇列有五種:
- ArrayBlockingQueue 陣列型阻塞佇列:陣列結構,初始化時傳入大小,有界,FIFO,使用一個重入鎖,默認使用非公平鎖,入隊和出隊共用一個鎖,互斥,
- LinkedBlockingQueue 鏈表型阻塞佇列:鏈表結構,默認初始化大小為Integer.MAX_VALUE,有界(近似無解),FIFO,使用兩個重入鎖分別控制元素的入隊和出隊,用Condition進行執行緒間的喚醒和等待,
- SynchronousQueue 同步佇列:容量為0,添加任務必須等待取出任務,這個佇列相當于通道,不存盤元素,
- PriorityBlockingQueue 優先阻塞佇列:無界,默認采用元素自然順序升序排列,
- DelayQueue 延時佇列:無界,元素有過期時間,過期的元素才能被取出,
threadFactory:執行緒工廠
創建執行緒的工廠,可以設定執行緒名、執行緒編號等,
默認執行緒工廠:
/**
* The default thread factory
*/
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
handler:拒絕策略
當執行緒池執行緒數已滿,并且作業佇列達到限制,新提交的任務使用拒絕策略處理,可以自定義拒絕策略,拒絕策略需要實作RejectedExecutionHandler介面,
JDK默認的拒絕策略有四種:
- AbortPolicy:丟棄任務并拋出RejectedExecutionException例外,
- DiscardPolicy:丟棄任務,但是不拋出例外,可能導致無法發現系統的例外狀態,
- DiscardOldestPolicy:丟棄佇列最前面的任務,然后重新提交被拒絕的任務,
- CallerRunsPolicy:由呼叫執行緒處理該任務,
默認拒絕策略:
/**
* The default rejected execution handler
*/
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
自定義執行緒池工具
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 執行緒池工廠工具
*
* @author 向振華
* @date 2021/04/11 10:24
*/
public class ThreadPoolFactory {
/**
* 生成固定大小的執行緒池
*
* @param threadName 執行緒名稱
* @return 執行緒池
*/
public static ExecutorService createFixedThreadPool(String threadName) {
AtomicInteger threadNumber = new AtomicInteger(0);
return new ThreadPoolExecutor(
// 核心執行緒數
desiredThreadNum(),
// 最大執行緒數
desiredThreadNum() * 2,
// 空閑執行緒存活時間
60L,
// 空閑執行緒存活時間單位
TimeUnit.SECONDS,
// 作業佇列
new ArrayBlockingQueue<>(1024),
// 執行緒工廠
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, threadName + "-" + threadNumber.getAndIncrement());
}
},
// 拒絕策略
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (!executor.isShutdown()) {
try {
//嘗試阻塞式加入任務佇列
executor.getQueue().put(r);
} catch (Exception e) {
//保持執行緒的中斷狀態
Thread.currentThread().interrupt();
}
}
}
});
}
/**
* 理想的執行緒數,使用 2倍cpu核心數
*/
public static int desiredThreadNum() {
return Runtime.getRuntime().availableProcessors() * 2;
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/275461.html
標籤:java
上一篇:設計模式:行為型-責任鏈模式
