主頁 > 資料庫 > elasticsearch升級和索引重建。

elasticsearch升級和索引重建。

2023-04-16 08:15:03 資料庫

1.背景描述

??2020年團隊決定對elasticsearch升級,es(elasticsearch縮寫,下同)當前版本為0.9x,升級到5.x版本,es在本公司承載三個部分的業務,站內查詢,訂單資料統計,elk日志分析,

??對于站內查詢和訂單資料統計,當前業務架構是

??mysql -> canal -> kafka -> es

??(可以考慮使用kafka connector 代替canal)

2.難點

??難點是在升級的時候如何不影響當前業務,

3.具體步驟

A.部署es新集群

??下載5.x版本的es,在新的機器上部署新的集群,

B.pull代碼,升級代碼到es新版本

??由于從0.9x到5.x版本跨度比較大,許多java api都發生了變化,需要修復,

??一個坑是alias api 發生了語意變化,在后來的自測中修復了此問題,

C.重建索引

??我們使用索引重建程式來新建索引,重建索引具體步驟如下,我們稱線上索引為online index, 新創建的索引為new index,

??1.init

????重繪索引名映射關系,檢查當前alias只有一個物理索引,

????根據預定義的mapping,創建索引new index,

????設定在線索引記錄資料變更日志,即記錄線上索引消費kafka資料,并存盤為change log檔案.

??2.全量索引資料庫上的資料到new index

????從mysql查出資料同步到es中,如果有多個分表,就按照表順序同步,可以開啟多執行緒批量插入,

??3.對new index索引優化

????refresh, flush 索引,呼叫force-merge api,進行段合并,

??4.重放change log到new index中

????根據change log 轉換為es query,寫入到new index,????

??5.暫停線上索引的寫入

????因為online index和new index 使用的是相同的kafka consumer group,所以必須停掉online index的消費功能,

??6.關閉change log

????停止記錄在線索引記錄資料變更日志,

??7.第二階段重放change log

????根據change log 轉換為es query,寫入到new index,?

??8.洗掉change log 

????洗掉線索引記錄資料變更日志,

??9.設定副本數 

????new index創建索引的時候默認副本數為0,現在動態調整副本數為業務需要的值,比如對現實搜索業務設定兩個副本,對訂單統計類索引不需要副本,

PUT /new_index/_settings
{
    "number_of_replicas": 2
}

????此階段可能會比較耗時,需要等待幾分鐘才能進行下一步操作,更好的做法是呼叫health api 查看分片狀態,

GET _cluster/health

{
  "cluster_name" : "testcluster",
  "status" : "yellow",
  "timed_out" : false,
  "number_of_nodes" : 1,
  "number_of_data_nodes" : 1,
  "active_primary_shards" : 1,
  "active_shards" : 1,
  "relocating_shards" : 0, // 重新定位的分片
  "initializing_shards" : 0, // 初始化中的分片
  "unassigned_shards" : 1, // 未分配的分片
  "delayed_unassigned_shards": 0,
  "number_of_pending_tasks" : 0,
  "number_of_in_flight_fetch": 0,
  "task_max_waiting_in_queue_millis": 0,
  "active_shards_percent_as_number": 50.0
}

??10.別名切換 

POST /_aliases
{
    "actions": [
        { "remove": { "index": "online_index", "alias": "my_index" }},
        { "add":    { "index": "new_index", "alias": "my_index" }}
    ]
}

??11.運行在線索引 (從kafka里面讀取資料)

????new_index 開始從kafka里面消費最新資料,由于之前的操作可能會有延時,需要等待幾分鐘才能同步到最新資料,

??12.洗掉舊的索引

????洗掉old_index

