我正在撰寫一個程式,在該程式中,我試圖查看給定的紅移如何在光譜中檢測到一組線以匹配原子線資料庫。紅移越接近線重疊,“分數”越低,紅移正確的機會就越高。
我通過回圈一系列可能的紅移來做到這一點,計算每個紅移的分數。在該外回圈中,我在檢測到的行集中的每一行內回圈以計算其 sub_score,并對內回圈求和以獲得總分。
我試圖用 numpy 對內回圈進行矢量化,但令人驚訝的是它實際上減慢了執行速度。在給出的示例中,嵌套的 for 回圈在我的筆記本電腦上執行大約需要 2.6 秒,而內部帶有 numpy 的單個 for 回圈需要大約 5.3 秒。
為什么矢量化內回圈會減慢速度?有沒有更好的方法來做到這一點,我錯過了?
import numpy as np
import time
def find_nearest_line(lines, energies):
# Return the indices (from the lines vector) that are closest to the energies vector supplied
# Vectorized with help from https://stackoverflow.com/a/53111759
energies = np.expand_dims(energies, axis=-1)
idx = np.abs(lines / energies - 1).argmin(axis=-1)
return idx
def calculate_distance_to_line(lines, energies):
# Returns distance between an array of lines and an array of energies)
z = (lines / energies) - 1
return z
rng = np.random.default_rng(2021)
det_lines = rng.random(1000)
atom_lines = rng.random(10000)
redshifts = np.linspace(-0.1, 0.1, 100)
# loop version
start=time.time()
scores = []
for redshift in redshifts:
atom_lines_shifted = atom_lines * (1 redshift)
score = 0
for det_line in det_lines:
closest_atom_line = find_nearest_line(atom_lines_shifted, det_line)
score = abs(calculate_distance_to_line(atom_lines_shifted[closest_atom_line], det_line))
scores.append(score)
print(time.time()-start)
print(scores)
# (semi)-vectorized version
start=time.time()
scores = []
for redshift in redshifts:
atom_lines_shifted = atom_lines * (1 redshift)
closest_atom_lines = find_nearest_line(atom_lines_shifted, det_lines)
score = np.sum(np.abs(calculate_distance_to_line(atom_lines_shifted[closest_atom_lines], det_lines)))
scores.append(score)
print(time.time()-start)
print(scores)
uj5u.com熱心網友回復:
Numpy 代碼通常會創建許多臨時陣列。find_nearest_line例如,您的函式就是這種情況。det_lines同時處理所有專案會導致創建許多相對較大的陣列(1000 * 10_000 * 8 = 76 MiB每個陣列)。問題是大陣列通常不適合 CPU 快取。如果是這樣,陣列需要以低得多的吞吐量和高得多的延遲存盤在 RAM 中。此外,分配/釋放更大的陣列需要更多的時間并且經常導致更多的頁面錯誤(由于大多數默認標準分配器的實際實作)。有時使用大陣列會更快,因為 CPython 解釋器的開銷很大,但兩種策略在實踐中都效率低下。
問題是演算法效率不高。實際上,您可以對陣列進行排序并使用二分搜索來更有效地找到最接近的值。np.searchsorted完成大部分作業,但它只回傳大于(或等于)目標值的最接近值的索引。因此,需要進行一些額外的操作來獲得最接近的值(可能大于或小于目標值)。請注意,由于二進制搜索,該演算法不會生成巨大的陣列。
scores = []
n = atom_lines.size
m = det_lines.size
line_idx = np.arange(m)
for redshift in redshifts:
atom_lines_shifted = atom_lines * (1 redshift)
sorted_atom_lines_shifted = np.sort(atom_lines_shifted)
close_idx = np.searchsorted(sorted_atom_lines_shifted, det_lines)
lower_bound = sorted_atom_lines_shifted[np.maximum(close_idx - 1, 0)]
upper_bound = sorted_atom_lines_shifted[np.minimum(close_idx, n - 1)]
bounds = np.hstack((lower_bound[:, None], upper_bound[:, None]))
closest_bound_idx = find_nearest_line(bounds, det_lines)
close_values = bounds[line_idx, closest_bound_idx]
score = np.sum(np.abs(calculate_distance_to_line(close_values, det_lines)))
scores.append(score)
由于atom_lines沒有被修改并且乘法保留了順序,因此可以通過atom_lines直接排序來進一步優化演算法:
scores = []
n = atom_lines.size
m = det_lines.size
line_idx = np.arange(m)
sorted_atom_lines = np.sort(atom_lines)
for redshift in redshifts:
sorted_atom_lines_shifted = sorted_atom_lines * (1 redshift)
close_idx = np.searchsorted(sorted_atom_lines_shifted, det_lines)
lower_bound = sorted_atom_lines_shifted[np.maximum(close_idx - 1, 0)]
upper_bound = sorted_atom_lines_shifted[np.minimum(close_idx, n - 1)]
bounds = np.hstack((lower_bound[:, None], upper_bound[:, None]))
closest_bound_idx = find_nearest_line(bounds, det_lines)
close_values = bounds[line_idx, closest_bound_idx]
score = np.sum(np.abs(calculate_distance_to_line(close_values, det_lines)))
scores.append(score)
最后一個實作在我的機器上快了大約300 倍。
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/400872.html
下一篇:按第二個索引對元組進行排序
