摘要:今天我們就來一起手撕ScheduledThreadPoolExecutor類的源代碼,
本文分享自華為云社區《深度決議ScheduledThreadPoolExecutor類的源代碼》,作者:冰 河,
在之前的文章中,我們深度分析了ThreadPoolExecutor類的源代碼,而ScheduledThreadPoolExecutor類是ThreadPoolExecutor類的子類,今天我們就來一起手撕ScheduledThreadPoolExecutor類的源代碼,
構造方法
我們先來看下ScheduledThreadPoolExecutor的構造方法,源代碼如下所示,
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); } public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory); } public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), handler); } public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler); }
從代碼結構上來看,ScheduledThreadPoolExecutor類是ThreadPoolExecutor類的子類,ScheduledThreadPoolExecutor類的構造方法實際上呼叫的是ThreadPoolExecutor類的構造方法,
schedule方法
接下來,我們看一下ScheduledThreadPoolExecutor類的schedule方法,源代碼如下所示,
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { //如果傳遞的Runnable物件和TimeUnit時間單位為空 //拋出空指標例外 if (command == null || unit == null) throw new NullPointerException(); //封裝任務物件,在decorateTask方法中直接回傳ScheduledFutureTask物件 RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit))); //執行延時任務 delayedExecute(t); //回傳任務 return t; } public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) //如果傳遞的Callable物件和TimeUnit時間單位為空 //拋出空指標例外 if (callable == null || unit == null) throw new NullPointerException(); //封裝任務物件,在decorateTask方法中直接回傳ScheduledFutureTask物件 RunnableScheduledFuture<V> t = decorateTask(callable, new ScheduledFutureTask<V>(callable, triggerTime(delay, unit))); //執行延時任務 delayedExecute(t); //回傳任務 return t; }
從源代碼可以看出,ScheduledThreadPoolExecutor類提供了兩個多載的schedule方法,兩個schedule方法的第一個引數不同,可以傳遞Runnable介面物件,也可以傳遞Callable介面物件,在方法內部,會將Runnable介面物件和Callable介面物件封裝成RunnableScheduledFuture物件,本質上就是封裝成ScheduledFutureTask物件,并通過delayedExecute方法來執行延時任務,
在源代碼中,我們看到兩個schedule都呼叫了decorateTask方法,接下來,我們就看看decorateTask方法,
decorateTask方法
decorateTask方法源代碼如下所示,
protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) { return task; } protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> callable, RunnableScheduledFuture<V> task) { return task; }
通過原始碼可以看出decorateTask方法的實作比較簡單,接收一個Runnable介面物件或者Callable介面物件和封裝的RunnableScheduledFuture任務,兩個方法都是將RunnableScheduledFuture任務直接回傳,在ScheduledThreadPoolExecutor類的子類中可以重寫這兩個方法,
接下來,我們繼續看下scheduleAtFixedRate方法,
scheduleAtFixedRate方法
scheduleAtFixedRate方法源代碼如下所示,
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { //傳入的Runnable物件和TimeUnit為空,則拋出空指標例外 if (command == null || unit == null) throw new NullPointerException(); //如果執行周期period傳入的數值小于或者等于0 //拋出非法引數例外 if (period <= 0) throw new IllegalArgumentException(); //將Runnable物件封裝成ScheduledFutureTask任務, //并設定執行周期 ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); //呼叫decorateTask方法,本質上還是直接回傳ScheduledFutureTask物件 RunnableScheduledFuture<Void> t = decorateTask(command, sft); //設定執行的任務 sft.outerTask = t; //執行延時任務 delayedExecute(t); //回傳執行的任務 return t; }
通過原始碼可以看出,scheduleAtFixedRate方法將傳遞的Runnable物件封裝成ScheduledFutureTask任務物件,并設定了執行周期,下一次的執行時間相對于上一次的執行時間來說,加上了period時長,時長的具體單位由TimeUnit決定,采用固定的頻率來執行定時任務,
ScheduledThreadPoolExecutor類中另一個定時調度任務的方法是scheduleWithFixedDelay方法,接下來,我們就一起看看scheduleWithFixedDelay方法,
scheduleWithFixedDelay方法
scheduleWithFixedDelay方法的源代碼如下所示,
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { //傳入的Runnable物件和TimeUnit為空,則拋出空指標例外 if (command == null || unit == null) throw new NullPointerException(); //任務延時時長小于或者等于0,則拋出非法引數例外 if (delay <= 0) throw new IllegalArgumentException(); //將Runnable物件封裝成ScheduledFutureTask任務 //并設定固定的執行周期來執行任務 ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null,triggerTime(initialDelay, unit), unit.toNanos(-delay)); //呼叫decorateTask方法,本質上直接回傳ScheduledFutureTask任務 RunnableScheduledFuture<Void> t = decorateTask(command, sft); //設定執行的任務 sft.outerTask = t; //執行延時任務 delayedExecute(t); //回傳任務 return t; }
從scheduleWithFixedDelay方法的源代碼,我們可以看出在將Runnable物件封裝成ScheduledFutureTask時,設定了執行周期,但是此時設定的執行周期與scheduleAtFixedRate方法設定的執行周期不同,此時設定的執行周期規則為:下一次任務執行的時間是上一次任務完成的時間加上delay時長,時長單位由TimeUnit決定,也就是說,具體的執行時間不是固定的,但是執行的周期是固定的,整體采用的是相對固定的延遲來執行定時任務,
如果大家細心的話,會發現在scheduleWithFixedDelay方法中設定執行周期時,傳遞的delay值為負數,如下所示,
ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(-delay));
這里的負數表示的是相對固定的延遲,
在ScheduledFutureTask類中,存在一個setNextRunTime方法,這個方法會在run方法執行完任務后呼叫,這個方法更能體現scheduleAtFixedRate方法和scheduleWithFixedDelay方法的不同,setNextRunTime方法的原始碼如下所示,
private void setNextRunTime() { //距離下次執行任務的時長 long p = period; //固定頻率執行, //上次執行任務的時間 //加上任務的執行周期 if (p > 0) time += p; //相對固定的延遲 //使用的是系統當前時間 //加上任務的執行周期 else time = triggerTime(-p); }
在setNextRunTime方法中通過對下次執行任務的時長進行判斷來確定是固定頻率執行還是相對固定的延遲,
triggerTime方法
在ScheduledThreadPoolExecutor類中提供了兩個triggerTime方法,用于獲取下一次執行任務的具體時間,triggerTime方法的原始碼如下所示,
private long triggerTime(long delay, TimeUnit unit) { return triggerTime(unit.toNanos((delay < 0) ? 0 : delay)); } long triggerTime(long delay) { return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay)); }
這兩個triggerTime方法的代碼比較簡單,就是獲取下一次執行任務的具體時間,有一點需要注意的是:delay < (Long.MAX_VALUE >> 1判斷delay的值是否小于Long.MAX_VALUE的一半,如果小于Long.MAX_VALUE值的一半,則直接回傳delay,否則需要處理溢位的情況,
我們看到在triggerTime方法中處理防止溢位的邏輯使用了overflowFree方法,接下來,我們就看看overflowFree方法的實作,
overflowFree方法
overflowFree方法的源代碼如下所示,
private long overflowFree(long delay) { //獲取佇列中的節點 Delayed head = (Delayed) super.getQueue().peek(); //獲取的節點不為空,則進行后續處理 if (head != null) { //從佇列節點中獲取延遲時間 long headDelay = head.getDelay(NANOSECONDS); //如果從佇列中獲取的延遲時間小于0,并且傳遞的delay //值減去從佇列節點中獲取延遲時間小于0 if (headDelay < 0 && (delay - headDelay < 0)) //將delay的值設定為Long.MAX_VALUE + headDelay delay = Long.MAX_VALUE + headDelay; } //回傳延遲時間 return delay; }
通過對overflowFree方法的原始碼分析,可以看出overflowFree方法本質上就是為了限制佇列中的所有節點的延遲時間在Long.MAX_VALUE值之內,防止在ScheduledFutureTask類中的compareTo方法中溢位,
ScheduledFutureTask類中的compareTo方法的原始碼如下所示,
public int compareTo(Delayed other) { if (other == this) // compare zero if same object return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; }
compareTo方法的主要作用就是對各延遲任務進行排序,距離下次執行時間靠前的任務就排在前面,
delayedExecute方法
delayedExecute方法是ScheduledThreadPoolExecutor類中延遲執行任務的方法,源代碼如下所示,
private void delayedExecute(RunnableScheduledFuture<?> task) { //如果當前執行緒池已經關閉 //則執行執行緒池的拒絕策略 if (isShutdown()) reject(task); //執行緒池沒有關閉 else { //將任務添加到阻塞佇列中 super.getQueue().add(task); //如果當前執行緒池是SHUTDOWN狀態 //并且當前執行緒池狀態下不能執行任務 //并且成功從阻塞佇列中移除任務 if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) //取消任務的執行,但不會中斷執行中的任務 task.cancel(false); else //呼叫ThreadPoolExecutor類中的ensurePrestart()方法 ensurePrestart(); } }
可以看到在delayedExecute方法內部呼叫了canRunInCurrentRunState方法,canRunInCurrentRunState方法的原始碼實作如下所示,
boolean canRunInCurrentRunState(boolean periodic) { return isRunningOrShutdown(periodic ? continueExistingPeriodicTasksAfterShutdown : executeExistingDelayedTasksAfterShutdown); }
可以看到canRunInCurrentRunState方法的邏輯比較簡單,就是判斷執行緒池當前狀態下能夠執行任務,
另外,在delayedExecute方法內部還呼叫了ThreadPoolExecutor類中的ensurePrestart()方法,接下來,我們看下ThreadPoolExecutor類中的ensurePrestart()方法的實作,如下所示,
void ensurePrestart() { int wc = workerCountOf(ctl.get()); if (wc < corePoolSize) addWorker(null, true); else if (wc == 0) addWorker(null, false); }
在ThreadPoolExecutor類中的ensurePrestart()方法中,首先獲取當前執行緒池中執行緒的數量,如果執行緒數量小于corePoolSize則呼叫addWorker方法傳遞null和true,如果執行緒數量為0,則呼叫addWorker方法傳遞null和false,
關于addWork()方法的原始碼決議,大家可以參考【高并發專題】中的《高并發之——通過ThreadPoolExecutor類的原始碼深度決議執行緒池執行任務的核心流程》一文,這里,不再贅述,
reExecutePeriodic方法
reExecutePeriodic方法的源代碼如下所示,
void reExecutePeriodic(RunnableScheduledFuture<?> task) { //執行緒池當前狀態下能夠執行任務 if (canRunInCurrentRunState(true)) { //將任務放入佇列 super.getQueue().add(task); //執行緒池當前狀態下不能執行任務,并且成功移除任務 if (!canRunInCurrentRunState(true) && remove(task)) //取消任務 task.cancel(false); else //呼叫ThreadPoolExecutor類的ensurePrestart()方法 ensurePrestart(); } }
總體來說reExecutePeriodic方法的邏輯比較簡單,但是,這里需要注意和delayedExecute方法的不同點:呼叫reExecutePeriodic方法的時候已經執行過一次任務,所以,并不會觸發執行緒池的拒絕策略;傳入reExecutePeriodic方法的任務一定是周期性的任務,
onShutdown方法
onShutdown方法是ThreadPoolExecutor類中的鉤子函式,它是在ThreadPoolExecutor類中的shutdown方法中呼叫的,而在ThreadPoolExecutor類中的onShutdown方法是一個空方法,如下所示,
void onShutdown() { }
ThreadPoolExecutor類中的onShutdown方法交由子類實作,所以ScheduledThreadPoolExecutor類覆寫了onShutdown方法,實作了具體的邏輯,ScheduledThreadPoolExecutor類中的onShutdown方法的原始碼實作如下所示,
@Override void onShutdown() { //獲取佇列 BlockingQueue<Runnable> q = super.getQueue(); //在執行緒池已經呼叫shutdown方法后,是否繼續執行現有延遲任務 boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy(); //在執行緒池已經呼叫shutdown方法后,是否繼續執行現有定時任務 boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy(); //在執行緒池已經呼叫shutdown方法后,不繼續執行現有延遲任務和定時任務 if (!keepDelayed && !keepPeriodic) { //遍歷佇列中的所有任務 for (Object e : q.toArray()) //取消任務的執行 if (e instanceof RunnableScheduledFuture<?>) ((RunnableScheduledFuture<?>) e).cancel(false); //清空佇列 q.clear(); } //在執行緒池已經呼叫shutdown方法后,繼續執行現有延遲任務和定時任務 else { //遍歷佇列中的所有任務 for (Object e : q.toArray()) { //當前任務是RunnableScheduledFuture型別 if (e instanceof RunnableScheduledFuture) { //將任務強轉為RunnableScheduledFuture型別 RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e; //在執行緒池呼叫shutdown方法后不繼續的延遲任務或周期任務 //則從佇列中洗掉并取消任務 if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) || t.isCancelled()) { if (q.remove(t)) t.cancel(false); } } } } //最終呼叫tryTerminate()方法 tryTerminate(); }
ScheduledThreadPoolExecutor類中的onShutdown方法的主要邏輯就是先判斷執行緒池呼叫shutdown方法后,是否繼續執行現有的延遲任務和定時任務,如果不再執行,則取消任務并清空佇列;如果繼續執行,將佇列中的任務強轉為RunnableScheduledFuture物件之后,從佇列中洗掉并取消任務,大家需要好好理解這兩種處理方式,最后呼叫ThreadPoolExecutor類的tryTerminate方法,
點擊關注,第一時間了解華為云新鮮技術~
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/519298.html
標籤:其他
上一篇:NFS共享檔案服務
下一篇:模糊測驗工具AFL原始碼淺析
