我有一個 bash 腳本來激活一個 python 腳本:
#!/bin/bash
#SBATCH -J XXXXX
#SBATCH --ntasks=1
#SBATCH --cpus-per-task=16
python my_python_script.py
python 腳本正在掃描一個非常大的檔案(~480,000,000 行)并創建一個字典,稍后將作為輸出檔案寫入:
with open (huge_file,'r') as hugefile, open (final_file, 'w') as final:
reader= csv.reader (hugefile, delimiter="\t")
writer= csv.writer (final, delimiter="\t")
d={}
for r in reader:
v=r[0] r[1]
if v not in d.keys():
d[v]=[r[5],r[4]]
else:
d[v].append([r[5],r[4]])
for k,v in d.items():
#analyses
nl = [different variables]
writer.writerow(nl)
由于檔案的大小,我想使用 16 個 CPU 來運行,但即使我在我的 bash 腳本中定義了 16 個 CPU,它也只使用了 1 個 CPU。
我閱讀了很多關于子流程的內容,但它似乎不適用于這種情況。我很想聽聽任何建議。
uj5u.com熱心網友回復:
Cores 在這里幫不了你,因為字典操作是微不足道的,而且速度非常快。
您在這里遇到了 I/O 問題,其中讀取和寫入檔案是瓶頸。
如果您使用多處理模塊,您可能會遇到其他問題。構建的字典將彼此獨立,因此您將擁有與其他資料相同的重復鍵。如果必須保留 CSV 資料的排序,可能因為它是時間序列資料,您將不得不合并字典中的陣列,然后將其排序作為一個附加步驟,除非您在合并 16 個字典時考慮到這個問題。這也意味著您將把 CSV 分成 16 個塊并在每個核心上單獨處理它們,以便您可以跟蹤排序。
您是否考慮過將巨大的 CSV 檔案讀入 SQLite 資料庫?這至少可以讓您更好地控制資料的訪問方式,因為在指定排序時,16 個行程可以同時訪問資料。
我真的懷疑這里有什么可以并行化的。即使您使用 multiprocessing 模塊,您也需要在考慮整個字典的同時撰寫整個檔案,這限制了您并行化此任務的方式。
uj5u.com熱心網友回復:
多處理很難應用,因為一切都需要分類到一個中央字典 d 中。幾個行程必須始終知道哪些鍵已經在 dict 中,這使得它變得非常復雜。因此,更簡單的解決方案是嘗試加快處理速度,同時保持在一個程序中。dict 和 list comprehension 似乎是一個很好的前進方向:
# prepare dict keys and empty list entries:
d = {r[0] r[1]: [] for r in reader}
# fill dict
[d[r[0] r[1]].append([r[5], r[4]]) for r in reader]
# d is ready for analysis
uj5u.com熱心網友回復:
這是一個如何使用多個行程的想法(尚未使用大檔案進行測驗,但確信在除錯時它會作業)。
第一步是使用Linuxsplit函式將大檔案分割成段:
bash> split -l 10000000 hugefile segment
這將創建每個具有 10,000,000 行的檔案,名稱將是segmentaa, segmentab, ....(參見 的man頁面split)
現在 Python 程式讀取這些檔案段,為每個檔案段啟動一個行程,然后將結果合并到一個 dict 中:
import multiprocessing as mp
import csv
# define process function working on a file segment
def proc_target(q, filename):
with open(filename, 'r') as file_segment:
reader = csv.reader(file_segment, delimiter="\t")
dd = dict()
def func(r):
key = r[0] r[1]
if key in dd:
dd[key].append([r[5], r[4]])
else:
dd[key] = [r[5], r[4]]
[func(r) for r in reader]
# send result via queue to main process
q.put(dd)
if __name__ == '__main__':
segment_names = ['segmentaa', 'segmentab', 'segmentac']: # maybe there are more file segments ...
processes = dict() # all objects needed are stored in this dict
mp.set_start_method('spawn')
# launch processes
for fn in segment_names :
processes[fn] = dict()
q = mp.Queue()
p = mp.Process(target=proc_target, args=(q, fn))
p.start()
processes[fn]["process"] = p
processes[fn]["queue"] = q
# read results
for fn in segment_names:
processes[fn]["result"] = processes[fn]["queue"].get()
processes[fn]["process"].join()
# consolidate all results
# start with first segment result and merge the others into it
d = processes[segment_names[0]]["result"]
# helper function for fast execution using list comprehension
def consolidate(key, value):
if key in d:
d[key].append(value)
else:
d[key] = value
# merge other results into d
for fn in segment_names[1:]:
[consolidate(key, value) for key, value in processes[fn]["result"].items()]
# d is ready
為了避免 I/O 瓶頸,將段分布在多個磁盤上并讓并行行程并行訪問不同的 I/O 資源可能是明智的。
轉載請註明出處,本文鏈接:https://www.uj5u.com/qukuanlian/362989.html
