CronMan 分布式任務調度系統

github地址:CronMan, 歡迎star
歡迎朋友們站內私信交流~
簡介
CronMan是一款輕量級的分布式任務調度系統,隨著微服務化架構的逐步演進,單體架構逐漸演變為分布式、微服務架構,相應的也需要一個分布式任務調度系統來管理分布式架構中的定時任務,
已有的分布式任務調度系統如:Saturn、elastic-job、xxl-job都是非常優秀的開源作品,為了學習與交流,我設計了一款輕量級的分布式任務調度系統CronMan,具有一些嶄新的特性如:任務編排、任務結果傳遞、高可用、任務冪等性觸發等,彌補了上述開源系統的部分缺點,
內容一覽
-
功能
-
系統架構
- 整體架構
- 應用場景
- 設計思想
-
效果一覽
- 任務配置
- 任務監控
- 任務依賴
-
實踐和使用
- 使用流程
- 示例
功能
-
定時任務:調度系統最基礎的功能,定時完成用戶提出的任務,可實作秒級任務調度,
-
任務監控:監控任務運行的狀態:已就緒,禁用,執行中等,
-
任務控制:注冊、啟用、禁用、洗掉任務,
-
任務分片:對于一個大型任務,可以將任務分片置于不同執行器上執行,
-
彈性擴容、縮容:執行器可以水平擴展 ,同時某個執行器宕機/新增執行器對當前任務的執行無影響,
-
任務重觸發:自動記錄錯過未觸發的任務,重新觸發,
-
失效轉移:某臺執行器宕機后,歸屬于該執行器的任務會被重新分發到另外一臺執行器上,
-
任務編排:用戶可以編排任務依賴順序,任務形成一個有向無環圖,按照圖的順序依次呼叫,滿足串行,并行,多依賴需求,
-
任務結果傳遞:上游任務可以將本次運行結果傳遞給下游任務,
-
高可用:調度系統以集群形式部署,當其中一臺調度器宕機后,不會影響整體系統的運行,
-
任務冪等性:精密控制任務的執行次數,用容災處理來避免任務重復執行,
系統架構
整體架構


CronMan 可以分為三大模塊(調度器集群、控制中心和執行器集群)、兩大組件(路由轉發組件和持久化存盤組件),這三大模塊和兩大組件的作用如下:
- 任務調度器集群:負責調度任務,處理控制中心下發的請求等,
- 任務控制中心:負責任務的注冊、啟用、禁用、洗掉注冊等,對任務進行邏輯編排,提供實時監控功能,
- 任務執行器集群:負責接收調度器下發的任務請求,并執行任務邏輯,
- 路由轉發組件(Nginx):實作調度集群的高可用(如果對高可用不需要,那就可以不使用),
- 持久化存盤(DB):記錄任務的具體細節、已執行情況等,

應用場景
舉例:
-
場景
-
每天凌晨1點跑單查hive,進行每日清算,
-
用戶下單后長時間未支付,需要修改訂單狀態,
-
電商整點搶購,商品價格0點整開始優惠,
-
發放優惠券、發送短信提醒等,
-
-
解決
通過分布式系統來調度任務,最主要的功能就是將任務分解為子分片,每個分片系結一臺機器處理對應的任務,
對一般場景而言,一個任務可能由多個操作組合而成,而將這些操作分解依次派發給子分片來完成,
比如`` 發送短信``的任務,需要發送三千萬條生日祝福短信,使用十個分片,每個分片只需要發送三百萬條短信, 就極大緩解了機器的壓力, 比如`` 發放生日優惠券``的任務,需要先查用戶資訊庫,獲取需要派發優惠券的用戶資訊,再操作用戶卡券庫, 檢驗是否優惠券已存在、增添優惠券等流程,就可以用不同分片來處理不同操作,對復雜場景而言,如果存在分庫分表的情況下,可以對任務繼續細分,
比如`` 發放生日優惠券``的任務,需要先查用戶資訊庫,獲取需要派發優惠券的用戶資訊, 這時候可以選擇讓一臺分片機器專門負責查詢用戶資訊,一臺分片機器專門查詢用戶卡券資訊, 避免了多臺機器對同一資料庫資源的競爭, -
一般場景

- 復雜場景

