上一篇 介紹了移山(資料遷移平臺)實時資料同步的整體架構;
本文主要介紹移山(資料遷移平臺)實時資料同步是如何保證訊息的順序性,可以訪問 這里 查看更多關于大資料平臺建設的原創文章,
一. 什么是訊息的順序性?
訊息生產端將訊息發送給同一個MQ服務器的同一個磁區,并且按順序發送;
消費消費端按照訊息發送的順序進行消費,
二. 為什么要保證訊息的順序性?
在某些業務功能場景下需要保證訊息的發送和接收順序是一致的,否則會影響資料的使用,
需要保證訊息有序的場景
移山的實時資料同步使用
canal組件訂閱MySQL資料庫的日志,并將其投遞至 kafka 中(想了解移山實時同步服務架構設計的可以點 這里);
kafka 消費端再根據具體的資料使用場景去處理資料(存入 HBase、MySQL 或直接做實時分析);
由于binlog 本身是有序的,因此寫入到mq之后也需要保障順序,
-
假如現在移山創建了一個實時同步任務,然后訂閱了一個業務資料庫的訂單表;
-
上游業務,向訂單表里插入了一個訂單,然后對該訂單又做了一個更新操作,則 binlog 里會自動寫入插入操作和更新操作的資料,這些資料會被 canal server 投遞至 kafka broker 里面;
-
如果 kafka 消費端先消費到了更新日志,后消費到插入日志,則在往目標表里做操作時就會因為資料缺失導致發生例外,
三. 移山實時同步服務是怎么保證訊息的順序性
實時同步服務訊息處理整體流程如下:

我們主要通過以下兩個方面去保障保證訊息的順序性,
1. 將需要保證順序的訊息發送到同一個partition
1.1 kafka的同一個partition內的訊息是有序的
-
kafka 的同一個 partition 用一個write ahead log組織, 是一個有序的佇列,所以可以保證FIFO的順序;
-
因此生產者按照一定的順序發送訊息,broker 就會按照這個順序把訊息寫入 partition,消費者也會按照相同的順序去讀取訊息;
-
kafka 的每一個 partition 不會同時被兩個消費者實體消費,由此可以保證訊息消費的順序性,
1.2 控制同一key分發到同一partition
要保證同一個訂單的多次修改到達 kafka 里的順序不能亂,可以在Producer 往 kafka 插入資料時,控制同一個key (可以采用訂單主鍵key-hash演算法來實作)發送到同一 partition,這樣就能保證同一筆訂單都會落到同一個 partition 內,
1.3 canal 需要做的配置
canal 目前支持的mq有kafka/rocketmq,本質上都是基于本地檔案的方式來支持了磁區級的順序訊息的能力,我們只需在配置 instance 的時候開啟如下配置即可:
1> canal.properties
# leader節點會等待所有同步中的副本確認之后再確認這條記錄是否發送完成
canal.mq.acks = all
備注:
-
這樣只要至少有一個同步副本存在,記錄就不會丟失,
2> instance.properties
1 # 散列模式的磁區數 2 canal.mq.partitionsNum=2 3 # 散列規則定義 庫名.表名: 唯一主鍵,多個表之間用逗號分隔 4 canal.mq.partitionHash=test.lyf_canal_test:id
備注:
-
同一條資料的增刪改操作 產生的 binlog 資料都會寫到同一個磁區內;
-
查看指定topic的指定磁區的訊息,可以使用如下命令:
bin/kafka-console-consumer.sh --bootstrap-server serverlist --topic topicname --from-beginning --partition 0
2. 通過日志時間戳和日志偏移量進行亂序處理
將同一個訂單資料通過指定key的方式發送到同一個 partition 可以解決大部分情況下的資料亂序問題,
2.1 特殊場景
對于一個有著先后順序的訊息A、B,正常情況下應該是A先發送完成后再發送B,但是在例外情況下:
-
A發送失敗了,B發送成功,而A由于重試機制在B發送完成之后重試發送成功了;
-
這時對于本身順序為AB的訊息順序變成了BA,
移山的實時同步服務會在將訂閱到的資料存入HBase之前再加一層亂序處理 ,
2.2 binlog里的兩個重要資訊
使用 mysqlbinlog 查看 binlog:
/usr/bin/mysqlbinlog --base64-output=decode-rows -v /var/lib/mysql/mysql-bin.000001
執行時間和偏移量:

備注:
-
每條資料都會有執行時間和偏移量這兩個重要資訊,下邊的校驗邏輯核心正是借助了這兩個值;
-
執行的sql 陳述句在 binlog 中是以base64編碼格式存盤的,如果想查看sql 陳述句,需要加上:
--base64-output=decode-rows -v引數來解碼; -
偏移量:
-
Position 就代表 binlog 寫到這個偏移量的地方,也就是寫了這么多位元組,即當前 binlog 檔案的大小;
-
也就是說后寫入資料的 Position 肯定比先寫入資料的 Position 大,因此可以根據 Position 大小來判斷訊息的順序,
-
3.訊息亂序處理演示
3.1 在訂閱表里插入一條資料,然后再做兩次更新操作:
MariaDB [test]> insert into lyf_canal_test (name,status,content) values('demo1',1,'demo1 test'); Query OK, 1 row affected (0.00 sec) MariaDB [test]> update lyf_canal_test set name = 'demo update' where id = 13; Query OK, 1 row affected (0.00 sec) Rows matched: 1 Changed: 1 Warnings: 0 MariaDB [test]> update lyf_canal_test set name = 'demo update2',content='second update',status=2 where id = 13; Query OK, 1 row affected (0.00 sec)
3.2 產生三條需要保證順序的訊息
把插入,第一次更新,第二次更新這三次操作產生的 binlog 被 canal server 推送至 kafka 中的訊息分別稱為:訊息A,訊息B,訊息C,
-
訊息A:

