我用 Python 撰寫了以下 NumPy 代碼:
def inbox_(points, polygon):
""" Finding points in a region """
ll = np.amin(polygon, axis=0) # lower limit
ur = np.amax(polygon, axis=0) # upper limit
in_idx = np.all(np.logical_and(ll <= points, points < ur), axis=1) # points in the range [boolean]
return in_idx
def operation_(r, gap, ends_ind):
""" calculation formula which is applied on the points specified by inbox_ function """
r_active = np.take(r, ends_ind) # taking values from "r" based on indices and shape (paired_values) of "ends_ind"
r_sub = np.subtract.reduce(r_active, axis=1) # subtracting each paired "r" determined by "ends_ind" [this line will be used in the final return formula]
r_add = np.add.reduce(r_active, axis=1) # adding each paired "r" determined by "ends_ind" [this line will be used in the final return formula]
paired_cent_dis = np.sum((r_add, gap), axis=0) # distance of the each two paired points
return (np.power(gap, 2) * (np.power(paired_cent_dis, 2) 5 * paired_cent_dis * r_add - 7 * np.power(r_sub, 2))) / (3 * paired_cent_dis) # Formula
def elapses(r, pos, gap, ends_ind, elem_vert, contact_poss):
if len(gap) > 0:
elaps = np.empty([len(elem_vert), ], dtype=object)
operate_ = operation_(r, gap, ends_ind)
#elbav = np.empty([len(elem_vert), ], dtype=object)
#con_num = 0
for i, j in enumerate(elem_vert): # loop for each section (cell or region) of a mesh
in_bool = inbox_(contact_poss, j) # getting boolean array for points within that section
elaps[i] = np.sum(operate_[in_bool]) # performing some calculations on that points and get the sum of them for each section
operate_ = operate_[np.invert(in_bool)] # slicing the arrays by deleting the points on which the calculations were performed to speed-up the code in next loops
contact_poss = contact_poss[np.invert(in_bool)] # as above
#con_num = sum(inbox_(contact_poss, j))
#inba_bool = inbox_(pos, j)
#elbav[i] = 4 * np.pi * np.sum(np.power(r[inba_bool], 3)) / 3
#pos = pos[np.invert(inba_bool)]
#r = r[np.invert(inba_bool)]
return elaps
r = np.load('a.npy')
pos = np.load('b.npy')
gap = np.load('c.npy')
ends_ind = np.load('d.npy')
elem_vert = np.load('e.npy')
contact_poss = np.load('f.npy')
elapses(r, pos, gap, ends_ind, elem_vert, contact_poss)
# a --------r-------> parameter corresponding to each coordinate (point); here radius (23605,) <class 'numpy.ndarray'> <class 'numpy.float64'>
# b -------pos------> coordinates of the points (23605, 3) <class 'numpy.ndarray'> <class 'numpy.ndarray'> <class 'numpy.float64'>
# c -------gap------> if we consider points as spheres by that radii [r], it is maximum length for spheres' over-lap (103832,) <class 'numpy.ndarray'> <class 'numpy.float64'>
# d ----ends_ind----> indices for each over-laped spheres (103832, 2) <class 'numpy.ndarray'> <class 'numpy.ndarray'> <class 'numpy.int64'>
# e ---elem_vert----> vertices of the mesh's sections or cells (2000, 8, 3) <class 'numpy.ndarray'> <class 'numpy.ndarray'> <class 'numpy.ndarray'> <class 'numpy.float64'>
# f --contact_poss--> a coordinate between the paired spheres (103832, 3) <class 'numpy.ndarray'> <class 'numpy.ndarray'> <class 'numpy.float64'>
此代碼將被另一個具有大資料輸入的代碼頻繁呼叫。所以,加速這段代碼是必不可少的。我曾嘗試使用JAX和Numba庫中的jit裝飾器來加速代碼,但我無法正確使用它來使代碼更好。我已經在 Colab 上測驗了代碼(對于回圈數為 20、250 和 2000 的 3 個資料集)的速度,結果是:
11 (ms), 47 (ms), 6.62 (s) (per loop) <-- without the commented code lines in the code
137 (ms), 1.66 (s) , 4 (m) (per loop) <-- with activating the commented code lines in the code
這段代碼的作用是在一個范圍內找到一些坐標,然后對它們進行一些計算。
對于可以顯著加速此代碼的任何答案(我相信它可以),我將不勝感激。此外,對于通過更改(替代)使用的 NumPy 方法和……或數學運算的撰寫方法來加速代碼的任何有經驗的建議,我將不勝感激。
筆記:
- 建議的答案必須可由 python 版本 2 執行(適用于版本 2 和 3 非常好)
- 代碼中注釋的代碼行對于主要目標是不必要的,只是為了進一步評估而撰寫的。任何用建議的答案處理這些行的建議都值得贊賞(不需要)。
測驗資料集:
小資料集:https : //drive.google.com/file/d/1CswjyoqS8ogLmLQa_oNTOj221chDcbK8/view?usp=sharing
medium data set:https : //drive.google.com/file/d/14RJ0Ackx88NzQWloops5FagzuNQYDSrh/ view?usp=sharing
大資料集:https : //drive.google.com/file/d/1dJnXpb3HiAGcRC9PPTwui9joNcij4E_E/view?usp=sharing
uj5u.com熱心網友回復:
首先,可以改進演算法以提高效率。事實上,一個多邊形可以直接分配給每個點。這就像按多邊形對點進行分類。分類完成后,您可以按鍵執行一個/多個縮減,其中鍵是多邊形 ID。
這個新演算法包括:
- 計算多邊形的所有邊界框;
- 按多邊形對點進行分類;
- 按鍵執行縮減(其中鍵是多邊形 ID)。
這種方法比迭代每個多邊形的所有點并過濾屬性陣列(例如operate_和contact_poss)更有效。事實上,過濾是一項昂貴的操作,因為它需要完全讀取目標陣列(可能不適合 CPU 快取)然后寫回。更不用說這個操作需要分配/洗掉一個臨時陣列,如果它不是就地執行,并且該操作無法從大多數 x86/x86-64 平臺上的SIMD 指令實作中受益(因為它需要新的 AVX-512指令系統)。并行化也更困難,因為過濾步驟太快,執行緒無法使用,但步驟需要按順序完成。
關于演算法的實作,Numba可以用來加速整體計算。使用 Numba 的主要好處是大大減少了在當前實作中由 Numpy 創建的昂貴臨時陣列的數量。請注意,您可以為 Numba指定函式型別,以便它可以在定義時編譯函式。斷言可用于使代碼更健壯,并幫助編譯器了解給定維度的大小,從而生成明顯更快的代碼(Numba 的 JIT 編譯器可以展開回圈)。三元運算子可以幫助 JIT 編譯器生成更快的無分支 程式。
請注意,可以使用多個執行緒輕松并行化分類。然而,需要對常量傳播非常小心,因為一些關鍵常量(如作業陣列和斷言的形狀)往往不會傳播到執行緒執行的代碼,而傳播對于優化熱回圈(例如矢量化,展開)。另請注意,在具有多個內核(從 10 毫秒到 0.1 毫秒)的機器上創建多個執行緒可能會很昂貴。因此,僅在大輸入資料上使用并行實作通常更好。
這是最終的實作(同時使用 Python2 和 Python3):
@nb.njit('float64[::1](float64[::1], float64[::1], int64[:,::1])')
def operation_(r, gap, ends_ind):
""" calculation formula which is applied on the points specified by findMatchingPolygons_ function """
nPoints = ends_ind.shape[0]
assert ends_ind.shape[1] == 2
assert gap.size == nPoints
formula = np.empty(nPoints, dtype=np.float64)
for i in range(nPoints):
ind0, ind1 = ends_ind[i]
r0, r1 = r[ind0], r[ind1]
r_sub = r0 - r1
r_add = r0 r1
cur_gap = gap[i]
paired_cent_dis = r_add cur_gap
formula[i] = (cur_gap**2 * (paired_cent_dis**2 5 * paired_cent_dis * r_add - 7 * r_sub**2)) / (3 * paired_cent_dis)
return formula
# Use `parallel=True` for a parallel implementation
@nb.njit('int32[::1](float64[:,::1], float64[:,:,::1])')
def findMatchingPolygons_(points, polygons):
""" Attribute to all point a region """
nPolygons = polygons.shape[0]
nPolygonPoints = polygons.shape[1]
nPoints = points.shape[0]
assert points.shape[1] == 3
assert polygons.shape[2] == 3
# Compute the bounding boxes of all polygons
ll = np.empty((nPolygons, 3), dtype=np.float64)
ur = np.empty((nPolygons, 3), dtype=np.float64)
for i in range(nPolygons):
ll_x, ll_y, ll_z = polygons[i, 0]
ur_x, ur_y, ur_z = polygons[i, 0]
for j in range(1, nPolygonPoints):
x, y, z = polygons[i, j]
ll_x = x if x<ll_x else ll_x
ll_y = y if y<ll_y else ll_y
ll_z = z if z<ll_z else ll_z
ur_x = x if x>ur_x else ur_x
ur_y = y if y>ur_y else ur_y
ur_z = z if z>ur_z else ur_z
ll[i] = ll_x, ll_y, ll_z
ur[i] = ur_x, ur_y, ur_z
# Find for each point its corresponding polygon
pointPolygonId = np.empty(nPoints, dtype=np.int32)
# Use `nb.prange(nPoints)` for a parallel implementation
for i in range(nPoints):
x, y, z = points[i, 0], points[i, 1], points[i, 2]
pointPolygonId[i] = -1
for j in range(polygons.shape[0]):
if ll[j, 0] <= x < ur[j, 0] and ll[j, 1] <= y < ur[j, 1] and ll[j, 2] <= z < ur[j, 2]:
pointPolygonId[i] = j
break
return pointPolygonId
@nb.njit('float64[::1](float64[:,:,::1], float64[:,::1], float64[::1])')
def computeSections_(elem_vert, contact_poss, operate_):
nPolygons = elem_vert.shape[0]
elaps = np.zeros(nPolygons, dtype=np.float64)
pointPolygonId = findMatchingPolygons_(contact_poss, elem_vert)
for i, polygonId in enumerate(pointPolygonId):
if polygonId >= 0:
elaps[polygonId] = operate_[i]
return elaps
def elapses(r, pos, gap, ends_ind, elem_vert, contact_poss):
if len(gap) > 0:
operate_ = operation_(r, gap, ends_ind)
return computeSections_(elem_vert, contact_poss, operate_)
r = np.load('a.npy')
pos = np.load('b.npy')
gap = np.load('c.npy')
ends_ind = np.load('d.npy')
elem_vert = np.load('e.npy')
contact_poss = np.load('f.npy')
elapses(r, pos, gap, ends_ind, elem_vert, contact_poss)
這是在舊的 2 核機器 (i7-3520M) 上的結果:
Small dataset:
- Original version: 5.53 ms
- Proposed version (sequential): 0.22 ms (x25)
- Proposed version (parallel): 0.20 ms (x27)
Medium dataset:
- Original version: 53.40 ms
- Proposed version (sequential): 1.24 ms (x43)
- Proposed version (parallel): 0.62 ms (x86)
Big dataset:
- Original version: 5742 ms
- Proposed version (sequential): 144 ms (x40)
- Proposed version (parallel): 67 ms (x86)
Thus, the proposed implementation is up to 86 times faster than the original one.
轉載請註明出處,本文鏈接:https://www.uj5u.com/qiye/399411.html
標籤:麻木的 表现 python-2.7 麻麻 数字表达式