詳細代碼步驟如下

        // 1.init
        logger.info("初始化");
        ESHighLevelFactory esHighLevelFactory = ESHighLevelFactory.getInstance(indexContext.getIndex().getIndexName());
        logger.info("重繪索引名映射關系");
        if (!indexContext.refreshIndexName()) {
            throw new IndexException("重繪索引映射關系失敗");
        }

        rebuildIndexName = indexContext.getPhysicalRebuildIndexName();

        logger.info("初始化重建索引環境,當前重建索引名:" + rebuildIndexName);
        logger.info("創建索引,索引名:" + rebuildIndexName);
        boolean isCreate = false;
        try {
            isCreate = indexContext.getIndex().createIndex(rebuildIndexName);
        } catch (Throwable t) {
            logger.info("創建索引失敗,本次失敗可以不處理,將會自動重試 ...");
        }

        logger.info("設定在線索引記錄資料變更日志");
        indexContext.startChangeLog();

        // 2. 重建索引
        logger.info("全量索引資料庫上的資料 ...");
        long startRebulidTime = System.currentTimeMillis();
        rebuild();
        logger.info(" ------  完成全量索引資料庫上的資料,對應索引" + rebuildIndexName + ",耗時" + ((System.currentTimeMillis() - startRebulidTime) / 1000)
            + " 秒    ------  ");

        // 3. 索引優化 -- 是否調到變更重放完畢后做優化
        logger.info("優化索引 ...");
        long startOptimizeTime = System.currentTimeMillis();
        ESHighLevelFactory.getInstance(rebuildIndexName).optimize(rebuildIndexName, 1);
        logger.info(" ------  完成" + rebuildIndexName + "索引優化,耗時 " + ((System.currentTimeMillis() - startOptimizeTime) / 1000)
            + " 秒    ------  ");

        // TODO 字符集設定
        BufferedReader logReader = new BufferedReader(new FileReader(indexContext.getChangeLogFilePath()));

        // 4. 重放變更日志
        logger.info("重放本地資料變更日志[第一階段] ...");
        long startReplay1Time = System.currentTimeMillis();
        int replayChangeLogCount = replayChangeLogFirst(logReader);
        logger.info(" ------  完成[第一階段]的變更日志重放,行數" + replayChangeLogCount + " 耗時 "
            + ((System.currentTimeMillis() - startReplay1Time) / 1000) + " 秒    ------  ");

        // 5. 暫停在線索引
        logger.info("暫停在線索引");
        indexContext.pauseOnlineIndex();
        isPauseOnline.set(true);

        // 6. 設定 在線索引只做索引更新 以及 關閉 change log
        logger.info("停止變更日志");
        indexContext.stopChangeLog();

        // 7. 繼續重放 change log
        logger.info("重放本地資料變更日志[第二階段] ...");
        long startReplay2Time = System.currentTimeMillis();
        replayChangeLogCount = replayChangeLogCount + replayChangeLogSecond(logReader);
        if ((indexContext.getWriteChangeLogCount() - replayChangeLogCount) != 0) {
            logger.error("變更日志,處于錯誤的狀態,統計的日志行數:" + indexContext.getWriteChangeLogCount() + ", 但實際只有:" + replayChangeLogCount);
        }
        logger.info(" ------  完成[第二階段]的變更日志重放,行數" + replayChangeLogCount + " 耗時 "
            + ((System.currentTimeMillis() - startReplay2Time) / 1000) + " 秒    ------  ");

        // 8. 洗掉變更日志, OnlineIndex.startChangeLog 有做環境清理,這里不執行
        logger.info("簡單優化索引 ...");
        long startSimpleOptimizeTime = System.currentTimeMillis();
        ESHighLevelFactory.getInstance(rebuildIndexName).optimize(rebuildIndexName, null);

        logger.info(" ------  完成" + rebuildIndexName + "索引簡單優化,耗時 " + ((System.currentTimeMillis() - startSimpleOptimizeTime) / 1000)
            + " 秒    ------  ");

        // 9. 設定副本數 (懷疑比較耗時~~~待確認)
        logger.info("設定副本數 ...");
        int replicas = 3;
        if (rebuildIndexName.startsWith(IndexNameConst.ORDER_INDEX_PREFIX)) {
            replicas = 1;
        } else if (rebuildIndexName.startsWith(IndexNameConst.IndexName.activityTicket.getIndexName())) {
            replicas = 2;
        } else {
            String replicasStr = Configuration.getInstance().loadDiamondProperty(Configuration.ES_INDEX_REPLICAS);
            if (NumberUtils.isNumber(replicasStr)) {
                replicas = NumberUtils.toInt(replicasStr);
            }
        }
        ESHighLevelFactory.getInstance(rebuildIndexName).setReplicas(rebuildIndexName, replicas);

        // 執行索引切換流程
        // 預發、線上環境阻塞等待2分鐘同步資料后,再執行索引切換和洗掉舊索引邏輯
        try {
            if(IDCUtil.isBuildOrProduction()){
                Thread.sleep(120 * 1000);
            }
        } catch (InterruptedException e) {
        }
        // 10. 別名切換
        logger.info("索引切換:將" + rebuildIndexName + "設定為線上索引");
        if (!indexContext.switchIndex(rebuildIndexName)) {
            throw new IndexException("索引切換失敗:將" + rebuildIndexName + "設定為線上索引失敗");
        }

        // 11. 運行在線索引
        logger.info("運行在線索引");
        indexContext.keepRuningOnlineIndex();
        isPauseOnline.set(false);

        // 12. 洗掉原有在線索引
        String oldOnlineIndexName = indexContext.getPhysicalRebuildIndexName();
        logger.info("洗掉原有在線索引,索引名:" + oldOnlineIndexName);
        if (!ESHighLevelFactory.getInstance(indexContext.getIndex().getIndexName()).deleteIndex(oldOnlineIndexName)) {
            throw new IndexException("洗掉索引失敗,索引名:" + oldOnlineIndexName);
        }

