執行緒池的監控很重要,對于前面章節講的動態引數調整,其實還是得依賴于執行緒池監控的資料反饋之后才能做出調整的決策,還有就是執行緒池本身的運行程序對于我們來說像一個黑盒,我們沒辦法了解執行緒池中的運行狀態時,出現問題沒有辦法及時判斷和預警,
對于監控這類的場景,核心邏輯就是要拿到關鍵指標,然后進行上報,只要能實時拿到這些關鍵指標,就可以輕松實作監控以及預警功能,
ThreadPoolExecutor中提供了以下方法來獲取執行緒池中的指標,
- getCorePoolSize():獲取核心執行緒數,
- getMaximumPoolSize:獲取最大執行緒數,
- getQueue():獲取執行緒池中的阻塞佇列,并通過阻塞佇列中的方法獲取佇列長度、元素個數等,
- getPoolSize():獲取執行緒池中的作業執行緒數(包括核心執行緒和非核心執行緒),
- getActiveCount():獲取活躍執行緒數,也就是正在執行任務的執行緒,
- getLargestPoolSize():獲取執行緒池曾經到過的最大作業執行緒數,
- getTaskCount():獲取歷史已完成以及正在執行的總的任務數量,
除此之外,ThreadPoolExecutor中還提供了一些未實作的鉤子方法,我們可以通過重寫這些方法來實作更多指標資料的獲取,
- beforeExecute,在Worker執行緒執行任務之前會呼叫的方法,
- afterExecute,在Worker執行緒執行任務之后會呼叫的方法,
- terminated,當執行緒池從狀態變更到TERMINATED狀態之前呼叫的方法,
比如我們可以在beforeExecute方法中記錄當前任務開始執行的時間,再到afterExecute方法來計算任務執行的耗時、最大耗時、最小耗時、平均耗時等,
執行緒池監控的基本原理
我們可以通過Spring Boot提供的Actuator,自定義一個Endpoint來發布執行緒池的指標資料,實作執行緒池監控功能,當然,除了Endpoint以外,我們還可以通過JMX的方式來暴露執行緒池的指標資訊,不管通過什么方法,核心思想都是要有一個地方看到這些資料,
了解對于Spring Boot應用監控得讀者應該知道,通過Endpoint發布指標資料后,可以采用一些主流的開源監控工具來進行采集和展示,如圖10-9所示,假設在Spring Boot應用中發布一個獲取執行緒池指標資訊的Endpoint,那么我們可以采用Prometheus定時去抓取目標服務器上的Metric資料,Prometheus會將采集到的資料通過Retrieval分發給TSDB進行存盤,這些資料可以通過Prometheus自帶的UI進行展示,也可以使用Grafana圖表工具通過PromQL陳述句來查詢Prometheus中采集的資料進行渲染,最后采用AlertManager這個組件來觸發預警功能,

