主頁 > 軟體工程 > 基于Kafka和Elasticsearch構建實時站內搜索功能的實踐

基于Kafka和Elasticsearch構建實時站內搜索功能的實踐

2023-03-21 11:59:54 軟體工程

作者:京東物流 紀卓志

目前我們在構建一個多租戶多產品類網站,為了讓用戶更好的找到他們所需要的產品,我們需要構建站內搜索功能,并且它應該是實時更新的,本文將會討論構建這一功能的核心基礎設施,以及支持此搜索能力的技術堆疊,

問題的定義與決策

為了構建一個快速、實時的搜索引擎,我們必須做出某些設計決策,我們使用 MySQL 作為主資料庫存盤,因此有以下選擇:

  1. 直接在 MySQL 資料庫中查詢用戶在搜索框中輸入的每個關鍵詞,就像%#{word1}%#{word2}%...這樣, ??
  2. 使用一個高效的搜索資料庫,如 Elasticsearch,??

考慮到我們是一個多租戶應用程式,同時被搜索的物體可能需要大量的關聯操作(如果我們使用的是 MySQL 一類的關系型資料庫),因為不同型別的產品有不同的資料結構,所以我們還可以能需要同時遍歷多個資料表來查詢用戶輸入的關鍵詞,所以我們決定不使用直接在 MySQL 中查詢關鍵詞的方案,??

因此,我們必須決定一種高效、可靠的方式,將資料實時地從 MySQL 遷移到 Elasticsearch 中,接下來需要做出如下的決定:

  1. 使用 Worker 定期查詢 MySQL 資料庫,并將所有變化的資料發送到 Elasticsearch,??
  2. 在應用程式中使用 Elasticsearch 客戶端,將資料同時寫入到 MySQL 和 Elasticsearch 中,??
  3. 使用基于事件的流引擎,將 MySQL 資料庫中的資料更改作為事件,發送到流處理服務器上,經過處理后將其轉發到 Elasticsearch,??

選項 1 并不是實時的,所以可以直接排除,而且即使我們縮短輪詢間隔,也會造成全表掃描給資料庫造成查詢壓力,除了不是實時的之外,選項 1 無法支持對資料的洗掉操作,如果對資料進行了洗掉,那么我們需要額外的表記錄之前存在過的資料,這樣才能保證用戶不會搜索到已經洗掉了的臟資料,對于其他兩種選擇,不同的應用場景做出的決定可能會有所不同,在我們的場景中,如果選擇選項 2,那么我們可以預見一些問題:如過 Elasticsearch 建立網路連接并確認更新時速度很慢,那么這可能會降低我們應用程式的速度;或者在寫入 Elasticsearch 時發生了未知例外,我們該如何對這一操作進行重試來保證資料完整性;不可否認開發團隊中不是所有開發人員都能了解所有的功能,如果有開發人員在開發新的與產品有關的業務邏輯時沒有引入 Elasticsearch 客戶端,那么我們將在 Elasticsearch 中更新這次資料的更改,無法保證 MySQL 與 Elasticsearch 間的資料一致性,

接下來我們該考慮如何將 MySQL 資料庫中的資料更改作為事件,發送到流處理服務器上,我們可以在資料庫變更后,在應用程式中使用訊息管道的客戶端同步地將事件發送到訊息管道,但是這并沒有解決上面提到的使用 Elasticsearch 客戶端帶來的問題,只不過是將風險從 Elasticsearch 轉移到了訊息管道,最終我們決定通過采集 MySQL Binlog,將 MySQL Binlog 作為事件發送到訊息管道中的方式來實作基于事件的流引擎,關于 binlog 的內容可以點擊鏈接,在這里不再贅述,

服務簡介