設計思想
系統本質
分布式任務調度系統和rpc系統其實非常相似,rpc系統通常是多個client呼叫一個server的某個服務,而分布式任務調度系統是一個server通知多個client去執行他們的某個服務,
服務發現/注冊
作為分布式的系統,最先需要考慮的問題自然是服務發現/注冊,主流的兩種做法分別是:1.以elastic-job為代表的無中心化解決方案,通過引入Zookeeper來實作分布式協調,優點很明顯,缺點則是無法實作高可用(zk本身的缺點),并且系統復雜度提高了很多,2.以xxl-job為代表的中心化解決方案,調度中心通過DB鎖保證集群分布式調度的一致性,
我的理解是:分布式任務調度系統主要的目的是緩解大型定時任務(如在某個時間點發送千萬張優惠券)對單臺機器的壓力,將任務分散到多臺機器上,偏向于分布式處理資料,所以調度器本身不參與資料處理,只是單純地下發任務、發送請求等,沒必要引入第三方組件,
同時,受RocketMQ中NameServer設計的啟發,我認為調度器本身和NameServer是非常類似的:1.NameServer節點之間互不通信,調度器也不需要互相通信 2.NameServer本身不參與業務處理,調度器也不需要 3.Producer和Consumer只需要和隨機一個NameServer建立連接即可,而用戶發送的請求也只需要一個調度器來回應處理即可, 4.NameServer的主要功能是為Client提供路由能力(簡單來說就是找到一個活著的節點),而調度器的功能也是為任務找到活著的執行器,下放給它執行,
所以我參考了NameServer的模式,使執行器與調度器集群的每一個節點建立長連接,通過心跳機制定時注冊資訊到所有的調度器來向調度器證明自身還存活,在路由注冊操作中引入了ReadWriteLock讀寫鎖,允許多個任務控制器并發讀,保證了任務發送時的高并發,同一時刻調度器只能處理一個執行器的心跳包,多個心跳包串行處理,
一致性與任務冪等性
關于一致性與任務冪等性,系統需要保證請求與任務不能重復觸發,這里涉及到多種可能的情況:
-
用戶在Web端發送的
請求只能有一個調度器來處理 -
某個發送任務同一時刻只能由一個調度器觸發, -
某臺執行器與調度器連接斷開,但是它并沒有宕機,任務還在執行,而調度器在長時間沒有接收到它的心跳包后認定它斷聯,根據失效轉移策略,任務又會被發送到另外一臺執行器上執行,
-
某臺執行器完成了任務,通知執行器
任務已完成的訊息剛發送給某臺調度器,這臺調度器就宕機了,而其他調度器根據失效轉移策略,任務又會被發送到另外一臺執行器上執行, -
某臺調度器對資料庫的多次操作是否會沖突?比如mapper.selectAll()后進行mapper.update(),在這兩步中間另外一個執行緒也mapper.select(),這時此執行緒讀到的就是舊資料,
處理情況一:用戶通過Web端發送了任務監控與編排請求,通過Nginx來控制請求只發送到一臺調度器上,
處理情況二:調度中心通過競爭DB排他鎖來保證,某個任務同一時刻只能由一個調度器拿到,并且調度器會修改任務的狀態,從而確保另外的調度器無法拿到這個任務,避免了任務的重復執行,
處理情況三:執行器與調度器斷開后,會立即中斷自身所有的執行任務,
處理情況四:執行器發送完任務已完成的訊息后,會等待調度器的回應,如果超時或者錯誤,就會將任務已完成的訊息持久化到本地,定時掃描訊息,直到任務已完成的訊息被調度器接受,
處理情況五:為保證資料庫的一致性,普通的方法是開啟事務保持ACID,本系統為了提供秒級的任務調度,使用讀寫鎖來控制并發問題,在JVM層面提供并發控制而不是丟給資料庫處理,盡可能減小時間開銷,
任務編排
任務編排是一項比較重要的功能,它是指多個任務之間可以編排執行順序,譬如ABC三個任務,可以編排A任務先執行,等A任務執行完后BC任務方可執行,同時A任務會產生運行結果,這部分運行結果可以傳遞給BC任務,BC任務以此來進行分支任務執行,
CronMan實作的編排功能為:
-
普通任務編排:允許任務以有向無環圖(DAG)的依賴關系運行,等所有的前置依賴任務執行完畢后,可以根據任務的型別(被動任務、主動任務)運行,
-
普通任務結果傳遞:允許任務將本次允許結果傳遞給下游任務(暫時只支持1個上游任務對應多個下游任務,不支持多對1),
-
任務結果自傳遞:在DAG圖的基礎上,允許自環的結構(也就是自己依賴于自己),可以將本次任務的運行結果傳遞給下一次任務,任務可以根據上一次的運行情況來自我判斷該如何執行任務,
效果一覽
任務配置
任務在這里進行注冊,根據任務型別來配置不同的任務,目前共有四種任務,定時任務需要配置Cron運算式,到達對應時間,任務就會觸發;被動任務不需要配置Cron運算式,上游任務全部完成后,被動任務自動開始觸發;Java任務是普通Java程式的任務,需要配置類名、方法名、引數型別以及引數;Shell任務則是shell腳本任務,需要配置腳本內容,也可以稍后在監控頁面配置,

