
在上一篇文章中,我們介紹了如何下載安裝部署SeaTunnel Zeta服務(3分鐘部署SeaTunnel Zeta單節點Standalone模式環境),接下來我們介紹一下SeaTunnel支持的第一個同步場景:離線批量同步,顧名思意,離線批量同步需要用戶定義好SeaTunnel JobConfig,選擇批處理模式,作業啟動后開始同步資料,當資料同步完成后作業完成退出,
下面以MySQL離線同步到StarRocks為例,介紹如何使用SeaTunnel進行離線同步作業的定義和運行,
1. 定義作業組態檔
SeaTunnel使用組態檔來定義作業,在這個示例中,作業的組態檔如下,檔案保存路徑~/seatunnel/apache-seatunnel-incubating-2.3.1/config/mysql_to_sr.config
#定義一些作業的運行引數,具體可以參考 https://seatunnel.apache.org/docs/2.3.1/concept/JobEnvConfig
env {
job.mode="BATCH" #作業的運行模式,BATCH=離線批同步,STREAMING=實時同步
job.name="SeaTunnel_Job"
checkpoint.interval=10000 #每10000ms進行一次checkpoint,后面會詳細介紹checkpoint對JDBC Source和StarRocks Sink這兩個連接器的影響
}
source {
Jdbc {
parallelism=5 # 并行度,這里是啟動5個Source Task來并行的讀取資料
partition_column="id" # 使用id欄位來進行split的拆分,目前只支持數字型別的主鍵列,而且該列的值最好是離線的,自增id最佳
partition_num="20" # 拆分成20個split,這20個split會被分配給5個Source Task來處理
result_table_name="Table9210050164000"
query="SELECT `id`, `f_binary`, `f_blob`, `f_long_varbinary`, `f_longblob`, `f_tinyblob`, `f_varbinary`, `f_smallint`, `f_smallint_unsigned`, `f_mediumint`, `f_mediumint_unsigned`, `f_int`, `f_int_unsigned`, `f_integer`, `f_integer_unsigned`, `f_bigint`, `f_bigint_unsigned`, `f_numeric`, `f_decimal`, `f_float`, `f_double`, `f_double_precision`, `f_longtext`, `f_mediumtext`, `f_text`, `f_tinytext`, `f_varchar`, `f_date`, `f_datetime`, `f_timestamp` FROM `sr_test`.`test1`"
password="root@123"
driver="com.mysql.cj.jdbc.Driver"
user=root
url="jdbc:mysql://st01:3306/sr_test?enabledTLSProtocols=TLSv1.2&rewriteBatchedStatements=true"
}
}
transform {
# 在本次示例中我們不需要做任務的Transform操作,所以這里為空,也可以將transform整個元素洗掉
}
sink {
StarRocks {
batch_max_rows=10240 #
source_table_name="Table9210050164000"
table="test2"
database="sr_test"
base-url="jdbc:mysql://datasource01:9030"
password="root"
username="root"
nodeUrls=[
"datasource01:8030" #寫入資料是通過StarRocks的Http介面
]
}
}
2. 作業配置說明
在這個作業定義檔案中,我們通過env定義了作業的運行模式是BATCH離線批處理模式,同時定義了作業的名稱是"SeaTunnel_Job",checkpoint.interval引數用來定義該作業程序中多久進行一次checkpoint,那什么是checkpoint,以及checkpoint在Apache SeaTunnel中的作用是什么呢?
2.1 checkpoint
查看官方檔案中對Apache SeaTunnel Zeta引擎checkpoint的介紹: https://seatunnel.apache.org/docs/2.3.1/seatunnel-engine/checkpoint-storage#introduction 發現checkpoint是用來使運行在Apache SeaTunnel Zeta中的作業能定期的將自己的狀態以快照的形式保存下來,當任務意外失敗時,可以從最近一次保存的快照中恢復作業,以實作任務的失敗恢復,斷點續傳等功能,其實checkpoint的核心是分布式快照演算法:Chandy-Lamport 演算法,是廣泛應用在分布式系統,更多是分布式計算系統中的一種容錯處理理論基礎,這里不詳細介紹Chandy-Lamport 演算法,接下來我們重點說明在本示例中checkpoint對這個同步任務的影響,
Apache SeaTunnel Zeta引擎在作業啟動時會啟動一個叫CheckpointManager的執行緒,用來管理這個作業的checkpoint,SeaTunnel Connector API提供了一套checkpoint的API,用于在引擎觸發checkpoint時通知具體的Connector進行相應的處理,SeaTunnel的Source和Sink連接器都是基于SeaTunnel Connector API開發的,只是不同的連接器對checkpoint API的實作細節不同,所以能實作的功能也不同,
2.1.1 checkpoint對JDBC Source的影響
在本示例中我們通過JDBC Source連接器的官方檔案https://seatunnel.apache.org/docs/2.3.1/connector-v2/source/Jdbc 可以發現如下內容:

