問題 - ProcessPoolExecutor 沒有提高速度。tqdm 確認
對 python 有足夠的了解,可以復制和/或撰寫一個有效的程式。每個檔案需要大約 40 秒來加載->過濾->寫入。我有大約 6,800 個檔案要處理,并且想要一個更好的版本,它可以使用我所有的處理能力(6 核),我嘗試撰寫該版本(如下)。所述版本產生,但比我原來的功能稍慢:
from concurrent.futures import ProcessPoolExecutor
from glob import glob
from json import dump
from tqdm import tqdm
from pybufrkit.decoder import Decoder, generate_bufr_message
from pybufrkit.renderer import FlatJsonRenderer
decoder = Decoder()
DIRECTORY = 'C://blah/'
files = glob(DIRECTORY '*.bufr')
PHI_MAX, PHI_MIN, LAMBDA_MAX, LAMBDA_MIN = x,x,x,x #Integers
def load_decode_filter(file):
'''`
Phi and Lambda ranges are ~1 degree so this eviscerates 99.7% of each file
'''
output_message = []
with open(file, 'rb') as ins:
for bufr_message in generate_bufr_message(
decoder,ins.read()):
input_list = FlatJsonRenderer().render(bufr_message)[3][2] #necessary for [mask] to function
mask = [obj for obj in input_list if ((PHI_MAX > obj[
12] > PHI_MIN) & (LAMBDA_MAX > obj[13] > LAMBDA_MIN))]
output_message.extend(mask)
return output_message
def main(files_in):
'''
attempt to intiate all cores in loading and filter bufr files
'''
with ProcessPoolExecutor(max_workers=6) as executor:
with tqdm(range(len(files_in)), desc='files loaded',
position=0) as progress:
futures = []
for file in files_in:
future = executor.submit(load_decode_filter(file), file)
future.add_done_callback(lambda p: progress.update())
futures.append(future)
results = []
for future in futures:
result = future.result()
results.append(result)
with open(DIRECTORY 'bufrout.json', 'w', encoding='utf-8') as f_o:
dump(results, f_o)
if __name__ == '__main__':
main(files)
我希望至少減少每個檔案的處理時間。
Update, Closing:
First of all, I'd like to thank everyone who commented as well as the answerer (I'm too new to upvote). Seems like the only way to meaningfully increase efficiency would be to never decode in the first place and take what I want from in-situ bufr data, this is simply beyond my current ability (it is my first exposure to code of any kind).
I plan to (am currently) running my initial version (f.bufr in, f.bufr_.txt out) as I am able, I'll move processed files to subdirectory after each "run". Silver lining is I've learned enough doing this that I'll be able to make a program to combine all text output into one file. Thanks again.
uj5u.com熱心網友回復:
問:
“問題 - ProcessPoolExecutor 沒有提高速度。由 tqdm 確認”
答:
不,從各方面來說
,
您的主要問題不是ProcessPoolExecutor()-instance 的效率,而是
您的主要問題是選擇性能/效率(幾乎)反模式,其中 Python,Windows O 領域中的 Python 子行程更多/S 會讓您等待大約 75 小時以收集所有結果(如果處理管道確實按照您期望的那樣做,我無法判斷,但猜它不會......原因如下所列)
SUSPECT #1:
最好避免 75 小時產生無意義的輸出:
鑒于記錄的標準 Py3 concurrent.futures.Executor()-instance .submit()-method 的呼叫簽名,您的代碼不符合此規范。
代替傳遞對函式的參考main(),作為呼叫方,首先為 6800 個檔案中的每一個檔案執行完整的純[SERIAL]METOP 作業包處理(這會產生一些昂貴的收集巨大的訊息串列),這然后(與傳遞對函式/就地 lambda-operator 的參考的記錄要求相反)再次以非常巨大的 RAM/CPU/TIME 開銷,將 SER/sent/DES 轉移到Executor-managed 池之一作業行程(我懷疑在收到串列而不是函式(計劃在這樣的遠程行程中執行,通過傳遞給它的引數 - 根據呼叫簽名指定)時,它能夠做任何合理的事情。哎喲...
def main( files_in ):
''' __doc__
attempt to intiate all cores in loading and filter bufr files
'''
with ProcessPoolExecutor( max_workers = 6
) as executor: #---------------------------# eXe CONTEXT-mgr
with tqdm( range( len( files_in ) ),
desc = 'files loaded',
position = 0
) as progress: #---------------------------------------# pro CONTEXT-in-CONTEXT-mgr
futures = []
for file in files_in: #---------------------------------------# LUXURY of top-level iterator, commanding 6800x times a pool of workers
future = executor.submit( load_decode_filter( file ), #---# ??? WHY CALC THE RESULT BEFORE passing it to the .submit() ???
file #---# std PARA
)
future.add_done_callback( lambda p: progress.update() ) # LUXURY of tdqm() for showing 75-hours of work ???
futures.append( future ) #--------------------------------# LUXURY of no performance gain
results = []
for future in futures:
result = future.result()
results.append( result ) #--------------------------------# LUXURY of adverse performance gain
with open( DIRECTORY 'bufrout.json', 'w',
encoding = 'utf-8'
) as f_o: #------------------------------------------------# f_o CONTEXT-mgr
dump( results, f_o )
SUSPECT #2 :
better avoid any & all performance-degrading syntax-constructors,
if performance is the real goal to be achieved :
Avoid any and all sins of typing a kind of low-hanging-fruits SLOC-s, which seem "sexy", but having been paid by immense add-on overhead costs.
Design process-flow such that we may improve End-to-End processing times by latency-masking, where possible ( file-I/O being a classical case ) and avoiding any reducible steps at all ( creation of named-variables (sometimes never used ) is similar sin ).
Given you run inside Windows O/S, your ( tho' hidden ) sub-process-instantiation costs are the highest of all other cases - Windows O/S will be spawning a full top-down copy of the Python interpreter-process, with all data-structures et al, so if that causes your physical RAM to get "over-crowded", the O/S will start ( for the rest of those 75 hours ... ) a nasty war of thrashing Virtual-Memory-managed file-I/O-transfers ( ~ 10.000x bigger latency ) from-RAM-to-disk & from-disk-to-RAM. That will efficiently damage any other CPU-from/to-RAM I/O-operations and we may straight forget any dreams about increasing performance.
From pybufrkit promises, there is one more chance - getting 10% ~ 30% performance boost - if your "filter" is compilable using pybufrkit-templates :
"(...) BUFR Template Compilation
The main purpose of Template Compilation is performance. However since bit operations are the most time consuming part in the overall processing. The performance gain somewhat is limited. Depending on the total number of descriptors to be processed for a message, template compilation provides 10 - 30% performance boost. Read the Docs "
As-was, entropy-reduced code :
def load_decode_filter( file ):
'''`
Phi and Lambda ranges are ~1 degree so this eviscerates 99.7% of each file
'''
output_message = []
with open( file, 'rb' ) as ins: #----------------------------------- # ins CONTEXT-mgr
for idx, \
bufr_message \
in \
enumerate( generate_bufr_message( decoder, # LUXURY of enumerate for no real use
ins.read() # <-------------# ins.
)
):
input_list = FlatJsonRenderer().render( bufr_message )[3][2] # LUXURY of JSON-(re)-)decorations
mask = [ obj for obj in input_list #
if ( ( PHI_MAX > obj[12] > PHI_MIN )
& ( LAMBDA_MAX > obj[13] > LAMBDA_MIN )
)
]
output_message.extend( mask )
return output_message
Performance tips, if neither managed to use the pybufrkit native compiled-templates nor native-scripted CLI tasking of pybufrkit and resort to Win/Py3 flow of processing :
given the anyway paid costs of full top-bottom copies of main-Python interpreter process, your workers shall "know" the list-of-all-files, so this embarrasingly independent file-by-file process will do best to :
gc.collect(); gc.disable()before spawning any pool of workersspawn as few
max_workersworker-processes as CPU-RAM physical memory-I/O-channels are present on your host hardware ( the tasks are memory-bound, not CPU )split, on the
main()-side the list-of-files to process - usingmax_workers-many, balanced-length, non-overlapping tuples of( from_fileIDX, to_fileIDX )executor.submit()a block-processing function-reference, with a single tuple of( from_, to_ )and arrange all the rest inside such block-processing function, including the latency-masked file-I/O storage of results ( possible to later merge, using O/S text/binary-file merging )prefer latency-masking flows, using syntax-sugar(ed) iterators might be nice in school-book examples, but here these are ( un-maskable ) performance killers - collecting a huge-list of
[ obj for obj in ... if ... ]is never to improve stream-alike ( maskable latency ) process-flow, without first collecting such a huge-list, just to next (re)-iterate such a huge-list to file-I/O such list's items one by one onto disk-file. Better iterate/filter/conditionally execute file-I/O-ops in one, single stream-of-steps ( reducing RAM, avoiding add-on overheads & all that with maskable latencies )
For more details you may like to read this and code from this and there directed examples.
轉載請註明出處,本文鏈接:https://www.uj5u.com/qukuanlian/437082.html
標籤:performance parallel-processing low-latency large-data-volumes process-pool
