簡介
用xxl-job做后臺任務管理, 主要是快速解決定時任務的HA問題, 專案代碼量不大, 功能精簡, 沒有特殊依賴. 因為產品中用到了這個專案, 上午花了點時間研究了一下運行機制. 把看到的記一下.
- 專案地址
- https://github.com/xuxueli/xxl-job
- http://gitee.com/xuxueli0323/xxl-job
- 檔案 https://www.xuxueli.com/xxl-job/
環境
<!-- http://repo1.maven.org/maven2/com/xuxueli/xxl-job-core/ -->
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>${最新穩定版本}</version>
</dependency>
運行需要 JDK1.8, MySQL5.7
資料庫結構
- 庫編碼 utf8mb4_unicode_ci
- Table: xxl_job_group
任務分組, 組名, 只支持一級分組, address_list 欄位支持多個執行端地址, 逗號分隔 - Table: xxl_job_info
任務主表, 記錄了任務明細, 調度明細以及預警設定 - Table: xxl_job_log
任務每次執行的日志 - Table: xxl_job_log_report
按日對執行日志進行統計的結果 - Table: xxl_job_logglue
- Table: xxl_job_registry
用于登記任務的執行者, 記錄group:分組, key:名稱, value:介面地址. 名稱是可以重復的, 介面地址會添加到任務分組表中的注冊欄位 - Table: xxl_job_user
簡單的登錄控制, 與其它表沒有關聯 - Table: xxl_job_lock
單欄位表, 用于并發時加鎖避免沖突
代碼結構
- 專案用到的都是常見組件, MyBatis, FreeMarker, Bootstrap, 當前版本基于SpringBoot 2.6.7
- 線上運行的是 xxl-job-admin 模塊, 提供執行端注冊, 任務發起和日志記錄等服務
- 專案中需要實作 xxl-job-executor, 專案中提供了例子
專案檔案結構如下
├───doc
│ ├───db # 初始化的sql
│ └───images
├───xxl-job-admin # 運行的服務端模塊, 提供界面和調度
│ └───src
│ ├───main
│ │ ├───java
│ │ │ └───com
│ │ │ └───xxl
│ │ │ └───job
│ │ │ └───admin
│ │ │ ├───controller
│ │ │ │ ├───annotation
│ │ │ │ ├───interceptor
│ │ │ │ └───resolver
│ │ │ ├───core
│ │ │ ├───dao
│ │ │ └───service
│ │ │ └───impl
│ │ └───resources
│ │ ├───i18n # 多國化, 簡繁英
│ │ ├───mybatis-mapper # xml形式的mapper
│ │ ├───static # 前端靜態檔案
│ │ └───templates # Freemarker模板
│ └───test
│ └───java
│
├───xxl-job-core # 公用jar包, 模塊內部依賴
│ └───src
│ └───main
│ └───java
│
└───xxl-job-executor-samples
├───xxl-job-executor-sample-frameless # 任務執行層示例
│ └───src
│ ├───main
│ │ ├───java
│ │ └───resources
│ └───test
│ └───java
└───xxl-job-executor-sample-springboot # 使用SpringBoot的執行層示例
└───src
├───main
│ ├───java
│ └───resources
└───test
運行機制
執行端需要準備以下資訊
-
adminAddresses 服務端地址, 例如 http://127.0.0.1:8080/xxl-job-admin
-
accessToken 貌似是服務端的token, 在呼叫服務端 api/registry, api/registryRemove 等操作時需要驗證
-
appname 執行端名稱
-
address 執行端地址, 和 ip:port 二選一, 存在則覆寫 ip:port
-
ip 執行端IP
-
port 執行端服務埠
-
執行端啟動后將自己注冊到服務端, 等待回呼
-
任務執行通過 XxlJobTrigger.processTrigger() 發起, 準備引數, 并在分組中選擇一個地址
-
根據這個地址取得 ExecutorBiz, 呼叫 executorBiz.run() 執行任務
-
服務端: 通過 ExecutorBizClient,
- 呼叫
XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class); - 其中
accessToken是服務端的accessToken
- 呼叫
-
執行端: 通過
ExecutorBizImpl.run()- 呼叫
XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());得到XxlJob方法 - 通過
XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason)執行
- 呼叫
非 Spring 的場景
通過呼叫 FrameLessXxlJobConfig.getInstance().initXxlJobExecutor() 這個方法將 XxlJobSimpleExecutor 實體化, 并注冊到xxl_job服務端
Spring 場景
- 在
@Configuration中, 將 XxlJobSpringExecutor 作為一個@Bean添加到 Spring context - XxlJobSpringExecutor 是 XxlJobExecutor 的子類并實作了
SmartInitializingSingleton介面的afterSingletonsInstantiated()方法 - 在
afterSingletonsInstantiated()方法中- 呼叫 initJobHandlerMethodRepository(), 在這個方法中, 找到所有
@XxlJob注解的方法 - 通過
registJobHandler(), 將@XxlJob方法添加到private static ConcurrentMap<String, IJobHandler> jobHandlerRepository - 呼叫
XxlJobExecutor.start(), 將自己注冊到 xxl_job 服務端
- 呼叫 initJobHandlerMethodRepository(), 在這個方法中, 找到所有
遠程呼叫服務
xxl_job 并未使用Spring的服務機制, 而是內部實作了一個偵聽指定IP+埠的服務. 這個實作對應的類是 EmbedServer, 服務基于 Netty, 核心代碼是
// start server
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS)) // beat 3N, close if idle
.addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // merge request & reponse to FULL
.addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
}
})
.childOption(ChannelOption.SO_KEEPALIVE, true);
這行代碼注冊了內部的XxlJob方法
.addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool)
處理遠程請求時, 在下面的代碼中, 通過executorBiz.run(triggerParam)呼叫XxlJob方法
private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {
//...
// services mapping
try {
switch (uri) {
case "/beat":
return executorBiz.beat();
case "/idleBeat":
IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
return executorBiz.idleBeat(idleBeatParam);
case "/run":
TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
return executorBiz.run(triggerParam);
case "/kill":
KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
return executorBiz.kill(killParam);
case "/log":
LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
return executorBiz.log(logParam);
default:
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping(" + uri + ") not found.");
}
} catch (Exception e) {
//...
}
鎖機制
通過select ... for update實作的, 這個表并沒有放到 MyBatis, 在 JobScheduleHelper 中, 通過
preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
preparedStatement.execute();
得到鎖, 在方法末尾釋放
// close PreparedStatement
if (null != preparedStatement) {
try {
preparedStatement.close();
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/540335.html
標籤:Java
上一篇:區塊鏈,中心去,何曾著眼看君王?用Go語言實作區塊鏈技術,通過Golang秒懂區塊鏈
下一篇:day06-功能實作05
