元編程是指使用程式代碼來生成可以動態運行的程式代碼,元編程的目的一般是延遲執行代碼或動態創建代碼,
1. DolphinDB實作元編程的方法
DolphinDB支持使用元編程來動態創建運算式,包括函式呼叫的運算式、SQL查詢運算式等,DolphinDB有兩種實作元編程的方法:
(1)使用一對尖括號<>來表示需要延遲執行的動態代碼,例如,
a = <1 + 2 * 3>
typestr(a);
CODE
//a是元代碼,它的資料型別是CODE
eval(a);
7
//eval函式用于執行元代碼
(2)使用函式來創建各種運算式,常用的元編程函式包括expr, parseExpr, partial, sqlCol, sqlColAlias, sql, eval, makeCall. 下面介紹這幾個函式的用法,
- expr函式根據輸入的物件、運算子或其他元代碼生成元代碼,例如:
a = expr(1, +, 2, *, 3)
a.typestr();
CODE
a;
< 1 + 2 * 3 >
- parseExpr函式可以把字串轉換為元代碼,例如:
parseExpr("1+2")
< 1 + 2 >
- partial函式可以固定一個函式的部分引數,產生一個引數較少的函式,例如:
partial(add,1)(2)
3
def f(a,b):a pow b
g=partial(f, 2)
g(3)
8
- sqlCol, sqlColAlias和sql函式用于動態生成SQL運算式,
sqlCol函式可以將列名轉換成運算式,sqlColAlias常用于生成計算列的元代碼,sql函式可以動態地生成SQL陳述句,
sym = take(`GE,6) join take(`MSFT,6) join take(`F,6)
date=take(take(2017.01.03,2) join take(2017.01.04,4), 18)
PRC=31.82 31.69 31.92 31.8 31.75 31.76 63.12 62.58 63.12 62.77 61.86 62.3 12.46 12.59 13.24 13.41 13.36 13.17
vol=2300 3500 3700 2100 1200 4600 8800 7800 6400 4200 2300 6800 4200 5600 8900 2300 6300 9600
t1 = table(sym, date, PRC, vol);
sql(sqlCol("*"),t1)
< select * from t1 >
sql(sqlCol("*"),t1,[<sym="MSFT">,<PRC>=5000>])
< select * from t1 where sym == "MSFT",PRC >= 5000 >
sql(sqlColAlias(<avg(vol)>,"avg_vol"),t1,<sym="MSFT">,sqlCol("date"))
< select avg(vol) as avg_vol from t1 where sym == "MSFT" group by date >
sql(sqlColAlias(<avg(vol)>,"avg_vol"),t1,<sym="MSFT">,sqlCol("date"),,,,<avg(vol)>3000>)
< select avg(vol) as avg_vol from t1 where sym == "MSFT" group by date having avg(vol) > 3000 >
sql(sqlColAlias(<avg(vol)>,"avg_vol"),t1,<sym="MSFT">,sqlCol("date"),0)
< select avg(vol) as avg_vol from t1 where sym == "MSFT" context by date >
sql(sqlColAlias(<avg(vol)>,"avg_vol"),t1,<sym="MSFT">,sqlCol("date"),0,sqlCol("avg_vol"),0)
< select avg(vol) as avg_vol from t1 where sym == "MSFT" context by date csort avg_vol desc >
sql(sqlCol("*"),t1,,,,,,,sqlCol(`vol),0,5)
< select top 5 * from t1 order by vol desc >
- eval函式可以執行元代碼,例如:
a = <1 + 2 * 3>
eval(a);
7
sql(sqlColAlias(<avg(vol)>,"avg_vol"),t1,,sqlCol(["sym","date"])).eval();
sym date avg_vol
---- ---------- -------
F 2017.01.03 4900
F 2017.01.04 6775
GE 2017.01.03 2900
GE 2017.01.04 2900
MSFT 2017.01.03 8300
MSFT 2017.01.04 4925
//這里使用的t1是第(2)部分的t1
- makeCall函式可以根據指定的函式和輸入引數生成元代碼,例如,查詢表t1時,把date列輸出為字串,并以類似于03/01/2017的形式顯示,
sql([sqlColAlias(makeCall(temporalFormat,sqlCol(`date),"dd/MM/yyyy"),"date"),sqlCol(`sym),sqlCol(`PRC),sqlCol(`vol)],t1)
< select temporalFormat(date, "dd/MM/yyyy") as date,sym,PRC,vol from t1 >
2.DolphinDB元編程應用
2.1 更新磁區記憶體表
磁區記憶體表的更新、洗掉等操作不僅可以通過SQL陳述句完成,也可以通過元編程完成,創建磁區記憶體表:
n=1000000
sym=rand(`IBM`MSFT`GOOG`FB`IBM`MSFT,n)
date=rand(2018.01.02 2018.01.02 2018.01.02 2018.01.03 2018.01.03 2018.01.03,n)
price=rand(1000.0,n)
qty=rand(10000,n)
t=table(sym,date,price,qty)
db=database("",VALUE,`IBM`MSFT`GOOG`FB`IBM`MSFT)
trades=db.createPartitionedTable(t,`trades,`sym).append!(t)
2.1.1 更新資料
例如,更新股票代碼為IBM的交易數量:
trades[`qty,<sym=`IBM>]=<qty+100>
//等價于update trades set qty=qty+100 where sym=`IBM
2.1.2 新增一個列
例如,添加一個新的列volume,用于保存交易量:
trades[`volume]=<price*qty>
//等價于update trades set volume=price*qty
2.1.3 洗掉資料
例如,洗掉qty為0的資料:
trades.erase!(<qty=0>)
//等價于delete from trades where qty=0
2.1.4 動態生成過濾條件并更新資料
本例使用了以下資料表,
ind1=rand(100,10)
ind2=rand(100,10)
ind3=rand(100,10)
ind4=rand(100,10)
ind5=rand(100,10)
ind6=rand(100,10)
ind7=rand(100,10)
ind8=rand(100,10)
ind9=rand(100,10)
ind10=rand(100,10)
indNum=1..10
t=table(ind1,ind2,ind3,ind4,ind5,ind6,ind7,ind8,ind9,ind10,indNum)
我們需要對資料表進行更新操作,SQL陳述句如下:
update t set ind1=1 where indNum=1
update t set ind2=1 where indNum=2
update t set ind3=1 where indNum=3
update t set ind4=1 where indNum=4
update t set ind5=1 where indNum=5
update t set ind6=1 where indNum=6
update t set ind7=1 where indNum=7
update t set ind8=1 where indNum=8
update t set ind9=1 where indNum=9
update t set ind10=1 where indNum=10
如果資料表的列數較多,需要手工撰寫非常多的SQL陳述句,觀察以上陳述句可以發現,列名和過濾條件是有一定關系的,使用元編程可以非常方便地完成以上操作,
for(i in 1..10){
t["ind"+i,<indNum=i>]=1
}
2.2 在內置函式中使用元編程
DolphinDB的一些內置函式會使用到元編程,
2.2.1 視窗連接
在視窗連接(window join)中,需要為右表的視窗資料集指定一個或多個聚合函式以及這些函式運行時需要的引數,由于問題的描述和執行在兩個不同的階段,我們采用元編程來實作延后執行,
t = table(take(`ibm, 3) as sym, 10:01:01 10:01:04 10:01:07 as time, 100 101 105 as price)
q = table(take(`ibm, 8) as sym, 10:01:01+ 0..7 as time, 101 103 103 104 104 107 108 107 as ask, 98 99 102 103 103 104 106 106 as bid)
wj(t, q, -2 : 1, < [max(ask), min(bid), avg((bid+ask)*0.5) as avg_mid]>, `time)
sym time price max_ask min_bid avg_mid
--- -------- ----- ------- ------- -------
ibm 10:01:01 100 103 98 100.25
ibm 10:01:04 101 104 99 102.625
ibm 10:01:07 105 108 103 105.625
2.2.2 流計算引擎
DolphinDB有三種型別的流計算引擎:時間序列聚合引擎(createTimeSeriesAggregator)、橫截面引擎(createCrossSectionalAggregator)和例外檢測引擎(createAnomalyDetectionEngine),在使用這些流計算引擎時,需要為資料視窗中的資料集指定聚合函式或運算式以及它們運行時所需的引數,這種情況下,我們采用元編程來表示聚合函式或運算式以及它們所需的引數,以時間序列聚合引擎的應用為例:
share streamTable(1000:0, `time`sym`qty, [DATETIME, SYMBOL, INT]) as trades
output1 = table(10000:0, `time`sym`sumQty, [DATETIME, SYMBOL, INT])
agg1 = createTimeSeriesAggregator("agg1",60, 60, <[sum(qty)]>, trades, output1, `time, false,`sym, 50,,false)
subscribeTable(, "trades", "agg1", 0, append!{agg1}, true)
insert into trades values(2018.10.08T01:01:01,`A,10)
insert into trades values(2018.10.08T01:01:02,`B,26)
insert into trades values(2018.10.08T01:01:10,`B,14)
insert into trades values(2018.10.08T01:01:12,`A,28)
insert into trades values(2018.10.08T01:02:10,`A,15)
insert into trades values(2018.10.08T01:02:12,`B,9)
insert into trades values(2018.10.08T01:02:30,`A,10)
insert into trades values(2018.10.08T01:04:02,`A,29)
insert into trades values(2018.10.08T01:04:04,`B,32)
insert into trades values(2018.10.08T01:04:05,`B,23)
select * from output1
time sym sumQty
------------------- --- ------
2018.10.08T01:02:00 A 38
2018.10.08T01:03:00 A 25
2018.10.08T01:02:00 B 40
2018.10.08T01:03:00 B 9
2.3 定制報表
元編程可以用于定制報表,下例定義了一個用于生成報表的自定義函式,用戶只需要輸入資料表、欄位名稱以及欄位相應的格式字串即可,
def generateReport(tbl, colNames, colFormat, filter){
colCount = colNames.size()
colDefs = array(ANY, colCount)
for(i in 0:colCount){
if(colFormat[i] == "")
colDefs[i] = sqlCol(colNames[i])
else
colDefs[i] = sqlCol(colNames[i], format{,colFormat[i]})
}
return sql(colDefs, tbl, filter).eval()
}
創建模擬的歷史資料庫:
if(existsDatabase("dfs://historical_db")){
dropDatabase("dfs://historical_db")
}
n=5000000
dates=2012.09.01..2012.09.30
syms=symbol(`IBM`MSFT`GOOG`FB`AAPL)
t=table(rand(dates,n) as date, rand(syms,n) as sym, rand(200.0,n) as price, rand(1000..2000,n) as qty)
db1=database("",VALUE,dates)
db2=database("",VALUE,syms)
db=database("dfs://historical_db",COMPO,[db1,db2])
stock=db.createPartitionedTable(t,`stock,`date`sym).append!(t)
選擇2012年9月1日股票代碼為IBM的資料生成報表:
generateReport(stock,`date`sym`price`qty,["MM/dd/yyyy","","###.00","#,###"],<date=2012.09.01 and sym=`IBM >)
date sym price qty
---------- --- ------ -----
09/01/2012 IBM 90.97 1,679
09/01/2012 IBM 22.36 1,098
09/01/2012 IBM 133.42 1,404
09/01/2012 IBM 182.08 1,002
09/01/2012 IBM 144.67 1,468
09/01/2012 IBM 6.59 1,256
09/01/2012 IBM 73.09 1,149
09/01/2012 IBM 83.35 1,415
09/01/2012 IBM 93.13 1,006
09/01/2012 IBM 88.05 1,406
...
上面的陳述句等價于以下SQL陳述句:
select format(date,"MM/dd/yyyy") as date, sym, format(price,"###.00") as price, format(qty,"#,###") as qty from stock where date=2012.09.01 and sym=`IBM
2.4 物聯網中動態生成計算指標
在物聯網的實時流計算中,資料源包含tag, timestamp和value三個欄位,現在需要對輸入的原始資料進行實時的指標計算,由于每次收到的原始資料的tag數量和種類有可能不同,并且每次計算的指標也可能不同,我們無法將計算指標固定下來,因此這種情況下我們可以采用元編程的方法,我們需要定義一個配置表,將計算的指標放到該表中,可以根據實際增加、洗掉或修改計算指標,每次實時計算時,從配置表中動態地讀取需要計算的指標,并把計算的結果輸出到另外一個表中,
以下是示例代碼,pubTable是流資料的發布表,config表是存盤計算指標的配置表,由于計算指標有可能每次都不相同,這里采用的是并發版本控制表(mvccTable),subTable通過訂閱pubTable,對流資料進行實時計算,
t1=streamTable(1:0,`tag`value`time,[STRING,DOUBLE,DATETIME])
share t1 as pubTable
config = mvccTable(`index1`index2`index3`index4 as targetTag, ["tag1 + tag2", "sqrt(tag3)", "floor(tag4)", "abs(tag5)"] as formular)
subTable = streamTable(100:0, `targetTag`value, [STRING, FLOAT])
def calculateTag(mutable subTable,config,msg){
pmsg = select value from msg pivot by time, tag
for(row in config){
try{
insert into subTable values(row.targetTag, sql(sqlColAlias(parseExpr(row.formular), "value"), pmsg).eval().value)
}
catch(ex){print ex}
}
}
subscribeTable(,`pubTable,`calculateTag,-1,calculateTag{subTable,config},true)
//模擬寫入資料
tmp = table(`tag1`tag2`tag3`tag4 as tag, 1.2 1.3 1.4 1.5 as value, take(2019.01.01T12:00:00, 4) as time)
pubTable.append!(tmp)
select * from subTable
targetTag value
--------- --------
index1 2.5
index2 1.183216
index3 1
2.5 執行一組查詢,合并查詢結果
在資料分析中,有時我們需要對同一個資料集執行一組相關的查詢,并將查詢結果合并展示出來,如果每次都手動撰寫全部SQL陳述句,作業量大,并且擴展性差,通過元編程動態生成SQL可以解決這個問題,
本例使用的資料集結構如下(以第一行為例):
mt vn bc cc stt vt gn bk sc vas pm dls dt ts val vol
-------- ------- -- --- --- -- -- ---- -- --- -- ---------- ---------- ------ ----- -----
52354955 50982208 25 814 11 2 1 4194 0 0 0 2020.02.05 2020.02.05 153234 5.374 18600
我們需要對每天的資料都執行一組相關的查詢,比如:
select * from t where vn=50982208,bc=25,cc=814,stt=11,vt=2, dsl=2020.02.05, mt<52355979 order by mt desc limit 1
select * from t where vn=50982208,bc=25,cc=814,stt=12,vt=2, dsl=2020.02.05, mt<52355979 order by mt desc limit 1
select * from t where vn=51180116,bc=25,cc=814,stt=12,vt=2, dsl=2020.02.05, mt<52354979 order by mt desc limit 1
select * from t where vn=41774759,bc=1180,cc=333,stt=3,vt=116, dsl=2020.02.05, mt<52355979 order by mt desc limit 1
可以觀察到,這一組查詢中,過濾條件包含的列和排序列都相同,并且都是取排序后的第一行記錄,還有部分過濾條件的值相同,為此,我們撰寫了自定義函式bundleQuery:
def bundleQuery(tbl, dt, dtColName, mt, mtColName, filterColValues, filterColNames){
cnt = filterColValues[0].size()
filterColCnt =filterColValues.size()
orderByCol = sqlCol(mtColName)
selCol = sqlCol("*")
filters = array(ANY, filterColCnt + 2)
filters[filterColCnt] = expr(sqlCol(dtColName), ==, dt)
filters[filterColCnt+1] = expr(sqlCol(mtColName), <, mt)
queries = array(ANY, cnt)
for(i in 0:cnt) {
for(j in 0:filterColCnt){
filters[j] = expr(sqlCol(filterColNames[j]), ==, filterColValues[j][i])
}
queries.append!(sql(select=selCol, from=tbl, where=filters, orderBy=orderByCol, ascOrder=false, limit=1))
}
return loop(eval, queries).unionAll(false)
}
bundleQuery中各個引數的含義如下:
- tbl是資料表
- dt是過濾條件中日期的值
- dtColName是過濾條件中日期列的名稱
- mt是過濾條件中mt的值
- mtColName是過濾條件中mt列的名稱,以及排序列的名稱
- filterColValues是其他過濾條件中的值,用元組表示,其中的每個向量表示一個過濾條件,每個向量中的元素表示該過濾條件的值
- filterColNames是其他過濾條件中的列名,用向量表示
上面一組SQL陳述句,相當于執行以下代碼:
dt = 2020.02.05
dtColName = "dls"
mt = 52355979
mtColName = "mt"
colNames = `vn`bc`cc`stt`vt
colValues = [50982208 50982208 51180116 41774759, 25 25 25 1180, 814 814 814 333, 11 12 12 3, 2 2 2 116]
bundleQuery(t, dt, dtColName, mt, mtColName, colValues, colNames)
我們可以執行以下腳本把bundleQuery函式定義為函式視圖,這樣在集群的任何節點或者重啟系統之后,都可以直接使用該函式,
//please login as admin first
addFunctionView(bundleQuery)
3.小結
DolphinDB的元編程功能強大,使用簡單,能夠極大地提高程式開發效率,
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/245804.html
標籤:其他
