重復資料在資料分析和搜索中會造成錯誤,在我們的實際使用中,我們應該避免重復匯入的資料,重復資料有各種原因會造成,比如我們重復匯入同樣的資料,當我們寫入檔案時使用自動生成的 ID,那么同樣的檔案被匯入兩次,這樣會造成同樣的兩個一樣的檔案會保存于 Elasticsearch 中盡管它們的 ID 會有不同,在我之前的文章 “Beats:如何避免重復的匯入資料”,我詳細描述了如果使用 Beats 匯入資料時,避免重復資料,
避免在 Elasticsearch 索引中重復始終是一件好事, 但是,通過消除重復項,你可以獲得其他好處:節省磁盤空間,提高搜索準確性,提高硬體資源管理效率, 也許最重要的是,你減少了搜索的獲取時間,令人驚訝的是,有關該主題的檔案很少,因此我們提供了本教程,為你提供識別和管理索引中重復項的適當技術,
示例資料
這里有四個簡單的檔案,其中一個是另一個的重復資料, 我們建立一個叫做 employeeid 的索引,
POST employeeid/_bulk
{ "index" : { "_id" : "1" } }
{ "name" : "John", "organisation": "Apple", "employeeID": "23141A"}
{ "index" : { "_id" : "2" } }
{ "name" : "Sam", "organisation": "Tesla", "employeeID": "TE9829"}
{ "index" : { "_id" : "3" } }
{ "name" : "Sarah", "organisation": "Microsoft", "employeeID": "M54667"}
{ "index" : { "_id" : "4" } }
{ "name" : "John", "organisation": "Apple", "employeeID": "23141A"}
從上面的命令中,我們可以看得出來 ID 為 1 和 4 的兩個檔案完全是一樣的,盡管它們的 ID 是不同的,
資料匯入程序中避免重復檔案
在考慮如何在 Elasticsearch 中執行重復檢查之前,讓我們花點時間考慮一下不同型別的索引方案,
一種情況是在索引編制之前我們可以訪問源檔案, 在這種情況下,檢查資料并查找一個或多個包含唯一值的欄位相對容易, 也就是說,該欄位的每個不同值僅出現在一個檔案中, 在這種情況下,我們可以將該特定欄位設定為 Elasticsearch 索引的檔案 ID, 由于任何重復的源檔案也將具有相同的檔案 ID,因此 Elasticsearch 將確保這些重復檔案不會成為索引的一部分,
你可以參考我之前的文章 “Beats:如何避免重復的匯入資料”,
Upsert
另一種情況是一個或多個檔案具有相同的識別符號但內容不同, 當用戶編輯檔案并想使用相同的檔案 ID 重新索引該檔案時,通常會發生這種情況, 問題在于,當用戶嘗試重新索引時,Elasticsearch 不允許這樣做,因為它的檔案 ID 必須是唯一的,
解決方法是使用 Upsert API, Upsert 檢查特定檔案的存在,如果存在,Upsert 將使用 Upsert 的內容更新該檔案, 如果檔案不存在,Upsert 將創建具有相同內容的檔案, 無論哪種方式,用戶都將在相同的檔案 ID 下獲得內容更新,
在第三種情況下,在創建索引之前無法訪問資料集, 在這些情況下,我們將需要搜索索引并檢查重復項, 這就是我們在以下各節中演示的內容,
在寫入資料時,我們可以使用 Upsert 來進行,如果該檔案尚不存在,則將 Upsert 元素的內容作為新檔案插入, 如果檔案存在,則執行更新,比如:
POST test/_update/1
{
"script": {
"source": "ctx._source.counter += params.count",
"lang": "painless",
"params": {
"count": 4
}
},
"upsert": {
"counter": 1
}
}
在上面,如果 ID 為 1 的檔案已經存在沒那么將執行腳本,并把 count 的值加上 4,否則創建一個新的檔案,并把 count 欄位的值設定為 1,又比如:
POST sessions/_update/dh3sgudg8gsrgl
{
"scripted_upsert": true,
"script": {
"id": "my_web_session_summariser",
"params": {
"pageViewEvent": {
"url": "foo.com/bar",
"response": 404,
"time": "2014-01-01 12:32"
}
}
},
"upsert": {}
}
在上面,我們設定 scrpted_upsert 為 true,無論 ID 為 dh3sgudg8gsrgl 已經存在與否,id 為 my_web_session_summariser 的腳本將被執行,并把相應的引數傳入,
我們也可以直接使用 _update 對檔案直接更新,比如:
POST test/_update/1
{
"doc": {
"name": "new_name"
},
"doc_as_upsert": true
}
在上面,如果 ID 為 1 的檔案已經存在,那么它的欄位 name 值將被更新,否則創建一個新的 ID 為 1 的檔案,并設定它的欄位 name 值為 new_name,
在使用 Upsert 命令時,我們需要注意的是:我們必須提供一個 ID,另外它有兩個操作:檢查是否存在,并更新或者寫入,Upsert 的速度會比正常的自動生成 ID 的匯入速度慢,
當我們使用 Logstash 進行匯入時,我們也可以指定 Upsert,
檢查重復項的基本技巧
在上面的每個示例檔案中,我們看到三個欄位:name,organisation 及 employeeID,并且如果我們假設 name 欄位是唯一的,則可以將該欄位指定為檢查重復項的識別符號, 如果多個檔案的 name 欄位具有相同的值,則該檔案確實是重復的,
遵循此基本原理,我們可以執行簡單的術語聚合,以獲取 name 欄位每個值的檔案計數, 但是,這種簡單的聚合只會回傳該欄位每個值下的檔案計數, 這種方法在檢查重復項時沒有用,因為我們要檢查檔案中該欄位的一個或多個值的重復項, 為此,我們還需要應用 top_hits 聚合 - 一個子聚合器,其中每個存盤桶中均會聚合最匹配的檔案,
這是我們建議針對上面給出的示例檔案索引的查詢:
GET employeeid/_search
{
"size": 0,
"aggs": {
"duplicateCount": {
"terms": {
"field": "name.keyword",
"min_doc_count": 2
},
"aggs": {
"duplicateDocuments": {
"top_hits": {}
}
}
}
}
}
在這里,我們定義引數 min_doc_count,通過將此引數設定為 2,只有 doc_count 為 2 或更大的聚合桶將出現在聚合中(如以下結果所示),
上面的命令運行的結果是:
{
"took" : 1,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 4,
"relation" : "eq"
},
"max_score" : null,
"hits" : [ ]
},
"aggregations" : {
"duplicateCount" : {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets" : [
{
"key" : "John",
"doc_count" : 2,
"duplicateDocuments" : {
"hits" : {
"total" : {
"value" : 2,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "employeeid",
"_type" : "_doc",
"_id" : "1",
"_score" : 1.0,
"_source" : {
"name" : "John",
"organisation" : "Apple",
"employeeID" : "23141A"
}
},
{
"_index" : "employeeid",
"_type" : "_doc",
"_id" : "4",
"_score" : 1.0,
"_source" : {
"name" : "John",
"organisation" : "Apple",
"employeeID" : "23141A"
}
}
]
}
}
}
]
}
}
}
請務必注意,我們必須將 min_doc_count 的值設定為2, 否則,其他結果將出現在聚合中,并且我們將找不到可能存在的任何重復項,
對多個欄位中的值進行重復資料洗掉
我們上面所做的是一個非常基本的示例,該示例根據單個欄位中的值來標識重復檔案,這不是很有趣,還是有用的,在大多數情況下,檢查重復項需要檢查多個欄位,我們不能可靠地假設員工檔案之間存在重復項,而這些重復項僅在名稱欄位中包含多次出現的 “Bill” 值,在許多實際情況下,有必要檢查許多不同欄位之間的重復項,考慮到上面的示例資料集,我們需要檢查所有欄位中的重復項,
我們可以從上一節中擴展我們的方法,并執行多欄位術語聚合和 top-hits 聚合,我們可以對索引檔案中的所有三個欄位進行術語聚合,我們將再次指定 min_doc_count 引數,以僅獲取 doc_count 大于或等于 2 的存盤桶,我們還應用 top_hits 聚合以獲取正確的結果,為了容納多個欄位,我們使用腳本來幫助我們追加欄位值以在聚合中顯示:
GET employeeid/_search
{
"size": 0,
"aggs": {
"duplicateCount": {
"terms": {
"script": "doc['name.keyword'].value + doc['employeeID.keyword'].value + doc['organisation.keyword'].value",
"min_doc_count": 2
},
"aggs": {
"duplicateDocuments": {
"top_hits": {}
}
}
}
}
}
如下所示,運行此查詢的結果將顯示一個重復計數聚合,聚合 duplicateDocuments 包含在其中找到重復值的檔案, 我們可以對這些檔案進行交叉檢查和驗證,
{
"took" : 3,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 4,
"relation" : "eq"
},
"max_score" : null,
"hits" : [ ]
},
"aggregations" : {
"duplicateCount" : {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets" : [
{
"key" : "John23141AApple",
"doc_count" : 2,
"duplicateDocuments" : {
"hits" : {
"total" : {
"value" : 2,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "employeeid",
"_type" : "_doc",
"_id" : "1",
"_score" : 1.0,
"_source" : {
"name" : "John",
"organisation" : "Apple",
"employeeID" : "23141A"
}
},
{
"_index" : "employeeid",
"_type" : "_doc",
"_id" : "4",
"_score" : 1.0,
"_source" : {
"name" : "John",
"organisation" : "Apple",
"employeeID" : "23141A"
}
}
]
}
}
}
]
}
}
}
從上面,我們可以看出來有一個重復的檔案,
假如我們把匯入的檔案修改為:
POST employeeid/_bulk
{ "index" : { "_id" : "1" } }
{ "name" : "John", "organisation": "Apple", "employeeID": "23141A"}
{ "index" : { "_id" : "2" } }
{ "name" : "Sam", "organisation": "Tesla", "employeeID": "TE9829"}
{ "index" : { "_id" : "3" } }
{ "name" : "Sarah", "organisation": "Microsoft", "employeeID": "M54667"}
{ "index" : { "_id" : "4" } }
{ "name" : "John", "organisation": "Apple", "employeeID": "23141A"}
{ "index" : { "_id" : "5" } }
{ "name" : "Sarah", "organisation": "Microsoft", "employeeID": "M54667"}
在上面,ID 為 1 和 4 為重復檔案,3 和 5 的檔案為重復檔案,重新運行上面的查詢,我們可以看到:
"aggregations" : {
"duplicateCount" : {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets" : [
{
"key" : "John23141AApple",
"doc_count" : 2,
"duplicateDocuments" : {
"hits" : {
"total" : {
"value" : 2,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "employeeid",
"_type" : "_doc",
"_id" : "1",
"_score" : 1.0,
"_source" : {
"name" : "John",
"organisation" : "Apple",
"employeeID" : "23141A"
}
},
{
"_index" : "employeeid",
"_type" : "_doc",
"_id" : "4",
"_score" : 1.0,
"_source" : {
"name" : "John",
"organisation" : "Apple",
"employeeID" : "23141A"
}
}
]
}
}
},
{
"key" : "SarahM54667Microsoft",
"doc_count" : 2,
"duplicateDocuments" : {
"hits" : {
"total" : {
"value" : 2,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "employeeid",
"_type" : "_doc",
"_id" : "3",
"_score" : 1.0,
"_source" : {
"name" : "Sarah",
"organisation" : "Microsoft",
"employeeID" : "M54667"
}
},
{
"_index" : "employeeid",
"_type" : "_doc",
"_id" : "5",
"_score" : 1.0,
"_source" : {
"name" : "Sarah",
"organisation" : "Microsoft",
"employeeID" : "M54667"
}
}
]
}
}
}
]
}
}
如上所示,重復的檔案都被顯示出來了,
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/273684.html
標籤:其他
