開心一刻
旁邊的女乘客太吵,我實在忍無可忍,便對她說:“你能不能讓我睡會兒?”
她揮手就給了我一個耳光:“你個臭流氓!”
我頓時就清醒了,理論到:“你讓我睡一會怎么了嗎”
她害羞的低下了頭,說道:“人家不是隨便的人”
我:“我也不是隨便的人,下一站我們下車把話說清楚”

任務調度
相信大家對任務調度都不陌生,說的通熟一點就是定時任務;這個在我們的專案中或多或少都存在,我們可以用 JDK 自帶的(Timer、ScheduledExecutor)來實作,也可以用 Spring 的 Scheduler 來實作,不管用以上哪種方式,我們都是在單機上跑,如果我們以集群的方式部署,會不會出現什么問題 ?
集群中的各個節點都會執行定時調度,會有重復執行的問題,那怎么辦? 我們可以加配置,只啟動某個節點的定時任務,但是這時候又會出現單點問題
那有沒有什么辦法,既能避免重復執行,又不會出現單點問題呢? 分布式調度應運而生,常見的分布式任務調度框架有:quartz 、cronsun、Elastic-job、saturn、lts、TBSchedule、xxl-job 等
quartz 我已經簡單講過,有興趣的可以去看看:請點我,你們會發現:樓主壓根就沒講 quartz 的集群模式,你們發現的很對,我就是沒講,就問你氣不氣 ?

既然你們對 quartz 已經有了一定的了解了 ,那么它的集群模式交給你們自己了
今天我們就一起來了解下另外一個分布式調度平臺:xxl-job
關于 xxl-job 是什么、有什么特性、發展歷程、接入了哪些公司、各個版本的新特性等等問題,我都不會去講,因為官方檔案已經說的非常清楚了,xxl-job 是國產的,如果檔案還看不懂,那就需要回學校再造了,但是我還是想強調下它的架構圖

通過這個架構圖,我們可以對其有個大致的了解;大體上分為調度中心 和 執行器,調度中心通過調度規則(cron運算式)對執行器中的任務進行調度,執行器收到調度后,執行具體的任務(Job)
既然官方檔案都說的非常細致了,那我還能講什么呢 ? 好像確實么什么可以說的了, 那今天就到這吧,大家散會!

大家先別急著走,雖然下面的內容在官方檔案中已經存在,但是卻很容易被我們忽略;我會在搭建的程序中來穿插著一些問題,來鞏固我們容易忽略的點
單節點搭建
我們先搭一個簡單的,調度中心 和 執行器 都先搭建成單節點
按照官方的檔案來,一步一步很容易搭建成功
原始碼下載
原始碼地址:xxl-job,可以 git clone 也可以 Download ZIP ,不管何種方式,我們拿到了原始碼,匯入到 IDEA,結構如下

初始化 “調度資料庫”
SQL 腳本在原始碼中已存在,路徑: xxl-job-master\doc\db\tables_xxl_job.sql ,執行此腳本,創建資料庫和表,如下圖

配置&部署 調度中心
組態檔: appliction.properties ,內容如下
### web server.port=8080 server.servlet.context-path=/xxl-job-admin ### actuator management.server.servlet.context-path=/actuator management.health.mail.enabled=false ### resources spring.mvc.servlet.load-on-startup=0 spring.mvc.static-path-pattern=/static/** spring.resources.static-locations=classpath:/static/ ### freemarker spring.freemarker.templateLoaderPath=classpath:/templates/ spring.freemarker.suffix=.ftl spring.freemarker.charset=UTF-8 spring.freemarker.request-context-attribute=request spring.freemarker.settings.number_format=0.########## ### mybatis mybatis.mapper-locations=classpath:/mybatis-mapper/*Mapper.xml #mybatis.type-aliases-package=com.xxl.job.admin.core.model ### xxl-job, datasource spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai spring.datasource.username=root spring.datasource.password=root_pwd spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver ### datasource-pool spring.datasource.type=com.zaxxer.hikari.HikariDataSource spring.datasource.hikari.minimum-idle=10 spring.datasource.hikari.maximum-pool-size=30 spring.datasource.hikari.auto-commit=true spring.datasource.hikari.idle-timeout=30000 spring.datasource.hikari.pool-name=HikariCP spring.datasource.hikari.max-lifetime=900000 spring.datasource.hikari.connection-timeout=10000 spring.datasource.hikari.connection-test-query=SELECT 1 ### xxl-job, email spring.mail.host=smtp.qq.com spring.mail.port=25 [email protected] [email protected] spring.mail.password=xxx spring.mail.properties.mail.smtp.auth=true spring.mail.properties.mail.smtp.starttls.enable=true spring.mail.properties.mail.smtp.starttls.required=true spring.mail.properties.mail.smtp.socketFactory.class=javax.net.ssl.SSLSocketFactory ### xxl-job, access token xxl.job.accessToken= ### xxl-job, i18n (default is zh_CN, and you can choose "zh_CN", "zh_TC" and "en") xxl.job.i18n=zh_CN ## xxl-job, triggerpool max size xxl.job.triggerpool.fast.max=200 xxl.job.triggerpool.slow.max=100 ### xxl-job, log retention days xxl.job.logretentiondays=30View Code
需要改的地方不多,埠號可能需要根據實際情況進行修改,然后就是資料庫的地址、用戶名和密碼需要改成自己的,email服務器最好配上(告警用的上),調度中心與執行器之間的安全訪問 token( xxl.job.accessToken ) 最好也配置上
出于演示,改下資料庫的配置就好,其他的保持默認;我們啟動調度中心,訪問: http://localhost:8080/xxl-job-admin ,默認登錄賬號 “admin/123456”, 登錄后運行界面如下圖所示