這說明JDBC Source連接器實作了checkpoint相關的介面,通過原始碼我們可以得知,當checkpoint發生時,JDBC Source會將自己還未處理的split做為狀態的快照發送給CheckpointManager進行持久化保存,這樣當作業失敗并恢復時,JDBC Source會從最近一次保存的快照中讀取哪些split還未處理,然后接著處理這些split,
在該作業中通過partition_num=20,會將query引數中指定的sql陳述句的結果分成20個split進行處理,每個split會生成讀取它負責的資料的sql,這個sql是由query中指定的sql再加上一些where過濾條件組成的,這20個split會被分配給5個Source Task進行處理,理想情況下,每個Source Task會分配到4個split,假設在一次checkpoint時每個Source Task都只剩下一個split沒有處理,這個split的資訊會被保存下來,如果這之后作業掛掉了,作業會自動進行恢復,恢復時每個Source Task都會獲取到那個還未處理的split,并接著進行處理,如果作業不再報錯,這些split都處理完成后,作業運行完成,如果作業還是報錯(比如目標端StarRocks掛了,無法寫入資料),最終作業會以失敗狀態結束,
斷點續傳:
如果在作業失敗后,我們修復了問題,并且希望該作業接著之前的進度運行,只處理那些之前沒有被處理過的split,可以使用 sh seatunnel.sh -r jobId來讓作業ID為jobId的作業從斷點中恢復,
回到主題,checkpoint.interval=10000對于從Mysql中讀取資料意味著每過10s,SeaTunnel Zeta引擎就會觸發一次checkpoint操作,然后JDBC Source Task會被要求將自己還未處理的split資訊保存下來,這里需求注意的是,JDBC Source Task讀取資料是以split為單位的,如果checkpoint觸發時一個split中的資料正在被讀取還未完全發送給下游的StarRocks,它會等到這個split的資料處理完成之后才會回應這次checkpoint操作,這里一定要注意,如果MySQL中的資料量比較大,一個split的資料需要很長的時候才能處理完成,可能會導致checkpoint超時,關于checkpoint的超時時長可以引數https://seatunnel.apache.org/docs/2.3.1/seatunnel-engine/checkpoint-storage, 默認是1分鐘,
2.1.2 checkpoint對StarRocks Sink的影響
在Sink連接器的檔案上,我們也能看到如下圖中的標識:

這個標識代表該Sink連接器是否實作了精確處理一次的語意,如果該標識被選中,說明這個Sink連接器能保證發給它的資料它只會往目標端寫入一次,不會漏掉導致目標端資料丟失 ,也不會重復往目標端寫入,這一功能常見的實作方式是兩階段提交,支持事務的連接器一般會先開啟事務進行資料的寫入,當checkpoint發生時,將事務ID回傳給CheckManager進行持久化,當作業中的所有Task都回應了CheckManager的checkpoint請求后,第一階段完成,然后Apache SeaTunnel Zeta引擎會呼叫AggregateCommit的方法讓Sink對其事務進行提交,這個程序被稱為第二階段,第二階段完成后該次checkpoint完成,如果第二階段提交失敗,作業會失敗,然后自動恢復,恢復后會再次從第二階段開始,要求對事務進行提交,直到該事務提交完成,如果事務一直失敗,作業也將失敗,
并不是只有實作了exactly-once特性的Sink連接器才能保證目標端的資料不丟失不重復,如果目標端的資料庫支持以主鍵去重,那只要Sink連接器保證發送給它的資料至少往目標端寫入一次,無論重復寫入多少次,最終都不會導致目標端資料丟失或重復,在該示例中StarRocks Sink連接器即是使用了這種方式,StarRocks Sink連接器會將收到的資料先快取在記憶體中,當快取的行數達到batch_max_rows設定的10240行,就會發起一次寫入請求,將資料寫入到StarRocks中,如果MySQL中的資料量很小,達不到10240行,那就會在checkpoint觸發時進行StarRocks的寫入,
3. 運行作業
我們使用Apache SeaTunnel Zeta引擎來運行該作業
cd ~/seatunnel/apache-seatunnel-incubating-2.3.1
sh bin/seatunnel.sh --config config/mysql_to_sr.config
作業運行完成后可以看到如下資訊,說明作業狀態為FINISHED,讀取20w行資料,寫入StarRocks也是20w行資料,用時6s,

轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/553561.html
標籤:其他
下一篇:返回列表
