我正在運行一個 python 代碼來計算某些坐標之間的距離。原始資料如下所示:
a = np.array([[1,40,70],[2,41,71],[3,42,73]]) #id, latitude, longitude
我期待獲得每對之間的距離,結果應該是這樣的:
[1, 2, 100(km)]
[1, 3, 200(km)].
[2, 1, 100(km)]
[2, 3, 300(km)]
[3, 1, 200(km)]
[3, 2, 300(km)]
結果應該包含 pair(m,n) 和 pair(n,m)
實際資料有39000列,因此我對代碼效率有很大的要求。目前我正在使用一個非常愚蠢的雙回圈:
line = 0
result = np.zeros((6,3))
for i in a:
for j in a:
dis = getDistance(i[1],i[2],j[1],j[2]) # this is the function i made to calculate distance between two coordinates
result[line] = [i[0],j[0],dis]
line = 1
誰能幫我改進代碼?
uj5u.com熱心網友回復:
如果您的距離函式很簡單,您可以嘗試查找distance_matrix并將其轉換為類似于以下輸出的資料結構np.enumerate(distance_matrix):
def get_dist(X, Y):
return 100*np.hypot(X-X[:,None], Y-Y[:,None])
M = np.array([[1,40,70],[2,41,71],[3,42,73]])
names, X, Y = np.transpose(M)
distance_matrix = get_dist(X, Y)
>>> list(np.enumerate(distance_matrix))
[((0, 0), 0.0),
((0, 1), 141.4213562373095),
((0, 2), 360.5551275463989),
((1, 0), 141.4213562373095),
((1, 1), 0.0),
((1, 2), 223.60679774997897),
((2, 0), 360.5551275463989),
((2, 1), 223.60679774997897),
((2, 2), 0.0)]
請注意,我們需要為索引添加不同的名稱。此外,這些名稱和距離的型別不同,因此我們不能將它們都保存在一個(非結構化)陣列中。您可能希望避免迭代np.ndeumerate并以不同的方式查找名稱:
x,y = np.indices([len(M), len(M)])
>>> names[x].ravel(), names[y].ravel(), distance_matrix.ravel()
(array([1, 1, 1, 2, 2, 2, 3, 3, 3]),
array([1, 2, 3, 1, 2, 3, 1, 2, 3]),
array([ 0. , 141.42135624, 360.55512755, 141.42135624,
0. , 223.60679775, 360.55512755, 223.60679775,
0. ]))
或者:
>>> np.transpose([names[x].ravel(), names[y].ravel(), dist_matrix.ravel()])
array([[ 1. , 1. , 0. ],
[ 1. , 2. , 141.42135624],
[ 1. , 3. , 360.55512755],
[ 2. , 1. , 141.42135624],
[ 2. , 2. , 0. ],
[ 2. , 3. , 223.60679775],
[ 3. , 1. , 360.55512755],
[ 3. , 2. , 223.60679775],
[ 3. , 3. , 0. ]])
uj5u.com熱心網友回復:
首先,您要進行兩次計算。您的代碼:
for i in a:
for j in a:
從計算出的距離i,以j從j到i,即使這些是相同的距離。只需執行以下操作,您就可以將時間減少一半
for i in range(len(a)):
for j in range(i 1, len(a)):
D[i,j] = distance_calc(i,j)
即使您需要兩個方向的距離,我也不會計算兩次,只需在兩個地方分配值。但是如果你可以向量化你的代碼,它可能會加速
for i in range(len(a)):
D[i,i 1:] = distance_calc(i)
為您的問題舉一個例子,任何計算都將以弧度完成,所以我想首先將您的位置轉換為弧度而不是度數。即便如此,我們也可以加快速度。我在這里假設您正在使用Haversine estiamte 來計算球形地球上的距離。我的演算法基于這篇文章中的資訊,該資訊在對您的問題的評論中參考。首先,我將考慮N=1000分數而不是你的全套。
import numpy as np
import time
r_Earth = 6371
N = 1000
def haversine_one_element(data1, data2):
# calculates the Haversine distance one element at a time
lat1 = data1[0]
lng1 = data1[1]
lat2 = data2[0]
lng2 = data2[1]
diff_lat = lat1 - lat2
diff_lng = lng1 - lng2
d = np.sin(diff_lat/2)**2 np.cos(lat1)*np.cos(lat2) * np.sin(diff_lng/2)**2
return 2 * r_Earth * np.arcsin(np.sqrt(d))
def haversine_slow(a, N, do_all = True):
D = np.zeros((N,N))
for i in range(N):
if do_all:
for j in range(N):
D[i,j] = haversine_one_element(a[i,1:], a[j,1:])
else:
for j in range(i 1,N): # note that D[i,i] = 0 so we can skip it
D[i,j] = D[j,i] = haversine_one_element(a[i,1:], a[j,1:])
a = np.array([np.arange(N).ravel(),
np.random.random(N) * 360 - 180,
np.arcsin(np.random.random(N) * 2 - 1) * 180 / np.pi]).transpose()
a[:,1:] = a[:,1:] * np.pi / 180
# Doing the entire array
start = time.time()
haversine_slow(a,N)
print(time.time() - start) # 8.777195453643799
# Doing only the upper half
start = time.time()
haversine_slow(a,N,do_all = False)
print(time.time() - start) # 4.547634840011597
但是,我們可以重寫我們的Haversine 公式以一次接收一系列值,以便我們可以一次計算從一個點到所有其他點的距離。
def haversine(data1, data2):
# data1, data2 are the data arrays with 2 cols and they hold
# lat., lng. values in those cols respectively
lat1 = data1[0]
lng1 = data1[1]
lat2 = data2[0,:]
lng2 = data2[1,:]
diff_lat = lat1 - lat2
diff_lng = lng1 - lng2
d = np.sin(diff_lat/2)**2 np.cos(lat1)*np.cos(lat2) * np.sin(diff_lng/2)**2
return 2 * r_Earth * np.arcsin(np.sqrt(d))
def haversine_d(a,N):
D = np.zeros((N,N))
for i in range(N):
d1 = a[i,1:]
d2 = a[i 1:,1:].transpose()
D[i,i 1:] = haversine(d1, d2)
D[i 1:,i] = D[i, i 1:].transpose()
# only return flattened upper triangular part of matirx as
return(D)
start = time.time()
d1 = haversine_d(a,N)
print(time.time() - start) # 0.03420424461364746
So, for my case here, that has reduced the compute time by another factor of 100, and I think it will be reduced more for longer vectors like what you are talking about. (Subject to memory-bound problems, see below.)
However, there is one more thing that I think we can do, if we look at the problem in terms of linear algebra instead of trigonometry. If you've taken linear algebra the you should know the relationship between dot products and cosines and we can use that information to our advantage by describing the locations as vectors in (x,y,z) instead of longitiude and latitude:
def get_xyz(a):
x = np.cos(a[:,1]) * np.cos(a[:,2])
y = np.cos(a[:,1]) * np.sin(a[:,2])
z = np.sin(a[:,1])
v = np.stack([x, y, z]).transpose()
return(np.hstack([a[:,:1], v]))
def distance_dot(a,N):
b = get_xyz(a)
D = np.zeros((N,N))
for i in range(N):
D[i,i 1:] = np.arccos(b[i,1] * b[i 1:,1] \
b[i,2] * b[i 1:,2] \
b[i,3] * b[i 1:,3]) * r_Earth
D[i 1:,i] = D[i, i 1:].transpose()
return(D)
start = time.time()
distance_dot(a,N) # 0.019799470901489258
print(time.time() - start)
這種代數方法應該在浮點計算的截斷范圍內給出相同的答案;如果您嘗試使用==它比較半正弦和線性代數的測驗將失敗,但如果您執行類似的操作<= 10**-10,它將通過。
在我的 1000 點資料集上,這些方法將計算時間減少了大約 400 倍。對于更大的資料集,我沒有運行前兩個,N=40000但對于后兩個,Haversine 和線性代數測驗大約在同一時間(290 秒)出現,但這可能是因為問題最終需要所有記憶體我的筆記本電腦。如果記憶體不是問題,我會預計問題會大致按比例縮放,N**2因此Haversine 大約為 60 s,線性代數為 30 s。
如果您開始遇到記憶體問題,可能有一種方法可以通過計算陣列塊并隨時保存到磁盤來加速它,但在解決這個問題的時間里,除非您需要生成生產級代碼或一遍又一遍地這樣做,我可能不會打擾。
轉載請註明出處,本文鏈接:https://www.uj5u.com/net/361508.html
上一篇:用多個值替換單個值
