我需要將高頻時間序列中的資料聚合到低頻時間序列中。例如,我有一個頻率為一分鐘的價格時間序列。對于相同的時間范圍,我有頻率為 1 小時的價格時間序列。對于每個小時,我需要從一分鐘開始計算一些統計資料。
如果兩個時間序列都適合記憶體,我可以加載它們并使用 pandas apply 函式輕松解決我的要求。不幸的是,這對我來說不是現實(而且有些事情告訴我,我不是唯一的人)。所以,為了解決這個問題,我正在嘗試使用 Dask,但是當我嘗試模仿我對 Pandas 所做的事情時,我收到以下錯誤:
系列 getitem 僅支持具有匹配磁區結構的其他系列物件<
遵循完整的代碼和錯誤,但在此之前我的問題。我做錯了什么嗎?,是否可以使用 Dask 實作這一目標?,還有其他選擇嗎?。可能我將不得不將資料集拆分成更小的資料集,這樣我才能使它們適合記憶體并使用 Pandas 進行處理。
例子
首先我用隨機游走生成資料。
import numpy as np
import pandas as pd
import dask.dataframe as dd
# parameters
size = 1000000
price_0 = 1.1300
# GENERATE MINUTE-WISE SIMULATED PRICE TIME SERIES
index = pd.date_range(end='2021-12-19', periods=size, freq='min')
wn = np.random.normal(loc=0, scale=0.0005, size=size) # white noise (normally distributed random data)
price = np.zeros(size).astype(float)
price[0] = price_0 # set first price
# random walk
for i in range(1, len(price)):
price[i] = price[i-1] wn[i-1]
df_min = pd.DataFrame({'date':index, 'bid':price, 'wn':wn})
df_min.set_index('date', inplace=True)
# GENERATE AN HOURLY-WISE TIME SERIES OUT OF THE MINUTE-WISE ONE
df_h = df_min['bid'].resample('1H').ohlc()
df_h['open_date'] = df_h.index
df_h['close_date'] = df_h['open_date'].shift(-1)
df_h.dropna(inplace=True)
其次,我嘗試使用熊貓
def my_func(row):
start_date = pd.to_datetime(row['open_date'])
end_date = pd.to_datetime(row['close_date'])
return df_min['bid'][(df_min.index >=start_date)&(df_min.index < end_date)].mean()
result_pd = df_h.apply(lambda row: my_func(row), axis=1)
result_pd
| 日期 | |
|---|---|
| 2020-01-24 13:00:00 | 1.128743 |
| 2020-01-24 14:00:00 | 1.127739 |
| 2020-01-24 15:00:00 | 1.130548 |
| ... | |
| 2021-12-18 23:00:00 | 0.482139 |
最后,我嘗試使用 Dask
dd_min = dd.from_pandas(df_min, npartitions=10)
dd_h = dd.from_pandas(df_h, npartitions=6)
def my_func_2(row):
start_date = pd.to_datetime(row['open_date'])
end_date = pd.to_datetime(row['close_date'])
return dd_min['bid'][(dd_min.index >=start_date)&(dd_min.index < end_date)].mean()
res = dd_h.map_partitions(lambda df: df. apply(lambda row: my_func_2(row), axis=1,),meta=('x', 'float64'))
result_dd = res.compute()
---------------------------------------------------------------------------
NotImplementedError Traceback (most recent call last)
<timed exec> in <module>
c:\mch_py_38\lib\site-packages\dask\base.py in compute(self, **kwargs)
284 dask.base.compute
285 """
--> 286 (result,) = compute(self, traverse=False, **kwargs)
287 return result
288
c:\mch_py_38\lib\site-packages\dask\base.py in compute(*args, **kwargs)
566 postcomputes.append(x.__dask_postcompute__())
567
--> 568 results = schedule(dsk, keys, **kwargs)
569 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
570
c:\mch_py_38\lib\site-packages\dask\threaded.py in get(dsk, result, cache, num_workers, pool, **kwargs)
77 pool = MultiprocessingPoolExecutor(pool)
78
---> 79 results = get_async(
80 pool.submit,
81 pool._max_workers,
c:\mch_py_38\lib\site-packages\dask\local.py in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
515 _execute_task(task, data) # Re-execute locally
516 else:
--> 517 raise_exception(exc, tb)
518 res, worker_id = loads(res_info)
519 state["cache"][key] = res
c:\mch_py_38\lib\site-packages\dask\local.py in reraise(exc, tb)
323 if exc.__traceback__ is not tb:
324 raise exc.with_traceback(tb)
--> 325 raise exc
326
327
c:\mch_py_38\lib\site-packages\dask\local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
221 try:
222 task, data = loads(task_info)
--> 223 result = _execute_task(task, data)
224 id = get_id()
225 result = dumps((result, id))
c:\mch_py_38\lib\site-packages\dask\core.py in _execute_task(arg, cache, dsk)
119 # temporaries by their reference count and can execute certain
120 # operations in-place.
--> 121 return func(*(_execute_task(a, cache) for a in args))
122 elif not ishashable(arg):
123 return arg
c:\mch_py_38\lib\site-packages\dask\optimization.py in __call__(self, *args)
967 if not len(args) == len(self.inkeys):
968 raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 969 return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
970
971 def __reduce__(self):
c:\mch_py_38\lib\site-packages\dask\core.py in get(dsk, out, cache)
149 for key in toposort(dsk):
150 task = dsk[key]
--> 151 result = _execute_task(task, cache)
152 cache[key] = result
153 result = _execute_task(out, cache)
c:\mch_py_38\lib\site-packages\dask\core.py in _execute_task(arg, cache, dsk)
119 # temporaries by their reference count and can execute certain
120 # operations in-place.
--> 121 return func(*(_execute_task(a, cache) for a in args))
122 elif not ishashable(arg):
123 return arg
c:\mch_py_38\lib\site-packages\dask\utils.py in apply(func, args, kwargs)
33 def apply(func, args, kwargs=None):
34 if kwargs:
---> 35 return func(*args, **kwargs)
36 else:
37 return func(*args)
c:\mch_py_38\lib\site-packages\dask\dataframe\core.py in apply_and_enforce(*args, **kwargs)
5830 func = kwargs.pop("_func")
5831 meta = kwargs.pop("_meta")
-> 5832 df = func(*args, **kwargs)
5833 if is_dataframe_like(df) or is_series_like(df) or is_index_like(df):
5834 if not len(df):
<timed exec> in <lambda>(df)
c:\mch_py_38\lib\site-packages\pandas\core\frame.py in apply(self, func, axis, raw, result_type, args, **kwargs)
8734 kwargs=kwargs,
8735 )
-> 8736 return op.apply()
8737
8738 def applymap(
c:\mch_py_38\lib\site-packages\pandas\core\apply.py in apply(self)
686 return self.apply_raw()
687
--> 688 return self.apply_standard()
689
690 def agg(self):
c:\mch_py_38\lib\site-packages\pandas\core\apply.py in apply_standard(self)
810
811 def apply_standard(self):
--> 812 results, res_index = self.apply_series_generator()
813
814 # wrap results
c:\mch_py_38\lib\site-packages\pandas\core\apply.py in apply_series_generator(self)
826 for i, v in enumerate(series_gen):
827 # ignore SettingWithCopy here in case the user mutates
--> 828 results[i] = self.f(v)
829 if isinstance(results[i], ABCSeries):
830 # If we have a view on v, we need to make a copy because
<timed exec> in <lambda>(row)
<ipython-input-30-8ca073921a34> in my_func_2(row)
5 start_date = pd.to_datetime(row['open_date'])
6 end_date = pd.to_datetime(row['close_date'])
----> 7 return dd_min['bid'][(dd_min.index >=start_date)&(dd_min.index < end_date)].mean()
c:\mch_py_38\lib\site-packages\dask\dataframe\core.py in __getitem__(self, key)
3260 graph = HighLevelGraph.from_collections(name, dsk, dependencies=[self, key])
3261 return Series(graph, name, self._meta, self.divisions)
-> 3262 raise NotImplementedError(
3263 "Series getitem is only supported for other series objects "
3264 "with matching partition structure"
NotImplementedError: Series getitem is only supported for other series objects with matching partition structure
uj5u.com熱心網友回復:
遵循@mdurant 建議并修復@SultanOrazbayev 觀察作業!但我不知道我做錯了什么,解決方案比熊貓本身慢。
import pandas as pd
import dask.dataframe as dd
import numpy as np
# parameters
size = 1000000
price_0 = 1.1300
# GENERATE MINUTE-WISE SIMULATED PRICE TIME SERIES
index = pd.date_range(end='2021-12-19', periods=size, freq='min')
wn = np.random.normal(loc=0, scale=0.0005, size=size) # white noise (normally distributed random data)
price = np.zeros(size).astype(float)
price[0] = price_0 # set first price
# random walk
for i in range(1, len(price)):
price[i] = price[i-1] wn[i-1]
df_min = pd.DataFrame({'date':index, 'bid':price, 'wn':wn})
df_min.set_index('date', inplace=True)
# GENERATE AN HOURLY-WISE TIME SERIES OUT OF THE MINUTE-WISE ONE
df_h = df_min['bid'].resample('1H').ohlc()
df_h['open_date'] = df_h.index
df_h['close_date'] = df_h['open_date'].shift(-1)
df_h.dropna(inplace=True)
dd_min = dd.from_pandas(df_min, npartitions=10)
def my_func_2(row):
start_date = pd.to_datetime(row['open_date'])
end_date = pd.to_datetime(row['close_date'])
return dd_min.loc[((dd_min.index >=start_date)&(dd_min.index < end_date)), 'bid'].mean().compute()
%%time
df_h.dropna(inplace=True)
res = df_h.apply(lambda row: my_func_2(row), axis=1)
掛墻時間:8 分 46 秒
僅限熊貓
%%time
def my_func(row):
start_date = pd.to_datetime(row['open_date'])
end_date = pd.to_datetime(row['close_date'])
return df_min.loc[(df_min.index >=start_date)&(df_min.index < end_date), 'bid'].mean()
result_pd = df_h.apply(lambda row: my_func(row), axis=1)
掛墻時間:2分11秒
這個解決方案超出了我的解決方案空間。
熊貓 pyarrow 解決方案。
我現在已經放棄了 Dask,并決定嘗試結合 Pandas 和 pyarrow 的解決方案,從n行的塊中讀取每小時頻率資料集。從每個塊中,我采用日期范圍限制并將它們用作過濾器,以使用 pyarrow 從分鐘級鑲木地板格式資料集中讀取:
import pandas as pd
import numpy as np
import pyarrow
# parameters
size = 1000000
price_0 = 1.1300
# GENERATE MINUTE-WISE SIMULATED PRICE TIME SERIES
index = pd.date_range(end='2021-12-19', periods=size, freq='min')
wn = np.random.normal(loc=0, scale=0.0005, size=size) # white noise (normally distributed random data)
price = np.zeros(size).astype(float)
price[0] = price_0 # set first price
# random walk
for i in range(1, len(price)):
price[i] = price[i-1] wn[i-1]
df_min = pd.DataFrame({'date':index, 'bid':price, 'wn':wn})
df_min.set_index('date', inplace=True)
# GENERATE AN HOURLY-WISE TIME SERIES OUT OF THE MINUTE-WISE ONE
df_h = df_min['bid'].resample('1H').ohlc()
df_h['open_date'] = df_h.index
df_h['close_date'] = df_h['open_date'].shift(-1)
df_h.dropna(inplace=True)
# SAVE FILES
df_min.to_parquet('minute_dataset.parquet')
df_h.to_csv('hourly_dataset.csv')
def my_func_2(row):
start_date = pd.to_datetime(row['open_date'])
end_date = pd.to_datetime(row['close_date'])
return df_min.loc[(df_min.index >=start_date)&(df_min.index < end_date), 'bid'].mean()
%%time
# READ-CALCULATE LOOP
full_df = list()
for df in pd.read_csv('hourly_dataset.csv', chunksize=10):
date_from = pd.to_datetime(df.iloc[0]['open_date'])
date_to = pd.to_datetime(df.iloc[-1]['close_date'])
filter = [[('date', '>=', date_from), ('date', '<', date_to)]]
df_min = pyarrow.parquet.read_table('minute_dataset.parquet', filters=filter).to_pandas()
min_date_from = df_min.iloc[0].name
min_date_to = df_min.iloc[-1].name
df['mean'] = df.apply(lambda row: my_func_2(row), axis=1)
full_df.append(df)
df_result = pd.concat(full_df)
掛墻時間:1分31秒
這次的性能在解決方案空間內。
df_result

uj5u.com熱心網友回復:
這是對 OP 提供的答案的快速破解,請注意還有很多改進的余地,但這只是一些使用dask. 下面的代碼段在我的機器上運行了 30 秒,而原始代碼段運行了大約 52 秒。改進并不出色,但仍有很大的優化空間......
import pandas as pd
import numpy as np
import pyarrow
import dask
# parameters
size = 1000000
price_0 = 1.1300
# GENERATE MINUTE-WISE SIMULATED PRICE TIME SERIES
index = pd.date_range(end='2021-12-19', periods=size, freq='min')
wn = np.random.normal(loc=0, scale=0.0005, size=size) # white noise (normally distributed random data)
price = np.zeros(size).astype(float)
price[0] = price_0 # set first price
# random walk
for i in range(1, len(price)):
price[i] = price[i-1] wn[i-1]
df_min = pd.DataFrame({'date':index, 'bid':price, 'wn':wn})
df_min.set_index('date', inplace=True)
# GENERATE AN HOURLY-WISE TIME SERIES OUT OF THE MINUTE-WISE ONE
df_h = df_min['bid'].resample('1H').ohlc()
df_h['open_date'] = df_h.index
df_h['close_date'] = df_h['open_date'].shift(-1)
df_h.dropna(inplace=True)
# SAVE FILES
df_min.to_parquet('minute_dataset.parquet')
df_h.to_csv('hourly_dataset.csv')
def my_func_2(row):
start_date = pd.to_datetime(row['open_date'])
end_date = pd.to_datetime(row['close_date'])
return df_min.loc[(df_min.index >=start_date)&(df_min.index < end_date), 'bid'].mean()
@dask.delayed
def delayed_computations(df):
date_from = pd.to_datetime(df.iloc[0]['open_date'])
date_to = pd.to_datetime(df.iloc[-1]['close_date'])
filter = [[('date', '>=', date_from), ('date', '<', date_to)]]
df_min = pyarrow.parquet.read_table('minute_dataset.parquet', filters=filter).to_pandas()
min_date_from = df_min.iloc[0].name
min_date_to = df_min.iloc[-1].name
df['mean'] = df.apply(lambda row: my_func_2(row), axis=1)
return df
# READ-CALCULATE LOOP
full_df = list()
for df in pd.read_csv('hourly_dataset.csv', chunksize=1000):
full_df.append(delayed_computations(df))
full_df = dask.compute(*full_df)
df_result = pd.concat(full_df)
轉載請註明出處,本文鏈接:https://www.uj5u.com/caozuo/389523.html
上一篇:在時間段內僅選取第一個值等于1
