JobStore
在之前的博文中,博主已經寫了關于Job的相關內容,本篇博文,博主將介紹JobStore相關的內容,
JobStore是存放Job和Trigger的地方,當我們呼叫Scheduler物件的scheduleJob時就會將其存入JobStore中,然后供quartzSchedulerThread使用,
為什么需要JobStore?
因為我們需要被Scheduler呼叫的任務大多數并不是一次性的任務,而是需要被定時觸發,或者某個時間點才能被觸發的,因此我們需要一個容器來存盤Job和Trigger的相關內容,
其次,quartz框架還有考慮到持久化存盤的場景,比如說將對應的資料存放到資料庫,這時候存放或者讀取資料庫里面的資料都需要有一個與之對應的容器,
介面定義
JobStore介面中定義的方法太多,這里博主只列出一下比較重要的方法,介面定義如下所示:
public interface JobStore {
//存盤job和tigger
void storeJobAndTrigger(JobDetail newJob, OperableTrigger newTrigger)
throws ObjectAlreadyExistsException, JobPersistenceException;
//獲取下一次需要進行觸發的觸發器
List<OperableTrigger> acquireNextTriggers(long noLaterThan, int maxCount, long timeWindow)
throws JobPersistenceException;
//釋放獲取到的觸發器
void releaseAcquiredTrigger(OperableTrigger trigger);
//觸發器被觸發
List<TriggerFiredResult> triggersFired(List<OperableTrigger> triggers) throws JobPersistenceException;
//觸發器觸發完成
void triggeredJobComplete(OperableTrigger trigger, JobDetail jobDetail, CompletedExecutionInstruction triggerInstCode);
}
RAMJobStore
在quartz的默認配置中使用的就是RAMJobStore,顧名思義,RAMJobStore是基于記憶體來存盤的Job相關資料,也就是在程式重啟之后,對應的資料就會消失,并且它不支持集群,也就是說它不可以把需要調度的任務分配到多臺機器上面進行執行,博主這里就以RAMJobStore為例,講解一下RAMJobStore的相關實作,
RAMJobStore屬性
- jobsByKey,按jobKey進行分組的HashMap集合
- triggersByKey,按jobKey進行分組的HashMap集合
- jobsByGroup,按job的分組名進行分組的HashMap集合
- triggersByGroup,按trigger的分組名進行分組的HashMap集合
- timeTriggers,具有下次觸發時間的trigger集合
- triggers,trigger集合
- lock,操作RAMJobStore時需要獲取的鎖
- pausedTriggerGroups,被暫停的trigger分組名的HashSet集合
- pausedJobGroups,被暫停的job分組名的HashSet集合
- blockedJobs,被鎖住的任務key的HashSet集合
- misfireThreshold,失火閾值
- signaler,SchedulerSignaler信號器
acquireNextTriggers方法
總體來說,quartzSchedulerThread有設定idlewaitime時間,idlewaitime時間就是在這個空閑時間內如果沒有接收到調度器發生變化的信號(sigLock鎖的notify),它就會阻塞對應的時間(sigLock的wait(idlewaitime)方法),因此quartzSchedulerThread需要提前獲取到這個(now + idlewaitime)時間點內的trigger,否則會造成觸發器失火的情況,并且它會一次性獲取當前可用空閑執行緒個數的trigger,
acquireNextTriggers方法邏輯:
- 先獲取到lock鎖,防止此時的job和trigger發生變化,
- 如果timeTriggers的集合為空,那么直接回傳,
- while回圈獲取triggerWrapper,從timetriggers中獲取第一個triggerWrapper,因為timeTriggers是有序的triggerWrapper集合(按照觸發時間和優先級排序),接著從timeTriggers中移除triggerWrapper,
- 如果trigger的下一次觸發時間為空,則重新獲取triggerWrapper,
- 如果trigger的下一次觸發時間大于需要獲取的時間點,則跳出回圈(因為timeTrigger是有序的,第一個時間都不滿足了,就不用再繼續回圈了),然后回傳獲取到的trigger集合,
- 接著判斷該trigger是否失火,失火的條件為:當前時間小于這個(now-misfireTime)的時間點 并且失火策略為不能忽略(Trigger.MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY),
a. 通知triggerListener該trigger發生失火
b. 接著呼叫trigger的updateAfterMisfire方法
c. 最后進行下一次觸發時間的判斷,如果下一次觸發時間為空,那么就說明trigger已經完成,從timeTriggers中移除自己 - 如果處理后的trigger的下一次執行時間跟之前的執行時間不一樣的話,則需要重新加入timeTrigger計算觸發順序,
- 判斷該trigger對應的job是否禁止并發執行,如果禁止,則需要將該job對應的其它trigger暫時從timeTriggers中移除,
- 設定triggerWrapper的狀態為ACQUIRED,設定本次觸發的triggerInstanceId,
- 如果獲取到的trigger數量等于需要獲取的數量,則可以跳出回圈,
- 結束回圈后需要判斷是否有被臨時移除的trigger,有的話,需要將triggerWrapper放回timeTriggers,
releaseAcquiredTrigger方法
在JobStore中呼叫triggerWrapper相關的方法時(如triggersFired方法),如果發生了例外,則會呼叫該方法,將其重新放入timeTriggers中,
releaseAcquiredTrigger方法邏輯:
- 先獲取到lock鎖,
- 判斷triggerWrapper的狀態是否為已獲取狀態,
- 如果是已獲取狀態,則將其重新加入到timeTriggers中,
triggersFired方法
- 先獲取到lock鎖,
- 回圈進行處理firedTriggers集合,每次從firedTriggers集合獲取triggerWrapper,
- 判斷triggerWrapper的狀態是否為ACQUIRED狀態,不是的話則continue,
- 呼叫trigger對應的triggered方法,更新trigger內部的屬性,
a. 計算下一次執行時間
b. 已執行的次數
c. 存盤上一次執行時間 - 設定triggerWrapper的狀態為WAITING(之前是ACQUIRED狀態)
- 判斷對應的job是否允許并發執行
a. 如果不允許的話,則將該job對應的所有triggerWrapper改為加鎖狀態(BLOCKED)
b. 如果允許的話,判斷下一次執行時間是否為null,不為null的話,重新加入timeTriggers中, - 回傳觸發器觸發執行結果(TriggerFiredResult),
triggeredJobComplete方法
正常情況下,Job被執行完畢的時候,會通知JobStore執行該方法,
- 先獲取到lock鎖
- 根據isPersistJobDataAfterExecution判斷需要是否持久化存盤jobDataMap
- 根據isConcurrentExectionDisallowed判斷job是否不允許并發執行
a. 如果不允許并發執行,需要從blockJobs中移除該job,解除對應trigger的鎖定狀態,如果解除狀態后的trigger狀態為WAITING狀態,則將trigger放入到timeTriggers結合中, - 根據完成策略CompletedExecutionInstruction對triggerWrapper進行處理
a. 如果CompletedExecutionInstruction默認值是NOOP,什么都不做,
b. 如果值是DELETE_TRIGGER,則需要洗掉對應的trigger,
c. ...
博主微信公眾號
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/442766.html
標籤:Java
