前提
Tomcat 10.1.x
Tomcat執行緒池介紹
Tomcat執行緒池,源于JAVA JDK自帶執行緒池,由于JAVA JDK執行緒池策略,比較適合處理 CPU 密集型任務,但是對于 I/O 密集型任務,如資料庫查詢,rpc 請求呼叫等,不是很友好,所以Tomcat在其基礎上進行了擴展,
任務處理流程

擴展執行緒池相關原始碼簡析
Tomcat中定義了一個StandardThreadExecutor類,該類實作了org.apache.catalina.Executor,org.apache.tomcat.util.threads.ResizableExecutor介面
該類內部定義了namePrefix(創建的執行緒名稱前綴,默認值tomcat-exec-),maxThreads(最大執行緒數,默認值 200),minSpareThreads(最小執行緒數,即核心執行緒數,默認值 25),maxIdleTime(執行緒最大空閑時間,毫秒為單位,默認值60秒),maxQueueSize(最大佇列大小,默認值 Integer.MAX_VALUE)等屬性,此外,還定義了一個org.apache.tomcat.util.threads.ThreadPoolExecutor型別的執行器物件,一個execute(Runnable command) 方法
當execute(Runnable command) 方法被呼叫時,會呼叫上述ThreadPoolExecutor類物件的execute方法
org.apache.catalina.core.StandardThreadExecutor.java
import org.apache.catalina.Executor;
import org.apache.catalina.LifecycleException;
import org.apache.catalina.LifecycleState;
import org.apache.catalina.util.LifecycleMBeanBase;
import org.apache.tomcat.util.res.StringManager;
import org.apache.tomcat.util.threads.ResizableExecutor;
import org.apache.tomcat.util.threads.TaskQueue;
import org.apache.tomcat.util.threads.TaskThreadFactory;
import org.apache.tomcat.util.threads.ThreadPoolExecutor;
public class StandardThreadExecutor extends LifecycleMBeanBase
implements Executor, ResizableExecutor {
protected static final StringManager sm = StringManager.getManager(StandardThreadExecutor.class);
// ---------------------------------------------- Properties
/**
* Default thread priority
*/
protected int threadPriority = Thread.NORM_PRIORITY;
/**
* Run threads in daemon or non-daemon state
*/
protected boolean daemon = true;
/**
* Default name prefix for the thread name
*/
protected String namePrefix = "tomcat-exec-";
/**
* max number of threads
*/
protected int maxThreads = 200;
/**
* min number of threads
*/
protected int minSpareThreads = 25;
/**
* idle time in milliseconds
*/
protected int maxIdleTime = 60000;
/**
* The executor we use for this component
*/
protected ThreadPoolExecutor executor = null;
/**
* the name of this thread pool
*/
protected String name;
/**
* The maximum number of elements that can queue up before we reject them
*/
protected int maxQueueSize = Integer.MAX_VALUE;
/**
* After a context is stopped, threads in the pool are renewed. To avoid
* renewing all threads at the same time, this delay is observed between 2
* threads being renewed.
*/
protected long threadRenewalDelay =
org.apache.tomcat.util.threads.Constants.DEFAULT_THREAD_RENEWAL_DELAY;
private TaskQueue taskqueue = null;
// ---------------------------------------------- Constructors
public StandardThreadExecutor() {
//empty constructor for the digester
}
//....此處代碼已省略
@Override
public void execute(Runnable command) {
if (executor != null) {
// Note any RejectedExecutionException due to the use of TaskQueue
// will be handled by the o.a.t.u.threads.ThreadPoolExecutor
executor.execute(command);
} else {
throw new IllegalStateException(sm.getString("standardThreadExecutor.notStarted"));
}
}
//....此處代碼已省略
}
當org.apache.tomcat.util.threads.ThreadPoolExecuto類物件的execute(Runnable command) 方法被呼叫時,會呼叫該類定義的一個executeInternal方法,并在捕獲到RejectedExecutionException例外時,嘗試再次將任務放入作業佇列中,
executeInternal方法中,通過代碼可知,當前執行緒數小于核心執行緒池大小時,會創建新執行緒,否則,會呼叫workQueue物件(org.apache.tomcat.util.threads.TaskQueue型別)的offer方法,將任務進行排隊,Tomcat通過控制workQueue.offer()方法的回傳值,實作了當前執行緒數超過核心執行緒池大小時,優先創建執行緒,而不是讓任務排隊,
org.apache.tomcat.util.threads.ThreadPoolExecutor
public class ThreadPoolExecutor extends AbstractExecutorService {
//...此處代碼已省略
@Override
public void execute(Runnable command) {
submittedCount.incrementAndGet();
try {
executeInternal(command);
} catch (RejectedExecutionException rx) {
if (getQueue() instanceof TaskQueue) {
// If the Executor is close to maximum pool size, concurrent
// calls to execute() may result (due to Tomcat's use of
// TaskQueue) in some tasks being rejected rather than queued.
// If this happens, add them to the queue.
final TaskQueue queue = (TaskQueue) getQueue();
if (!queue.force(command)) {
submittedCount.decrementAndGet();
throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull"));
}
} else {
submittedCount.decrementAndGet();
throw rx;
}
}
}
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@link RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
private void executeInternal(Runnable command) {
if (command == null) {
throw new NullPointerException();
}
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { // 當前執行緒數小于核心執行緒數時,
if (addWorker(command, true)) { // 創建執行緒
return;
}
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) { //workQueue.offer(command)為false時,會走以下的else if分支,創建執行緒
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command)) {
reject(command);
} else if (workerCountOf(recheck) == 0) {
addWorker(null, false);
}
}
else if (!addWorker(command, false)) {
reject(command);
}
}
//...此處代碼已省略
}
org.apache.tomcat.util.threads.TaskQueue繼承于java.util.concurrent.LinkedBlockingQueue,并重寫了offer(排隊任務的方法),該方法中,當當前執行緒數大于核心執行緒數,小于最大執行緒數時,回傳false,導致上述executeInternal方法中workQueue.offer(command)為false,進而導致該分支代碼不被執行,執行addWorker(command, false)方法,創建新執行緒,
org.apache.tomcat.util.threads.TaskQueue
import java.util.Collection;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.tomcat.util.res.StringManager;
/**
* As task queue specifically designed to run with a thread pool executor. The
* task queue is optimised to properly utilize threads within a thread pool
* executor. If you use a normal queue, the executor will spawn threads when
* there are idle threads and you won't be able to force items onto the queue
* itself.
*/
public class TaskQueue extends LinkedBlockingQueue<Runnable> {
//...此處代碼已省略
/**
* Used to add a task to the queue if the task has been rejected by the Executor.
*
* @param o The task to add to the queue
*
* @return {@code true} if the task was added to the queue,
* otherwise {@code false}
*/
public boolean force(Runnable o) {
if (parent == null || parent.isShutdown()) {
throw new RejectedExecutionException(sm.getString("taskQueue.notRunning"));
}
return super.offer(o); //forces the item onto the queue, to be used if the task is rejected
}
@Override
public boolean offer(Runnable o) {
//we can't do any checks
if (parent==null) {
return super.offer(o);
}
//we are maxed out on threads, simply queue the object
if (parent.getPoolSize() == parent.getMaximumPoolSize()) {
return super.offer(o);
}
//we have idle threads, just add it to the queue
if (parent.getSubmittedCount()<=(parent.getPoolSize())) {
return super.offer(o);
}
//if we have less threads than maximum force creation of a new thread
if (parent.getPoolSize()<parent.getMaximumPoolSize()) {
return false;
}
//if we reached here, we need to add it to the queue
return super.offer(o);
}
//...此處代碼已省略
}
參考鏈接
https://gitee.com/apache/tomcat/blob/10.1.x/java/org/apache/catalina/core/StandardThreadExecutor.java
作者:授客
微信/QQ:1033553122
全國軟體測驗QQ交流群:7156436
Git地址:https://gitee.com/ishouke
友情提示:限于時間倉促,文中可能存在錯誤,歡迎指正、評論!
作者五行缺錢,如果覺得文章對您有幫助,請掃描下邊的二維碼打賞作者,金額隨意,您的支持將是我繼續創作的源動力,打賞后如有任何疑問,請聯系我!!!
微信打賞
支付寶打賞 全國軟體測驗交流QQ群
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/533508.html
標籤:其他
上一篇:c++基礎2
