spring batch簡介
spring batch是spring提供的一個資料處理框架,企業域中的許多應用程式需要批量處理才能在關鍵任務環境中執行業務操作,這些業務運營包括:
- 無需用戶互動即可最有效地處理大量資訊的自動化,復雜處理,這些操作通常包括基于時間的事件(例如月末計算,通知或通信),
- 在非常大的資料集中重復處理復雜業務規則的定期應用(例如,保險利益確定或費率調整),
- 集成從內部和外部系統接收的資訊,這些資訊通常需要以事務方式格式化,驗證和處理到記錄系統中,批處理用于每天為企業處理數十億的交易,
Spring Batch是一個輕量級,全面的批處理框架,旨在開發對企業系統日常運營至關重要的強大批處理應用程式,Spring Batch構建了人們期望的Spring Framework特性(生產力,基于POJO的開發方法和一般易用性),同時使開發人員可以在必要時輕松訪問和利用更高級的企業服務,Spring Batch不是一個schuedling的框架,
Spring Batch提供了可重用的功能,這些功能對于處理大量的資料至關重要,包括記錄/跟蹤,事務管理,作業處理統計,作業重啟,跳過和資源管理,它還提供更高級的技術服務和功能,通過優化和磁區技術實作極高容量和高性能的批處理作業,
Spring Batch可用于兩種簡單的用例(例如將檔案讀入資料庫或運行存盤程序)以及復雜的大量用例(例如在資料庫之間移動大量資料,轉換它等等) 上),大批量批處理作業可以高度可擴展的方式利用該框架來處理大量資訊,
Spring Batch架構介紹
一個典型的批處理應用程式大致如下:
- 從資料庫,檔案或佇列中讀取大量記錄,
- 以某種方式處理資料,
- 以修改之后的形式寫回資料,
其對應的示意圖如下:

spring batch的一個總體的架構如下:

在spring batch中一個job可以定義很多的步驟step,在每一個step里面可以定義其專屬的ItemReader用于讀取資料,ItemProcesseor用于處理資料,ItemWriter用于寫資料,而每一個定義的job則都在JobRepository里面,我們可以通過JobLauncher來啟動某一個job,
Spring Batch核心概念介紹
下面是一些概念是Spring batch框架中的核心概念,
什么是Job
Job和Step是spring batch執行批處理任務最為核心的兩個概念,
其中Job是一個封裝整個批處理程序的一個概念,Job在spring batch的體系當中只是一個最頂層的一個抽象概念,體現在代碼當中則它只是一個最上層的介面,其代碼如下:
/**
* Batch domain object representing a job. Job is an explicit abstraction
* representing the configuration of a job specified by a developer. It should
* be noted that restart policy is applied to the job as a whole and not to a
* step.
*/
public interface Job {
String getName();
boolean isRestartable();
void execute(JobExecution execution);
JobParametersIncrementer getJobParametersIncrementer();
JobParametersValidator getJobParametersValidator();
}
在Job這個介面當中定義了五個方法,它的實作類主要有兩種型別的job,一個是simplejob,另一個是flowjob,在spring batch當中,job是最頂層的抽象,除job之外我們還有JobInstance以及JobExecution這兩個更加底層的抽象,
一個job是我們運行的基本單位,它內部由step組成,job本質上可以看成step的一個容器,一個job可以按照指定的邏輯順序組合step,并提供了我們給所有step設定相同屬性的方法,例如一些事件監聽,跳過策略,
Spring Batch以SimpleJob類的形式提供了Job介面的默認簡單實作,它在Job之上創建了一些標準功能,一個使用java config的例子代碼如下:
@Bean
public Job footballJob() {
return this.jobBuilderFactory.get("footballJob")
.start(playerLoad())
.next(gameLoad())
.next(playerSummarization())
.end()
.build();
}
這個配置的意思是:首先給這個job起了一個名字叫footballJob,接著指定了這個job的三個step,他們分別由方法,playerLoad,gameLoad, playerSummarization實作,
什么是JobInstance
我們在上文已經提到了JobInstance,他是Job的更加底層的一個抽象,他的定義如下:
public interface JobInstance {
/**
* Get unique id for this JobInstance.
* @return instance id
*/
public long getInstanceId();
/**
* Get job name.
* @return value of 'id' attribute from <job>
*/
public String getJobName();
}
他的方法很簡單,一個是回傳Job的id,另一個是回傳Job的名字,
JobInstance指的是job運行當中,作業執行程序當中的概念,Instance本就是實體的意思,
比如說現在有一個批處理的job,它的功能是在一天結束時執行行一次,我們假定這個批處理job的名字為'EndOfDay',在這個情況下,那么每天就會有一個邏輯意義上的JobInstance, 而我們必須記錄job的每次運行的情況,
什么是JobParameters
在上文當中我們提到了,同一個job每天運行一次的話,那么每天都有一個jobIntsance,但他們的job定義都是一樣的,那么我們怎么來區別一個job的不同jobinstance了,不妨先做個猜想,雖然jobinstance的job定義一樣,但是他們有的東西就不一樣,例如運行時間,
spring batch中提供的用來標識一個jobinstance的東西是:JobParameters,JobParameters物件包含一組用于啟動批處理作業的引數,它可以在運行期間用于識別或甚至用作參考資料,我們假設的運行時間,就可以作為一個JobParameters,
例如, 我們前面的'EndOfDay'的job現在已經有了兩個實體,一個產生于1月1日,另一個產生于1月2日,那么我們就可以定義兩個JobParameter物件:一個的引數是01-01, 另一個的引數是01-02,因此,識別一個JobInstance的方法可以定義為:

因此,我么可以通過Jobparameter來操作正確的JobInstance
什么是JobExecution
JobExecution指的是單次嘗試運行一個我們定義好的Job的代碼層面的概念,job的一次執行可能以失敗也可能成功,只有當執行成功完成時,給定的與執行相對應的JobInstance才也被視為完成,
還是以前面描述的EndOfDay的job作為示例,假設第一次運行01-01-2019的JobInstance結果是失敗,那么此時如果使用與第一次運行相同的Jobparameter引數(即01-01-2019)作業引數再次運行,那么就會創建一個對應于之前jobInstance的一個新的JobExecution實體,JobInstance仍然只有一個,
JobExecution的介面定義如下:
public interface JobExecution {
/**
* Get unique id for this JobExecution.
* @return execution id
*/
public long getExecutionId();
/**
* Get job name.
* @return value of 'id' attribute from <job>
*/
public String getJobName();
/**
* Get batch status of this execution.
* @return batch status value.
*/
public BatchStatus getBatchStatus();
/**
* Get time execution entered STARTED status.
* @return date (time)
*/
public Date getStartTime();
/**
* Get time execution entered end status: COMPLETED, STOPPED, FAILED
* @return date (time)
*/
public Date getEndTime();
/**
* Get execution exit status.
* @return exit status.
*/
public String getExitStatus();
/**
* Get time execution was created.
* @return date (time)
*/
public Date getCreateTime();
/**
* Get time execution was last updated updated.
* @return date (time)
*/
public Date getLastUpdatedTime();
/**
* Get job parameters for this execution.
* @return job parameters
*/
public Properties getJobParameters();
}
每一個方法的注釋已經解釋的很清楚,這里不再多做解釋,只提一下BatchStatus,JobExecution當中提供了一個方法getBatchStatus用于獲取一個job某一次特地執行的一個狀態,BatchStatus是一個代表job狀態的列舉類,其定義如下:
public enum BatchStatus {STARTING, STARTED, STOPPING,
STOPPED, FAILED, COMPLETED, ABANDONED }
這些屬性對于一個job的執行來說是非常關鍵的資訊,并且spring batch會將他們持久到資料庫當中. 在使用Spring batch的程序當中spring batch會自動創建一些表用于存盤一些job相關的資訊,用于存盤JobExecution的表為batch_job_execution,下面是一個從資料庫當中截圖的實體:

什么是Step
每一個Step物件都封裝了批處理作業的一個獨立的階段,事實上,每一個Job本質上都是由一個或多個步驟組成,每一個step包含定義和控制實際批處理所需的所有資訊,任何特定的內容都由撰寫Job的開發人員自行決定,
一個step可以非常簡單也可以非常復雜,例如,一個step的功能是將檔案中的資料加載到資料庫中,那么基于現在spring batch的支持則幾乎不需要寫代碼,更復雜的step可能具有復雜的業務邏輯,這些邏輯作為處理的一部分,
與Job一樣,Step具有與JobExecution類似的StepExecution,如下圖所示:

什么是StepExecution
StepExecution表示一次執行Step, 每次運行一個Step時都會創建一個新的StepExecution,類似于JobExecution,但是,某個步驟可能由于其之前的步驟失敗而無法執行,且僅當Step實際啟動時才會創建StepExecution,
一次step執行的實體由StepExecution類的物件表示,每個StepExecution都包含對其相應步驟的參考以及JobExecution和事務相關的資料,例如提交和回滾計數以及開始和結束時間,
此外,每個步驟執行都包含一個ExecutionContext,其中包含開發人員需要在批處理運行中保留的任何資料,例如重新啟動所需的統計資訊或狀態資訊,下面是一個從資料庫當中截圖的實體:

什么是ExecutionContext
ExecutionContext即每一個StepExecution 的執行環境,它包含一系列的鍵值對,我們可以用如下代碼獲取ExecutionContext
ExecutionContext ecStep = stepExecution.getExecutionContext();
ExecutionContext ecJob = jobExecution.getExecutionContext();
什么是JobRepository
JobRepository是一個用于將上述job,step等概念進行持久化的一個類,它同時給Job和Step以及下文會提到的JobLauncher實作提供CRUD操作,
首次啟動Job時,將從repository中獲取JobExecution,并且在執行批處理的程序中,StepExecution和JobExecution將被存盤到repository當中,
@EnableBatchProcessing注解可以為JobRepository提供自動配置,
什么是JobLauncher
JobLauncher這個介面的功能非常簡單,它是用于啟動指定了JobParameters的Job,為什么這里要強調指定了JobParameter,原因其實我們在前面已經提到了,jobparameter和job一起才能組成一次job的執行,下面是代碼實體:
public interface JobLauncher {
public JobExecution run(Job job, JobParameters jobParameters)
throws JobExecutionAlreadyRunningException, JobRestartException,
JobInstanceAlreadyCompleteException, JobParametersInvalidException;
}
上面run方法實作的功能是根據傳入的job以及jobparamaters從JobRepository獲取一個JobExecution并執行Job,
什么是Item Reader
ItemReader是一個讀資料的抽象,它的功能是為每一個Step提供資料輸入,當ItemReader以及讀完所有資料時,它會回傳null來告訴后續操作資料已經讀完,Spring Batch為ItemReader提供了非常多的有用的實作類,比如JdbcPagingItemReader,JdbcCursorItemReader等等,
ItemReader支持的讀入的資料源也是非常豐富的,包括各種型別的資料庫,檔案,資料流,等等,幾乎涵蓋了我們的所有場景,
下面是一個JdbcPagingItemReader的例子代碼:
@Bean
public JdbcPagingItemReader itemReader(DataSource dataSource, PagingQueryProvider queryProvider) {
Map<String, Object> parameterValues = new HashMap<>();
parameterValues.put("status", "NEW");
return new JdbcPagingItemReaderBuilder<CustomerCredit>()
.name("creditReader")
.dataSource(dataSource)
.queryProvider(queryProvider)
.parameterValues(parameterValues)
.rowMapper(customerCreditMapper())
.pageSize(1000)
.build();
}
@Bean
public SqlPagingQueryProviderFactoryBean queryProvider() {
SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();
provider.setSelectClause("select id, name, credit");
provider.setFromClause("from customer");
provider.setWhereClause("where status=:status");
provider.setSortKey("id");
return provider;
}
JdbcPagingItemReader必須指定一個PagingQueryProvider,負責提供SQL查詢陳述句來按分頁回傳資料,
下面是一個JdbcCursorItemReader的例子代碼:
private JdbcCursorItemReader<Map<String, Object>> buildItemReader(final DataSource dataSource, String tableName,
String tenant) {
JdbcCursorItemReader<Map<String, Object>> itemReader = new JdbcCursorItemReader<>();
itemReader.setDataSource(dataSource);
itemReader.setSql("sql here");
itemReader.setRowMapper(new RowMapper());
return itemReader;
}
什么是Item Writer
既然ItemReader是讀資料的一個抽象,那么ItemWriter自然就是一個寫資料的抽象,它是為每一個step提供資料寫出的功能,寫的單位是可以配置的,我們可以一次寫一條資料,也可以一次寫一個chunk的資料,關于chunk下文會有專門的介紹,ItemWriter對于讀入的資料是不能做任何操作的,
Spring Batch為ItemWriter也提供了非常多的有用的實作類,當然我們也可以去實作自己的writer功能,
什么是Item Processor
ItemProcessor對專案的業務邏輯處理的一個抽象, 當ItemReader讀取到一條記錄之后,ItemWriter還未寫入這條記錄之前,I我們可以借助temProcessor提供一個處理業務邏輯的功能,并對資料進行相應操作,如果我們在ItemProcessor發現一條資料不應該被寫入,可以通過回傳null來表示,ItemProcessor和ItemReader以及ItemWriter可以非常好的結合在一起作業,他們之間的資料傳輸也非常方便,我們直接使用即可,
chunk 處理流程
spring batch提供了讓我們按照chunk處理資料的能力,一個chunk的示意圖如下:

它的意思就和圖示的一樣,由于我們一次batch的任務可能會有很多的資料讀寫操作,因此一條一條的處理并向資料庫提交的話效率不會很高,因此spring batch提供了chunk這個概念,我們可以設定一個chunk size,spring batch 將一條一條處理資料,但不提交到資料庫,只有當處理的資料數量達到chunk size設定的值得時候,才一起去commit.
java的實體定義代碼如下:

在上面這個step里面,chunk size被設為了10,當ItemReader讀的資料數量達到10的時候,這一批次的資料就一起被傳到itemWriter,同時transaction被提交,
skip策略和失敗處理
一個batch的job的step,可能會處理非常大數量的資料,難免會遇到出錯的情況,出錯的情況雖出現的概率較小,但是我們不得不考慮這些情況,因為我們做資料遷移最重要的是要保證資料的最終一致性,spring batch當然也考慮到了這種情況,并且為我們提供了相關的技術支持,請看如下bean的配置:

我們需要留意這三個方法,分別是skipLimit(),skip(),noSkip(),
skipLimit方法的意思是我們可以設定一個我們允許的這個step可以跳過的例外數量,假如我們設定為10,則當這個step運行時,只要出現的例外數目不超過10,整個step都不會fail,注意,若不設定skipLimit,則其默認值是0.
skip方法我們可以指定我們可以跳過的例外,因為有些例外的出現,我們是可以忽略的,
noSkip方法的意思則是指出現這個例外我們不想跳過,也就是從skip的所以exception當中排除這個exception,從上面的例子來說,也就是跳過所有除FileNotFoundException的exception,
那么對于這個step來說,FileNotFoundException就是一個fatal的exception,拋出這個exception的時候step就會直接fail
批處理操作指南
本部分是一些使用spring batch時的值得注意的點
批處理原則
在構建批處理解決方案時,應考慮以下關鍵原則和注意事項,
-
批處理體系結構通常會影響體系結構
-
盡可能簡化并避免在單批應用程式中構建復雜的邏輯結構
-
保持資料的處理和存盤在物理上靠得很近(換句話說,將資料保存在處理程序中),
-
最大限度地減少系統資源的使用,尤其是I / O. 在internal memory中執行盡可能多的操作,
-
查看應用程式I / O(分析SQL陳述句)以確保避免不必要的物理I / O. 特別是,需要尋找以下四個常見缺陷:
-
- 當資料可以被讀取一次并快取或保存在作業存盤中時,讀取每個事務的資料,
- 重新讀取先前在同一事務中讀取資料的事務的資料,
- 導致不必要的表或索引掃描,
- 未在SQL陳述句的WHERE子句中指定鍵值,
-
在批處理運行中不要做兩次一樣的事情,例如,如果需要資料匯總以用于報告目的,則應該(如果可能)在最初處理資料時遞增存盤的總計,因此您的報告應用程式不必重新處理相同的資料,
-
在批處理應用程式開始時分配足夠的記憶體,以避免在此程序中進行耗時的重新分配,
-
總是假設資料完整性最差,插入適當的檢查和記錄驗證以維護資料完整性,
-
盡可能實施校驗和以進行內部驗證,例如,對于一個檔案里的資料應該有一個資料條數紀錄,告訴檔案中的記錄總數以及關鍵欄位的匯總,
-
在具有真實資料量的類似生產環境中盡早計劃和執行壓力測驗,
-
在大批量系統中,資料備份可能具有挑戰性,特別是如果系統以24-7在線的情況運行,資料庫備份通常在在線設計中得到很好的處理,但檔案備份應該被視為同樣重要,如果系統依賴于檔案,則檔案備份程序不僅應該到位并記錄在案,還應定期進行測驗,
如何默認不啟動job
在使用java config使用spring batch的job時,如果不做任何配置,專案在啟動時就會默認去跑我們定義好的批處理job,那么如何讓專案在啟動時不自動去跑job呢?
spring batch的job會在專案啟動時自動run,如果我們不想讓他在啟動時run的話,可以在application.properties中添加如下屬性:
spring.batch.job.enabled=false
在讀資料時記憶體不夠
在使用spring batch做資料遷移時,發現在job啟動后,執行到一定時間點時就卡在一個地方不動了,且log也不再列印,等待一段時間之后,得到如下錯誤:

紅字的資訊為:Resource exhaustion event:the JVM was unable to allocate memory from the heap.
翻譯過來的意思就是專案發出了一個資源耗盡的事件,告訴我們java虛擬機無法再為堆分配記憶體,
造成這個錯誤的原因是: 這個專案里的batch job的reader是一次性拿回了資料庫里的所有資料,并沒有進行分頁,當這個資料量太大時,就會導致記憶體不夠用,解決的辦法有兩個:
- 調整reader讀資料邏輯,按分頁讀取,但實作上會麻煩一些,且運行效率會下降
- 增大service記憶體
來源:blog.csdn.net/topdeveloperr/article/details/84337956
近期熱文推薦:
1.1,000+ 道 Java面試題及答案整理(2021最新版)
2.終于靠開源專案弄到 IntelliJ IDEA 激活碼了,真香!
3.阿里 Mock 工具正式開源,干掉市面上所有 Mock 工具!
4.Spring Cloud 2020.0.0 正式發布,全新顛覆性版本!
5.《Java開發手冊(嵩山版)》最新發布,速速下載!
覺得不錯,別忘了隨手點贊+轉發哦!
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/291987.html
標籤:其他
