我們有一個批處理系統,我們希望將其修改為使用多執行緒。該流程接收一個分隔的檔案,并通過 pandas 對其進行計算。
如果記錄總數超過閾值,我希望將資料框架分割成 N 個塊。然后,每個塊應該被輸送到執行緒池執行器中的一個執行緒,以完成計算,然后在最后,我將等待執行緒同步并將產生的DF串聯成一個。
問題是,我不確定如何像這樣拆分一個 Pandas DF。比方說,將有一個任意數量的執行緒,2個(作為一個例子),如果記錄數超過200000,我想開始拆分
因此,我的想法是,如果有一個執行緒,我將會把它拆分。
因此,這個想法是,如果我發送一個有200001條記錄的檔案,執行緒1將得到100000,執行緒2將得到100001。如果我發送一個有1000000條記錄的檔案,執行緒1將得到500000條,執行緒2將得到500000條。
(如果記錄總數不超過這個閾值,我就在一個單執行緒上執行這個程序)
我看到過相關的解決方案。
我已經看到了相關的解決方案,但沒有一個適用于我的情況。
uj5u.com熱心網友回復:
def do_something(df)。
if len(df) > some_threshold:
pivot = len(df)//2.
threading.Thread(target=do_something,args=(df[:pivot]).start()
return do_something(df[:pivot])
實際上_do_something_with_smallish_df(df)
可能嗎?
uj5u.com熱心網友回復:
下面,我已經包括了如何分割的示例代碼。然后,使用ThreadPoolExecutor,它將用八個執行緒執行代碼,在我的例子中(你也可以使用Thread庫)。process_pandas函式只是一個假函式,你可以使用任何你想要的東西:
import pandas as pd
from concurrent.futures import ThreadPoolExecutor as th
threshold = 300 閾值
block_size=100
num_threads = 8 8
big_list = pd.read_csv('pandas_list.csv',delimiter=';', header=None)
blocks = []
if len(big_list) > threshold:
for i in range((len(big_list)/block_size)) 。
blocks.append(big_list[block_size*i:block_size*(i 1) ])
i=i 1]。
if i*block_size < len(big_list)。
blocks.append(big_list[block_size*i:] )
else:
blocks.append(big_list)
def process_pandas(df)。
print('Doing calculations...' )
indexes = list(df.index.value)
df.loc[indexes[0], 2] = 'change'。
return df
with th(num_threads) as ex:
結果 = ex.map(process_pandas,block)
final_dataframe = pd.concat(results, axis=0)
轉載請註明出處,本文鏈接:https://www.uj5u.com/net/323988.html
標籤:
