我創建了一個函式 enc()
def enc():
password = bytes('asd123','utf-8')
salt = bytes('asd123','utf-8')
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=10000,
backend=default_backend())
key = base64.urlsafe_b64encode(kdf.derive(password))
f = Fernet(key)
for file in files:
with open(file,'rb') as original_file:
original = original_file.read()
encrypted = f.encrypt(original)
with open (file,'wb') as encrypted_file:
encrypted_file.write(encrypted)
它遍歷檔案中的每個檔案并對其進行加密。
files = ['D:/folder/asd.txt',
'D:/folder/qwe.mp4',
'D:/folder/qwe.jpg']
我想使用多執行緒或多處理來使其更快。是否可以?需要一些代碼幫助。
我試過多執行緒
thread = threading.Thread(target=enc)
thread.start()
thread.join()
但它似乎并沒有提高速度或時間。我需要一些幫助來實作多處理。謝謝。
uj5u.com熱心網友回復:
執行緒不是 CPU 密集型任務的最佳候選,除非任務正在執行,例如,由釋放全域解釋器鎖的 C 語言庫例程執行。無論如何,除非您并行運行多個行程,否則您肯定會通過多執行緒或多處理獲得任何性能提升。
假設您有 N 個任務和 M 個處理器來處理這些任務。如果任務是沒有 I/O 的純 CPU(不完全是你的情況),那么啟動多于 M 個行程來處理你的 N 個任務并沒有優勢,為此,多處理池是理想的情況。當 CPU 和 I/O 混合使用時,池大小大于 M可能是有利的,如果 I/O 很多而 CPU 很少,則池大小甚至可能大到 N。但在那種情況下,實際使用多執行緒池和多處理池(大小為 M)的組合會更好,其中多執行緒池用于所有 I/O 作業,多處理池用于 CPU 計算。以下代碼顯示了該技術:
from multiprocessing.pool import Pool, ThreadPool
from multiprocessing import cpu_count
from functools import partial
def encrypt(key, b):
f = Fernet(key)
return f.encrypt(b)
def enc(key, process_pool, file):
with open(file,'rb') as original_file:
original = original_file.read()
encrypted = process_pool.apply(encrypt, args=(key, original,))
with open (file,'wb') as encrypted_file:
encrypted_file.write(encrypted)
def main():
password = bytes('asd123','utf-8')
salt = bytes('asd123','utf-8')
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=10000,
backend=default_backend())
key = base64.urlsafe_b64encode(kdf.derive(password))
files = ['D:/folder/asd.txt',
'D:/folder/qwe.mp4',
'D:/folder/qwe.jpg']
# Too many threads may be counter productive due to disk contention
# Should MAX_THREADS be unlimited?
# For a solid-state drive with no physical arm movement,
# an extremely large value, e.g. 500, probably would not hurt.
# For "regular" drives, one needs to experiment
MAX_THREADS = 500 # Essentially no limit
# compute number of processes in our pool
# the lesser of number of files to process and the number of cores we have:
pool_size = min(MAX_THREADS, cpu_count(), len(files))
# create process pool:
process_pool = Pool(pool_size)
# create thread pool:
thread_pool = ThreadPool(len(files))
worker = partial(enc, key, process_pool)
thread_pool.map(worker, files)
if __name__ == '__main__':
main()
評論
無論如何,重點是:假設您有 30 個檔案和 4 個內核,而不是 3 個檔案。@anarchy 發布的解決方案將啟動 30 個行程并計算f30 次,但實際上只能有效地利用 4 個處理器進行并行計算f和加密。我的解決方案將使用 30 個執行緒進行 I/O,但只啟動 4 個行程,因此f只計算4 次。您可以節省創建 26 個行程和 26 個f無用的計算。
除非您有固態驅動器,否則執行緒數少于 30 甚至可能更好,因為您的所有執行緒都在與同一個驅動器競爭,并且 (1) 每個檔案可能位于驅動器上完全不同的位置并執行并發 I/ O 針對此類檔案可能會適得其反,并且 (2) 任何特定驅動器都可以實作一些最大吞吐量。
所以也許我們應該有:
thread_pool = ThreadPool(min(len(files), MAX_THREADS))
whereMAX_THREADS設定為適合您特定驅動器的某個最大值。
更新
現在key只進行一次昂貴的計算。
使用 TKinter 運行 OP 的新問題
Actually you have two problems. Not only are multiple windows being opened, but you are probably also getting a pickle error trying to call the multiprocessing worker function encrypt because such functions must be defined at global scope and not be nested within another function as you have done.
On platforms that use method spawn to create new processes, such as Windows, to create and initialize each processes in the pool that is created with your process_pool = Pool(pool_size) statement, a new, empty address space is created and a new Python interpreter is launched that re-reads and re-executes the source program in order to initialize the address space before ultimately calling the worker function test. That means that every statement at global scope, i.e. import statements, variable declarations, function declarations, etc., are executed for this purpose. However, in the new subprocess variable __name__ will not be '__main__' so any statements within an if __name__ == '__main__' : block at global scope will not be executed. By the way, that is why for Windows platforms code at global scope that ultimately results in creating new processes is placed within such a block. Failure to do so would result in an infinite recursive process-creation loop if it were to go otherwise undetected. But you placed such a check on __name__ within a nested function where it serves no purpose.
But realizing that all statements at global scope will be executed as part of the initialization of every process in a multiprocessing pool, ideally you should only have at global scope those statements that are required for the initialization of those processes or at least "harmless" statements, i.e. statements whose presence are not overly costly to be executing or have no unpleasant side-effects. Harmful statements should also be placed within an if __name__ == '__main__' : block or moved to within a function.
It should be clear now that the statements you have that create the main window are "harmful" statements that you do not want executed by each newly created process. The tail end of your code should be as follows (I have also incorporated a MAX_THREADS constant to limit the maximum number of threads that will be created although here it is set arbitrarily large -- you should experiment with much smaller values such as 3, 5, 10, 20, etc. to see what gives you the best throughput):
def passerrorbox():
tk.messagebox.showerror('Password Error','Enter a Password')
fipasswordbox.delete(0,'end')
fisaltbox.delete(0,'end')
filistbox.delete(0,'end')
# Changes start here:
# Get rid of all nesting of functions:
def encrypt(key, a):
f = Fernet(key)
return f.encrypt(a)
def enc(key, process_pool, file):
# File Encryption
with open(file,'rb') as original_file:
original = original_file.read()
encrypted = process_pool.apply(encrypt, args=(key, original,))
with open (file,'wb') as encrypted_file:
encrypted_file.write(encrypted)
def encfile(): # was previously named main
password = bytes(fipasswordbox.get(), 'utf-8')
salt = bytes(fisaltbox.get(),'utf-8')
fileln = filistbox.get(0,'end')
if len(fileln) == 0:
fierrorbox()
elif len(password) == 0:
passerrorbox()
else:
file_enc_button['state']='disabled'
browsefi['state']='disabled'
fipasswordbox['state']='disabled'
fisaltbox['state']='disabled'
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=10000,
backend=default_backend())
key = base64.urlsafe_b64encode(kdf.derive(password))
# Too many threads may be counter productive due to disk contention
# Should MAX_THREADS be unlimited?
# For a solid-state drive with no physical arm movement,
# an extremely large value, e.g. 500, probably would not hurt.
# For "regular" drives, one needs to experiment
MAX_THREADS = 500 # Essentially no limit
pool_size = min(MAX_THREADS, cpu_count(), len(fileln))
process_pool = Pool(pool_size)
thread_pool = ThreadPool(min(MAX_THREADS, len(fileln)))
worker = partial(enc, key, process_pool)
thread_pool.map(worker, fileln)
fiencdone()
if __name__ == '__main__':
root = tk.Tk()
fileframe()
root.mainloop()
uj5u.com熱心網友回復:
你需要重新設計你的函式。
Python 不夠聰明,無法知道您需要多處理代碼的哪一部分。
很可能是 for 回圈正確,您希望并行加密檔案。所以你可以嘗試這樣的事情。
定義每個回圈需要運行的函式,然后在外面創建for回圈。然后像這樣使用多處理。
import multiprocessing
password = bytes('asd123','utf-8')
salt = bytes('asd123','utf-8')
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=10000,
backend=default_backend())
key = base64.urlsafe_b64encode(kdf.derive(password))
f = Fernet(key)
def enc(file):
with open(file,'rb') as original_file:
original = original_file.read()
encrypted = f.encrypt(original)
with open (file,'wb') as encrypted_file:
encrypted_file.write(encrypted)
if __name__ == '__main__':
jobs = []
for file in files:
p = multiprocessing.Process(target=enc, args=(file,))
jobs.append(p)
p.start()
轉載請註明出處,本文鏈接:https://www.uj5u.com/gongcheng/354212.html
標籤:Python 多线程 加密 python-多处理