-
訊息B:

-
訊息C:

3.3 網路原因造成訊息亂序
假設由于不可知的網路原因:
-
kafka broker收到的三條訊息分別為:訊息A,訊息C,訊息B;
-
則 kafka 消費端消費到的這三條訊息先后順序就是:訊息A,訊息C,訊息B
-
這樣就造成了訊息的亂序,因此訂閱到的資料在存入目標表前必須得加亂序校驗處理,
3.4 訊息亂序處理邏輯
我們利用HBase的特性,將資料主鍵做為目標表的 rowkey,當 kafka 消費端消費到資料時,亂序處理主要流程(摘自禧云數芯大資料平臺技術白皮書)如下:

demo的三條訊息處理流程如下:
1> 判斷訊息A 的主鍵id做為rowkey在hbase的目標表中不存在,則將訊息A的資料直接插入HBase: 
2> 訊息C 的主鍵id做為rowkey,已經在目標表中存在,則這時需要拿訊息C 的執行時間和表中存盤的執行時間去判斷:
-
如果訊息C 中的執行時間小于表中存盤的執行時間,則證明訊息C 是重復訊息或亂序的訊息,直接丟棄;
-
訊息C 中的執行時間大于表中存盤的執行時間,則直接更新表資料(本demo即符合該種場景):

-
訊息C 中的執行時間等于表中存盤的執行時間,則這時需要拿訊息C 的偏移量和表中存盤的偏移量去判斷:
-
訊息C 中的偏移量小于表中存盤的偏移量,則證明訊息C 是重復訊息,直接丟棄;
-
訊息C 中的偏移量大于等于表中存盤的偏移量,則直接更新表資料,
-
3> 訊息B 的主鍵id做為rowkey,已經在目標表中存在,則這時需要拿訊息B 的執行時間和表中存盤的執行時間去判斷:
-
由于訊息B中的執行時間小于表中存盤的執行時間(即訊息C 的執行時間),因此訊息B 直接丟棄,
3.5 主要代碼
kafka 消費端將消費到的訊息進行格式化處理和組裝,并借助 HBase-client API 來完成對 HBase 表的操作,
1> 使用Put組裝單行資料
/** * 包名: org.apache.hadoop.hbase.client.Put * hbaseData 為從binlog訂閱到的資料,通過回圈,為目標HBase表 * 添加rowkey、列簇、列資料, * 作用:用來對單個行執行加入操作, */ Put put = new Put(Bytes.toBytes(hbaseData.get("id"))); // hbaseData 為從binlog訂閱到的資料,通過回圈,為目標HBase表添加列簇和列 put.addColumn(Bytes.toBytes("info"), Bytes.toBytes(mapKey), Bytes.toBytes(hbaseData.get(mapKey)));
2> 使用 checkAndMutate,更新HBase表的資料
只有服務端對應rowkey的列資料與預期的值符合期望條件(大于、小于、等于)時,才會將put操作提交至服務端,
// 如果 update_info(列族) execute_time(列) 不存在值就插入資料,如果存在則回傳false boolean res1 = table.checkAndMutate(Bytes.toBytes(hbaseData.get("id")), Bytes.toBytes("update_info")) .qualifier(Bytes.toBytes("execute_time")).ifNotExists().thenPut(put); // 如果存在,則去比較執行時間 if (!res1) { // 如果本次傳遞的執行時間大于HBase中的執行時間,則插入put boolean res2 =table.checkAndPut(Bytes.toBytes(hbaseData.get("id")), Bytes.toBytes("update_info"), Bytes.toBytes("execute_time"), CompareFilter.CompareOp.GREATER, Bytes.toBytes(hbaseData.get("execute_time")),put); // 執行時間相等時,則去比較偏移量,本次傳遞的值大于HBase中的值則插入put if (!res2) { boolean res3 = table.checkAndPut(Bytes.toBytes(hbaseData.get("id")), Bytes.toBytes("update_info"), Bytes.toBytes("execute_position"), CompareFilter.CompareOp.GREATER, Bytes.toBytes(hbaseData.get("execute_position")),put); } }
四.總結
-
目前移山的實時同步服務,kafka 消費端是使用一個執行緒去消費資料;
-
如果將來有版本升級需求,將消費端改為多個執行緒去消費資料時,要考慮到多執行緒消費時有序的訊息會被打亂這種情況的解決辦法,
關注微信公眾號
歡迎大家關注我的微信公眾號閱讀更多文章:
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/1025.html
標籤:架構設計
上一篇:JS基礎回顧_滾動條
