1 前言
歡迎訪問南瓜慢說 www.pkslow.com獲取更多精彩文章!
Spring相關文章:Springboot-Cloud
Spring Batch遠程磁區對于大量資料的處理非常擅長,它的實作有多種方式,如本地Jar包模式、MQ模式、Kubernetes模式,這三種模式的如下:
(1)本地Jar包模式:磁區處理的worker為一個Java行程,從jar包啟動,通過jvm引數和資料庫傳遞引數;官方提供示例代碼,
(2)MQ模式:worker是一個常駐行程,Manager和Worker通過訊息佇列來傳遞引數;網上有不少相關示例代碼,
(3)Kubernetes模式:worker為K8s中的Pod,Manager直接啟動Pod來處理;網上并沒有找到任何示例代碼,
本文將通過代碼來講解第一種模式(本地Jar包模式),其它后續再介紹,

建議先看下面文章了解一下:
Spring Batch入門:通過例子講解Spring Batch入門,優秀的批處理框架
Spring Batch并行處理介紹:大量資料也不在話下,Spring Batch并行處理四種模式初探
2 代碼講解
本文代碼中,Manager和Worker是放在一起的,在同一個專案里,也只會打一個jar包而已;我們通過profile來區別是manager還是worker,也就是通過Spring Profile實作一份代碼,兩份邏輯,實際上也可以拆成兩份代碼,但放一起更方便測驗,而且代碼量不大,就沒有必要了,
2.1 專案準備
2.1.1 資料庫
首先我們需要準備一個資料庫,因為Manager和Worker都需要同步狀態到DB上,不能直接使用嵌入式的記憶體資料庫了,需要一個外部可共同訪問的資料庫,這里我使用的是H2 Database,安裝可參考:把H2資料庫從jar包部署到Kubernetes,并解決Ingress不支持TCP的問題,
2.1.2 引入依賴
maven引入依賴如下所示:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-task</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-deployer-local</artifactId>
<version>2.4.1</version>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-integration</artifactId>
</dependency>
spring-cloud-deployer-local用于部署和啟動worker,非常關鍵;其它就是Spring Batch和Task相關的依賴;以及資料庫連接,
2.1.3 主類入口
Springboot的主類入口如下:
@EnableTask
@SpringBootApplication
@EnableBatchProcessing
public class PkslowRemotePartitionJar {
public static void main(String[] args) {
SpringApplication.run(PkslowRemotePartitionJar.class, args);
}
}
在Springboot的基礎上,添加了Spring Batch和Spring Cloud Task的支持,
2.2 關鍵代碼撰寫
前面的資料庫搭建和其它代碼沒有太多可講的,接下來就開始關鍵代碼的撰寫,
2.2.1 磁區管理Partitioner
Partitioner是遠程磁區中的核心bean,它定義了分成多少個區、怎么磁區,要把什么變數傳遞給worker,它會回傳一組<磁區名,執行背景關系>的鍵值對,即回傳Map<String, ExecutionContext>,把要傳遞給worker的變數放在ExecutionContext中去,支持多種型別的變數,如String、int、long等,實際上,我們不建議通過ExecutionContext來傳遞太多資料;可以傳遞一些標識或主鍵,然后worker自己去拿資料即可,
具體代碼如下:
private static final int GRID_SIZE = 4;
@Bean
public Partitioner partitioner() {
return new Partitioner() {
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> partitions = new HashMap<>(gridSize);
for (int i = 0; i < GRID_SIZE; i++) {
ExecutionContext executionContext = new ExecutionContext();
executionContext.put("partitionNumber", i);
partitions.put("partition" + i, executionContext);
}
return partitions;
}
};
}
上面分成4個區,程式會啟動4個worker來處理;給worker傳遞的引數是partitionNumber,
2.2.2 磁區處理器PartitionHandler
PartitionHandler也是核心的bean,它決定了怎么去啟動worker,給它們傳遞什么jvm引數(跟之前的ExecutionContext傳遞不一樣),
@Bean
public PartitionHandler partitionHandler(TaskLauncher taskLauncher, JobExplorer jobExplorer, TaskRepository taskRepository) throws Exception {
Resource resource = this.resourceLoader.getResource(workerResource);
DeployerPartitionHandler partitionHandler =
new DeployerPartitionHandler(taskLauncher, jobExplorer, resource, "workerStep", taskRepository);
List<String> commandLineArgs = new ArrayList<>(3);
commandLineArgs.add("--spring.profiles.active=worker");
commandLineArgs.add("--spring.cloud.task.initialize-enabled=false");
commandLineArgs.add("--spring.batch.initializer.enabled=false");
partitionHandler
.setCommandLineArgsProvider(new PassThroughCommandLineArgsProvider(commandLineArgs));
partitionHandler
.setEnvironmentVariablesProvider(new SimpleEnvironmentVariablesProvider(this.environment));
partitionHandler.setMaxWorkers(2);
partitionHandler.setApplicationName("PkslowWorkerJob");
return partitionHandler;
}
上面代碼中:
resource是worker的jar包地址,表示將啟動該程式;
workerStep是worker將要執行的step;
commandLineArgs定義了啟動worker的jvm引數,如--spring.profiles.active=worker;
environment是manager的系統環境變數,可以傳遞給worker,當然也可以選擇不傳遞;
MaxWorkers是最多能同時啟動多少個worker,類似于執行緒池大小;設定為2,表示最多同時有2個worker來處理4個磁區,
2.2.3 Manager和Worker的Batch定義
完成了磁區相關的代碼,剩下的就只是如何定義Manager和Worker的業務代碼了,
Manager作為管理者,不用太多業務邏輯,代碼如下:
@Bean
@Profile("!worker")
public Job partitionedJob(PartitionHandler partitionHandler) throws Exception {
Random random = new Random();
return this.jobBuilderFactory.get("partitionedJob" + random.nextInt())
.start(step1(partitionHandler))
.build();
}
@Bean
public Step step1(PartitionHandler partitionHandler) throws Exception {
return this.stepBuilderFactory.get("step1")
.partitioner(workerStep().getName(), partitioner())
.step(workerStep())
.partitionHandler(partitionHandler)
.build();
}
Worker主要作用是處理資料,是我們的業務代碼,這里就演示一下如何獲取Manager傳遞過來的partitionNumber:
@Bean
public Step workerStep() {
return this.stepBuilderFactory.get("workerStep")
.tasklet(workerTasklet(null, null))
.build();
}
@Bean
@StepScope
public Tasklet workerTasklet(final @Value("#{stepExecutionContext['partitionNumber']}") Integer partitionNumber) {
return new Tasklet() {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
Thread.sleep(6000); //增加延時,查看效果,通過jps:在jar情況下會新起java行程
System.out.println("This tasklet ran partition: " + partitionNumber);
return RepeatStatus.FINISHED;
}
};
}
通過運算式@Value("#{stepExecutionContext['partitionNumber']}") 獲取Manager傳遞過來的變數;注意要加注解@StepScope,
3 程式運行
因為我們分為Manager和Worker,但都是同一份代碼,所以我們先打包一個jar出來,不然manager無法啟動,配置資料庫和Worker的jar包地址如下:
spring.datasource.url=jdbc:h2:tcp://localhost:9092/test
spring.datasource.username=pkslow
spring.datasource.password=pkslow
spring.datasource.driver-class-name=org.h2.Driver
pkslow.worker.resource=file://pkslow/target/remote-partitioning-jar-1.0-SNAPSHOT.jar
執行程式如下:

可以看到啟動了4次Java程式,還給出日志路徑,
通過jps命令查看,能看到一個Manager行程,還有兩個worker行程:

4 復雜變數傳遞
前面講了Manager可以通過ExecutionContext傳遞變數,如簡單的String、long等,但其實它也是可以傳遞復雜的Java物件的,但對應的類需要可序列化,如:
import java.io.Serializable;
public class Person implements Serializable {
private Integer age;
private String name;
private String webSite;
//getter and setter
}
Manager傳遞:
executionContext.put("person", new Person(0, "pkslow", "www.pkslow.com"));
Worker接收:
@Value("#{stepExecutionContext['person']}") Person person
5 總結
本文介紹了Spring Batch遠程磁區的本地Jar包模式,只能在一臺機器上運行,所以也是無法真正發揮出遠程磁區的作用,但它對我們后續理解更復雜的模式是有很大幫助的;同時,我們也可以使用本地模式進行開發測驗,畢竟它只需要一個資料庫就行了,依賴很少,
歡迎關注微信公眾號<南瓜慢說>,將持續為你更新...

多讀書,多分享;多寫作,多整理,
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/41974.html
標籤:Java
上一篇:mysql高級day1
