DolphinDB 作為一款高性能時序資料庫,其在實際生產環境中常有資料的清洗、裝換以及加載等需求,而對于該如何結構化管理好 ETL 作業,Airflow 提供了一種很好的思路,本篇教程為生產環境中 ETL 實踐需求提供了一個解決方案,將 Python Airflow 引入到 DolphinDB 的高可用集群中,通過使用 Airflow 所提供的功能來實作更好管理 DolphinDB 資料 ETL 作業,整體架構如下:

1. Airflow
1.1 Airflow 簡介
Airflow 是一個可編程,調度和監控的作業流平臺,基于有向無環圖 (Directed acyclic graph, DAG),Airflow 可以定義一組有依賴的任務,按照依賴依次執行,Airflow 提供了豐富的命令列工具用于系統管控,而其 web 管理界面同樣也可以方便地管控調度任務,并且對任務運行狀態進行實時監控,方便了系統的運維和管理,
1.2 Airflow 部分核心功能
- 增量加載資料:當表或資料集較小時,可以整體加載資料,但是隨著資料的增長,以固定的時間間隔增量提取資料才是 ETL 的常態,僅加載一個小時,一天,一周資料的需求非常普遍,Airflow 可以容易地以特定的時間間隔加載增量資料,
- 處理歷史資料:在某些情況下,您剛剛完成一個新的作業流程并且需要回溯到將新代碼投入生產的日期的資料,在這種情況下,您只需使用 DAG 中的 start_date 引數以指定開始日期,然后,Airflow 將回填任務到開始日期,此外,還可以使用 DAG 的引數來處理數周、數月或數年的資料,
- 磁區提取的資料:通過對資料的目的地進行磁區,可以并行運行 DAG,避免對提取的資料進行鎖定,并在讀取相同資料時優化性能,不再相關的資料可以存檔并從資料庫中洗掉,
- 強制冪等約束:DAG 運行的結果應始終具有冪等特性,這意味著當您使用相同的引數多次運行某個流程時(即使在不同的日期),結果也將完全相同,
- 有條件地執行:Airflow 具有一些選項,可根據之前的實體的成功來控制 DAG 中任務的運行方式,
1.3 DolphinDBOperator
DolphinDBOperator 是 Airflow 的 operator 一種,通過 DolphinDBOperator 可以在 Airflow 連接 DolphinDB 進行資料寫入、查詢、計算等操作,DolphinDBOperator 特有的引數有:
dolphindb_conn_id: 用于指定 DolphinDB 連接,可在 connection 中設定sql: 指定需要運行的 DolphinDB 腳本file_path: 可以指定 DolphinDB dos 檔案運行腳本
? DolphinDBOperator 使用示例如下:
- 通過 sql 引數指定任務內容運行腳本:
//在 DolphinDB 中創建一個共享表
create_parameter_table = DolphinDBOperator(
task_id='create_parameter_table',
dolphindb_conn_id='dolphindb_test',
sql='''
undef(`paramTable,SHARED)
t = table(1:0, `param`value, [STRING, STRING])
share t as paramTable
'''
)
- 通過 file_path 指定 dos 檔案運行腳本:
//CalAlpha001.dos 為 DolphinDB 腳本
case1 = DolphinDBOperator(
task_id='case1',
dolphindb_conn_id='dolphindb_test',
file_path=path + "/StreamCalculating/CalAlpha001.dos"
)
1.4 Airflow 安裝部署
-
硬體環境:

-
軟體環境:

注:
1.本教程使用 SQLite 資料庫作為后端存盤,如果因 SQLite 版本過低無法啟動,可參考設定資料庫,升級 SQLlite 或更改默認資料庫,
2.在流程開始前建議預先構建 DolphinDB 服務,具體安裝方法可以參考 DolphinDB 高可用集群部署教程,也可以參考基于 Docker-Compose 的 DolphinDB 多容器集群部署,
- 主機環境
- 首先,在安裝 Airflow 之前要確保主機上安裝了
python3、dolphindb、dolphindb-operator三個依賴包,執行以下命令完成對這三個依賴包的安裝, 依賴包可從附件中獲取,
pip install --force-reinstall dolphindb
pip install --force-reinstall dolphindbapi-1.0.0-py3-none-any.whl
pip install --force-reinstall apache_Airflow_providers_dolphindb-1.0.0-py3-none-any.whl
本教程使用的 Airflow 的安裝包僅提供離線版安裝,在線版安裝會在正式發布后提供安裝方式,
- 安裝好 airflow.provide.dolphindb 插件后,啟動 Airflow :
部署以及安裝 Airflow 詳情見官網:airflow 快速入門,以下為啟動 Airflow 的核心代碼:
#初始化資料庫
airflow db init
#創建用戶
airflow users create --username admin --firstname Peter --lastname Parker --role Admin --email [email protected] --password admin
# 守護行程運行 webserve
airflow webserver --port 8080 -D
# 守護行程運行 scheduler
airflow scheduler -D1#初始化資料庫 2airflow db init 3 4#創建用戶 5airflow users create --username admin --firstname Peter --lastname Parker --role Admin --email [email protected] --password admin 6 7# 守護行程運行webserve 8airflow webserver --port 8080 -D 9 10# 守護行程運行scheduler 11airflow scheduler -D
- 執行以下命令驗證 Airflow 是否成功啟動:
ps -aux|grep airflow
預期輸出如下圖,證明 Airflow 啟動成功:

- 啟動成功后,瀏覽器中登陸 Airflow 的 web 界面:
- 默認地址:
http://IP:8080 - 默認賬戶:初始化 db 中創建,本文例子中為
admin - 默認密碼:初始化 db 中創建, 本文例子中為
admin

- 輸入上述創建用戶名密碼即可進入 Airflow 的 UI 界面,如下所示:

- 填寫 DolphinDB 連接資訊后連接到 DolphinDB 資料庫,


連接成功后,在 DolphinDBOperator 中指定 dolphindb_conn_id='dolphindb_test',即可運行 DolphinDB 腳本,上述準備作業完成后,下文以一個股票快照資料的 ETL 程序為例展現 Airflow 如何和 DolphinDB 互動,
2. Airflow 調度對行情資料 ETL
2.1 整體 ETL 架構圖


功能模塊代碼目錄結構詳解
-
add:增量資料 ETL
- addLoadSnapshot:每日新增 Snapshot 原始資料匯入
- addProcessSnapshot:增量 Snapshot 處理成 ArrayVector 以及清洗資料
- addFactor:增加合成日 K 及一分鐘 K 資料并存盤
- addETL.py:構建增量資料 DAG
-
full:全量資料 ETL
- loadSnapshot:Snapshot 建表與匯入
- processSnapshot:Snapshot 清洗結果建表,將資料處理成 ArrayVector 以及清洗資料并存盤
- Factor:創建因子存盤表,將清洗后資料加工成日 K 以及一分鐘 K 資料并存盤
- fullETL.py:構建全量資料 DAG
外部資料源 - > ODS 資料源:將原始資料從外部資料源匯入 DolphinDB
ODS 資料源 - >DWD 資料明細:清洗原始資料,將原始資料中的多檔資料清洗成 ArrayVector 并去重
DWD 資料明細 - > DWB/DWS 資料匯總: 對清洗后的快照資料進行計算加工合成 K 線資料
注:
本教程使用 DolphinDB 中 module 功能以及 DolphinDB 客戶端工具進行工程化管理 DolphinDB 腳本,詳細介紹見 DolphinDB教程: 模塊 以及 DolphinDB客戶端軟體教程,
2.2 資料介紹
本教程選取了 2020.01.04 - 2021.01.08 全市場所有股票的 5 天的 level 2 快照資料,以下是快照表在DolphinDB的結構,BidOrderQty,BidPrice,BidNumOrders,BidOrders,OfferPrice,OfferOrderQty,OfferNumOrders 和 OfferOrders 8個欄位分別包含多檔資料,在 DolphinDB 中采用 ArrayVector 資料型別來保存:

2.3 DolphinDB 核心清洗腳本介紹
2.3.1 創建分布式庫表
- 創建 snapshot 原始資料存盤表:
創建存盤原始 snapshot 原始資料的庫表,核心代碼如下:
module loadSnapshot::createSnapshotTable
//創建 snapshot 原始資料存盤庫表
def createSnapshot(dbName, tbName){
login("admin", "123456")
if(existsDatabase(dbName)){
dropDatabase(dbName)
}
db1 = database(, VALUE, 2020.01.01..2021.01.01)
db2 = database(, HASH, [SYMBOL, 50])
//按天和股票代碼組合磁區
db = database(dbName,COMPO,[db1,db2],engine='TSDB')
colName = ["SecurityID","DateTime","PreClosePx","OpenPx","HighPx","LowPx","LastPx","TotalVolumeTrade","TotalValueTrade","InstrumentStatus","BidPrice0","BidPrice1","BidPrice2","BidPrice3","BidPrice4","BidPrice5","BidPrice6","BidPrice7","BidPrice8","BidPrice9","BidOrderQty0","BidOrderQty1","BidOrderQty2","BidOrderQty3","BidOrderQty4","BidOrderQty5","BidOrderQty6","BidOrderQty7","BidOrderQty8","BidOrderQty9","BidNumOrders0","BidNumOrders1","BidNumOrders2","BidNumOrders3","BidNumOrders4","BidNumOrders5","BidNumOrders6","BidNumOrders7","BidNumOrders8","BidNumOrders9","BidOrders0","BidOrders1","BidOrders2","BidOrders3","BidOrders4","BidOrders5","BidOrders6","BidOrders7","BidOrders8","BidOrders9","BidOrders10","BidOrders11","BidOrders12","BidOrders13","BidOrders14","BidOrders15","BidOrders16","BidOrders17","BidOrders18","BidOrders19","BidOrders20","BidOrders21","BidOrders22","BidOrders23","BidOrders24","BidOrders25","BidOrders26","BidOrders27","BidOrders28","BidOrders29","BidOrders30","BidOrders31","BidOrders32","BidOrders33","BidOrders34","BidOrders35","BidOrders36","BidOrders37","BidOrders38","BidOrders39","BidOrders40","BidOrders41","BidOrders42","BidOrders43","BidOrders44","BidOrders45","BidOrders46","BidOrders47","BidOrders48","BidOrders49","OfferPrice0","OfferPrice1","OfferPrice2","OfferPrice3","OfferPrice4","OfferPrice5","OfferPrice6","OfferPrice7","OfferPrice8","OfferPrice9","OfferOrderQty0","OfferOrderQty1","OfferOrderQty2","OfferOrderQty3","OfferOrderQty4","OfferOrderQty5","OfferOrderQty6","OfferOrderQty7","OfferOrderQty8","OfferOrderQty9","OfferNumOrders0","OfferNumOrders1","OfferNumOrders2","OfferNumOrders3","OfferNumOrders4","OfferNumOrders5","OfferNumOrders6","OfferNumOrders7","OfferNumOrders8","OfferNumOrders9","OfferOrders0","OfferOrders1","OfferOrders2","OfferOrders3","OfferOrders4","OfferOrders5","OfferOrders6","OfferOrders7","OfferOrders8","OfferOrders9","OfferOrders10","OfferOrders11","OfferOrders12","OfferOrders13","OfferOrders14","OfferOrders15","OfferOrders16","OfferOrders17","OfferOrders18","OfferOrders19","OfferOrders20","OfferOrders21","OfferOrders22","OfferOrders23","OfferOrders24","OfferOrders25","OfferOrders26","OfferOrders27","OfferOrders28","OfferOrders29","OfferOrders30","OfferOrders31","OfferOrders32","OfferOrders33","OfferOrders34","OfferOrders35","OfferOrders36","OfferOrders37","OfferOrders38","OfferOrders39","OfferOrders40","OfferOrders41","OfferOrders42","OfferOrders43","OfferOrders44","OfferOrders45","OfferOrders46","OfferOrders47","OfferOrders48","OfferOrders49","NumTrades","IOPV","TotalBidQty","TotalOfferQty","WeightedAvgBidPx","WeightedAvgOfferPx","TotalBidNumber","TotalOfferNumber","BidTradeMaxDuration","OfferTradeMaxDuration","NumBidOrders","NumOfferOrders","WithdrawBuyNumber","WithdrawBuyAmount","WithdrawBuyMoney","WithdrawSellNumber","WithdrawSellAmount","WithdrawSellMoney","ETFBuyNumber","ETFBuyAmount","ETFBuyMoney","ETFSellNumber","ETFSellAmount","ETFSellMoney"]
colType = ["SYMBOL","TIMESTAMP","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","INT","DOUBLE","SYMBOL","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","DOUBLE","DOUBLE","INT","INT","INT","INT","INT","INT","INT","INT","DOUBLE","INT","INT","DOUBLE","INT","INT","INT","INT","INT","INT"]
schemaTable = table(1:0,colName, colType)
db.createPartitionedTable(table=schemaTable, tableName=tbName, partitionColumns=`DateTime`SecurityID, compressMethods={DateTime:"delta"}, sortColumns=`SecurityID`DateTime, keepDuplicates=ALL)
}
對于 snapshot 資料,本文采用的資料庫磁區方案是組合磁區,第一層按天磁區,第二層對股票代碼按 HASH 分50個磁區,如何根據資料確定磁區方案可參考 DolphinDB 磁區資料庫教程,
- 創建清洗后 snapshot 資料存盤表:
創建清洗后以 Array 格式存盤 snapshot 資料的庫表,核心代碼如下:
module processSnapshot::createSnapshot_array
//創建清洗后的 snapshot 資料存盤表
def createProcessTable(dbName, tbName){
if(existsDatabase(dbName)){
dropDatabase(dbName)
}
db1 = database(, VALUE, 2020.01.01..2021.01.01)
db2 = database(, HASH, [SYMBOL, 50])
//按天和股票代碼組合磁區
db = database(dbName,COMPO,[db1,db2],engine='TSDB')
colName = ["SecurityID","DateTime","PreClosePx","OpenPx","HighPx","LowPx","LastPx","TotalVolumeTrade","TotalValueTrade","InstrumentStatus","BidPrice","BidOrderQty","BidNumOrders","BidOrders","OfferPrice","OfferOrderQty","OfferNumOrders","OfferOrders","NumTrades","IOPV","TotalBidQty","TotalOfferQty","WeightedAvgBidPx","WeightedAvgOfferPx","TotalBidNumber","TotalOfferNumber","BidTradeMaxDuration","OfferTradeMaxDuration","NumBidOrders","NumOfferOrders","WithdrawBuyNumber","WithdrawBuyAmount","WithdrawBuyMoney","WithdrawSellNumber","WithdrawSellAmount","WithdrawSellMoney","ETFBuyNumber","ETFBuyAmount","ETFBuyMoney","ETFSellNumber","ETFSellAmount","ETFSellMoney"]
colType = ["SYMBOL","TIMESTAMP","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","INT","DOUBLE","SYMBOL","DOUBLE[]","INT[]","INT[]","INT[]","DOUBLE[]","INT[]","INT[]","INT[]","INT","INT","INT","INT","DOUBLE","DOUBLE","INT","INT","INT","INT","INT","INT","INT","INT","DOUBLE","INT","INT","DOUBLE","INT","INT","INT","INT","INT","INT"]
schemaTable = table(1:0, colName, colType)
db.createPartitionedTable(table=schemaTable, tableName=tbName, partitionColumns=`DateTime`SecurityID, compressMethods={DateTime:"delta"}, sortColumns=`SecurityID`DateTime, keepDuplicates=ALL)
}
- 創建 K 線結果存盤表:
創建分鐘級 K 線結果存盤表,核心代碼如下:
module Factor::createFactorOneMinute
//創建分鐘 k 線因子儲存表
def createFactorOneMinute(dbName, tbName){
if(existsDatabase(dbName)){
dropDatabase(dbName)
}
//按天磁區
db = database(dbName, VALUE, 2021.01.01..2021.01.03,engine = `TSDB)
colName = `TradeDate`TradeTime`SecurityID`Open`High`Low`Close`Volume`Amount`Vwap
colType =[DATE, MINUTE, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, LONG, DOUBLE, DOUBLE]
tbSchema = table(1:0, colName, colType)
db.createPartitionedTable(table=tbSchema,tableName=tbName,partitionColumns=`TradeDate,sortColumns=`SecurityID`TradeTime,keepDuplicates=ALL)
}
創建日級 K 線結果存盤表,核心代碼如下:
module Factor::createFactorDaily
//創建日 K 線儲存表
def createFactorDaily(dbName, tbName){
if(existsDatabase(dbName)){
dropDatabase(dbName)
}
//按年磁區
db = database(dbName, RANGE, datetimeAdd(2000.01M,(0..50)*12, "M"),engine = `TSDB)
colName = `TradeDate`SecurityID`Open`High`Low`Close`Volume`Amount`Vwap
colType =[DATE, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, LONG, DOUBLE, DOUBLE]
tbSchema = table(1:0, colName, colType)
db.createPartitionedTable(table=tbSchema,tableName=tbName,partitionColumns=`TradeDate,sortColumns=`SecurityID`TradeDate,keepDuplicates=ALL)
}
2.3.2 資料清洗
本文將 snapshot 資料的清洗主要分為兩個部分:
- 將多檔價格、委托量以 Array 形式儲存
- 資料去重
具體的處理流程及核心代碼如下:
module processSnapshot::processSnapshotData
//將資料組合為 array、去重,并統計重復資料量
def mapProcess(mutable t, dbName, tbName){
n1 = t.size()
t = select SecurityID, DateTime, PreClosePx, OpenPx, HighPx, LowPx, LastPx, TotalVolumeTrade, TotalValueTrade, InstrumentStatus, fixedLengthArrayVector(BidPrice0, BidPrice1, BidPrice2, BidPrice3, BidPrice4, BidPrice5, BidPrice6, BidPrice7, BidPrice8, BidPrice9) as BidPrice, fixedLengthArrayVector(BidOrderQty0, BidOrderQty1, BidOrderQty2, BidOrderQty3, BidOrderQty4, BidOrderQty5, BidOrderQty6, BidOrderQty7, BidOrderQty8, BidOrderQty9) as BidOrderQty, fixedLengthArrayVector(BidNumOrders0, BidNumOrders1, BidNumOrders2, BidNumOrders3, BidNumOrders4, BidNumOrders5, BidNumOrders6, BidNumOrders7, BidNumOrders8, BidNumOrders9) as BidNumOrders, fixedLengthArrayVector(BidOrders0, BidOrders1, BidOrders2, BidOrders3, BidOrders4, BidOrders5, BidOrders6, BidOrders7, BidOrders8, BidOrders9, BidOrders10, BidOrders11, BidOrders12, BidOrders13, BidOrders14, BidOrders15, BidOrders16, BidOrders17, BidOrders18, BidOrders19, BidOrders20, BidOrders21, BidOrders22, BidOrders23, BidOrders24, BidOrders25, BidOrders26, BidOrders27, BidOrders28, BidOrders29, BidOrders30, BidOrders31, BidOrders32, BidOrders33, BidOrders34, BidOrders35, BidOrders36, BidOrders37, BidOrders38, BidOrders39, BidOrders40, BidOrders41, BidOrders42, BidOrders43, BidOrders44, BidOrders45, BidOrders46, BidOrders47, BidOrders48, BidOrders49) as BidOrders, fixedLengthArrayVector(OfferPrice0, OfferPrice1, OfferPrice2, OfferPrice3, OfferPrice4, OfferPrice5, OfferPrice6, OfferPrice7, OfferPrice8, OfferPrice9) as OfferPrice, fixedLengthArrayVector(OfferOrderQty0, OfferOrderQty1, OfferOrderQty2, OfferOrderQty3, OfferOrderQty4, OfferOrderQty5, OfferOrderQty6, OfferOrderQty7, OfferOrderQty8, OfferOrderQty9) as OfferQty, fixedLengthArrayVector(OfferNumOrders0, OfferNumOrders1, OfferNumOrders2, OfferNumOrders3, OfferNumOrders4, OfferNumOrders5, OfferNumOrders6, OfferNumOrders7, OfferNumOrders8, OfferNumOrders9) as OfferNumOrders, fixedLengthArrayVector(OfferOrders0, OfferOrders1, OfferOrders2, OfferOrders3, OfferOrders4, OfferOrders5, OfferOrders6, OfferOrders7, OfferOrders8, OfferOrders9, OfferOrders10, OfferOrders11, OfferOrders12, OfferOrders13, OfferOrders14, OfferOrders15, OfferOrders16, OfferOrders17, OfferOrders18, OfferOrders19, OfferOrders20, OfferOrders21, OfferOrders22, OfferOrders23, OfferOrders24, OfferOrders25, OfferOrders26, OfferOrders27, OfferOrders28, OfferOrders29, OfferOrders30, OfferOrders31, OfferOrders32, OfferOrders33, OfferOrders34, OfferOrders35, OfferOrders36, OfferOrders37, OfferOrders38, OfferOrders39, OfferOrders40, OfferOrders41, OfferOrders42, OfferOrders43, OfferOrders44, OfferOrders45, OfferOrders46, OfferOrders47, OfferOrders48, OfferOrders49) as OfferOrders, NumTrades, IOPV, TotalBidQty, TotalOfferQty, WeightedAvgBidPx, WeightedAvgOfferPx, TotalBidNumber, TotalOfferNumber, BidTradeMaxDuration, OfferTradeMaxDuration, NumBidOrders, NumOfferOrders, WithdrawBuyNumber, WithdrawBuyAmount, WithdrawBuyMoney, WithdrawSellNumber, WithdrawSellAmount, WithdrawSellMoney, ETFBuyNumber, ETFBuyAmount, ETFBuyMoney, ETFSellNumber, ETFSellAmount, ETFSellMoney from t where isDuplicated([SecurityID, DateTime], FIRST) = false
n2 = t.size()
loadTable(dbName, tbName).append!(t)
return n1,n2
}
def process(processDate, dbName_orig, tbName_orig, dbName_process, tbName_process){
dataString = temporalFormat(processDate, "yyyyMMdd")
//查詢處理日期的資料在資料庫中是否存在
todayCount = exec count(*) from loadTable(dbName_process, tbName_process) where date(DateTime)=processDate
//如果庫里面已經存在當天要處理的資料,洗掉庫里面已有資料
if(todayCount != 0){
writeLog("Start to delete the process snapshot data, the delete date is: " + dataString)
dropPartition(database(dbName_process), processDate, tbName_process)
writeLog("Successfully deleted the process snapshot data, the delete date is: " + dataString)
}
//開始處理資料
writeLog("Start process Snapshot Data, the datetime is "+ dataString)
ds = sqlDS(sql(select=sqlCol("*"), from=loadTable(dbName_orig,tbName_orig),where=<date(DateTime)=processDate>))
n1,n2=mr(ds, mapProcess{, dbName_process, tbName_process}, +, , false)
if(n1 != n2){
writeLog("ERROR: Duplicated datas exists in " + dataString + ", Successfully drop " + string(n1-n2) + " pieces of data" )
}
writeLog("Successfully process the snapshot data, the processDate is: " + dataString)
}
2.3.3 清洗行情資料合成 K 線
分鐘級 K 線合成并入庫, 核心代碼如下:
module Factor::calFactorOneMinute
//合成分鐘 K 線并入庫
def calFactorOneMinute(dbName, tbName, mutable factorTable){
pt = loadTable(dbName, tbName)
//將資料分為10天一組計算
dayList = schema(pt).partitionSchema[0]
if(dayList.size()>10) dayList = dayList.cut(10)
for(days in dayList){
//計算分鐘 K 線
res = select first(LastPX) as Open, max(LastPx) as High, min(LastPx) as Low, last(LastPx) as Close, sum(TotalVolumeTrade) as Volume, sum(LastPx*totalVolumeTrade) as Amount, wavg(LastPx, TotalVolumeTrade) as Vwap from pt where date(DateTime) in days group by date(DateTime) as TradeDate,minute(DateTime) as TradeTime, SecurityID
writeLog("Start to append minute factor result , the days is: [" + concat(days, ",")+"]")
//分鐘 K 線入庫
factorTable.append!(res)
writeLog("Successfully append the minute factor result to databse, the days is: [" + concat(days, ",")+"]")
}
}
日級 K 線合成并入庫, 核心代碼如下:
module Factor::calFactorDaily1
//合成日 K 線并入庫
def calFactorDaily(dbName, tbName, mutable factorTable){
pt = loadTable(dbName, tbName)
//將資料分為10天一組計算
dayList = schema(pt).partitionSchema[0]
if(dayList.size()>10) dayList = dayList.cut(10)
for(days in dayList){
//計算日 K 線
res = select first(LastPX) as Open, max(LastPx) as High, min(LastPx) as Low, last(LastPx) as Close, sum(TotalVolumeTrade) as Volume, sum(LastPx*totalVolumeTrade) as Amount, wavg(LastPx, TotalVolumeTrade) as Vwap from pt where date(DateTime) in days group by date(DateTime) as TradeDate, SecurityID
writeLog("Start to append daily factor result , the days is: [" + concat(days, ",")+"]")
//日 K 線入庫
factorTable.append!(res)
writeLog("Successfully append the daily factor result to databse, the days is: [" + concat(days, ",")+"]")
}
}
2.4 增量資料清洗
增量資料的匯入和全量資料的匯入整體邏輯相同,主要區別如下:
- 全量資料批量匯入,增量資料每天定時執行
- 全量資料匯入通過 DolphinDB submitjob 異步提交任務,增量資料匯入通過呼叫 append! 介面同步匯入
- 資料加工 K 線全量資料批量加工,增量資料只加工當天資料
具體代碼差別見附件,
2.5 Airflow 生成 DAG 執行任務
2.5.1 生成一個 DAG 實體
生成全量 DAG 實體的示例如下:
with DAG(dag_id="ETLTest", start_date=datetime(2023, 3, 10), schedule_interval=None) as dag:
dag_id 指定了 DAG 名稱,需要具有唯一性;start_date 設定任務開始日期;schedule_interval 指定兩次任務的間隔;None 表示該任務不自動執行需手動觸發, 增量 DAG 示例如下:
args = {
"owner" : "Airflow",
"start_date" : days_ago(1),
"catchup" : False,
'retries' : 0
}
with DAG(dag_id="addETLTest", default_args = args, schedule_interval="0 12 * * *") as dag:
增量 DAG 中 catchup 表示是否進行回填任務操作,retries 表示任務失敗重試次數,schedule_interval = “0 12 * * * ” 表示在每天12點 (UTC) 運行任務,schedule 設定詳細可參考:Scheduling & Triggers
2.5.2 獲取 Airflow 中的變數
Airflow 中設定的變數值,無法直接在 DolphinDB 腳本中獲取,為了在后續的任務中使用,本文通過將 Airflow 中變數寫入共享表的方式,來實作后續在 DolphinDB 任務讀取變數,具體代碼示例如下:
//獲取變數值
variable = ['ETL_dbName_origin', "ETL_tbName_origin", "ETL_dbName_process",
"ETL_tbName_process", 'ETL_dbName_factor','ETL_tbName_factor','ETL_dbName_factor_daily',
'ETL_tbName_factor_daily',"ETL_filedir", "ETL_start_date","ETL_end_date"]
value = https://www.cnblogs.com/DolphinDB/p/[]
for var in variable:
value.append(str(Variable.get(var)))
variables =",".join(variable)
values = ",".join(value)
//通過DolphinDBOperator創建共享表,并將變數值寫入共享表中
create_parameter_table = DolphinDBOperator(
task_id='create_parameter_table',
dolphindb_conn_id='dolphindb_test',
sql='''
undef(`paramTable,SHARED)
t = table(1:0, `param`value, [STRING, STRING])
share t as paramTable
'''
)
given_param = DolphinDBOperator(
task_id='given_param',
dolphindb_conn_id='dolphindb_test',
sql="params = split('" + variables + "',',');" + \
"values = split('" + values + "',',');" + \
"for(i in 0..(params.size()-1)){" + \
"insert into paramTable values(params[i], values[i]);}"
)
運行該任務后可在 DolphinDB GUI 共享變數欄中看到引數表及其值,如下圖所示:


每個 DAG 變數的值可以通過 AirFlow 進行修改,點擊下圖所示位置:

DAG 生成后,在如下 Web 頁面顯示 DAG 使用的變數可以動態修改,如下所示:

下表為本專案中涉及的變數名稱及其含義:

2.5.3 DolphinDBOperator 執行任務
- DolphinDBOperator 全量處理資料
通過 DolphinDBOperator 將上述的資料入庫、清洗、計算等設定為 DAG 中的任務
全量處理核心代碼如下:
loadSnapshot = DolphinDBOperator(
task_id='loadSnapshot',
dolphindb_conn_id='dolphindb_test',
sql='''
pnodeRun(clearAllCache)
undef(all)
go;
//使用 module,加載已封裝好的建表及入庫函式
use loadSnapshot::createSnapshotTable
use loadSnapshot::loadSnapshotData
//通過引數共享表獲取引數
params = dict(paramTable[`param], paramTable[`value])
dbName = params[`ETL_dbName_origin]
tbName = params[`ETL_tbName_origin]
startDate = date(params[`ETL_start_date])
endDate = date(params[`ETL_end_date])
fileDir = params[`ETL_filedir]
//結果庫表不存在則創建
if(not existsDatabase(dbName)){
loadSnapshot::createSnapshotTable::createSnapshot(dbName, tbName)
}
//呼叫清洗函式,后臺多行程寫入,提高寫入效率
start = now()
for (loadDate in startDate..endDate){
submitJob("loadSnapshot"+year(loadDate)+monthOfYear(loadDate)+dayOfMonth(loadDate), "loadSnapshot", loadSnapshot::loadSnapshotData::loadSnapshot{, dbName, tbName, fileDir}, loadDate)
}
//查看寫入任務是否完成,以保證后續處理部分資料源完整
do{
cnt = exec count(*) from getRecentJobs() where jobDesc="loadSnapshot" and endTime is null
}
while(cnt != 0)
//查看匯入程序中是否有例外,有例外則拋出例外
cnt = exec count(*) from pnodeRun(getRecentJobs) where jobDesc="loadSnapshot" and errorMsg is not null and receivedTime > start
if (cnt != 0){
error = exec errorMsg from pnodeRun(getRecentJobs) where jobDesc="loadSnapshot" and errorMsg is not null and receivedTime > start
throw error[0]
}
'''
)
processSnapshot = DolphinDBOperator(
task_id='processSnapshot',
dolphindb_conn_id='dolphindb_test',
sql='''
pnodeRun(clearAllCache)
undef(all)
go;
//使用 module,加載已封裝好的建表及入庫函式
use processSnapshot::createSnapshot_array
use processSnapshot::processSnapshotData
//通過引數共享表獲取引數
params = dict(paramTable[`param], paramTable[`value])
dbName_orig = params[`ETL_dbName_origin]
tbName_orig = params[`ETL_tbName_origin]
dbName_process = params[`ETL_dbName_process]
tbName_process = params[`ETL_tbName_process]
startDate = date(params[`ETL_start_date])
endDate = date(params[`ETL_end_date])
//結果庫表不存在則創建
if(not existsDatabase(dbName_process)){
processSnapshot::createSnapshot_array::createProcessTable(dbName_process, tbName_process)
}
//后臺多行程處理,提高處理效率
start = now()
for (processDate in startDate..endDate){
submitJob("processSnapshot"+year(processDate)+monthOfYear(processDate)+dayOfMonth(processDate), "processSnapshot", processSnapshot::processSnapshotData::process{, dbName_orig, tbName_orig, dbName_process, tbName_process}, processDate)
}
//查看清洗任務是否完成,以保證后續處理部分資料源完整
do{
cnt = exec count(*) from getRecentJobs() where jobDesc="processSnapshot" and endTime is null
}
while(cnt != 0)
//查看清洗程序中是否有例外,有例外則拋出例外
cnt = exec count(*) from pnodeRun(getRecentJobs) where jobDesc="processSnapshot" and errorMsg is not null and receivedTime > start
if (cnt != 0){
error = exec errorMsg from pnodeRun(getRecentJobs) where jobDesc="processSnapshot" and errorMsg is not null and receivedTime > start
throw error[0]
}
'''
)
calMinuteFactor = DolphinDBOperator(
task_id='calMinuteFactor',
dolphindb_conn_id='dolphindb_test',
sql='''
pnodeRun(clearAllCache)
undef(all)
go;
//使用 module,加載已封裝好的建表及入庫函式
use Factor::createFactorOneMinute
use Factor::calFactorOneMinute
//通過引數共享表獲取引數
params = dict(paramTable[`param], paramTable[`value])
dbName = params[`ETL_dbName_process]
tbName = params[`ETL_tbName_process]
dbName_factor = params[`ETL_dbName_factor]
tbName_factor = params[`ETL_tbName_factor]
//結果庫表不存在則創建
if(not existsDatabase(dbName_factor)){
createFactorOneMinute(dbName_factor, tbName_factor)
}
factorTable = loadTable(dbName_factor, tbName_factor)
//呼叫計算函式
calFactorOneMinute(dbName, tbName,factorTable)
'''
)
calDailyFactor = DolphinDBOperator(
task_id='calDailyFactor',
dolphindb_conn_id='dolphindb_test',
sql='''
pnodeRun(clearAllCache)
undef(all)
go;
//使用 module,加載已封裝好的建表及入庫函式
use Factor::createFactorDaily
use Factor::calFactorDaily1
//通過引數共享表獲取引數
params = dict(paramTable[`param], paramTable[`value])
dbName = params[`ETL_dbName_process]
tbName = params[`ETL_tbName_process]
dbName_factor = params[`ETL_dbName_factor_daily]
tbName_factor = params[`ETL_tbName_factor_daily]
//結果庫表不存在則創建
if(not existsDatabase(dbName_factor)){
createFactorDaily(dbName_factor, tbName_factor)
}
//呼叫計算函式
factorTable = loadTable(dbName_factor, tbName_factor)
Factor::calFactorDaily1::calFactorDaily(dbName, tbName,factorTable)
'''
)
根據任務間的依賴關系,構建 DAG,示例如下:
start_task >> create_parameter_table >> given_param >> loadSnapshot >> processSnapshot >> calMinuteFactor >> calDailyFactor
- DolphinDBOperator 增量資料入庫
增量資料任務構建代碼如下:
addLoadSnapshot = DolphinDBOperator(
task_id='addLoadSnapshot',
dolphindb_conn_id='dolphindb_test',
sql='''
pnodeRun(clearAllCache)
undef(all)
go;
//使用module,加載已封裝好的入庫函式
use addLoadSnapshot::loadSnapshotData
//通過引數共享表獲取引數
params = dict(paramTable[`param], paramTable[`value])
dbName = params[`ETL_dbName_origin]
tbName = params[`ETL_tbName_origin]
fileDir = params[`ETL_filedir]
//獲取交易日歷
MarketDays = getMarketCalendar("CFFEX")
//是交易日則進行資料入庫
if(today() in MarketDays ){
fileDir = params[`ETL_filedir]
addLoadSnapshot::loadSnapshotData::loadSnapshot(today(), dbName, tbName, fileDir)
}
'''
)
addProcessSnapshot = DolphinDBOperator(
task_id='addProcessSnapshot',
dolphindb_conn_id='dolphindb_test',
sql='''
pnodeRun(clearAllCache)
undef(all)
go;
//使用module,加載已封裝好的清洗函式
use addProcessSnapshot::processSnapshotData
//通過引數共享表獲取引數
params = dict(paramTable[`param], paramTable[`value])
dbName_orig = params[`ETL_dbName_origin]
tbName_orig = params[`ETL_tbName_origin]
dbName_process = params[`ETL_dbName_process]
tbName_process = params[`ETL_tbName_process]
//獲取交易日歷
MarketDays = getMarketCalendar("CFFEX")
//是交易日則進行資料處理
if(today() in MarketDays ){
addProcessSnapshot::processSnapshotData::process(today(), dbName_orig, tbName_orig, dbName_process, tbName_process)
}
'''
)
addCalMinuteFactor= DolphinDBOperator(
task_id='addCalMinuteFactor',
dolphindb_conn_id='dolphindb_test',
sql='''
pnodeRun(clearAllCache)
undef(all)
go;
//使用module,加載已封裝好的計算函式
use addFactor::calFactorOneMinute
//通過引數共享表獲取引數
params = dict(paramTable[`param], paramTable[`value])
dbName = params[`ETL_dbName_process]
tbName = params[`ETL_tbName_process]
dbName_factor = params[`ETL_dbName_factor]
tbName_factor = params[`ETL_tbName_factor]
factorTable = loadTable(dbName_factor, tbName_factor)
//獲取交易日歷
MarketDays = getMarketCalendar("CFFEX")
//是交易日則呼叫計算函式合成分鐘K線
if(today() in MarketDays ){
addFactor::calFactorOneMinute::calFactorOneMinute(dbName, tbName,today(), factorTable)
}
'''
)
addCalDailyFactor= DolphinDBOperator(
task_id='addCalDailyFactor',
dolphindb_conn_id='dolphindb_test',
sql='''
pnodeRun(clearAllCache)
undef(all)
go;
//使用module,加載已封裝好的計算函式
use addFactor::calFactorDaily1
//通過引數共享表獲取引數
params = dict(paramTable[`param], paramTable[`value])
dbName = params[`ETL_dbName_process]
tbName = params[`ETL_tbName_process]
dbName_factor = params[`ETL_dbName_factor_daily]
tbName_factor = params[`ETL_tbName_factor_daily]
factorTable = loadTable(dbName_factor, tbName_factor)
//獲取交易日歷
MarketDays = getMarketCalendar("CFFEX")
//是交易日則呼叫計算函式合成日K線
if(today() in MarketDays ){
addFactor::calFactorDaily1::calFactorDaily(dbName, tbName,today(), factorTable)
}
'''
)
根據任務間的依賴關系,構建 DAG,示例如下:
start_task >> create_parameter_table >> given_param >> addLoadSnapshot >> addProcessSnapshot >> addCalMinuteFactor >> addCalDailyFactor
2.5.4 生成 DAG
根據如下步驟部署專案:
- 第一步 DolphinDB 專案部署
將 DolphinDB 專案中的 addETL 和 fullETL 專案分別匯入 DolphinDB GUI (DolphinDB 客戶端工具)中:

將 addETL 及 fullETL 專案中的 module 模塊上傳至 Airflow 中已建立連接的 DolphinDB server 中:

-
第二步 python 專案部署
將 python 專案中的 python 腳本放置到 <Airflow_install_Dir/airflow/dags> 目錄下,注意,新建的 DAG 任務并不會馬上出現在界面上,默認需要等待5分鐘后重繪,也可修改 airflow.cfg 檔案中的 dag_dir_list_interval 調整重繪間隔, -
第三步 Airflow 變數匯入
在 Airflow 網頁中進入 Admin-->Variables,將 Variables.json 檔案上傳,將變數匯入 Airflow 中,并根據實際情況調整變數值,

- 第四步 上傳原始資料檔案
將資料檔案上傳至服務器,并根據資料檔案的實際存放路徑,在 Airflow 中修改ETL_filedir變數,如運行增量 ETL 任務,需要將資料檔案名中的日期改為當前日期,如:20230330snapshot.csv,以避免無資料導致任務失敗,
最終實作 DAG 如下所示:
全量資料入庫:

增量資料入庫:

運行任務后,任務實體為綠色代表任務運行成功;紅色表示任務運行失敗;橙色則表示該任務所依賴的上游任務運行失敗,任務未啟動,
3. 常見問題解答(FAQ)
3.1 如何捕獲 DolphinDB 腳本中的 print 函式列印的資訊
- DolphinDB 腳本的 print 的資訊為標準輸出,可以在
airflow-scheduler.out中找到,如下圖所示:

3.2 DolphinDB 腳本中的異步作業 submitjob 如何檢測其完成狀態
通過 DolphinDB 的函式 getRecntJobs 獲取后臺作業資訊, 之后在 DolphinDB DAG 中添加判斷邏輯, 代碼示例如下:
DolphinDBOperator(
task_id='processSnapshot',
dolphindb_conn_id='dolphindb_test',
sql='''
//檢查所有任務是否全部完成
do{
cnt = exec count(*) from getRecentJobs() where jobDesc="processSnapshot" and endTime is null
}
while(cnt != 0)
//檢查后臺任務是否成功,失敗則拋出例外
cnt = exec count(*) from pnodeRun(getRecentJobs) where jobDesc="processSnapshot" and errorMsg is not null and receivedTime > start
if (cnt != 0){
error = exec errorMsg from pnodeRun(getRecentJobs) where jobDesc="processSnapshot" and errorMsg is not null and receivedTime > start
throw error[0]
}
'''
)
3.3 執行 Airflow 中經常遇到連接超時斷開,該如何處理

當遇到如上問題,可能是網路延時導致的,可以在建立連接時設定引數,如上圖,在 DolphinDB 連接中設定 KeepAliveTime 及 reconnect 引數即可,
3.4 將 start_date 日期設為當前日期,每天運行一次,為什么當天不會運行
- 在 Airflow 中一個定時調度任務的最早開始時間為 start_date + scheduler_interval,例如:
start_date = 2023.03.16,每天呼叫一次,則最早一次任務調度為 2023.03.17,所以當天的任務無法執行,
3.5 DolphinDBOperator 任務運行失敗如何定位失敗原因
- 任務失敗后,DolphinDBOperator 會將具體的錯誤資訊列印在日志中,可通過查看日志資訊,定位例外代碼并進行修改,查看日志資訊步驟如下:



4. 總結
? 本教程從一個常用行情資料 ETL 案例出發,著重闡述了如何將 Airflow 調度框架與 DolphinDB 資料庫結合起來進行結構化 ETL 作業管理, 以此來節省時間,提高效率,降低運維成本,同時,由于篇幅有限,涉及到DolphinDB 和 Airflow 框架的一些其它操作未能更進一步展示,用戶在使用程序中需要按照實際情況進行調整,也歡迎大家對本教程中可能存在的紕漏和缺陷批評指正,
附件
- 依賴包:pydolphindb-1.0.0-py3-none-any.whl,apache_airflow_providers_dolphindb-1.0.0-py3-none-any.whl
- DolphinDB 工程專案:addETL,fullETL
- Python 專案:addETL.py,fullETL.py
- 資料檔案:20210104snapshot.csv
- Airflow 變數:Variables.json
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/550169.html
標籤:其它