為了對外提供統一的搜索介面,我們首先需要定義用于搜索的資料結構,對于大部分的搜索系統而言,對用戶展示的搜索結果通常包括為標題內容,這部分內容我們稱之可搜索內容(Searchable Content),在多租戶系統中我們還需要在搜索結果中標示出該搜索結果屬于哪個租戶,或用來過濾當前租戶下可搜索的內容,我們還需要額外的資訊來幫助用戶篩選自己想要搜索的產品類別,我們將這部分通用的但不用來進行搜索的內容稱為元資料(Metadata),最后,在我們展示搜索結果時可能希望根據不同型別的產品提供不同的展示效果,我們需要在搜索結果中回傳這些個性化展示所需要的原始內容(Raw Content),到此為止我們可以定義出了存盤到 Elasticsearch 中的通用資料結構:

{
	"searchable": {
		"title": "string",
		"content": "string"
	},
	"metadata": {
		"tenant_id": "long",
		"type": "long",
		"created_at": "date",
		"created_by": "string",
		"updated_at": "date",
		"updated_by": "string"
	},
	"raw": {}
}

基礎設施

Apache Kafka:Apache Kafka 是開源的分布式事件流平臺,我們使用 Apache kafka 作為資料庫事件(插入、修改和洗掉)的持久化存盤,

mysql-binlog-connector-java:我們使用mysql-binlog-connector-java從 MySQL Binlog 中獲取資料庫事件,并將它發送到 Apache Kafka 中,我們將單獨啟動一個服務來完成這個程序,

在接收端我們也將單獨啟動一個服務來消費 Kafka 中的事件,并對資料進行處理然后發送到 Elasticsearch 中,

Q:為什么不使用Elasticsearch connector之類的連接器對資料進行處理并發送到Elasticsearch中?
A:在我們的系統中是不允許將大文本存入到MySQL中的,所以我們使用了額外的物件存盤服務來存放我們的產品檔案,所以我們無法直接使用連接器將資料發送到Elasticsearch中,
Q:為什么不在發送到Kafka前就將資料進行處理?
A:這樣會有大量的資料被持久化到Kafka中,占用Kafka的磁盤空間,而這部分資料實際上也被存盤到了Elasticsearch,
Q:為什么要用單獨的服務來采集binlog,而不是使用Filebeat之類的agent?
A:當然可以直接在MySQL資料庫中安裝agent來直接采集binlog并發送到Kafka中,但是在部分情況下開發者使用的是云服務商或其他基礎設施部門提供的MySQL服務器,這種情況下我們無法直接進入服務器安裝agent,所以使用更加通用的、無侵入性的C/S結構來消費MySQL的binlog,

配置技術堆疊

我們使用 docker 和 docker-compose 來配置和部署服務,為了簡單起見,MySQL 直接使用了 root 作為用戶名和密碼,Kafka 和 Elasticsearch 使用的是單節點集群,且沒有設定任何鑒權方式,僅供開發環境使用,請勿直接用于生產環境,

version: "3"
services:
  mysql:
    image: mysql:5.7
    container_name: mysql
    environment:
      MYSQL_ROOT_PASSWORD: root
      MYSQL_DATABASE: app
    ports:
      - 3306:3306
    volumes:
      - mysql:/var/lib/mysql
  zookeeper:
    image: bitnami/zookeeper:3.6.2
    container_name: zookeeper
    ports:
      - 2181:2181
    volumes:
      - zookeeper:/bitnami
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: bitnami/kafka:2.7.0
    container_name: kafka
    ports:
      - 9092:9092
    volumes:
      - kafka:/bitnami
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
    depends_on:
      - zookeeper
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.11.0
    container_name: elasticsearch
    environment:
      - discovery.type=single-node
    volumes:
      - elasticsearch:/usr/share/elasticsearch/data
    ports:
      - 9200:9200
volumes:
  mysql:
    driver: local
  zookeeper:
    driver: local
  kafka:
    driver: local
  elasticsearch:
    driver: local

在服務啟動成功后我們需要為 Elasticsearch 創建索引,在這里我們直接使用 curl 呼叫 Elasticsearch 的 RESTful API,也可以使用 busybox 基礎鏡像創建服務來完成這個步驟,

