我在np.concatenate有效地并行化方面遇到了一些麻煩。這是一個最小的作業示例。(我知道,在這里我可以總結a和b分開,但我集中于并行的連擊操作,因為這是我需要在我的專案做的事。然后我會做串聯陣列上的進一步操作,如排序。)
無論我在多少內核上運行它,它似乎總是需要相同的時間(~10 秒)。如果有的話,它在更多內核上會更慢。我嘗試nogil=True在裝飾器中使用 cc 釋放 GIL ,但無濟于事。請注意,即使沒有加速,在計算程序中所有內核都明顯在使用中。
有誰能夠幫我?
非常感謝
from numba import prange, njit
import numpy as np
@njit()
def cc():
r = np.random.rand(20)
a = r[r < 0.5]
b = r[r > 0.7]
c = np.concatenate((a, b))
return np.sum(c)
@njit(parallel=True)
def cc_wrap():
n = 10 ** 7
result = np.empty(n)
for i in prange(n):
result[i] = cc()
return result
cc_wrap()
uj5u.com熱心網友回復:
主要問題來自分配器的爭用。
該cc函式創建了許多隱式的小型臨時陣列。例如np.random.rand做到這一點,r < 0.5甚至a = r[condition],更不用說np.concatenate。臨時陣列通常需要使用給定的分配器在堆中分配。標準庫提供的默認分配器不能保證使用多執行緒可以很好地擴展。分配不能完美擴展,因為昂貴的隱式同步分配資料的執行緒之間需要。例如,一個執行緒可以分配一個被另一個執行緒洗掉的陣列。在最壞的情況下,分配器可以序列化分配/洗掉。因為與對分配的資料執行的作業相比,分配資料的成本是巨大的,所以同步的開銷占主導地位,并且整體執行是串行化的。實際上,情況更糟,因為順序時間已經被開銷所支配。
請注意,積極優化的編譯器可能會在堆疊上分配陣列,因為它們不會對函式進行轉義。然而,不幸的是,Numba 并沒有明顯地做到這一點。此外,假設 Numba 執行緒從不洗掉其他執行緒分配的資料(雖然我不完全確定,但可能是這種情況),可以使用每執行緒池調整分配器以很好地擴展。盡管如此,分配的記憶體池需要向通常也不能很好擴展的作業系統(尤其是 Windows AFAIK)請求。
最好的解決方案是避免創建隱式臨時陣列。這可以使用per-worker 本地臨時陣列與磁區演算法相結合。請注意,可以通過將型別指定為 Numba 來提前編譯這些函式。
這是由此產生的實作:
import numba as nb
import numpy as np
import random
@nb.njit('float64(float64[::1])')
def cc(tempBuffer):
assert tempBuffer.size >= 20
# View to the temporary scratch buffer
r = tempBuffer[0:20]
# Generate 20 random numbers without any allocation
for i in range(20):
r[i] = random.random()
j = 0
# Partition the array so to put values smaller than
# a condition in the beginning.
# After the loop, r[0:split] contains the filtered values.
for i in range(r.size):
if r[i] < 0.5:
r[i], r[j] = r[j], r[i]
j = 1
split = j
# Partition the rest of the array.
# After the loop, r[split:j] contains the other filtered values.
for i in range(j, r.size):
if r[i] > 0.7:
r[i], r[j] = r[j], r[i]
j = 1
# Note that extracting contiguous views it cheap as
# it does not create a new temporary array
# a = r[0:split]
# b = r[split:j]
c = r[0:j]
return np.sum(c)
@nb.njit('float64[:]()', parallel=True)
def cc_wrap():
n = 10 ** 7
result = np.empty(n)
# Preallocate some space once for all threads
globalTempBuffer = np.empty((nb.get_num_threads(), 64), dtype=np.float64)
for i in nb.prange(n):
threadId = nb.np.ufunc.parallel._get_thread_id()
threadLocalBuffer = globalTempBuffer[threadId]
result[i] = cc(threadLocalBuffer)
return result
cc_wrap()
請注意,執行緒本地操作有點棘手,通常不需要。在這種情況下,僅使用磁區演算法來減少分配就可以看到明顯的加速。然而,由于臨時陣列的大小和分配的數量非常小,分配的開銷仍然很大。
另請注意r,此代碼中并不嚴格要求陣列,因為可以就地對亂數求和。這可能不符合您對實際用例的需求。這是一個(更簡單的實作):
@nb.njit('float64()')
def cc():
s = 0.0
for i in range(20):
e = random.random()
if e < 0.5 or e > 0.7:
s = e
return s
@nb.njit('float64[:]()', parallel=True)
def cc_wrap():
n = 10 ** 7
result = np.empty(n)
for i in nb.prange(n):
result[i] = cc()
return result
cc_wrap()
以下是我的 6 核機器上的時序:
# Initial (sequential): 8.1 s
# Initial (parallel): 9.0 s
# Array-based (sequential): 2.50 s
# Array-based (parallel): 0.41 s
# In-place (sequential): 1.09 s
# In-place (parallel): 0.19 s
最后,最快的并行版本比原始版本快 47 倍(并且幾乎完美地擴展)。
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/380850.html
下一篇:磁盤盤片旋轉和磁盤臂運動