思考

如果只是簡單地新建索引,完全可以這樣做(使用不同的消費組) 

??1.記錄時間戳 

??2.全量索引資料的資料

??3.根據前面的時間戳找到kafka中的下標,下標得時間戳必須 < 記錄的時間戳

??4.根據上一步的下標開始索引資料

D.使用新集群進行業務測驗

??部署新的客戶端服務呼叫新的es集群,檢查業務是否正常,對站內查詢檢查搜索結果是否一致,對統計類查詢查看統計結果是否一致,

E.發布線上客戶端搜索代碼,修改es地址為新集群地址

??上線,觀察業務是否穩定,

F.下線舊的es集群

??釋放舊的es集群的資源,

4.總結

??es升級這份作業是兩年之前做的,現在來進行總結,部分細節可能會有疏漏,但是總結起來,依然后很多識訓,從架構,代碼細節上都有改進的空間,es重建代碼可以做得更通用,然后開源出來,

轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/550233.html

標籤:其他

上一篇:Mysql 中,為什么 WHERE 使用別名會報錯,而 ORDER BY 不會報錯?

下一篇:MySQL MHA資訊的收集【Filebeat+logstash+MySQL】

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • GPU虛擬機創建時間深度優化

    **?桔妹導讀:**GPU虛擬機實體創建速度慢是公有云面臨的普遍問題,由于通常情況下創建虛擬機屬于低頻操作而未引起業界的重視,實際生產中還是存在對GPU實體創建時間有苛刻要求的業務場景。本文將介紹滴滴云在解決該問題時的思路、方法、并展示最終的優化成果。 從公有云服務商那里購買過虛擬主機的資深用戶,一 ......

    uj5u.com 2020-09-10 06:09:13 more
  • 可編程網卡芯片在滴滴云網路的應用實踐

    **?桔妹導讀:**隨著云規模不斷擴大以及業務層面對延遲、帶寬的要求越來越高,采用DPDK 加速網路報文處理的方式在橫向縱向擴展都出現了局限性。可編程芯片成為業界熱點。本文主要講述了可編程網卡芯片在滴滴云網路中的應用實踐,遇到的問題、帶來的收益以及開源社區貢獻。 #1. 資料中心面臨的問題 隨著滴滴 ......

    uj5u.com 2020-09-10 06:10:21 more
  • 滴滴資料通道服務演進之路

    **?桔妹導讀:**滴滴資料通道引擎承載著全公司的資料同步,為下游實時和離線場景提供了必不可少的源資料。隨著任務量的不斷增加,資料通道的整體架構也隨之發生改變。本文介紹了滴滴資料通道的發展歷程,遇到的問題以及今后的規劃。 #1. 背景 資料,對于任何一家互聯網公司來說都是非常重要的資產,公司的大資料 ......

    uj5u.com 2020-09-10 06:11:05 more
  • 滴滴AI Labs斬獲國際機器翻譯大賽中譯英方向世界第三

    **桔妹導讀:**深耕人工智能領域,致力于探索AI讓出行更美好的滴滴AI Labs再次斬獲國際大獎,這次獲獎的專案是什么呢?一起來看看詳細報道吧! 近日,由國際計算語言學協會ACL(The Association for Computational Linguistics)舉辦的世界最具影響力的機器 ......

    uj5u.com 2020-09-10 06:11:29 more
  • MPP (Massively Parallel Processing)大規模并行處理

    1、什么是mpp? MPP (Massively Parallel Processing),即大規模并行處理,在資料庫非共享集群中,每個節點都有獨立的磁盤存盤系統和記憶體系統,業務資料根據資料庫模型和應用特點劃分到各個節點上,每臺資料節點通過專用網路或者商業通用網路互相連接,彼此協同計算,作為整體提供 ......

    uj5u.com 2020-09-10 06:11:41 more
  • 滴滴資料倉庫指標體系建設實踐

    **桔妹導讀:**指標體系是什么?如何使用OSM模型和AARRR模型搭建指標體系?如何統一流程、規范化、工具化管理指標體系?本文會對建設的方法論結合滴滴資料指標體系建設實踐進行解答分析。 #1. 什么是指標體系 ##1.1 指標體系定義 指標體系是將零散單點的具有相互聯系的指標,系統化的組織起來,通 ......

    uj5u.com 2020-09-10 06:12:52 more
  • 單表千萬行資料庫 LIKE 搜索優化手記

    我們經常在資料庫中使用 LIKE 運算子來完成對資料的模糊搜索,LIKE 運算子用于在 WHERE 子句中搜索列中的指定模式。 如果需要查找客戶表中所有姓氏是“張”的資料,可以使用下面的 SQL 陳述句: SELECT * FROM Customer WHERE Name LIKE '張%' 如果需要 ......

    uj5u.com 2020-09-10 06:13:25 more
  • 滴滴Ceph分布式存盤系統優化之鎖優化

    **桔妹導讀:**Ceph是國際知名的開源分布式存盤系統,在工業界和學術界都有著重要的影響。Ceph的架構和演算法設計發表在國際系統領域頂級會議OSDI、SOSP、SC等上。Ceph社區得到Red Hat、SUSE、Intel等大公司的大力支持。Ceph是國際云計算領域應用最廣泛的開源分布式存盤系統, ......

    uj5u.com 2020-09-10 06:14:51 more
  • es~通過ElasticsearchTemplate進行聚合~嵌套聚合

    之前寫過《es~通過ElasticsearchTemplate進行聚合操作》的文章,這一次主要寫一個嵌套的聚合,例如先對sex集合,再對desc聚合,最后再對age求和,共三層嵌套。 Aggregations的部分特性類似于SQL語言中的group by,avg,sum等函式,Aggregation ......

    uj5u.com 2020-09-10 06:14:59 more
  • 爬蟲日志監控 -- Elastc Stack(ELK)部署

    傻瓜式部署,只需替換IP與用戶 導讀: 現ELK四大組件分別為:Elasticsearch(核心)、logstash(處理)、filebeat(采集)、kibana(可視化) 下載均在https://www.elastic.co/cn/downloads/下tar包,各組件版本最好一致,配合fdm會 ......

    uj5u.com 2020-09-10 06:15:05 more
