該賞金過期5天。回答這個問題有資格獲得 50聲望獎勵。 quantguy想引起更多人對這個問題的關注。
假設我們在 for 回圈中對幾乎相同的基本資料(可變)應用一組(就地)操作。什么是記憶體高效(和執行緒安全)的方式來做到這一點?
請注意,在每次迭代的 for 回圈中不應更改基本資料。
示例代碼:
假設我們在一個data目錄中有一些包含基本資料的 Excel 檔案。此外,我們在some_more_data目錄中有一些附加資料。我想data使用目錄中的檔案對從some_more_data目錄中檢索到的資料應用操作。之后我想將結果列印到一個新的泡菜檔案中。
import copy
import pickle
import pandas as pd
# Excel import function to obtain dictionary of pandas DataFrames.
def read_data(info_dict):
data_dict = dict()
for dname, dpath in info_dict.items():
data_dict[dname] = pd.read_excel(dpath, index_col=0)
return data_dict
# list of data files
data_list = {'price': 'data/price.xlsx', 'eps': 'data/eps.xlsx'}
raw_data = read_data(data_list)
# list of files used for operation (they, for example, have different indices)
some_more_data= {
'some_data_a': 'some_more_data/some_data_a.xlsx',
'some_data_b': 'some_more_data/some_data_b.xlsx'
}
some_more_data = read_data(some_more_data)
# Apply operation to data (explicitly use a for-loop)
for smd_k, smd_v in some_more_data.items():
rdata = copy.deepcopy(raw_data)
rdata['price'] = rdata['price'].reindex(smd_v.index)
rdata['eps'] = rdata['eps'].reindex(columns=smd_v.columns)
with open(f'data/changed_{smd_k}.pkl', 'wb') as handle:
pickle.dump(rdata, handle, protocol=pickle.HIGHEST_PROTOCOL)
我上面示例中的 deepcopy 操作是否執行緒安全(假設我想使用多執行緒)?或者我應該在 for 回圈中重復加載 Excel 中的資料(非常慢)?或者有更好的方法嗎?
謝謝您的幫助。
用于生成示例資料幀并將資料保存在 Excel 檔案中的代碼
注意目錄data 和some_more_data必須先手動創建
import pandas as pd
import numpy as np
price = pd.DataFrame([[-1.332298, 0.396217, 0.574269, -0.679972, -0.470584, 0.234379],
[-0.222567, 0.281202, -0.505856, -1.392477, 0.941539, 0.974867],
[-1.139867, -0.458111, -0.999498, 1.920840, 0.478174, -0.315904],
[-0.189720, -0.542432, -0.471642, 1.506206, -1.506439, 0.301714]],
columns=['IBM', 'MSFT', 'APPL', 'ORCL','FB','TWTR'],
index=pd.date_range('2000', freq='D', periods=4))
eps = pd.DataFrame([[-1.91, 1.63, 0.51, -.32, -0.84, 0.37],
[-0.56, 0.02, 0.56, 1.77, 0.99, 0.97],
[-1.67, -0.41, -0.98, 1.20, 0.74, -0.04],
[-0.80, -0.43, -0.12, 1.06, 1.59, 0.34]],
columns=['IBM', 'MSFT', 'APPL', 'ORCL','FB','TWTR'],
index=pd.date_range('2000', freq='D', periods=4))
some_data_a = pd.DataFrame(np.random.randint(0,100,size=(4, 6)), columns=['IBM', 'MSFT', 'APPL', 'ORCL','FB','TWTR'], index=pd.date_range('2001', freq='D', periods=4))
some_data_b = pd.DataFrame(np.random.randint(0,100,size=(20, 6)), columns=['GM', 'TSLA', 'IBM', 'MSFT', 'APPL', 'ORCL'], index=pd.date_range('2000', freq='D', periods=20))
price.to_excel('data/price.xlsx')
eps.to_excel('data/eps.xlsx')
some_data_a.to_excel('some_more_data/some_data_a.xlsx')
some_data_b.to_excel('some_more_data/some_data_b.xlsx')
uj5u.com熱心網友回復:
一旦你的raw_data字典被創建,我看不到它在哪里被修改(畢竟,這是使用deepcopy它的重點)。因此,雖然深度復制可變物件不是執行緒安全的,但這個特定物件在任何時候都不會發生變化。所以我不明白為什么會有問題。但是,deepcopy如果您沒有信心,您總是可以在鎖的控制下進行。
如果您使用多執行緒執行此操作,那么使用 athreading.Lock可能不會降低性能,因為深度復制操作都是 CPU 并且deepcopy無論如何您都無法實作任何并行性,因為您的執行緒已經為此鎖定了全域解釋器鎖 (GIL)函式(主要是 Python 位元組碼)。這種額外的鎖定只是防止在deepcopy操作程序中放棄你的時間片給另一個可能開始一個執行緒的執行緒deepcopy操作(但同樣,我仍然認為這不是問題)。但是,如果您使用多執行緒,那么并發 I/O 操作會帶來哪些性能提升?根據您使用的是硬碟驅動器還是固態驅動器以及該驅動器的特性,并發甚至可能會損害您的 I/O 性能。Pandas如果他們發布 GIL,您可能會從這些操作中獲得一些性能改進。
多處理確實提供了 CPU 密集型功能的真正并行性,在創建行程和將資料從一個地址空間傳遞到另一個地址空間(即一個行程到另一個行程)方面有自己的開銷。這種在串行處理中沒有的額外開銷必須通過并行計算所實作的節省來補償。從您所展示的內容中不清楚,如果這確實代表了您的實際情況,那么您將從這種并行性中獲得任何好處。但是,當然,您不必擔心執行緒安全性,deepcopy因為一旦每個行程都有該行程的副本raw_data,就會運行一個執行緒,該執行緒擁有自己的記憶體副本,彼此完全隔離。
概括
通常,
deepcopy對于可變物件不是執行緒安全的,但由于您的物件似乎沒有“變異”,因此應該不是問題。但是如果在多執行緒下運行,您可以deepcopy在 a 控制下將操作作為原子操作執行,multithreading.Lock而不會顯著降低性能。如果您正在使用多處理,并假設
raw_data沒有在共享記憶體中實作,那么每個行程將raw_data開始處理自己的副本。因此,即使另一個行程正在“變異”raw_data,只要任何一個行程正在運行單個執行緒,就無需擔心deepcopy.根據我所看到的代碼,目前尚不清楚多執行緒或多處理是否會實作任何性能改進。
基準
這對串行、多執行緒和多處理進行了基準測驗。也許每個字典中只有 2 個鍵,這不是一個現實的例子,但它給出了一個總體思路:
import copy
import pickle
import pandas as pd
import time
from multiprocessing.pool import Pool, ThreadPool
from multiprocessing import cpu_count
# Excel import function to obtain dictionary of pandas DataFrames.
def read_data(info_dict):
data_dict = dict()
for dname, dpath in info_dict.items():
data_dict[dname] = pd.read_excel(dpath, index_col=0)
return data_dict
def serial(raw_data, some_more_data, suffix):
# Apply operation to data (explicitly use a for-loop)
for smd_k, smd_v in some_more_data.items():
rdata = copy.deepcopy(raw_data)
rdata['price'] = rdata['price'].reindex(smd_v.index)
rdata['eps'] = rdata['eps'].reindex(columns=smd_v.columns)
with open(f'data/changed_{smd_k}_{suffix}.pkl', 'wb') as handle:
pickle.dump(rdata, handle, protocol=pickle.HIGHEST_PROTOCOL)
def init_pool(r_d, sfx):
global raw_data, suffix
raw_data = r_d
suffix = sfx
def worker(smd_k, smd_v):
rdata = copy.deepcopy(raw_data)
rdata['price'] = rdata['price'].reindex(smd_v.index)
rdata['eps'] = rdata['eps'].reindex(columns=smd_v.columns)
with open(f'data/changed_{smd_k}_{suffix}.pkl', 'wb') as handle:
pickle.dump(rdata, handle, protocol=pickle.HIGHEST_PROTOCOL)
def benchmark1(raw_data, some_more_data):
start_time = time.time()
serial(raw_data, some_more_data, '1')
elapsed = time.time() - start_time
print('Serial time:', elapsed)
def benchmark2(raw_data, some_more_data):
start_time = time.time()
items = list(some_more_data.items())
pool_size = len(items)
pool = ThreadPool(pool_size, initializer=init_pool, initargs=(raw_data, '2'))
pool.starmap(worker, items)
elapsed = time.time() - start_time
print('Multithreading time:', elapsed)
pool.close()
pool.join()
def benchmark3(raw_data, some_more_data):
start_time = time.time()
items = list(some_more_data.items())
pool_size = min(len(items), cpu_count())
pool = Pool(pool_size, initializer=init_pool, initargs=(raw_data, '3'))
pool.starmap(worker, items)
elapsed = time.time() - start_time
print('Multiprocessing time:', elapsed)
pool.close()
pool.join()
def main():
# list of data files
data_list = {'price': 'data/price.xlsx', 'eps': 'data/eps.xlsx'}
raw_data = read_data(data_list)
# list of files used for operation (they, for example, have different indices)
some_more_data= {
'some_data_a': 'some_more_data/some_data_a.xlsx',
'some_data_b': 'some_more_data/some_data_b.xlsx'
}
some_more_data = read_data(some_more_data)
benchmark1(raw_data, some_more_data)
benchmark2(raw_data, some_more_data)
benchmark3(raw_data, some_more_data)
if __name__ == '__main__':
main()
印刷:
Serial time: 0.002997159957885742
Multithreading time: 0.013999462127685547
Multiprocessing time: 0.7790002822875977
轉載請註明出處,本文鏈接:https://www.uj5u.com/gongcheng/403793.html
標籤:
上一篇:正則運算式匹配/中斷