# Elasticsearch
curl "http://localhost:9200/search" -XPUT -d '
{
  "mappings": {
    "properties": {
      "searchable": {
        "type": "nested",
        "properties": {
          "title": {
            "type": "text"
          },
          "content": {
            "type": "text"
          }
        }
      },
      "metadata": {
        "type": "nested",
        "properties": {
          "tenant_id": {
            "type": "long"
          },
          "type": {
            "type": "integer"
          },
          "created_at": {
            "type": "date"
          },
          "created_by": {
            "type": "keyword"
          },
          "updated_at": {
            "type": "date"
          },
          "updated_by": {
            "type": "keyword"
          }
        }
      },
      "raw": {
        "type": "nested"
      }
    }
  }
}'

核心代碼實作(SpringBoot + Kotlin)

Binlog 采集端:

    override fun run() {
        client.serverId = properties.serverId
        val eventDeserializer = EventDeserializer()
        eventDeserializer.setCompatibilityMode(
            EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG
        )
        client.setEventDeserializer(eventDeserializer)
        client.registerEventListener {
            val header = it.getHeader<EventHeader>()
            val data = https://www.cnblogs.com/Jcloud/p/it.getData()
            if (header.eventType == EventType.TABLE_MAP) {
                tableRepository.updateTable(Table.of(data as TableMapEventData))
            } else if (EventType.isRowMutation(header.eventType)) {
                val events = when {
                    EventType.isWrite(header.eventType) -> mapper.map(data as WriteRowsEventData)
                    EventType.isUpdate(header.eventType) -> mapper.map(data as UpdateRowsEventData)
                    EventType.isDelete(header.eventType) -> mapper.map(data as DeleteRowsEventData)
                    else -> emptyList()
                }
                logger.info("Mutation events: {}", events)
                for (event in events) {
                    kafkaTemplate.send("binlog", objectMapper.writeValueAsString(event))
                }
            }
        }
        client.connect()
    }

在這段代碼里面,我們首先是對 binlog 客戶端進行了初始化,隨后開始監聽 binlog 事件,binlog 事件型別有很多,大部分都是我們不需要關心的事件,我們只需要關注 TABLE_MAP 和 WRITE/UPDATE/DELETE 就可以,當我們接收到 TABLE_MAP 事件,我們會對記憶體中的資料庫表結構進行更新,在后續的 WRITE/UPDATE/DELETE 事件中,我們會使用記憶體快取的資料庫結構進行映射,整個程序大概如下所示:

Table: ["id", "title", "content",...]
Row: [1, "Foo", "Bar",...]
=>
{
	"id": 1,
	"title": "Foo",
	"content": "Bar"
}

隨后我們將收集到的事件發送到 Kafka 中,并由 Event Processor 進行消費處理,

事件處理器

@Component
class KafkaBinlogTopicListener(
    val binlogEventHandler: BinlogEventHandler
) {

    companion object {
        private val logger = LoggerFactory.getLogger(KafkaBinlogTopicListener::class.java)
    }

    private val objectMapper = jacksonObjectMapper()

    @KafkaListener(topics = ["binlog"])
    fun process(message: String) {
        val binlogEvent = objectMapper.readValue<BinlogEvent>(message)
        logger.info("Consume binlog event: {}", binlogEvent)
        binlogEventHandler.handle(binlogEvent)
    }
}

首先使用SpringBoot Message Kafka提供的注解對事件進行消費,接下來將事件委托到binlogEventHandler去進行處理,實際上BinlogEventHandler是個自定義的函式式介面,我們自定義事件處理器實作該介面后通過 Spring Bean 的方式注入到KafkaBinlogTopicListener中,

