我有以下一段代碼,作業正常:
我有以下一段代碼,作業正常:
我有以下一段代碼,作業正常。
import numpy as np
arr1 = np. array([(1, 2, 3, 4], (2, 3, 4, 6),(4, 6。7, 9),(4, 7, 8, 9), (5, 7, 8, 9)] )
arr2 = np. array([(1, 3, 4, 5),(2,3。4, 5), (3, 4, 6, 7) ] )
m = []
for c in range(len(arr1)) 。
s = np.sum(np.any(arr1[c] == arr2[... , None], axis=1, axis=1)
m = np.append(m, np.amax(s, axis=0)
上面得到的預期結果如下(也就是說,對于arr1中的每一行,找到與arr2中任何一行匹配的最大數量,并將這個數字添加到輸出陣列中):
print(m)
[3.3.3.2.1.]
我面臨的問題是,當 arr1 增長到數百萬行時,即使 arr2 只有幾千行,計算仍然變得非常慢。
我曾嘗試使用以下代碼進行比較(還有許多其他嘗試,結果更糟糕),但這并不奏效:
我曾嘗試使用以下代碼進行比較(還有許多其他嘗試,結果更糟糕)。
c = (arr1[:, None, ...] == arr2[None, ...])
上面的代碼產生的結果非常接近我期望在做計數之前看到的第一階段的輸出(如下圖所示),但是元素間的比較沒有正確完成:
print(c)
[[[True False False] [False False] [False False] [False False] [False False 假的 假的]]
[[False True True False] [ True True False] [False False False False]]
[[False False False] [False False False] [False 真真偽假]]
[[False False False] [False False False][False False][False False]。 假的 假的 假的]]
[[False False False] [False False False] [False False False] [False 假的假的假的]]
對于如何優化第一段代碼(例如,它將不使用for回圈),以便在arr1有 /- 2 000 000行,而arr2有 /- 7 000行的情況下作業,有什么建議嗎?
uj5u.com熱心網友回復:
你可以嘗試使用python集:
l1 = [frozenset(i) fori in arr1]
l2 = [frozenset(i) for i in arr2]
[max(len(s1.intersection(s2) ) for s2 in l2) for s1 in l1] 。
輸出:
[3, 3, 3, 2, 1]
但是,說實話,在這里你必須進行2M*7K的比較,所以無論如何都要花時間。
uj5u.com熱心網友回復:
我將在這里直接跳到時間問題上:
np.random.seed(0xFFFF)
arr1 = np.random.randint(10, size=(20000, 5))
arr2 = np.random.randint(10, size=(70, 5))
def original(a1, a2)。
m = []
for c in range(len(a1)):
s = np.sum(np.any(a1[c] == a2[... , None], axis=1, axis=1)
m = np.append(m, np.amax(s, axis=0)
return m
我將輸入的大小從你的目標中截斷了100倍,以便能夠在我弱小的筆記本電腦上得到合理的數字。你可以根據需要進行推斷。
%timeit original(arr1, arr2)
1.11 s ± 19.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
將此與@mozway的解決方案使用集合進行比較:
def using_sets(a1, a2)。
l1 = [frozenset(i) fori in a1]
l2 = [frozenset(i) fori in a2]
return np.array([max(len(s1.intersection(s2) ) for s2 in l2) for s1 in l1】)
534 ms ± 6. 23 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
首先:numpy陣列的分配是不被攤銷的,不像list那樣。這意味著你不應該在回圈中使用np.append(從一個串列開始也沒有用):
def list_append(a1, a2)。
m = []
for c in range(len(a1)):
s = np.sum(np.any(a1[c] == a2[... , None], axis=1, axis=1)
m.append(np.amax(s, axis=0)
return np.array(m)
即使是少量的元素,這也是一個巨大的變化。在實踐中,隨著輸出大小的增加,其好處是將O(n^2)減少到O(n):
%timeit list_append(arr1, arr2)
856 ms ± 28.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
但是等等! 你一開始就知道輸出的大小。預先分配整個事情,并將結果存盤到正確的索引中,這樣做會好很多:
def preallocate(a1, a2)。
m = np.empty(a1.shape[0], int)
for c in range(len(a1))。
s = np.sum(np.any(a1[c] == a2[... , None], axis=1, axis=1)
m[c] = np.amax(s, axis=0)
return m
這次的收益要小得多,因為例子中的陣列并不龐大,而且list一開始就已經攤派了很多append:
%timeit preallocate(arr1, arr2)
842 ms ± 4.28 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
現在要解決的是問題的實質。首先,你可以通過使用np.isin而不是計算巨大的布爾掩碼來減少記憶體占用和速度:
def using_isin(a1, a2)。
m = np.empty(a1.shape[0], int)
for c in range(len(a1))。
s = np.sum(np.isin(a2, a1[c]), axis= 1)
m[c] = np.amax(s, axis=0)
return m
這就開始接近基于集合的方法了:
%timeit using_isin(arr1, arr2)
582 ms ± 1.55 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
如果陣列有排序的行,你可以完全避免線性搜索:
如果陣列有排序的行,你可以完全避免線性搜索。
def using_sorted(a1, a2)。
a1 = np.sort(a1, axis=1)
m = np.empty(a1.shape[0], int)
for c in range(len(a1)):
m[c] = (a1[c, np.clip(np.searchsorted(a1[c], a2), 0, a2.shape[1] - 1) ] == a2) 。 sum(1)。max()
return m
與np.isin中實作的線性搜索相比,二進制搜索的開銷實際上使其變得更糟:
%timeit using_sorted(arr1, arr2)
695 ms ± 2.59 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
當然,最快的解決方案將是使用numba:
def using_numba(a1, a2)。
m = np.empty(a1.shape[0], np.int32)
s = np.empty(a2.shape[0], a1.dtype)
for i in range(a1.shape[0]) 。
for j in range(a2.shape[0]) 。
s[j] = 0
for k in range(a1.shape[1]) 。
s[j] = a1[i, k] in a2[j] 。
m[i] = s.max()
return m
毫不奇怪,這比基于集合的方法要快~7倍:
%timeit using_numba(arr1, arr2)
73 ms ± 1.88 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
作為一個旁觀者,讓我們看一下你想用布爾掩碼做什么。對于arr1的每個元素,你想檢查它是否等于arr2的任何元素(shape: (2M, 5, 7k, 5))。) 如果arr1中的元素出現在arr2的某一行中,你要計算它,所以你將any應用于arr2的行所對應的維度(shape: (2M, 5, 7k))。) 然后你想知道在arr1的某一行中匹配的元素的總數,所以你在這個維度上求和(shape: (2M, 7k))。) 最后,你想要的是在arr2的各行中的最大和(shape: (2M)).
(arr1[..., None, None] == arr2)。 any(axis=3)。 sum(axis=1)。max(axis=1)
注意到你需要沿途制作的臨時陣列的大小。它們中至少有兩個必須同時存在,所以這個方法不太可能對所需的輸入起作用。
轉載請註明出處,本文鏈接:https://www.uj5u.com/caozuo/322422.html
標籤:
下一篇:對串列中的每個陣列進行求和
