1.背景描述
??2020年團隊決定對elasticsearch升級,es(elasticsearch縮寫,下同)當前版本為0.9x,升級到5.x版本,es在本公司承載三個部分的業務,站內查詢,訂單資料統計,elk日志分析,
??對于站內查詢和訂單資料統計,當前業務架構是
??mysql -> canal -> kafka -> es
??(可以考慮使用kafka connector 代替canal)
2.難點
??難點是在升級的時候如何不影響當前業務,
3.具體步驟
A.部署es新集群
??下載5.x版本的es,在新的機器上部署新的集群,
B.pull代碼,升級代碼到es新版本
??由于從0.9x到5.x版本跨度比較大,許多java api都發生了變化,需要修復,
??一個坑是alias api 發生了語意變化,在后來的自測中修復了此問題,
C.重建索引
??我們使用索引重建程式來新建索引,重建索引具體步驟如下,我們稱線上索引為online index, 新創建的索引為new index,
??1.init
????重繪索引名映射關系,檢查當前alias只有一個物理索引,
????根據預定義的mapping,創建索引new index,
????設定在線索引記錄資料變更日志,即記錄線上索引消費kafka資料,并存盤為change log檔案.
??2.全量索引資料庫上的資料到new index
????從mysql查出資料同步到es中,如果有多個分表,就按照表順序同步,可以開啟多執行緒批量插入,
??3.對new index索引優化
????refresh, flush 索引,呼叫force-merge api,進行段合并,
??4.重放change log到new index中
????根據change log 轉換為es query,寫入到new index,????
??5.暫停線上索引的寫入
????因為online index和new index 使用的是相同的kafka consumer group,所以必須停掉online index的消費功能,
??6.關閉change log
????停止記錄在線索引記錄資料變更日志,
??7.第二階段重放change log
????根據change log 轉換為es query,寫入到new index,?
??8.洗掉change log
????洗掉線索引記錄資料變更日志,
??9.設定副本數
????new index創建索引的時候默認副本數為0,現在動態調整副本數為業務需要的值,比如對現實搜索業務設定兩個副本,對訂單統計類索引不需要副本,
PUT /new_index/_settings
{
"number_of_replicas": 2
}
????此階段可能會比較耗時,需要等待幾分鐘才能進行下一步操作,更好的做法是呼叫health api 查看分片狀態,
GET _cluster/health
{
"cluster_name" : "testcluster",
"status" : "yellow",
"timed_out" : false,
"number_of_nodes" : 1,
"number_of_data_nodes" : 1,
"active_primary_shards" : 1,
"active_shards" : 1,
"relocating_shards" : 0, // 重新定位的分片
"initializing_shards" : 0, // 初始化中的分片
"unassigned_shards" : 1, // 未分配的分片
"delayed_unassigned_shards": 0,
"number_of_pending_tasks" : 0,
"number_of_in_flight_fetch": 0,
"task_max_waiting_in_queue_millis": 0,
"active_shards_percent_as_number": 50.0
}
??10.別名切換
POST /_aliases
{
"actions": [
{ "remove": { "index": "online_index", "alias": "my_index" }},
{ "add": { "index": "new_index", "alias": "my_index" }}
]
}
??11.運行在線索引 (從kafka里面讀取資料)
????new_index 開始從kafka里面消費最新資料,由于之前的操作可能會有延時,需要等待幾分鐘才能同步到最新資料,
??12.洗掉舊的索引
????洗掉old_index
詳細代碼步驟如下
// 1.init
logger.info("初始化");
ESHighLevelFactory esHighLevelFactory = ESHighLevelFactory.getInstance(indexContext.getIndex().getIndexName());
logger.info("重繪索引名映射關系");
if (!indexContext.refreshIndexName()) {
throw new IndexException("重繪索引映射關系失敗");
}
rebuildIndexName = indexContext.getPhysicalRebuildIndexName();
logger.info("初始化重建索引環境,當前重建索引名:" + rebuildIndexName);
logger.info("創建索引,索引名:" + rebuildIndexName);
boolean isCreate = false;
try {
isCreate = indexContext.getIndex().createIndex(rebuildIndexName);
} catch (Throwable t) {
logger.info("創建索引失敗,本次失敗可以不處理,將會自動重試 ...");
}
logger.info("設定在線索引記錄資料變更日志");
indexContext.startChangeLog();
// 2. 重建索引
logger.info("全量索引資料庫上的資料 ...");
long startRebulidTime = System.currentTimeMillis();
rebuild();
logger.info(" ------ 完成全量索引資料庫上的資料,對應索引" + rebuildIndexName + ",耗時" + ((System.currentTimeMillis() - startRebulidTime) / 1000)
+ " 秒 ------ ");
// 3. 索引優化 -- 是否調到變更重放完畢后做優化
logger.info("優化索引 ...");
long startOptimizeTime = System.currentTimeMillis();
ESHighLevelFactory.getInstance(rebuildIndexName).optimize(rebuildIndexName, 1);
logger.info(" ------ 完成" + rebuildIndexName + "索引優化,耗時 " + ((System.currentTimeMillis() - startOptimizeTime) / 1000)
+ " 秒 ------ ");
// TODO 字符集設定
BufferedReader logReader = new BufferedReader(new FileReader(indexContext.getChangeLogFilePath()));
// 4. 重放變更日志
logger.info("重放本地資料變更日志[第一階段] ...");
long startReplay1Time = System.currentTimeMillis();
int replayChangeLogCount = replayChangeLogFirst(logReader);
logger.info(" ------ 完成[第一階段]的變更日志重放,行數" + replayChangeLogCount + " 耗時 "
+ ((System.currentTimeMillis() - startReplay1Time) / 1000) + " 秒 ------ ");
// 5. 暫停在線索引
logger.info("暫停在線索引");
indexContext.pauseOnlineIndex();
isPauseOnline.set(true);
// 6. 設定 在線索引只做索引更新 以及 關閉 change log
logger.info("停止變更日志");
indexContext.stopChangeLog();
// 7. 繼續重放 change log
logger.info("重放本地資料變更日志[第二階段] ...");
long startReplay2Time = System.currentTimeMillis();
replayChangeLogCount = replayChangeLogCount + replayChangeLogSecond(logReader);
if ((indexContext.getWriteChangeLogCount() - replayChangeLogCount) != 0) {
logger.error("變更日志,處于錯誤的狀態,統計的日志行數:" + indexContext.getWriteChangeLogCount() + ", 但實際只有:" + replayChangeLogCount);
}
logger.info(" ------ 完成[第二階段]的變更日志重放,行數" + replayChangeLogCount + " 耗時 "
+ ((System.currentTimeMillis() - startReplay2Time) / 1000) + " 秒 ------ ");
// 8. 洗掉變更日志, OnlineIndex.startChangeLog 有做環境清理,這里不執行
logger.info("簡單優化索引 ...");
long startSimpleOptimizeTime = System.currentTimeMillis();
ESHighLevelFactory.getInstance(rebuildIndexName).optimize(rebuildIndexName, null);
logger.info(" ------ 完成" + rebuildIndexName + "索引簡單優化,耗時 " + ((System.currentTimeMillis() - startSimpleOptimizeTime) / 1000)
+ " 秒 ------ ");
// 9. 設定副本數 (懷疑比較耗時~~~待確認)
logger.info("設定副本數 ...");
int replicas = 3;
if (rebuildIndexName.startsWith(IndexNameConst.ORDER_INDEX_PREFIX)) {
replicas = 1;
} else if (rebuildIndexName.startsWith(IndexNameConst.IndexName.activityTicket.getIndexName())) {
replicas = 2;
} else {
String replicasStr = Configuration.getInstance().loadDiamondProperty(Configuration.ES_INDEX_REPLICAS);
if (NumberUtils.isNumber(replicasStr)) {
replicas = NumberUtils.toInt(replicasStr);
}
}
ESHighLevelFactory.getInstance(rebuildIndexName).setReplicas(rebuildIndexName, replicas);
// 執行索引切換流程
// 預發、線上環境阻塞等待2分鐘同步資料后,再執行索引切換和洗掉舊索引邏輯
try {
if(IDCUtil.isBuildOrProduction()){
Thread.sleep(120 * 1000);
}
} catch (InterruptedException e) {
}
// 10. 別名切換
logger.info("索引切換:將" + rebuildIndexName + "設定為線上索引");
if (!indexContext.switchIndex(rebuildIndexName)) {
throw new IndexException("索引切換失敗:將" + rebuildIndexName + "設定為線上索引失敗");
}
// 11. 運行在線索引
logger.info("運行在線索引");
indexContext.keepRuningOnlineIndex();
isPauseOnline.set(false);
// 12. 洗掉原有在線索引
String oldOnlineIndexName = indexContext.getPhysicalRebuildIndexName();
logger.info("洗掉原有在線索引,索引名:" + oldOnlineIndexName);
if (!ESHighLevelFactory.getInstance(indexContext.getIndex().getIndexName()).deleteIndex(oldOnlineIndexName)) {
throw new IndexException("洗掉索引失敗,索引名:" + oldOnlineIndexName);
}
思考
如果只是簡單地新建索引,完全可以這樣做(使用不同的消費組)
??1.記錄時間戳
??2.全量索引資料的資料
??3.根據前面的時間戳找到kafka中的下標,下標得時間戳必須 < 記錄的時間戳
??4.根據上一步的下標開始索引資料
D.使用新集群進行業務測驗
??部署新的客戶端服務呼叫新的es集群,檢查業務是否正常,對站內查詢檢查搜索結果是否一致,對統計類查詢查看統計結果是否一致,
E.發布線上客戶端搜索代碼,修改es地址為新集群地址
??上線,觀察業務是否穩定,
F.下線舊的es集群
??釋放舊的es集群的資源,
4.總結
??es升級這份作業是兩年之前做的,現在來進行總結,部分細節可能會有疏漏,但是總結起來,依然后很多識訓,從架構,代碼細節上都有改進的空間,es重建代碼可以做得更通用,然后開源出來,
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/550233.html
標籤:其他