@Component
class ElasticsearchIndexerBinlogEventHandler(
    val restHighLevelClient: RestHighLevelClient
) : BinlogEventHandler {
    override fun handle(binlogEvent: BinlogEvent) {
        val payload = binlogEvent.payload as Map<*, *>
        val documentId = "${binlogEvent.database}_${binlogEvent.table}_${payload["id"]}"
        // Should delete from Elasticsearch
        if (binlogEvent.eventType == EVENT_TYPE_DELETE) {
            val deleteRequest = DeleteRequest()
            deleteRequest
                .index("search")
                .id(documentId)
            restHighLevelClient.delete(deleteRequest, DEFAULT)
        } else {
            // Not ever WRITE or UPDATE, just reindex
            val indexRequest = IndexRequest()
            indexRequest
                .index("search")
                .id(documentId)
                .source(
                    mapOf<String, Any>(
                        "searchable" to mapOf(
                            "title" to payload["title"],
                            "content" to payload["content"]
                        ),
                        "metadata" to mapOf(
                            "tenantId" to payload["tenantId"],
                            "type" to payload["type"],
                            "createdAt" to payload["createdAt"],
                            "createdBy" to payload["createdBy"],
                            "updatedAt" to payload["updatedAt"],
                            "updatedBy" to payload["updatedBy"]
                        )
                    )
                )
            restHighLevelClient.index(indexRequest, DEFAULT)
        }
    }
}

在這里我們只需要簡單地判斷是否為洗掉操作就可以,如果是刪除操作需要在 Elasticsearch 中將資料洗掉,而如果是非洗掉操作只需要在 Elasticsearch 重新按照為檔案建立索引即可,這段代碼簡單地使用了 Kotlin 中提供的 mapOf 方法對資料進行映射,如果需要其他復雜的處理只需要按照 Java 代碼的方式撰寫處理器即可,

總結

其實 Binlog 的處理部分有很多開源的處理引擎,包括 Alibaba Canal,本文使用手動處理的方式也是為其他使用非 MySQL 資料源的同學類似的解決方案,大家可以按需所取,因地制宜,為自己的網站設計屬于自己的實時站內搜索引擎!

轉載請註明出處,本文鏈接:https://www.uj5u.com/gongcheng/547546.html

標籤:其他

上一篇:Mac作業環境初始化