根據每個任務不同的需求,相應地配置不同的分片數、是否允許失效轉移、是否允許錯過任務重觸發、選擇執行器的策略等,


可通過依賴任務來配置依賴的上游任務,構成一個DAG圖,任務以DAG圖的形式執行,只有當上游任務都完成時,下游任務才能開始,

任務監控
監控任務的狀態,共分為:已就緒、執行中、等待上游任務、已停用四種狀態,其中,已就緒代表任務就緒,只需等待設定時間一到/上游任務完成即可開始執行;執行中代表任務已交由執行器執行;等待上游任務為被動任務特有,代表上游任務一執行完就可以立馬執行;已停用代表任務暫時停止執行,
控制任務的執行,啟用、禁用、洗掉任務,
查看任務的DAG依賴圖,包括上游和下游的所有關聯任務,

對shell任務,可以通過控制中心在線修改任務的具體內容,

任務依賴
針對某項任務,可以查看它的關聯DAG圖,實時查看各項任務的狀態,
點擊 backUp 的查看依賴,就可以看見 backUp 的DAG任務依賴圖,可以看出backUp和多個任務相互有關聯,它的下游任務是cleanUp,backUp的狀態為已就緒,待設定時間到就會開始執行,而cleanUp在backUp完成之后,還需要等待它的上游任務washUp完成才能開始執行,

點擊 passive查看依賴,就可以看見 passive的DAG任務依賴圖,因為passive任務和其他任務沒有依賴關系,所以DAG圖只有它自己,當前的狀態為已停用,

實踐和使用
CronMan是SpringBoot專案,使用者暫時也需要用SpringBoot來使用執行器,(因為樓主要寫論文,所以沒有太多空閑時間用來適配各種其他的專案)
使用流程
1. 下載專案 & 打包執行器(executor)模塊
2. 配置資料庫
3. 將1.中打好的包作為依賴引入你自己的專案中
4. 為你的任務添加上@scheduleJob注解,并在資料庫/控制臺添加上這個定時任務
示例
設定任務:
添加定時任務,每分鐘備份一次資料庫demo和demo2,每個資料庫中各有一百萬條記錄,將sql存盤到磁盤中,設定兩個分片來完成這個任務,
-
首先下載整個專案,
git clone git@github.com:smmdwa/CronMan.git -
使用assembly:assembly 的maven插件可以快速進行打包(PS:這里不要用SpringBoot的打包插件,因為Spring Boot 中默認打包成的 jar 是
可執行jar,無法被依賴) -
利用專案中的sql/cronjob.sql 配置資料庫
-
配置調度器的application.yml
spring: datasource: driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://192.168.213.130:3306/cronjob?characterEncoding=utf-8&useSSL=false&useAffectedRows=true username: root password: 123456 server: port: 8081 #netty監聽埠 remoting: address: 127.0.0.1:8088 #mybatis的相關配置 mybatis: mapper-locations: classpath:mapper/*.xml type-aliases-package: com.distribute.remoting.bean configuration: map-underscore-to-camel-case: true -
新建一個SpringBoot專案demo,這里采用的是本地依賴,新建lib目錄,放入2.中打好的jar包,引入依賴,
<dependency> <groupId>com.distribute</groupId> <artifactId>executor</artifactId> <version>0.0.1</version> <scope>system</scope> <systemPath>${project.basedir}/lib/executor-0.0.1-SNAPSHOT.jar</systemPath> </dependency> -
注入bean
@Configuration @ComponentScan(basePackages = {"com.distribute"}) public class CronConfig { } -
撰寫備份任務

-
配置demo專案的application.yml
executor: name: executor-1 ip: 127.0.0.1 port: 8092 remoting: address: 127.0.0.1:8088;127.0.0.1:8089 server: port: 8092 controller: channel: expiredTime: 30000 -
啟動一臺調度器和兩臺執行器(記得修改port和remoting.address)
-
成功!


轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/350873.html
標籤:其他