圖10-9中所涉及到的工具都是比較程度的開源監控組件,大家可以自行根據官方教程配置即可,而在本章節中要重點講解的就是如何自定義Endpoint發布執行緒池的Metric資料,
在Spring Boot應用中發布執行緒池資訊
對于執行緒池的監控實作,筆者開發了一個相對較為完整的小程式,主要涉及到幾個功能:
- 可以通過組態檔來構建執行緒池,
- 擴展了ThreadPoolExecutor的實作,
- 發布一個自定義的Endpoint,
該小程式包含的類以及功能說明如下:
- ThreadPoolExecutorForMonitor:擴展ThreadPoolExecutor的實作類,
- ThreadPoolConfigurationProperties:系結application.properties的配置屬性,
- ThreadPoolForMonitorManager:執行緒池管理類,實作執行緒池的初始化,
- ThreadPoolProperties:執行緒池基本屬性,
- ResizeLinkedBlockingQueue:這個類是直接復制了LinkedBlockingQueue,提供了
setCapacity方法,在前面有講解到,原始碼就不貼出來, - ThreadPoolEndpoint:自定義Endpoint,
ThreadPoolExecutorForMonitor
繼承了ThreadPoolExecutor,實作了beforeExecute和afterExecute,在原有執行緒池的基礎上新增了最短執行時間、最長執行時間、平均執行耗時的屬性,
public class ThreadPoolExecutorForMonitor extends ThreadPoolExecutor {
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
private static final String defaultPoolName="Default-Task";
private static ThreadFactory threadFactory=new MonitorThreadFactory(defaultPoolName);
public ThreadPoolExecutorForMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,threadFactory,defaultHandler);
}
public ThreadPoolExecutorForMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,String poolName) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,new MonitorThreadFactory(poolName),defaultHandler);
}
public ThreadPoolExecutorForMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler,String poolName) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,threadFactory,handler);
}
//最短執行時間
private long minCostTime;
//最長執行時間
private long maxCostTime;
//總的耗時
private AtomicLong totalCostTime=new AtomicLong();
private ThreadLocal<Long> startTimeThreadLocal=new ThreadLocal<>();
@Override
public void shutdown() {
super.shutdown();
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
startTimeThreadLocal.set(System.currentTimeMillis());
super.beforeExecute(t, r);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
long costTime=System.currentTimeMillis()-startTimeThreadLocal.get();
startTimeThreadLocal.remove();
maxCostTime=maxCostTime>costTime?maxCostTime:costTime;
if(getCompletedTaskCount()==0){
minCostTime=costTime;
}
minCostTime=minCostTime<costTime?minCostTime:costTime;
totalCostTime.addAndGet(costTime);
super.afterExecute(r, t);
}
public long getMinCostTime() {
return minCostTime;
}
public long getMaxCostTime() {
return maxCostTime;
}
public long getAverageCostTime(){//平均耗時
if(getCompletedTaskCount()==0||totalCostTime.get()==0){
return 0;
}
return totalCostTime.get()/getCompletedTaskCount();
}
@Override
protected void terminated() {
super.terminated();
}
static class MonitorThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
MonitorThreadFactory(String poolName) {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = poolName+"-pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
}
ThreadPoolConfigurationProperties
提供了獲取application.properties組態檔屬性的功能,
@ConfigurationProperties(prefix = "monitor.threadpool")
@Data
public class ThreadPoolConfigurationProperties {
private List<ThreadPoolProperties> executors=new ArrayList<>();
}
執行緒池的核心屬性宣告,
@Data
public class ThreadPoolProperties {
private String poolName;
private int corePoolSize;
private int maxmumPoolSize=Runtime.getRuntime().availableProcessors();
private long keepAliveTime=60;
private TimeUnit unit= TimeUnit.SECONDS;
private int queueCapacity=Integer.MAX_VALUE;
}
上述配置類要生效,需要通過@EnableConfigurationProperties開啟,我們可以在Main方法上開啟,代碼如下,
@EnableConfigurationProperties(ThreadPoolConfigurationProperties.class)
@SpringBootApplication
public class ThreadPoolApplication {
public static void main(String[] args) {
SpringApplication.run(ThreadPoolApplication.class, args);
}
}
application.properties
配置類創建好之后,我們就可以在application.properties中,通過如下方式來構建執行緒池,
monitor.threadpool.executors[0].pool-name=first-monitor-thread-pool
monitor.threadpool.executors[0].core-pool-size=4
monitor.threadpool.executors[0].maxmum-pool-size=8
monitor.threadpool.executors[0].queue-capacity=100
monitor.threadpool.executors[1].pool-name=second-monitor-thread-pool
monitor.threadpool.executors[1].core-pool-size=2
monitor.threadpool.executors[1].maxmum-pool-size=4
monitor.threadpool.executors[1].queue-capacity=40
ThreadPoolForMonitorManager
用來實作執行緒池的管理和初始化,實作執行緒池的統一管理,初始化的邏輯是根據application.properties中配置的屬性來實作的,
- 從配置類中獲得執行緒池的基本配置,
- 根據配置資訊構建ThreadPoolExecutorForMonitor實體,
- 把實體資訊保存到集合中,
@Component
public class ThreadPoolForMonitorManager {
@Autowired
ThreadPoolConfigurationProperties poolConfigurationProperties;
private final ConcurrentMap<String,ThreadPoolExecutorForMonitor> threadPoolExecutorForMonitorConcurrentMap=new ConcurrentHashMap<>();
@PostConstruct
public void init(){
poolConfigurationProperties.getExecutors().forEach(threadPoolProperties -> {
if(!threadPoolExecutorForMonitorConcurrentMap.containsKey(threadPoolProperties.getPoolName())){
ThreadPoolExecutorForMonitor executorForMonitor=new ThreadPoolExecutorForMonitor(
threadPoolProperties.getCorePoolSize(),
threadPoolProperties.getMaxmumPoolSize(),
threadPoolProperties.getKeepAliveTime(),
threadPoolProperties.getUnit(),
new ResizeLinkedBlockingQueue<>(threadPoolProperties.getQueueCapacity()),
threadPoolProperties.getPoolName());
threadPoolExecutorForMonitorConcurrentMap.put(threadPoolProperties.getPoolName(),executorForMonitor);
}
});
}
public ThreadPoolExecutorForMonitor getThreadPoolExecutor(String poolName){
ThreadPoolExecutorForMonitor threadPoolExecutorForMonitor=threadPoolExecutorForMonitorConcurrentMap.get(poolName);
if(threadPoolExecutorForMonitor==null){
throw new RuntimeException("找不到名字為"+poolName+"的執行緒池");
}
return threadPoolExecutorForMonitor;
}
public ConcurrentMap<String,ThreadPoolExecutorForMonitor> getThreadPoolExecutorForMonitorConcurrentMap(){
return this.threadPoolExecutorForMonitorConcurrentMap;
}
}
ThreadPoolEndpoint
使用Spring-Boot-Actuator發布Endpoint,用來暴露當前應用中所有執行緒池的Metric資料,
讀者如果不清楚在Spring Boot中自定義Endpoint,可以直接去Spring官方檔案中配置,比較簡單,
@Configuration
@Endpoint(id="thread-pool")
public class ThreadPoolEndpoint {
@Autowired
private ThreadPoolForMonitorManager threadPoolForMonitorManager;
@ReadOperation
public Map<String,Object> threadPoolsMetric(){
Map<String,Object> metricMap=new HashMap<>();
List<Map> threadPools=new ArrayList<>();
threadPoolForMonitorManager.getThreadPoolExecutorForMonitorConcurrentMap().forEach((k,v)->{
ThreadPoolExecutorForMonitor tpe=(ThreadPoolExecutorForMonitor) v;
Map<String,Object> poolInfo=new HashMap<>();
poolInfo.put("thread.pool.name",k);
poolInfo.put("thread.pool.core.size",tpe.getCorePoolSize());
poolInfo.put("thread.pool.largest.size",tpe.getLargestPoolSize());
poolInfo.put("thread.pool.max.size",tpe.getMaximumPoolSize());
poolInfo.put("thread.pool.thread.count",tpe.getPoolSize());
poolInfo.put("thread.pool.max.costTime",tpe.getMaxCostTime());
poolInfo.put("thread.pool.average.costTime",tpe.getAverageCostTime());
poolInfo.put("thread.pool.min.costTime",tpe.getMinCostTime());
poolInfo.put("thread.pool.active.count",tpe.getActiveCount());
poolInfo.put("thread.pool.completed.taskCount",tpe.getCompletedTaskCount());
poolInfo.put("thread.pool.queue.name",tpe.getQueue().getClass().getName());
poolInfo.put("thread.pool.rejected.name",tpe.getRejectedExecutionHandler().getClass().getName());
poolInfo.put("thread.pool.task.count",tpe.getTaskCount());
threadPools.add(poolInfo);
});
metricMap.put("threadPools",threadPools);
return metricMap;
}
}
如果需要上述自定義的Endpoint可以被訪問,還需要在application.properties檔案中配置如下代碼,意味著thread-pool Endpoint允許被訪問,
management.endpoints.web.exposure.include=thread-pool
TestController
提供使用執行緒池的方法,用來實作在呼叫之前和呼叫之后,通過Endpoint獲取到Metric資料的變化,
@RestController
public class TestController {
private final String poolName="first-monitor-thread-pool";
@Autowired
ThreadPoolForMonitorManager threadPoolForMonitorManager;
@GetMapping("/execute")
public String doExecute(){
ThreadPoolExecutorForMonitor tpe=threadPoolForMonitorManager.getThreadPoolExecutor(poolName);
for (int i = 0; i < 100; i++) {
tpe.execute(()->{
try {
Thread.sleep(new Random().nextInt(4000));
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
return "success";
}
}
效果演示
訪問自定義Endpoint: http://ip:8080/actuator/thread-pool,就可以看到如下資料,我們可以把這個Endpoint配置到Prometheus中,Prometheus會定時抓取這些指標存盤并展示,從而完成執行緒池的整體監控,
{
"threadPools":[
{
"thread.pool.queue.name":"com.concurrent.demo.ResizeLinkedBlockingQueue",
"thread.pool.core.size":2,
"thread.pool.min.costTime":0,
"thread.pool.completed.taskCount":0,
"thread.pool.max.costTime":0,
"thread.pool.task.count":0,
"thread.pool.name":"second-monitor-thread-pool",
"thread.pool.largest.size":0,
"thread.pool.rejected.name":"java.util.concurrent.ThreadPoolExecutor$AbortPolicy",
"thread.pool.active.count":0,
"thread.pool.thread.count":0,
"thread.pool.average.costTime":0,
"thread.pool.max.size":4
},
{
"thread.pool.queue.name":"com.concurrent.demo.ResizeLinkedBlockingQueue",
"thread.pool.core.size":4,
"thread.pool.min.costTime":65,
"thread.pool.completed.taskCount":115,
"thread.pool.max.costTime":3964,
"thread.pool.task.count":200,
"thread.pool.name":"first-monitor-thread-pool",
"thread.pool.largest.size":4,
"thread.pool.rejected.name":"java.util.concurrent.ThreadPoolExecutor$AbortPolicy",
"thread.pool.active.count":4,
"thread.pool.thread.count":4,
"thread.pool.average.costTime":1955,
"thread.pool.max.size":8
}
]
}
總結
執行緒池的整體實作并不算太復雜,但是里面涉及到的一些思想和理論是可以值得我們去學習和借鑒,如基于阻塞佇列的生產者消費者模型的實作、動態擴容的思想、如何通過AQS來實作安全關閉執行緒池、降級方案(拒絕策略)、位運算等,實際上越底層的實作,越包含更多技術層面的思想和理論,
執行緒池在實際使用中,如果是新手,不建議直接用Executors中提供的工廠方法,因為執行緒池中的引數會影響到記憶體以及CPU資源的占用,我們可以自己集成ThreadPoolExecutor這個類,擴展一個自己的實作,也可以自己構造ThreadPoolExecutor實體,這樣能夠更好的了解執行緒池中核心引數的意義避免不必要的生產問題,
關注[跟著Mic學架構]公眾號,獲取更多精品原創

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/325235.html
標籤:Java
上一篇:JVM記憶體區域
