問題的高級視圖
我有 X 源,其中包含有關我們環境中資產(主機名、IP、MAC、作業系統等)的資訊。源包含 1500 到 150k 條目(至少是我現在使用的條目)。我的腳本應該查詢它們中的每一個,收集資料,通過合并來自不同來源的相同資產的資訊對其進行重復資料洗掉,并回傳所有條目的統一串列。我當前的實作確實有效,但對于更大的資料集來說速度很慢。我很好奇是否有更好的方法來完成我正在嘗試做的事情。
通用問題
通過合并相似條目來進行重復資料洗掉,但需要注意的是,合并兩個資產可能會改變生成的資產是否與合并前與前兩個資產相似的第三個資產相似。
示例:
~相似性, 合并
(之前)A~B~C
(之后)(A B)~C或(A B)!~C
我試圖尋找有同樣問題的人,我只找到了在 Python 中洗掉串列中重復可變物件的優雅方法是什么?,但它不包括對我來說至關重要的資料合并。
使用的類
為便于閱讀和理解而進行了簡化,洗掉了不需要的部分 - 一般功能完好無損。
class Entry:
def __init__(self, source: List[str], mac: List[str] = [], ip: List[str] = [], hostname: List[str] = [], os: OS = OS.UNKNOWN, details: dict = {}):
# SO: Sorting and sanitization removed for simplicity
self.source = source
self.mac = mac
self.ip = ip
self.hostname = hostname
self.os = os
self.details = details
def __eq__(self, other):
if isinstance(other, Entry):
return (self.source == other.source and
self.os == other.os and
self.hostname == other.hostname and
self.mac == other.mac and
self.ip == other.ip)
return NotImplemented
def is_similar(self, other) -> bool:
def same_entry(l1: list, l2: list) -> bool:
return not set(l1).isdisjoint(l2)
if isinstance(other, Entry):
if self.os == OS.UNKNOWN or other.os == OS.UNKNOWN or self.os == other.os:
empty_hostnames = self.hostname == [] or other.hostname == []
empty_macs = self.mac == [] or other.mac == []
return (same_entry(self.hostname, other.hostname) or
(empty_hostnames and same_entry(self.mac, other.mac)) or
(empty_hostnames and empty_macs and same_entry(self.ip, other.ip)))
return False
def merge(self, other: 'Entry'):
self.source = _merge_lists(self.source, other.source)
self.hostname = _merge_lists(self.hostname, other.hostname)
self.mac = _merge_lists(self.mac, other.mac)
self.ip = _merge_lists(self.ip, other.ip)
self.os = self.os if self.os != OS.UNKNOWN else other.os
self.details = _merge_dicts(self.details, other.details)
def representation(self) -> str:
# Might be useful if anyone wishes to run the code
return f'<Entry from {self.source}: hostname={self.hostname}, MAC={self.mac}, IP={self.ip}, OS={self.os.value}, details={self.details}>'
def _merge_lists(l1: list, l2: list):
return list(set(l1) | set(l2))
def _merge_dicts(d1: dict, d2: dict):
"""
Merge two dicts without overwriting any data.
"""
# If either is empty, return the other one
if not d1:
return d2
if not d2:
return d1
if d1 == d2:
return d1
result = d1
for k, v in d2.items():
if k in result:
result[k '_'] = v
else:
result[k] = v
return result
class OS(Enum):
'''
Enum specifying the operating system of the asset.
'''
UNKNOWN = 'Unknown'
WINDOWS = 'Windows'
LINUX = 'Linux'
MACOS = 'MacOS'
演算法
每個演算法從不同來源獲取一個條目串列,例如:
entries = [[entries from source A], [entries from source B], ..., [entries from source Z]]
Main deduplication function
It's the main function used in each algorithm. It takes list of entries from 2 different sources and combines that into list containing assets with information merged if needed.
It's probably the part I need help the most. It's the only way I could think of. Because of that, I focused on how to run this function multiple times faster, but making this one faster would be the best in terms of reducing runtime.
def deduplicate(en1: List[Entry], en2: List[Entry]) -> List[Entry]:
"""
Deduplicates entries from provided lists by merging similar entries.
Entries in the lists are supposed to be already deduplicated.
"""
# If either is empty, return the other one
if not en1:
return en2
if not en2:
return en1
result = []
# Iterate over longer and check for similar in shorter
if len(en2) > len(en1):
en1, en2 = en2, en1
for e in en1:
# walrus operator in Python 3.8 or newer
while (similar := next((y for y in en2 if y.is_similar(e)), None)) is not None:
e.merge(similar)
en2.remove(similar)
del similar
result.append(e)
result.extend(en2)
return result
A reason why normal deduplication (eg. using sets) isn't applicable here is because of merging one entry with another new entries might become similar, eg.:
In [2]: e1 = Entry(['SRC_A'], [], ['1.1.1.1'], [], OS.UNKNOWN)
In [3]: e2 = Entry(['SRC_A'], ['aa:bb:cc:dd:ee:ff'], ['1.1.1.1'], [], OS.UNKNOWN)
In [4]: e3 = Entry(['SRC_A'], ['aa:bb:cc:dd:ee:ff'], [], [], OS.UNKNOWN)
In [5]: e1.is_similar(e2)
Out[5]: True
In [6]: e1.is_similar(e3) # at first it's not similar
Out[6]: False
In [7]: e1.merge(e2)
In [8]: e1.is_similar(e3) # but after merging it is
Out[8]: True
1st approach - sequential
My first idea was the simplest one, just simple recursion.
def dedup_multiple(lists: List[List[Entry]]) -> List[Entry]:
"""Deduplication helper allowing for providing more than 2 sources."""
if len(lists) == 1:
return lists[0]
return deduplicate(lists[0], dedup_multiple(lists[1:]))
2nd approach - multithreading using Pool
That's the approach I'm using at the moment. So far it's the fastest one and fairly simple.
def async_dedup(lists: List[List[Entry]]) -> List[Entry]:
"""Asynchronous deduplication helper allowing for providing more than 2 sources."""
with mp.Pool() as pool:
while len(lists) > 1:
if len(lists) % 2 == 1:
lists.append([])
data = [(lists[i], lists[i 1]) for i in range(0, len(lists), 2)]
lists = pool.map_async(_internal_deduplication, data).get()
return lists[0]
def _internal_deduplication(en):
return deduplicate(*en)
But I realized really fast that if one task takes much longer than the rest (for example because deduplicating the biggest source), everything else wait instead of working.
3rd approach - multithreading using Queue and Process
As I was trying to speed up 2nd approach I came across How to use python multiprocessing pool in continuous loop and Filling a queue and managing multiprocessing in python, and I came up with the following solution.
def async_dedup2(lists: List[List[Entry]]) -> List[Entry]:
tasks_number = min(os.cpu_count(), len(lists) // 2)
args = lists[:tasks_number]
with mp.Manager() as manager:
queue = manager.Queue()
for l in lists[tasks_number:]:
queue.put(l)
processes = []
for arg in args:
proc = mp.Process(target=test, args=(queue, arg, ))
proc.start()
processes.append(proc)
for proc in processes:
proc.join()
return queue.get()
def test(queue: mp.Queue, arg: List[Entry]):
while not queue.empty():
try:
arg2: List[Entry] = queue.get()
except Empty:
continue
arg = deduplicate(arg, arg2)
queue.put(arg)
I thought it would be the best solution as there wouldn't be a moment when a data isn't processed if possible, but after testing it was almost always slightly slower than 2nd approach.
Runtime comparison
Source A 1510
Source B 1509
Source C 5000
Source D 4460
Source E 5000
Source F 2084
Deduplicating.....
SYNC - Execution time: 188.6127771000 - Count: 13540
ASYNC - Execution time: 68.249583 - Count: 13532
ASYNC2 - Execution time: 69.416046 - Count: 13532
Source A 1510
Source B 1509
Source C 11821
Source D 13871
Source E 5001
Source F 2333
Deduplicating.....
ASYNC - Execution time: 424.405793 - Count: 26229
ASYNC2 - Execution time: 522.697551 - Count: 26405
uj5u.com熱心網友回復:
總結:我們從條目到“草圖”集合定義了兩個草圖函式 f 和 g,使得兩個條目 e 和 e' 相似當且僅當 f(e) ∩ g(e') ≠ ?。然后我們可以有效地識別合并(見最后的演算法)。
我實際上將定義四個草圖函式 f os、 f addr、 g os和 g addr,我們從中構造
- f(e) = {(x, y) | x ∈ f os (e), y ∈ f addr (e)}
- g(e) = {(x, y) | x ∈ g os (e), y ∈ g addr (e)}。
f os和go os是四個中較簡單的一個。f os (e) 包括
- (1, e.
os), 如果 e.os是已知的 - (2,),如果 e。
os是已知的 - (3,),如果 e.
os未知。
g os (e) 包括
- (1, e.
os), 如果 e.os是已知的 - (2,),如果 e。
os未知 - (3,)。
f addr和 g addr比較復雜,因為有優先屬性,它們可以有多個值。盡管如此,同樣的技巧可以發揮作用。f地址(e) 包括
- (1,
h) 對于he 中的每個。hostname - (2,
m) 對于me 中的每個。mac,如果 e。hostname非空 - (3,
m) 對于me 中的每個。mac,如果 e。hostname是空的 - (4,
i) 對于ie 中的每個。ip,如果 e。hostname和 e。mac非空 - (5,
i) 對于ie 中的每個。ip,如果 e。hostname是空的,e。mac非空 - (6,
i) 對于ie 中的每個。ip,如果 e。hostname非空且 e。mac是空的 - (7,
i) 對于ie 中的每個。ip,如果 e。hostname和 e。mac是空的。
g地址(e) 包括
- (1,
h) 對于he 中的每個。hostname - (2,
m) 對于me 中的每個。mac,如果 e。hostname是空的 - (3,
m) 對于me 中的每個。mac - (4,
i) 對于ie 中的每個。ip,如果 e。hostname是空的,e。mac是空的 - (5,
i) 對于ie 中的每個。ip,如果 e。mac是空的 - (6,
i) 對于ie 中的每個。ip,如果 e。hostname是空的 - (7,
i) 對于ie 中的每個。ip.
演算法的其余部分如下。
將
defaultdict(list)草圖映射到條目識別符號串列。對于每個條目,對于每個條目的 f-sketches,將條目的識別符號添加到
defaultdict.初始化一個
set邊。對于每個條目,對于每個條目的 g-sketches,在 中查找 g-sketch
defaultdict并添加從條目識別符號到串列中每個其他識別符號的邊。
Now that we have a set of edges, we run into the problem that @btilly noted. My first instinct as a computer scientist is to find connected components, but of course, merging two entries may cause some incident edges to disappear. Instead you can use the edges as candidates for merging, and repeat until the algorithm above returns no edges.
import collections
import itertools
Entry = collections.namedtuple("Entry", ("os", "hostname", "mac", "ip"))
UNKNOWN = "UNKNOWN"
WINDOWS = "WINDOWS"
LINUX = "LINUX"
def f_os(e):
if e.os != UNKNOWN:
yield (1, e.os)
if e.os != UNKNOWN:
yield (2,)
if e.os == UNKNOWN:
yield (3,)
def g_os(e):
if e.os != UNKNOWN:
yield (1, e.os)
if e.os == UNKNOWN:
yield (2,)
yield (3,)
def f_addr(e):
for h in e.hostname:
yield (1, h)
if e.hostname:
for m in e.mac:
yield (2, m)
if not e.hostname:
for m in e.mac:
yield (3, m)
if e.hostname and e.mac:
for i in e.ip:
yield (4, i)
if not e.hostname and e.mac:
for i in e.ip:
yield (5, i)
if e.hostname and not e.mac:
for i in e.ip:
yield (6, i)
if not e.hostname and not e.mac:
for i in e.ip:
yield (7, i)
def g_addr(e):
for h in e.hostname:
yield (1, h)
if not e.hostname:
for m in e.mac:
yield (2, m)
for m in e.mac:
yield (3, m)
if not e.hostname and not e.mac:
for i in e.ip:
yield (4, i)
if not e.mac:
for i in e.ip:
yield (5, i)
if not e.hostname:
for i in e.ip:
yield (6, i)
for i in e.ip:
yield (7, i)
def f(e):
return set(itertools.product(f_os(e), f_addr(e)))
def g(e):
return set(itertools.product(g_os(e), g_addr(e)))
def is_similar(e, e_prime):
return not f(e).isdisjoint(g(e_prime))
# Begin testing code for is_similar
def original_is_similar(e, e_prime):
if e.os != UNKNOWN and e_prime.os != UNKNOWN and e.os != e_prime.os:
return False
if e.hostname and e_prime.hostname:
return not set(e.hostname).isdisjoint(set(e_prime.hostname))
if e.mac and e_prime.mac:
return not set(e.mac).isdisjoint(set(e_prime.mac))
return not set(e.ip).isdisjoint(set(e_prime.ip))
import random
def random_os():
return random.choice([UNKNOWN, WINDOWS, LINUX])
def random_names(prefix):
return [
"{}{}".format(prefix, random.randrange(10)) for n in range(random.randrange(3))
]
def random_entry():
return Entry(random_os(), random_names("H"), random_names("M"), random_names("I"))
def test_is_similar():
print("Testing is_similar()")
for rep in range(100000):
e = random_entry()
e_prime = random_entry()
got = is_similar(e, e_prime)
expected = original_is_similar(e, e_prime)
if got != expected:
print(e)
print(e_prime)
print("got", got)
print("expected", expected)
break
if __name__ == "__main__":
test_is_similar()
# End testing code
def find_edges(entries):
entries = list(entries)
posting_lists = collections.defaultdict(list)
for i, e in enumerate(entries):
for sketch in f(e):
posting_lists[sketch].append(i)
edges = set()
for i, e in enumerate(entries):
for sketch in g(e):
for j in posting_lists[sketch]:
if i < j:
edges.add((i, j))
return edges
# Begin testing code for find_edges
def test_find_edges():
print("Testing find_edges()")
entries = [random_entry() for i in range(1000)]
got = find_edges(entries)
expected = {
(i, j)
for (i, e) in enumerate(entries)
for (j, e_prime) in enumerate(entries)
if i < j and is_similar(e, e_prime)
}
print(len(expected))
assert got == expected
if __name__ == "__main__":
test_find_edges()
find_edges([random_entry() for i in range(10000)])
# End testing code for find_edges
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/335996.html
標籤:python multithreading algorithm merge duplicates
下一篇:秋招開獎了!!
