嗨,大家。很抱歉打擾你。
我有這個任務,我有一個存盤在一個串列中的哈希編碼串列,其中包含 30 個位置,值為 0 和 1。總的來說,我有超過 10000 個這樣的 30 大小(0/1)哈希碼,我想找到所有對這種哈希碼的差異低于給定閾值(例如 0、1、5),在這種情況下,這對將被視為“相似”哈希碼。
我已經在python3中使用雙“for回圈”實作了這一點(見下面的代碼),但我覺得它不夠高效,因為這似乎是一個O(N ^ 2),當N = 10000時它確實很慢甚至更大。
我的問題是有沒有更好的方法可以加快尋找相似哈希對的速度?理想情況下,我想在 O(N) 中?
注意效率我的意思是在給定的情況下找到相似的對而不是生成哈希編碼(這僅用于演示)。
我已經深入研究了這個問題,我找到的所有答案都是關于使用某種收集工具來查找相同的對,但在這里我有一個更一般的情況,即在給定閾值的情況下,這些對也可能是相似的。
我提供了生成示例散列編碼的代碼和我正在使用的當前低效程式。我希望你會發現這個問題很有趣,希望一些更好/更聰明/高級的程式員可以幫我解決這個問題。提前致謝。
import random
import numpy as np
# HashCodingSize = 10
# Just use this to test the program
HashCodingSize = 100
# HashCodingSize = 1000
# What can we do when we have the list over 10000, 100000 size ?
# This is where the problem is
# HashCodingSize = 10000
# HashCodingSize = 100000
#Generating "HashCodingSize" of list with each element has size of 30
outputCodingAllPy = []
for seed in range(HashCodingSize):
random.seed(seed)
listLength = 30
numZero = random.randint(1, listLength)
numOne = listLength - numZero
my_list = [0] * numZero [1] * numOne
random.shuffle(my_list)
# print(my_list)
outputCodingAllPy.append(my_list)
#Covert to np array which is better than python3 list I suppose?
outputCodingAll = np.asarray(outputCodingAllPy)
print(outputCodingAll)
print("The N is", len(outputCodingAll))
hashDiffThreshold = 0
#hashDiffThreshold = 1
#hashDiffThreshold = 5
loopRange = range(outputCodingAll.shape[0])
samePairList = []
#This is O(n^2) I suppose, is there better way ?
for i in loopRange:
for j in loopRange:
if j > i:
if (sum(abs(outputCodingAll[i,] - outputCodingAll[j,])) <= hashDiffThreshold):
print("The pair (", str(i), ", ", str(j), ") ")
samePairList.append([i, j])
print("Following pairs are considered the same given the threshold ", hashDiffThreshold)
print(samePairList)
Update2 RAM 問題當串列大小達到 100000 時,當前的速度解決方案仍然存在 RAM 問題(numpy.core._exceptions._ArrayMemoryError: Unable to allocate 74.5 GiB for an array with shape (100000, 100000) and data type int64) . 在這種情況下,任何對速度感興趣但沒有大 RAM 的人都可以考慮并行編程原始方法**
更新當前答案和基準測驗:
我已經簡要測驗了@Raibek 提供的答案,它確實比 for 回圈快得多,并且包含了其他人提供的大部分建議(也非常感謝他們)。現在我的問題已經解決了,對于任何對此問題進一步感興趣的人,您可以在接受的答案中參考@Raibek 或在下面查看我自己的測驗程式:
提示:對于那些在專案上完全沒有時間的人,你需要做的是將函式“bits_to_int”和“find_pairs_by_threshold_fast”帶回家,首先將0/1位轉換為整數,然后使用XOR查找所有小于閾值的對。希望這有助于更快。
from logging import raiseExceptions
import random
import numpy as np
#check elapsed time
import time
# HashCodingSize = 10
# HashCodingSize = 100
HashCodingSize = 1000
# What can we do when we have the list over 10000, 100000 size ?
# HashCodingSize = 10000
# HashCodingSize = 100000
#Generating "HashCodingSize" of list with each element has 30 size
outputCodingAllPy = []
for seed in range(HashCodingSize):
random.seed(seed)
listLength = 30
numZero = random.randint(1, listLength)
numOne = listLength - numZero
my_list = [0] * numZero [1] * numOne
random.shuffle(my_list)
# print(my_list)
outputCodingAllPy.append(my_list)
#Covert to np array which is better than python3 list
#Study how to convert bytes to integers
outputCodingAll = np.asarray(outputCodingAllPy)
print(outputCodingAll)
print("The N is", len(outputCodingAll))
hashDiffThreshold = 0
def myWay():
loopRange = range(outputCodingAll.shape[0])
samePairList = []
#This is O(n!) I suppose, is there better way ?
for i in loopRange:
for j in loopRange:
if j > i:
if (sum(abs(outputCodingAll[i,] - outputCodingAll[j,])) <= hashDiffThreshold):
print("The pair (", str(i), ", ", str(j), ") ")
samePairList.append([i, j])
return(np.array(samePairList))
#Thanks to Raibek
def bits_to_int(bits: np.ndarray) -> np.ndarray:
"""
https://stackoverflow.com/a/59273656/11040577
:param bits:
:return:
"""
assert len(bits.shape) == 2
# number of columns is needed, not bits.size
m, n = bits.shape
# -1 reverses array of powers of 2 of same length as bits
a = 2**np.arange(n)[::-1]
# this matmult is the key line of code
return bits @ a
#Thanks to Raibek
def find_pairs_by_threshold_fast(
coding_all_bits: np.ndarray,
listLength=30,
hashDiffThreshold=0
) -> np.ndarray:
xor_outer_matrix = np.bitwise_xor.outer(coding_all_bits, coding_all_bits)
# counting number of differences
diff_count_matrix = np.bitwise_and(xor_outer_matrix, 1)
for i in range(1, listLength):
diff_count_matrix = np.right_shift(np.bitwise_and(xor_outer_matrix, 2**i), i)
same_pairs = np.transpose(np.where(diff_count_matrix <= hashDiffThreshold))
# filtering out diagonal values
same_pairs = same_pairs[same_pairs[:, 0] != same_pairs[:, 1]]
# filtering out duplicates above diagonal
same_pairs.sort(axis=1)
same_pairs = np.unique(same_pairs, axis=0)
return same_pairs
start = time.time()
outResult1 = myWay()
print("My way")
print("Following pairs are considered the same given the threshold ", hashDiffThreshold)
print(outResult1)
end = time.time()
timeUsedOld = end - start
print(timeUsedOld)
start = time.time()
print('Helper Way updated')
print("Following pairs are considered the same given the threshold ", hashDiffThreshold)
outputCodingAll_bits = bits_to_int(outputCodingAll)
same_pairs_fast = find_pairs_by_threshold_fast(outputCodingAll_bits, 30, hashDiffThreshold)
print(same_pairs_fast)
end = time.time()
timeUsedNew = end - start
print(timeUsedNew)
print(type(outResult1))
print(type(same_pairs_fast))
if ((outResult1 == same_pairs_fast).all()) & (timeUsedNew < timeUsedOld):
print("The two methods have returned the same results, I have been outsmarted !")
print("The faster method used ", timeUsedNew, " while the old method takes ", timeUsedOld)
else:
raiseExceptions("Error, two methods do not return the same results, something must be wrong")
#Thanks to Raibek
#note this suffers from out of memoery problem
# def Helper1Way():
# outer_not_equal = np.not_equal.outer(outputCodingAll, outputCodingAll)
# diff_count_matrix = outer_not_equal.sum((1, 3)) // outputCodingAll.shape[1]
# samePairNumpy = np.transpose(np.where(diff_count_matrix <= hashDiffThreshold))
# # filtering out diagonal values
# samePairNumpy = samePairNumpy[samePairNumpy[:, 0] != samePairNumpy[:, 1]]
# # filtering out duplicates above diagonal
# samePairNumpy.sort(axis=1)
# samePairNumpy = np.unique(samePairNumpy, axis=0)
# return(np.array(samePairNumpy))
# start = time.time()
# outResult2 = Helper1Way()
# print('Helper Way')
# print("Following pairs are considered the same given the threshold ", hashDiffThreshold)
# print(outResult2)
# end = time.time()
# print(end - start)
uj5u.com熱心網友回復:
此版本利用整數的按位運算。將 numpy 二進制表示轉換為整數的方法是從這個答案https://stackoverflow.com/a/59273656/11040577中獲得的。
基準測驗結果表明,新方法比原來的方法快得多:
N = 1000, 0.194 秒 VS 3.332 秒
N = 10000, 17.417 秒 VS 338.628 秒
import random
import numpy as np
from time import perf_counter
def generate_codings(
HashCodingSize=100,
listLength=30
) -> np.ndarray:
# Generating "HashCodingSize" of list with each element has size of 30
outputCodingAllPy = []
for seed in range(HashCodingSize):
random.seed(seed)
numZero = random.randint(1, listLength)
numOne = listLength - numZero
my_list = [0] * numZero [1] * numOne
random.shuffle(my_list)
# print(my_list)
outputCodingAllPy.append(my_list)
# Covert to np array which is better than python3 list I suppose?
outputCodingAll = np.asarray(outputCodingAllPy)
return outputCodingAll
def find_pairs_by_threshold(
coding_all: np.ndarray,
hashDiffThreshold=0
) -> np.ndarray:
loopRange = range(coding_all.shape[0])
samePairList = []
#This is O(n!) I suppose, is there better way ?
for i in loopRange:
for j in loopRange:
if j > i:
if (sum(abs(coding_all[i,] - coding_all[j,])) <= hashDiffThreshold):
# print("The pair (", str(i), ", ", str(j), ") ")
samePairList.append([i, j])
return np.array(samePairList)
def bits_to_int(bits: np.ndarray) -> np.ndarray:
"""
https://stackoverflow.com/a/59273656/11040577
:param bits:
:return:
"""
assert len(bits.shape) == 2
# number of columns is needed, not bits.size
m, n = bits.shape
# -1 reverses array of powers of 2 of same length as bits
a = 2**np.arange(n)[::-1]
# this matmult is the key line of code
return bits @ a
def find_pairs_by_threshold_fast(
coding_all_bits: np.ndarray,
listLength=30,
hashDiffThreshold=0
) -> np.ndarray:
xor_outer_matrix = np.bitwise_xor.outer(coding_all_bits, coding_all_bits)
# counting number of differences
diff_count_matrix = np.bitwise_and(xor_outer_matrix, 1)
for i in range(1, listLength):
diff_count_matrix = np.right_shift(np.bitwise_and(xor_outer_matrix, 2**i), i)
same_pairs = np.transpose(np.where(diff_count_matrix <= hashDiffThreshold))
# filtering out diagonal values
same_pairs = same_pairs[same_pairs[:, 0] != same_pairs[:, 1]]
# filtering out duplicates above diagonal
same_pairs.sort(axis=1)
same_pairs = np.unique(same_pairs, axis=0)
return same_pairs
if __name__ == "__main__":
list_length = 30
hash_diff_threshold = 0
for hash_coding_size in (100, 1000, 10000):
# let's generate samples
output_coding_all = generate_codings(hash_coding_size, list_length)
print("The N is", len(output_coding_all))
# find_pairs_by_threshold bench
start_time = perf_counter()
same_pairs_etalon = find_pairs_by_threshold(output_coding_all, hash_diff_threshold)
end_time = perf_counter()
print(f"find_pairs_by_threshold() took {end_time-start_time} secs...")
print("Following pairs are considered the same given the threshold ", same_pairs_etalon)
# find_pairs_by_threshold_fast bench
# first, we should convert binary representations to int
start_time = perf_counter()
output_coding_all_bits = bits_to_int(output_coding_all)
end_time = perf_counter()
print(f"it took {end_time-start_time} secs to convert numpy array binary to ints...")
start_time = perf_counter()
same_pairs_fast = find_pairs_by_threshold_fast(output_coding_all_bits, list_length, hash_diff_threshold)
end_time = perf_counter()
print(f"find_pairs_by_threshold_fast() took {end_time-start_time} secs...")
# check if the results are the same
print(f"Two lists of pairs found by different methods are identical: {(same_pairs_fast == same_pairs_etalon).all()}")
第一個非常消耗記憶體的版本:
outer_not_equal = np.not_equal.outer(outputCodingAll, outputCodingAll)
diff_count_matrix = outer_not_equal.sum((1, 3)) // outputCodingAll.shape[1]
samePairNumpy = np.transpose(np.where(diff_count_matrix <= hashDiffThreshold))
# filtering out diagonal values
samePairNumpy = samePairNumpy[samePairNumpy[:, 0] != samePairNumpy[:, 1]]
# filtering out duplicates above diagonal
samePairNumpy.sort(axis=1)
samePairNumpy = np.unique(samePairNumpy, axis=0)
解決記憶體不足的更新
這個版本迭代“slice_size”的切片,最后連接所有迭代的結果。
例如,如果“numpy.core._exceptions._ArrayMemoryError”發生在 N=100,000 上,那么您可以使用“slice_size=1000”、“slice_size=10000”或其他切片大小,直到它在當前環境中最適合您。
def find_pairs_by_threshold_fast_v2(
coding_all_bits: np.ndarray,
listLength=30,
hashDiffThreshold=0,
slice_size=None
) -> np.ndarray:
if slice_size is None:
xor_outer_matrix = np.bitwise_xor.outer(coding_all_bits, coding_all_bits)
# counting number of differences
diff_count_matrix = np.bitwise_and(xor_outer_matrix, 1)
for i in range(1, listLength):
diff_count_matrix = np.right_shift(np.bitwise_and(xor_outer_matrix, 2 ** i), i)
same_pairs = np.transpose(np.where(diff_count_matrix <= hashDiffThreshold))
else:
same_pairs_list = []
for slice_starts in range(0, len(coding_all_bits), slice_size):
xor_outer_matrix = np.bitwise_xor.outer(coding_all_bits[slice_starts: slice_starts slice_size], coding_all_bits)
# counting number of differences
diff_count_matrix = np.bitwise_and(xor_outer_matrix, 1)
for i in range(1, listLength):
diff_count_matrix = np.right_shift(np.bitwise_and(xor_outer_matrix, 2**i), i)
same_pairs = np.transpose(np.where(diff_count_matrix <= hashDiffThreshold))
same_pairs[:, 0] = slice_starts
same_pairs_list.append(same_pairs)
same_pairs = np.concatenate(same_pairs_list)
# filtering out diagonal values
same_pairs = same_pairs[same_pairs[:, 0] != same_pairs[:, 1]]
# filtering out duplicates above diagonal
same_pairs.sort(axis=1)
same_pairs = np.unique(same_pairs, axis=0)
return same_pairs
uj5u.com熱心網友回復:
如果您只需要 30 位向量,那么將其表示為 32 位整數中的 30 位會更好。那么兩個“向量”之間的漢明距離就是xor兩個整數的位數。有計算整數中非零位數的有效演算法。這些可以很容易地使用numpy.
所以演算法是:
- 生成
HashCodingSize0 到 (1<<30)-1 之間的隨機整數。那是一條線numpy.random.randint() - 對于每個值與陣列進行異或運算(請參閱 參考資料
numpy.bitwise_xor),計算每個異或輸出值中的位數(向量化位計數演算法之一),并找到位計數小于或等于的索引hashDiffThreshold
這仍然是 O(n^2),但只是 python 中的一個回圈;回圈中的每個操作都在一個長度為 n 的向量上進行numpy呼叫。
uj5u.com熱心網友回復:
只要您listLength在計算機上的整數大小范圍內,我就會改用整數。然后,您可以xor將這些值(使用廣播一次對所有值進行異或)來獲得不同的位數,對這些位求和,然后用于nonzero查找符合哈希差異要求的索引。例如:
import numpy as np
import random
HashCodingSize = 10
listLength = 30
outputCodingAll = np.array([random.choice(range(2**listLength)) for _ in range(HashCodingSize)])
# sample result
# array([995834408, 173548139, 717311089, 87822983, 813938401,
# 363814224, 970707528, 907497995, 337492435, 361696322])
distance = bit_count(outputCodingAll[:, np.newaxis] ^ outputCodingAll)
# sample result
# array([[ 0, 10, 15, 18, 14, 18, 8, 12, 18, 16],
# [10, 0, 13, 14, 16, 24, 14, 14, 16, 18],
# [15, 13, 0, 23, 13, 15, 15, 17, 19, 15],
# [18, 14, 23, 0, 18, 16, 18, 12, 12, 14],
# [14, 16, 13, 18, 0, 16, 12, 14, 14, 14],
# [18, 24, 15, 16, 16, 0, 14, 16, 12, 6],
# [ 8, 14, 15, 18, 12, 14, 0, 12, 18, 14],
# [12, 14, 17, 12, 14, 16, 12, 0, 14, 14],
# [18, 16, 19, 12, 14, 12, 18, 14, 0, 12],
# [16, 18, 15, 14, 14, 6, 14, 14, 12, 0]], dtype=int32)
hashDiffThreshold = 10
samePairList = np.transpose(np.nonzero(distance < hashDiffThreshold))
# sample result
# array([[0, 0],
# [0, 6],
# [1, 1],
# [2, 2],
# [3, 3],
# [4, 4],
# [5, 5],
# [5, 9],
# [6, 0],
# [6, 6],
# [7, 7],
# [8, 8],
# [9, 5],
# [9, 9]], dtype=int64)
請注意,結果重復對(例如 [5, 9] 和 [9, 5]),因為它們都作為第一個和第二個運算元進行了測驗)。它還包括針對自身測驗的每個值(顯然是0)。如果需要,可以輕松過濾掉這些結果。
請注意,如果要將任何值轉換為串列,1并且0可以將數字格式化為長度的二進制字串listLength并將每個字符映射到 int,例如
list(map(int, f'{outputCodingAll[0]:0{listLength}b}'))
# sample output
# [0, 0, 1, 0, 1, 0, 0, 1, 0, 1, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 1, 1, 0, 1, 0, 1, 1]
此代碼使用此答案bit_count中的函式:
def bit_count(arr):
# Make the values type-agnostic (as long as it's integers)
t = arr.dtype.type
mask = t(-1)
s55 = t(0x5555555555555555 & mask) # Add more digits for 128bit support
s33 = t(0x3333333333333333 & mask)
s0F = t(0x0F0F0F0F0F0F0F0F & mask)
s01 = t(0x0101010101010101 & mask)
arr = arr - ((arr >> 1) & s55)
arr = (arr & s33) ((arr >> 2) & s33)
arr = (arr (arr >> 4)) & s0F
return (arr * s01) >> (8 * (arr.itemsize - 1))
轉載請註明出處,本文鏈接:https://www.uj5u.com/qiye/523817.html