下一篇:程式員“起名”頭痛根治指南

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • Git本地庫既關聯GitHub又關聯Gitee

    創建代碼倉庫 使用gitee舉例(github和gitee差不多) 1.在gitee右上角點擊+,選擇新建倉庫 ? 2.選擇填寫倉庫資訊,然后進行創建 ? 3.服務端已經準備好了,本地開始作準備 (1)Git 全域設定 git config --global user.name "成鈺" git c ......

    uj5u.com 2020-09-10 05:04:14 more
  • CODING DevOps 代碼質量實戰系列第二課,相約周三

    隨著 ToB(企業服務)的興起和 ToC(消費互聯網)產品進入成熟期,線上故障帶來的損失越來越大,代碼質量越來越重要,而「質量內建」正是 DevOps 核心理念之一。**《DevOps 代碼質量實戰(PHP 版)》**為 CODING DevOps 代碼質量實戰系列的第二課,同時也是本系列的 PHP ......

    uj5u.com 2020-09-10 05:07:43 more
  • 推薦Scrum書籍

    推薦Scrum書籍 直接上干貨,推薦書籍清單如下(推薦有順序的哦) Scrum指南 Scrum精髓 Scrum敏捷軟體開發 Scrum捷徑 硝煙中的Scrum和XP : 我們如何實施Scrum 敏捷軟體開發:Scrum實戰指南 Scrum要素 大規模Scrum:大規模敏捷組織的設計 用戶故事地圖 用 ......

    uj5u.com 2020-09-10 05:07:45 more
  • CODING DevOps 代碼質量實戰系列最后一課,周四發車

    隨著 ToB(企業服務)的興起和 ToC(消費互聯網)產品進入成熟期,線上故障帶來的損失越來越大,代碼質量越來越重要,而「質量內建」正是 DevOps 核心理念之一。 **《DevOps 代碼質量實戰(Java 版)》**為 CODING DevOps 代碼質量實戰系列的最后一課,同時也是本系列的 ......

    uj5u.com 2020-09-10 05:07:52 more
  • 敏捷軟體工程實踐書籍

    Scrum轉型想要做好,第一步先了解并真正落實Scrum,那么我推薦的Scrum書籍是要看懂并實踐的。第二步是團隊的工程實踐要做扎實。 下面推薦工程實踐書單: 重構:改善既有代碼的設計 決議極限編程 : 擁抱變化 代碼整潔代碼 程式員的職業素養 修改代碼的藝術 撰寫可讀代碼的藝術 測驗驅動開發 : ......

    uj5u.com 2020-09-10 05:07:55 more
  • Jenkins+svn+nginx實作windows環境自動部署vue前端專案

    前面文章介紹了Jenkins+svn+tomcat實作自動化部署,現在終于有空抽時間出來寫下Jenkins+svn+nginx實作自動部署vue前端專案。 jenkins的安裝和配置已經在前面文章進行介紹,下面介紹實作vue前端專案需要進行的哪些額外的步驟。 注意:在安裝jenkins和nginx的 ......

    uj5u.com 2020-09-10 05:08:49 more
  • CODING DevOps 微服務專案實戰系列第一課,明天等你

    CODING DevOps 微服務專案實戰系列第一課**《DevOps 微服務專案實戰:DevOps 初體驗》**將由 CODING DevOps 開發工程師 王寬老師 向大家介紹 DevOps 的基本理念,并探討為什么現代開發活動需要 DevOps,同時將以 eShopOnContainers 項 ......

    uj5u.com 2020-09-10 05:09:14 more
  • CODING DevOps 微服務專案實戰系列第二課來啦!

    近年來,工程專案的結構越來越復雜,需要接入合適的持續集成流水線形式,才能滿足更多變的需求,那么如何優雅地使用 CI 能力提升生產效率呢?CODING DevOps 微服務專案實戰系列第二課 《DevOps 微服務專案實戰:CI 進階用法》 將由 CODING DevOps 全堆疊工程師 何晨哲老師 向 ......

    uj5u.com 2020-09-10 05:09:33 more
  • CODING DevOps 微服務專案實戰系列最后一課,周四開講!

    隨著軟體工程越來越復雜化,如何在 Kubernetes 集群進行灰度發布成為了生產部署的”必修課“,而如何實作安全可控、自動化的灰度發布也成為了持續部署重點關注的問題。CODING DevOps 微服務專案實戰系列最后一課:**《DevOps 微服務專案實戰:基于 Nginx-ingress 的自動 ......

    uj5u.com 2020-09-10 05:10:00 more
  • CODING 儀表盤功能正式推出,實作作業資料可視化!

    CODING 儀表盤功能現已正式推出!該功能旨在用一張張統計卡片的形式,統計并展示使用 CODING 中所產生的資料。這意味著無需額外的設定,就可以收集歸納寶貴的作業資料并予之量化分析。這些海量的資料皆會以圖表或串列的方式躍然紙上,方便團隊成員隨時查看各專案的進度、狀態和指標,云端協作迎來真正意義上 ......

    uj5u.com 2020-09-10 05:11:01 more