最新发布
  • day02-2-商鋪查詢快取

    功能02-商鋪查詢快取 3.商鋪詳情快取查詢 3.1什么是快取? 快取就是資料交換的緩沖區(稱作Cache),是存盤資料的臨時地方,一般讀寫性能較高。 快取的作用: 降低后端負載 提高讀寫效率,降低回應時間 快取的成本: 資料一致性成本 代碼維護成本 運維成本 3.2需求說明 如下,當我們點擊商店詳 ......

    uj5u.com 2023-04-20 08:33:24 more
  • MySQL中binlog備份腳本分享

    關于MySQL的二進制日志(binlog),我們都知道二進制日志(binlog)非常重要,尤其當你需要point to point災難恢復的時侯,所以我們要對其進行備份。關于二進制日志(binlog)的備份,可以基于flush logs方式先切換binlog,然后拷貝&壓縮到到遠程服務器或本地服務器 ......

    uj5u.com 2023-04-20 08:28:06 more
  • day02-短信登錄

    功能實作02 2.功能01-短信登錄 2.1基于Session實作登錄 2.1.1思路分析 2.1.2代碼實作 2.1.2.1發送短信驗證碼 發送短信驗證碼: 發送驗證碼的介面為:http://127.0.0.1:8080/api/user/code?phone=xxxxx<手機號> 請求方式:PO ......

    uj5u.com 2023-04-20 08:27:27 more
  • 快取與資料庫雙寫一致性幾種策略分析

    本文將對幾種快取與資料庫保證資料一致性的使用方式進行分析。為保證高并發性能,以下分析場景不考慮執行的原子性及加鎖等強一致性要求的場景,僅追求最終一致性。 ......

    uj5u.com 2023-04-20 08:26:48 more
  • sql陳述句優化

    問題查找及措施 問題查找 需要找到具體的代碼,對其進行一對一優化,而非一直把關注點放在服務器和sql平臺 降低簡化每個事務中處理的問題,盡量不要讓一個事務拖太長的時間 例如檔案上傳時,應將檔案上傳這一步放在事務外面 微軟建議 4.啟動sql定時執行計劃 怎么啟動sqlserver代理服務-百度經驗 ......

    uj5u.com 2023-04-20 08:26:35 more
  • 云時代,MySQL到ClickHouse資料同步產品對比推薦

    ClickHouse 在執行分析查詢時的速度優勢很好的彌補了MySQL的不足,但是對于很多開發者和DBA來說,如何將MySQL穩定、高效、簡單的同步到 ClickHouse 卻很困難。本文對比了 NineData、MaterializeMySQL(ClickHouse自帶)、Bifrost 三款產品... ......

    uj5u.com 2023-04-20 08:26:29 more
  • sql陳述句優化

    問題查找及措施 問題查找 需要找到具體的代碼,對其進行一對一優化,而非一直把關注點放在服務器和sql平臺 降低簡化每個事務中處理的問題,盡量不要讓一個事務拖太長的時間 例如檔案上傳時,應將檔案上傳這一步放在事務外面 微軟建議 4.啟動sql定時執行計劃 怎么啟動sqlserver代理服務-百度經驗 ......

    uj5u.com 2023-04-20 08:25:13 more
  • Redis 報”OutOfDirectMemoryError“(堆外記憶體溢位)

    Redis 報錯“OutOfDirectMemoryError(堆外記憶體溢位) ”問題如下: 一、報錯資訊: 使用 Redis 的業務介面 ,產生 OutOfDirectMemoryError(堆外記憶體溢位),如圖: 格式化后的報錯資訊: { "timestamp": "2023-04-17 22: ......

    uj5u.com 2023-04-20 08:24:54 more
  • day02-2-商鋪查詢快取

    功能02-商鋪查詢快取 3.商鋪詳情快取查詢 3.1什么是快取? 快取就是資料交換的緩沖區(稱作Cache),是存盤資料的臨時地方,一般讀寫性能較高。 快取的作用: 降低后端負載 提高讀寫效率,降低回應時間 快取的成本: 資料一致性成本 代碼維護成本 運維成本 3.2需求說明 如下,當我們點擊商店詳 ......

    uj5u.com 2023-04-20 08:24:03 more
  • day02-短信登錄

    功能實作02 2.功能01-短信登錄 2.1基于Session實作登錄 2.1.1思路分析 2.1.2代碼實作 2.1.2.1發送短信驗證碼 發送短信驗證碼: 發送驗證碼的介面為:http://127.0.0.1:8080/api/user/code?phone=xxxxx<手機號> 請求方式:PO ......

    uj5u.com 2023-04-20 08:23:11 more