Python實作基于動態時間規整的股市交易策略測驗
- 一、策略簡介
- 二、策略原理
- 三、代碼實作
- 四、測驗策略
- 五、完整代碼
一、策略簡介
該專案采用Python語言,利用動態時間規整演算法對滬深300市場構建一種交易策略,并采用夏普比率來對該策略模擬交易的結果進行評估,
本專案只是對交易策略的初探,不涉及投資組合、風隙訓釋和資金管理等等,也不被視為任何型別的投資建議!歡迎相關專業人士指教,
二、策略原理
動態時間規整是一種衡量兩段時間序列的相似度的演算法,具體運算細節這里不多贅述,
我們將過去一段時間的滬深300每日漲跌率(當天收盤價與前一天收盤價相比)按照特定天數分成若干個時間序列,并設定持有時間,
為描述方便,商定:在時間序列結尾買入,過了持有時間后賣出,算作一次完整“交易”,
該策略會將正在被測驗的時期與過去的若干已知時間序列進行動態時間規整并找到相似度高的時間序列,若這些時間序列的“交易”得到了正收益,我們就期望在現時期進行一次“交易”,
三、代碼實作
使用到的庫:
import pandas as pd
import matplotlib.pyplot as plt
from fastdtw import fastdtw
from scipy.spatial.distance import euclidean
import numpy as np
import math
pandas和numpy幫助我們進行資料處理,plt用來畫圖,fastdtw用來進行動態時間規整,
spy = pd.read_csv('300.csv')
spy = spy.set_index(['Date'])
spy_c = spy['Close']
讀取滬深300從2005年到2021年的歷史資料,并取出收盤價資料,可以畫個圖看一下:

分別設定三個參量:時間序列的長度,持有天數,判定相似度高的閾值,
tlen = 5
hold = 7
thre = 5
計算每日漲跌率并按照tlen分成若干時間序列,并計算每個時間序列對應的一次完整“交易”的回報,將兩者放入tseries中:
tseries = []
for i in range(tlen,len(spy_c)-hold-1,tlen):
pctc = spy_c.iloc[i-tlen:i].pct_change()[1:].values*100
res = (spy_c[i+hold+1] - spy_c[i])/spy_c[i] * 100
tseries.append((pctc,res))
對每個序列都與其他序列依次計算相似度,并保存兩個序列的“交易”回報,dist越低代表相似度越高,
然后清理掉以下情況的資料:
- 相似度為0,即相同的兩個序列,
- 兩個序列太近,中間還沒超過一個完整的持有周期,
- 距離超過閾值,即相似度過低的兩個序列,
- 第一個序列的“交易”回報為負,
def dtw_dist(x, y):
distance, path = fastdtw(x, y, dist=euclidean)
return distance
dist_pairs = []
for i in range(len(tseries)):
for j in range(len(tseries)):
dist = dtw_dist(tseries[i][0],tseries[j][0])
dist_pairs.append((i,j,dist,tseries[i][1],tseries[j][1]))
dist_frame = pd.DataFrame(dist_pairs,columns=['A','B','Dist','A Ret','B Ret'])
sf = dist_frame[dist_frame['Dist']>0].sort_values(['A','B']).reset_index(drop=1)
sfe = sf[sf['A']+math.ceil(float(tlen+hold)/tlen) <= sf['B']]
winf = sfe[(sfe['Dist']<=thre)&(sfe['A Ret']>0)]
四、測驗策略
為評估我們的測驗結果,我們定義一個評估函式:
def get_stats(s,n=252):
cnt = len(s)
wins = len(s[s>0])
losses = len(s[s<0])
mean_trd = round(s.mean(),3)
sd = round(np.std(s),3)
sharpe_r = round((s.mean()/np.std(s))*np.sqrt(n),4)
print 'Trades:',cnt,\
'\nWins:',wins,\
'\nLosses:',losses,\
'\nMean:',mean_trd,\
'\nStd Dev:',sd,\
'\nSharpe Ratio:',sharpe_r
輸入該策略的所有模擬交易回報序列,回傳交易次數、盈利次數、虧損次數、平均收益率、回報標準差、修正后的夏普比率,
為方便對該策略的結果有個直觀的比較,我們取一個新的策略——固定每天以開盤價買入,以收盤價賣出,作為我們的基準線,
>>> id_rtn = ((spy['Close'] - spy['Open'])/spy['Open'])*100
>>> get_stats(id_rtn)
Trades: 3920
Wins: 2121
Losses: 1799
Mean: 0.127
Std Dev: 1.555
Sharpe Ratio: 1.3008
現在進行我們的策略的評估,
對于相似的有正盈利的歷史序列,我們會進行交易,如果發生了虧損的情況我們就洗掉相應歷史序列,
excluded = {}
return_list = []
def get_returns(r):
if excluded.get(r['A']) is None:
return_list.append(r['B Ret'])
if r['B Ret']<0:
excluded.update({r['A']:1})
winf.apply(get_returns,axis=1)
看看我們的策略的評估結果:
>>> get_stats(pd.Series(return_list))
>Trades: 1180
Wins: 753
Losses: 427
Mean: 1.665
Std Dev: 4.913
Sharpe Ratio: 5.3795
可以看到相比基準策略有了更高的夏普比率,
五、完整代碼
import pandas as pd
import matplotlib.pyplot as plt
from fastdtw import fastdtw
from scipy.spatial.distance import euclidean
import numpy as np
import math
def dtw_dist(x, y):
distance, path = fastdtw(x, y, dist=euclidean)
return distance
def get_stats(s,n=252):
cnt = len(s)
wins = len(s[s>0])
losses = len(s[s<0])
mean_trd = round(s.mean(),3)
sd = round(np.std(s),3)
sharpe_r = round((s.mean()/np.std(s))*np.sqrt(n),4)
print 'Trades:',cnt,\
'\nWins:',wins,\
'\nLosses:',losses,\
'\nMean:',mean_trd,\
'\nStd Dev:',sd,\
'\nSharpe Ratio:',sharpe_r
spy = pd.read_csv('300.csv')
spy = spy.set_index(['Date'])
#spy = spy.sort_values(['Date'])
spy_c = spy['Close']
tlen = 5
hold = 7
thre = 5
tseries = []
for i in range(tlen,len(spy_c)-hold-1,tlen):
pctc = spy_c.iloc[i-tlen:i].pct_change()[1:].values*100
res = (spy_c[i+hold+1] - spy_c[i])/spy_c[i] * 100
tseries.append((pctc,res))
dist_pairs = []
for i in range(len(tseries)):
for j in range(len(tseries)):
dist = dtw_dist(tseries[i][0],tseries[j][0])
dist_pairs.append((i,j,dist,tseries[i][1],tseries[j][1]))
dist_frame = pd.DataFrame(dist_pairs,columns=['A','B','Dist','A Ret','B Ret'])
sf = dist_frame[dist_frame['Dist']>0].sort_values(['A','B']).reset_index(drop=1)
sfe = sf[sf['A']+math.ceil(float(tlen+hold)/tlen) <= sf['B']]
winf = sfe[(sfe['Dist']<=thre)&(sfe['A Ret']>0)]
excluded = {}
return_list = []
def get_returns(r):
if excluded.get(r['A']) is None:
return_list.append(r['B Ret'])
if r['B Ret']<0:
excluded.update({r['A']:1})
winf.apply(get_returns,axis=1)
get_stats(pd.Series(return_list))
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/258951.html
標籤:python