最新发布
  • windows系統git使用ssh方式和gitee/github進行同步

    使用git來clone專案有兩種方式:HTTPS和SSH:
    HTTPS:不管是誰,拿到url隨便clone,但是在push的時候需要驗證用戶名和密碼;
    SSH:clone的專案你必須是擁有者或者管理員,而且需要在clone前添加SSH Key。SSH 在push的時候,是不需要輸入用戶名的,如果配置... ......

    uj5u.com 2023-04-19 08:41:12 more
  • windows系統git使用ssh方式和gitee/github進行同步

    使用git來clone專案有兩種方式:HTTPS和SSH:
    HTTPS:不管是誰,拿到url隨便clone,但是在push的時候需要驗證用戶名和密碼;
    SSH:clone的專案你必須是擁有者或者管理員,而且需要在clone前添加SSH Key。SSH 在push的時候,是不需要輸入用戶名的,如果配置... ......

    uj5u.com 2023-04-19 08:35:34 more
  • 2023年農牧行業6大CRM系統、5大場景盤點

    在物聯網、大資料、云計算、人工智能、自動化技術等現代資訊技術蓬勃發展與逐步成熟的背景下,數字化正成為農牧行業供給側結構性變革與高質量發展的核心驅動因素。因此,改造和提升傳統農牧業、開拓創新現代智慧農牧業,加快推進農牧業的現代化、資訊化、數字化建設已成為農牧業發展的重要方向。 當下,企業數字化轉型已經 ......

    uj5u.com 2023-04-18 08:05:44 more
  • 2023年農牧行業6大CRM系統、5大場景盤點

    在物聯網、大資料、云計算、人工智能、自動化技術等現代資訊技術蓬勃發展與逐步成熟的背景下,數字化正成為農牧行業供給側結構性變革與高質量發展的核心驅動因素。因此,改造和提升傳統農牧業、開拓創新現代智慧農牧業,加快推進農牧業的現代化、資訊化、數字化建設已成為農牧業發展的重要方向。 當下,企業數字化轉型已經 ......

    uj5u.com 2023-04-18 08:00:18 more
  • 計算機組成原理—存盤器

    計算機組成原理—硬體結構 二、存盤器 1.概述 存盤器是計算機系統中的記憶設備,用來存放程式和資料 1.1存盤器的層次結構 快取-主存層次主要解決CPU和主存速度不匹配的問題,速度接近快取 主存-輔存層次主要解決存盤系統的容量問題,容量接近與價位接近于主存 2.主存盤器 2.1概述 主存與CPU的聯 ......

    uj5u.com 2023-04-17 08:20:31 more
  • 談一談我對協同開發的一些認識

    如今各互聯網公司普通都使用敏捷開發,采用小步快跑的形式來進行專案開發。如果是小專案或者小需求,那一個開發可能就搞定了。但對于電商等復雜的系統,其功能多,結構復雜,一個人肯定是搞不定的,所以都是很多人來共同開發維護。以我曾經待過的商城團隊為例,光是后端開發就有七十多人。 為了更好地開發這類大型系統,往 ......

    uj5u.com 2023-04-17 08:18:55 more
  • 專案管理PRINCE2核心知識點整理

    PRINCE2,即 PRoject IN Controlled Environment(受控環境中的專案)是一種結構化的專案管理方法論,由英國政府內閣商務部(OGC)推出,是英國專案管理標準。
    PRINCE2 作為一種開放的方法論,是一套結構化的專案管理流程,描述了如何以一種邏輯性的、有組織的方法,... ......

    uj5u.com 2023-04-17 08:18:51 more
  • 談一談我對協同開發的一些認識

    如今各互聯網公司普通都使用敏捷開發,采用小步快跑的形式來進行專案開發。如果是小專案或者小需求,那一個開發可能就搞定了。但對于電商等復雜的系統,其功能多,結構復雜,一個人肯定是搞不定的,所以都是很多人來共同開發維護。以我曾經待過的商城團隊為例,光是后端開發就有七十多人。 為了更好地開發這類大型系統,往 ......

    uj5u.com 2023-04-17 08:18:00 more
  • 專案管理PRINCE2核心知識點整理

    PRINCE2,即 PRoject IN Controlled Environment(受控環境中的專案)是一種結構化的專案管理方法論,由英國政府內閣商務部(OGC)推出,是英國專案管理標準。
    PRINCE2 作為一種開放的方法論,是一套結構化的專案管理流程,描述了如何以一種邏輯性的、有組織的方法,... ......

    uj5u.com 2023-04-17 08:17:55 more
  • 計算機組成原理—存盤器

    計算機組成原理—硬體結構 二、存盤器 1.概述 存盤器是計算機系統中的記憶設備,用來存放程式和資料 1.1存盤器的層次結構 快取-主存層次主要解決CPU和主存速度不匹配的問題,速度接近快取 主存-輔存層次主要解決存盤系統的容量問題,容量接近與價位接近于主存 2.主存盤器 2.1概述 主存與CPU的聯 ......

    uj5u.com 2023-04-17 08:12:06 more