“調度中心” 已經部署成功
配置&部署 執行器
組態檔: application.properties ,內容如下
# web port server.port=8081 # no web #spring.main.web-environment=false # log config logging.config=classpath:logback.xml ### xxl-job admin address list, such as "http://address" or "http://address01,http://address02" xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin ### xxl-job, access token xxl.job.accessToken= ### xxl-job executor appname xxl.job.executor.appname=xxl-job-executor-sample ### xxl-job executor registry-address: default use address to registry , otherwise use ip:port if address is null xxl.job.executor.address= ### xxl-job executor server-info xxl.job.executor.ip= xxl.job.executor.port=9999 ### xxl-job executor log-path xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler ### xxl-job executor log-retention-days xxl.job.executor.logretentiondays=30View Code
埠號配置一個未被使用的埠,調度中心地址配置成我們之前部署的調度中心的地址即可;至于 xxl.job.accessToken ,和調度中心配置成一樣即可
組態檔中各個配置的注釋寫的非常清楚,大家根據實際情況進行配置即可
執行器的示例有好幾個,我們啟動 springboot 版本的;啟動不報錯就行了,它會自動注冊到調度中心,如下圖

配置調度規則&任務
調度中心通過調度規則對執行器中的任務進行調度,現在調度中心和執行器都部署好了,就缺調度規則和任務了
任務在示例代碼中已經存在了, SampleXxlJob.java :
package com.xxl.job.executor.service.jobhandler; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.context.XxlJobContext; import com.xxl.job.core.handler.IJobHandler; import com.xxl.job.core.handler.annotation.XxlJob; import com.xxl.job.core.log.XxlJobLogger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import java.io.BufferedInputStream; import java.io.BufferedReader; import java.io.DataOutputStream; import java.io.InputStreamReader; import java.net.HttpURLConnection; import java.net.URL; import java.util.Arrays; import java.util.concurrent.TimeUnit; /** * XxlJob開發示例(Bean模式) * * 開發步驟: * 1、在Spring Bean實體中,開發Job方法,方式格式要求為 "public ReturnT<String> execute(String param)" * 2、為Job方法添加注解 "@XxlJob(value="https://www.cnblogs.com/youzhibing/p/自定義jobhandler名稱", init = "JobHandler初始化方法", destroy = "JobHandler銷毀方法")",注解value值對應的是調度中心新建任務的JobHandler屬性的值, * 3、執行日志:需要通過 "XxlJobLogger.log" 列印執行日志; * * @author xuxueli 2019-12-11 21:52:51 */ @Component public class SampleXxlJob { private static Logger logger = LoggerFactory.getLogger(SampleXxlJob.class); /** * 1、簡單任務示例(Bean模式) */ @XxlJob("demoJobHandler") public ReturnT<String> demoJobHandler(String param) throws Exception { XxlJobLogger.log("XXL-JOB, Hello World."); for (int i = 0; i < 5; i++) { XxlJobLogger.log("beat at:" + i); TimeUnit.SECONDS.sleep(2); } return ReturnT.SUCCESS; } /** * 2、分片廣播任務 */ @XxlJob("shardingJobHandler") public ReturnT<String> shardingJobHandler(String param) throws Exception { // 分片引數 int shardIndex = XxlJobContext.getXxlJobContext().getShardIndex(); int shardTotal = XxlJobContext.getXxlJobContext().getShardTotal(); XxlJobLogger.log("分片引數:當前分片序號 = {}, 總分片數 = {}", shardIndex, shardTotal); // 業務邏輯 for (int i = 0; i < shardTotal; i++) { if (i == shardIndex) { XxlJobLogger.log("第 {} 片, 命中分片開始處理", i); } else { XxlJobLogger.log("第 {} 片, 忽略", i); } } return ReturnT.SUCCESS; } /** * 3、命令列任務 */ @XxlJob("commandJobHandler") public ReturnT<String> commandJobHandler(String param) throws Exception { String command = param; int exitValue = https://www.cnblogs.com/youzhibing/p/-1; BufferedReader bufferedReader = null; try { // command process Process process = Runtime.getRuntime().exec(command); BufferedInputStream bufferedInputStream = new BufferedInputStream(process.getInputStream()); bufferedReader = new BufferedReader(new InputStreamReader(bufferedInputStream)); // command log String line; while ((line = bufferedReader.readLine()) != null) { XxlJobLogger.log(line); } // command exit process.waitFor(); exitValue = process.exitValue(); } catch (Exception e) { XxlJobLogger.log(e); } finally { if (bufferedReader != null) { bufferedReader.close(); } } if (exitValue =https://www.cnblogs.com/youzhibing/p/= 0) { return IJobHandler.SUCCESS; } else { return new ReturnT<String>(IJobHandler.FAIL.getCode(), "command exit value("+exitValue+") is failed"); } } /** * 4、跨平臺Http任務 * 引數示例: * "url: http://www.baidu.com\n" + * "method: get\n" + * "data: content\n"; */ @XxlJob("httpJobHandler") public ReturnT<String> httpJobHandler(String param) throws Exception { // param parse if (param==null || param.trim().length()==0) { XxlJobLogger.log("param["+ param +"] invalid."); return ReturnT.FAIL; } String[] httpParams = param.split("\n"); String url = null; String method = null; String data = null; for (String httpParam: httpParams) { if (httpParam.startsWith("url:")) { url = httpParam.substring(httpParam.indexOf("url:") + 4).trim(); } if (httpParam.startsWith("method:")) { method = httpParam.substring(httpParam.indexOf("method:") + 7).trim().toUpperCase(); } if (httpParam.startsWith("data:")) { data = httpParam.substring(httpParam.indexOf("data:") + 5).trim(); } } // param valid if (url==null || url.trim().length()==0) { XxlJobLogger.log("url["+ url +"] invalid."); return ReturnT.FAIL; } if (method==null || !Arrays.asList("GET", "POST").contains(method)) { XxlJobLogger.log("method["+ method +"] invalid."); return ReturnT.FAIL; } boolean isPostMethod = method.equals("POST"); // request HttpURLConnection connection = null; BufferedReader bufferedReader = null; try { // connection URL realUrl = new URL(url); connection = (HttpURLConnection) realUrl.openConnection(); // connection setting connection.setRequestMethod(method); connection.setDoOutput(isPostMethod); connection.setDoInput(true); connection.setUseCaches(false); connection.setReadTimeout(5 * 1000); connection.setConnectTimeout(3 * 1000); connection.setRequestProperty("connection", "Keep-Alive"); connection.setRequestProperty("Content-Type", "application/json;charset=UTF-8"); connection.setRequestProperty("Accept-Charset", "application/json;charset=UTF-8"); // do connection connection.connect(); // data if (isPostMethod && data!=null && data.trim().length()>0) { DataOutputStream dataOutputStream = new DataOutputStream(connection.getOutputStream()); dataOutputStream.write(data.getBytes("UTF-8")); dataOutputStream.flush(); dataOutputStream.close(); } // valid StatusCode int statusCode = connection.getResponseCode(); if (statusCode != 200) { throw new RuntimeException("Http Request StatusCode(" + statusCode + ") Invalid."); } // result bufferedReader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8")); StringBuilder result = new StringBuilder(); String line; while ((line = bufferedReader.readLine()) != null) { result.append(line); } String responseMsg = result.toString(); XxlJobLogger.log(responseMsg); return ReturnT.SUCCESS; } catch (Exception e) { XxlJobLogger.log(e); return ReturnT.FAIL; } finally { try { if (bufferedReader != null) { bufferedReader.close(); } if (connection != null) { connection.disconnect(); } } catch (Exception e2) { XxlJobLogger.log(e2); } } } /** * 5、生命周期任務示例:任務初始化與銷毀時,支持自定義相關邏輯; */ @XxlJob(value = "demoJobHandler2", init = "init", destroy = "destroy") public ReturnT<String> demoJobHandler2(String param) throws Exception { XxlJobLogger.log("XXL-JOB, Hello World."); return ReturnT.SUCCESS; } public void init(){ logger.info("init"); } public void destroy(){ logger.info("destory"); } }View Code
是一個任務集,里面每一個被 @XxlJob 修飾的都是一個任務,我們以名為: demoJobHandler 的任務來做演示
任務已經定好,目前就只差調度規則了,我們去調度中心管理界面進行配置;默認情況下,xxl-job 會幫我們自動配置好一個任務,如下

直接用它是可以的,但是為了清楚怎么配置,我們重新配置一個

各個配置項的具體含義大家可以去看官方檔案,里面都有詳細的介紹;
簡單點來說上圖的配置,就是每隔 3 秒,調度中心會去調度 示例執行器 的 demoJobHandler 任務
啟動調度
配置和部署都已完成,現在差的就是啟動調度了,我們啟動它

然后我們就可以在調度日志頁面查看調度中心的調度日志了,如下所示

問題
現在不管是調度中心,還是執行器,都是單節點的,都存在單節點問題
那如何解決了,單節點的解決方案往往就是集群,我們可以將調度中心和執行器都部署成集群,而 xxl-job 又是支持的,而且集群部署非常簡單、方便
集群搭建
集群架構圖簡單如下

nginx 只是對調度中心的請求(調度中心管理頁面的操作)做負載均衡,它不涉及任務的調度與回呼,這里就不配置 nginx 了, 我們重點來看下調度中心集群與執行器集群的搭建
調度中心集群
調度中心集群的搭建非常簡單,只需要注意兩點:DB配置保持一致,集群機器時鐘保持一致(單機集群忽視)
出于演示,我們就做單機集群處理,那么我們只需要在 IDEA 中再啟動一個調度中心節點就好,埠號配置不一樣就好;調度中心共啟動兩個節點,之前的埠號是8080, 這個我們改成 8088

啟動之后,我們就可以從http://localhost:8080/xxl-job-admin,http://localhost:8088/xxl-job-admin 對調度中心控制臺進行訪問了,具體就不演示了, 大家可以自行去操作
生產環境下,會通過 nginx 對外暴露唯一地址,由 nginx 對這兩個(或者多個)進行負載均衡
執行器集群
搭建同樣非常簡單,只需要注意兩點
1、執行器回呼地址(xxl.job.admin.addresses)需要保持一致;執行器根據該配置進行執行器自動注冊等操作,
2、同一個執行器集群內AppName(xxl.job.executor.appname)需要保持一致;調度中心根據該配置動態發現不同集群的在線執行器串列
由于是單機集群搭建,埠的唯一性也需要注意

啟動之后,去調度中心修改路由策略為輪訓,再啟動任務調度,然后就可以去查看調度日志了
宕機測驗
這個就不演示了,大家自行去測驗,停掉某個節點,整個調度是否能正常完成
疑問
1、調度中心集群部署,任務調度的時候,會不會每個節點都發起調度請求,從而產生重復調度的問題
這個問題在官方檔案中有說明:基于資料庫的集群方案,資料庫選用Mysql;集群分布式并發環境中進行定時任務調度時,會在各個節點會上報任務,存到資料庫中,執行時會從資料庫中取出觸發器來執行,如果觸發器的名稱和執行時間相同,則只有一個節點去執行此任務,
因此對同一個調度,不會產生重復調度問題
2、執行器集群收到調度請求后,會不會每個節點都去執行任務
這個問題不成立,我們不是配置了路由策略嗎,調度中心會根據路由策略將調度請求發送給具體的某個執行器了,那何來每個執行器都執行任務呢 ?
如果官方檔案看的細的話,我們會發現有如下一段話

不只異步調度和異步執行,其實還包括異步回呼,xxl-job 中用到了大量的佇列、異步處理
當然還有一些其他的疑問,絕大部分在官方檔案都能找到答案,所以需要大家多讀、細讀
總結
1、單機模式,大家了解就好,生產環境肯定都是集群模式的;但 xxl-job 的集群部署也非常簡單
2、xxl-job 的全異步化&輕量級設計,可以保證使用有限的執行緒支撐大量的JOB并發運行
3、通篇都是在 xxl-job 的原始碼上進行的,如何將它應用進我們的實戰專案中了 ? 實戰篇,我們下期見
參考
XXL-JOB
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/144559.html
標籤:Java
上一篇:類物件a呼叫成員函式,在其中新建區域類物件回傳其參考,為什么區域變數沒被銷毀,反而把它的值賦給了main里的類物件b呢
