我有兩組大的 (x, y) 點,我想在 Python 中將一組的每個點與另一組的“對應點”相關聯。
第二組也可以包含例外值,即額外的噪聲點,正如您在這張圖片中看到的,其中綠點多于紅點:

兩組點之間的關聯不是簡單的翻譯,正如您在這張圖片中看到的那樣:

在這兩個鏈接中,您可以找到紅點和綠點(原點在左上角的影像坐標串列):
https://drive.google.com/file/d/1fptkxEDYbIJ93r_OXJSstDHMfk67DDYo/view?usp=share_link https://drive.google.com/file/d/1Z_ghWIzUZv8sxfawOBoGG3fJz4h_z7Qv/view?usp=share_link
我的問題類似于這兩個:
將一組 x,y 點匹配到另一個已縮放、旋轉、平移且缺少元素的集合
當這些點集包含噪聲時如何對齊兩組點(平移 旋轉)?
但我有很多要點,所以這里提出的解決方案不適用于我的情況。我的點在行中有一定的結構,因此很難計算 Roto-Scale-Translation 函式,因為點的行會相互混淆。
uj5u.com熱心網友回復:
我找到了一種方法,可以使用兩個階段相當準確地恢復哪些點對應于哪些其他點。第一階段校正仿射變換,第二階段校正非線性失真。
注意:我選擇將紅點與綠點匹配,而不是相反。
假設
該方法做出三個假設:
- 它知道三個或更多綠點和匹配的紅點。
- 兩者之間的差異主要是線性的。
- 差異的非線性部分是區域相似的——即如果一個點具有 (-10, 10) 的匹配偏移量,則相鄰點將具有相似的偏移量。這是由 控制的
max_search_dist。
代碼
首先加載兩個資料集:
import json
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import numpy as np
from sklearn.neighbors import NearestNeighbors
from scipy.spatial import KDTree
from collections import Counter
with open('red_points.json', 'rb') as f:
red_points = json.load(f)
red_points = pd.DataFrame(red_points, columns=list('xy'))
with open('green_points.json', 'rb') as f:
green_points = json.load(f)
green_points = pd.DataFrame(green_points, columns=list('xy'))
我發現擁有一個可視化兩個資料集的函式很有用:
def plot_two(green, red):
if isinstance(red, np.ndarray):
red = pd.DataFrame(red, columns=list('xy'))
if isinstance(green, np.ndarray):
green = pd.DataFrame(green, columns=list('xy'))
both = pd.concat([green.assign(hue='green'), red.assign(hue='red')])
ax = both.plot.scatter('x', 'y', c='hue', alpha=0.5, s=0.5)
ax.ticklabel_format(useOffset=False)
接下來,選擇三個綠色點,并提供它們的 XY 坐標。找到紅色對應的點,并提供它們的 XY 坐標。
green_sample = np.array([
[5221, 12460],
[2479, 2497],
[6709, 6303],
])
red_sample = np.array([
[5274, 12597],
[2375, 2563],
[6766, 6406],
])
接下來,使用這些點來找到仿射矩陣。這個仿射矩陣將涵蓋旋轉、平移、縮放和傾斜。因為它有六個未知數,你至少需要六個約束,否則方程是欠定的。這就是為什么我們至少需要早三分。
def add_implicit_ones(matrix):
b = np.ones((matrix.shape[0], 1))
return np.concatenate((matrix,b), axis=1)
def transform_points_affine(points, matrix):
return add_implicit_ones(points) @ matrix
def fit_affine_matrix(red_sample, green_sample):
red_sample = add_implicit_ones(red_sample)
X, _, _, _ = np.linalg.lstsq(red_sample, green_sample, rcond=None)
return X
X = fit_affine_matrix(red_sample, green_sample)
red_points_transformed = transform_points_affine(red_points.values, X)
現在我們進入非線性匹配步驟。這是在紅色的值被轉換以匹配綠色的值之后運行的。這是演算法:
- 從沒有非線性分量的紅點開始。靠近其中一個
green_sample點是一個不錯的選擇 - 仿射步驟將優先考慮讓這些點正確。在該點周圍以半徑搜索相應的綠點。將紅點與相應綠點之間的差異記錄為“漂移”。 - 查看該紅點的所有紅色鄰居。將這些添加到串列中進行處理。
- 在其中一個紅色鄰居處,計算所有相鄰紅色點的平均漂移。將該漂移添加到紅點,并在半徑范圍內搜索綠點。
- 紅點和相應的綠點之間的差異是這個紅點的漂移。
- 將這個點的所有紅色鄰居加入到串列中進行處理,然后回傳步驟 3。
def find_nn_graph(red_points_np):
nbrs = NearestNeighbors(n_neighbors=8, algorithm='ball_tree').fit(red_points_np)
_, indicies = nbrs.kneighbors(red_points_np)
return indicies
def point_search(red_points_np, green_points_np, starting_point, max_search_radius):
starting_point_idx = (((red_points_np - starting_point)**2).mean(axis=1)).argmin()
green_tree = KDTree(green_points_np)
dirty = Counter()
visited = set()
indicies = find_nn_graph(red_points_np)
# Mark starting point as dirty
dirty[starting_point_idx] = 1
match = {}
drift = np.zeros(red_points_np.shape)
# NaN = unknown drift
drift[:] = np.nan
while len(dirty) > 0:
point_idx, num_neighbors = dirty.most_common(1)[0]
neighbors = indicies[point_idx]
if point_idx != starting_point_idx:
neighbor_drift_all = drift[neighbors]
if np.isnan(neighbor_drift_all).all():
# All neighbors have no drift
# Unmark as dirty and come back to this one
del dirty[point_idx]
continue
neighbor_drift = np.nanmean(neighbor_drift_all, axis=0)
assert not np.isnan(neighbor_drift).any(), "No neighbor drift found"
else:
neighbor_drift = np.array([0, 0])
# Find the point in the green set
red_point = red_points_np[point_idx]
green_points_idx = green_tree.query_ball_point(red_point neighbor_drift, r=max_search_radius)
assert len(green_points_idx) != 0, f"No green point found near {red_point}"
assert len(green_points_idx) == 1, f"Too many green points found near {red_point}"
green_point = green_points_np[green_points_idx[0]]
real_drift = green_point - red_point
match[point_idx] = green_points_idx[0]
# Save drift
drift[point_idx] = real_drift
# Mark unvisited neighbors as dirty
if point_idx not in visited:
neighbors = indicies[point_idx, 1:]
neighbors = [n for n in neighbors if n not in visited]
dirty.update(neighbors)
# Remove this point from dirty
del dirty[point_idx]
# Mark this point as visited
visited.add(point_idx)
# Check that there are no duplicates
assert len(set(match.values())) == len(match)
# Check that every point in red_points_np was matched
assert len(match) == red_points_np.shape[0]
return match, drift
# This point is assumed to have a drift of zero
# Pick one of the points which was used for the linear correction
starting_point = green_sample[0]
# Maximum distance that a point can be found from where it is expected
max_search_radius = 10
green_points_np = green_points.values
match, drift = point_search(red_points_transformed, green_points_np, starting_point, max_search_radius)
接下來,這是一個可以用來審核匹配質量的工具。這顯示了前一千場比賽。在它下面是一個箭袋圖,其中箭頭從紅點指向匹配的綠點的方向。(注意:箭頭不按比例。)
red_idx, green_idx = zip(*match.items())
def show_match_subset(start_idx, length):
end_idx = start_idx length
plot_two(green_points_np[np.array(green_idx)][start_idx:end_idx], red_points_np[np.array(red_idx)][start_idx:end_idx])
plt.show()
red_xy = red_points_np[np.array(red_idx)][start_idx:end_idx]
red_drift_direction = drift[np.array(red_idx)][start_idx:end_idx]
plt.quiver(red_xy[:, 0], red_xy[:, 1], red_drift_direction[:, 0], red_drift_direction[:, 1])
show_subset(0, 1000)
情節:

匹配
這是我找到的匹配項的副本。JSON格式,key代表紅點檔案中點的索引,value代表綠點檔案中點的索引。https://pastebin.com/SBezpstu
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/537628.html
下一篇:SQL連接多列或單個計算列
