主頁 > 軟體設計 > 實時資料同步服務(canal+kafka)是如何保證訊息的順序性?

實時資料同步服務(canal+kafka)是如何保證訊息的順序性?

2020-09-10 05:39:48 軟體設計

 

上一篇 介紹了移山(資料遷移平臺)實時資料同步的整體架構; 
本文主要介紹移山(資料遷移平臺)實時資料同步是如何保證訊息的順序性,

可以訪問 這里 查看更多關于大資料平臺建設的原創文章,

一. 什么是訊息的順序性?

  1. 訊息生產端將訊息發送給同一個MQ服務器的同一個磁區,并且按順序發送;

  2. 消費消費端按照訊息發送的順序進行消費,

二. 為什么要保證訊息的順序性?

在某些業務功能場景下需要保證訊息的發送和接收順序是一致的,否則會影響資料的使用,

需要保證訊息有序的場景

移山的實時資料同步使用 canal 組件訂閱MySQL資料庫的日志,并將其投遞至 kafka 中(想了解移山實時同步服務架構設計的可以點 這里); 
kafka 消費端再根據具體的資料使用場景去處理資料(存入 HBase、MySQL 或直接做實時分析); 
由于binlog 本身是有序的,因此寫入到mq之后也需要保障順序,

  1. 假如現在移山創建了一個實時同步任務,然后訂閱了一個業務資料庫的訂單表;

  2. 上游業務,向訂單表里插入了一個訂單,然后對該訂單又做了一個更新操作,則 binlog 里會自動寫入插入操作和更新操作的資料,這些資料會被 canal server 投遞至 kafka broker 里面;

  3. 如果 kafka 消費端先消費到了更新日志,后消費到插入日志,則在往目標表里做操作時就會因為資料缺失導致發生例外,

三. 移山實時同步服務是怎么保證訊息的順序性

實時同步服務訊息處理整體流程如下:

我們主要通過以下兩個方面去保障保證訊息的順序性,

1. 將需要保證順序的訊息發送到同一個partition

1.1 kafka的同一個partition內的訊息是有序的
  • kafka 的同一個 partition 用一個write ahead log組織, 是一個有序的佇列,所以可以保證FIFO的順序;

  • 因此生產者按照一定的順序發送訊息,broker 就會按照這個順序把訊息寫入 partition,消費者也會按照相同的順序去讀取訊息;

  • kafka 的每一個 partition 不會同時被兩個消費者實體消費,由此可以保證訊息消費的順序性,

1.2 控制同一key分發到同一partition

要保證同一個訂單的多次修改到達 kafka 里的順序不能亂,可以在Producer 往 kafka 插入資料時,控制同一個key (可以采用訂單主鍵key-hash演算法來實作)發送到同一 partition,這樣就能保證同一筆訂單都會落到同一個 partition 內,

1.3 canal 需要做的配置

canal 目前支持的mq有kafka/rocketmq,本質上都是基于本地檔案的方式來支持了磁區級的順序訊息的能力,我們只需在配置 instance 的時候開啟如下配置即可:

1> canal.properties

# leader節點會等待所有同步中的副本確認之后再確認這條記錄是否發送完成
canal.mq.acks = all

備注:

  • 這樣只要至少有一個同步副本存在,記錄就不會丟失,

2> instance.properties

1 # 散列模式的磁區數
2 canal.mq.partitionsNum=2
3 # 散列規則定義 庫名.表名: 唯一主鍵,多個表之間用逗號分隔
4 canal.mq.partitionHash=test.lyf_canal_test:id

備注:

  • 同一條資料的增刪改操作 產生的 binlog 資料都會寫到同一個磁區內;

  • 查看指定topic的指定磁區的訊息,可以使用如下命令:

    bin/kafka-console-consumer.sh --bootstrap-server serverlist --topic topicname --from-beginning --partition 0

2. 通過日志時間戳和日志偏移量進行亂序處理

將同一個訂單資料通過指定key的方式發送到同一個 partition 可以解決大部分情況下的資料亂序問題,

2.1 特殊場景

對于一個有著先后順序的訊息A、B,正常情況下應該是A先發送完成后再發送B,但是在例外情況下:

  • A發送失敗了,B發送成功,而A由于重試機制在B發送完成之后重試發送成功了;

  • 這時對于本身順序為AB的訊息順序變成了BA,

移山的實時同步服務會在將訂閱到的資料存入HBase之前再加一層亂序處理 ,

2.2 binlog里的兩個重要資訊

使用 mysqlbinlog 查看 binlog:

/usr/bin/mysqlbinlog --base64-output=decode-rows -v /var/lib/mysql/mysql-bin.000001

執行時間和偏移量:

備注:

  1. 每條資料都會有執行時間和偏移量這兩個重要資訊,下邊的校驗邏輯核心正是借助了這兩個值

  2. 執行的sql 陳述句在 binlog 中是以base64編碼格式存盤的,如果想查看sql 陳述句,需要加上:--base64-output=decode-rows -v 引數來解碼;

  3. 偏移量:

    • Position 就代表 binlog 寫到這個偏移量的地方,也就是寫了這么多位元組,即當前 binlog 檔案的大小;

    • 也就是說后寫入資料的 Position 肯定比先寫入資料的 Position 大,因此可以根據 Position 大小來判斷訊息的順序,

3.訊息亂序處理演示

3.1 在訂閱表里插入一條資料,然后再做兩次更新操作:
MariaDB [test]> insert into lyf_canal_test (name,status,content) values('demo1',1,'demo1 test');
Query OK, 1 row affected (0.00 sec)
 
MariaDB [test]> update lyf_canal_test set name = 'demo update' where id = 13;
Query OK, 1 row affected (0.00 sec)
Rows matched: 1 Changed: 1 Warnings: 0
 
MariaDB [test]> update lyf_canal_test set name = 'demo update2',content='second update',status=2 where id = 13;
Query OK, 1 row affected (0.00 sec)
3.2 產生三條需要保證順序的訊息

插入,第一次更新,第二次更新這三次操作產生的 binlog 被 canal server 推送至 kafka 中的訊息分別稱為:訊息A,訊息B,訊息C

  • 訊息A: 

  • 訊息B: 

  • 訊息C: 

3.3 網路原因造成訊息亂序

假設由于不可知的網路原因:

  • kafka broker收到的三條訊息分別為:訊息A,訊息C,訊息B

  • 則 kafka 消費端消費到的這三條訊息先后順序就是:訊息A,訊息C,訊息B

  • 這樣就造成了訊息的亂序,因此訂閱到的資料在存入目標表前必須得加亂序校驗處理

3.4 訊息亂序處理邏輯

我們利用HBase的特性,將資料主鍵做為目標表的 rowkey,當 kafka 消費端消費到資料時,亂序處理主要流程(摘自禧云數芯大資料平臺技術白皮書)如下:

demo的三條訊息處理流程如下: 
1> 判斷訊息A 的主鍵id做為rowkey在hbase的目標表中不存在,則將訊息A的資料直接插入HBase: 

2> 訊息C 的主鍵id做為rowkey,已經在目標表中存在,則這時需要拿訊息C 的執行時間和表中存盤的執行時間去判斷:

  • 如果訊息C 中的執行時間小于表中存盤的執行時間,則證明訊息C 是重復訊息或亂序的訊息,直接丟棄;

  • 訊息C 中的執行時間大于表中存盤的執行時間,則直接更新表資料(本demo即符合該種場景): 

  • 訊息C 中的執行時間等于表中存盤的執行時間,則這時需要拿訊息C 的偏移量和表中存盤的偏移量去判斷:

    • 訊息C 中的偏移量小于表中存盤的偏移量,則證明訊息C 是重復訊息,直接丟棄;

    • 訊息C 中的偏移量大于等于表中存盤的偏移量,則直接更新表資料,

3> 訊息B 的主鍵id做為rowkey,已經在目標表中存在,則這時需要拿訊息B 的執行時間和表中存盤的執行時間去判斷:

  • 由于訊息B中的執行時間小于表中存盤的執行時間(即訊息C 的執行時間),因此訊息B 直接丟棄,

3.5 主要代碼

kafka 消費端將消費到的訊息進行格式化處理和組裝,并借助 HBase-client API 來完成對 HBase 表的操作,

1> 使用Put組裝單行資料

/**
* 包名: org.apache.hadoop.hbase.client.Put
* hbaseData 為從binlog訂閱到的資料,通過回圈,為目標HBase表
* 添加rowkey、列簇、列資料,
* 作用:用來對單個行執行加入操作,
*/
Put put = new Put(Bytes.toBytes(hbaseData.get("id")));
// hbaseData 為從binlog訂閱到的資料,通過回圈,為目標HBase表添加列簇和列
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes(mapKey), Bytes.toBytes(hbaseData.get(mapKey)));

 

2> 使用 checkAndMutate,更新HBase表的資料

只有服務端對應rowkey的列資料與預期的值符合期望條件(大于、小于、等于)時,才會將put操作提交至服務端,

// 如果 update_info(列族) execute_time(列) 不存在值就插入資料,如果存在則回傳false
boolean res1 = table.checkAndMutate(Bytes.toBytes(hbaseData.get("id")), Bytes.toBytes("update_info")) .qualifier(Bytes.toBytes("execute_time")).ifNotExists().thenPut(put);
 
// 如果存在,則去比較執行時間
if (!res1) {
// 如果本次傳遞的執行時間大于HBase中的執行時間,則插入put
boolean res2 =table.checkAndPut(Bytes.toBytes(hbaseData.get("id")), Bytes.toBytes("update_info"),
Bytes.toBytes("execute_time"), CompareFilter.CompareOp.GREATER, Bytes.toBytes(hbaseData.get("execute_time")),put);
 
// 執行時間相等時,則去比較偏移量,本次傳遞的值大于HBase中的值則插入put
if (!res2) {
boolean res3 = table.checkAndPut(Bytes.toBytes(hbaseData.get("id")),
Bytes.toBytes("update_info"), Bytes.toBytes("execute_position"), CompareFilter.CompareOp.GREATER, Bytes.toBytes(hbaseData.get("execute_position")),put);
}
}

 

四.總結

  1. 目前移山的實時同步服務,kafka 消費端是使用一個執行緒去消費資料;

  2. 如果將來有版本升級需求,將消費端改為多個執行緒去消費資料時,要考慮到多執行緒消費時有序的訊息會被打亂這種情況的解決辦法,

關注微信公眾號

歡迎大家關注我的微信公眾號閱讀更多文章:

轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/1025.html

標籤:架構設計

上一篇:JS基礎回顧_滾動條

下一篇:實時資料同步服務(canal+kafka)的架構設計

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 面試突擊第一季,第二季,第三季

    第一季必考 https://www.bilibili.com/video/BV1FE411y79Y?from=search&seid=15921726601957489746 第二季分布式 https://www.bilibili.com/video/BV13f4y127ee/?spm_id_fro ......

    uj5u.com 2020-09-10 05:35:24 more
  • 第三單元作業總結

    1.前言 這應該是本學期最后一次寫作業總結了吧。總體來說,對作業的節奏也差不多掌握了,作業做起來的效率也更高了。雖然和之前的作業一樣,作業中都要用到新的知識,但是相比之前,更加懂得了如何利用工具以及資料。雖然之間卡過殼,但總體而言,這幾次作業還算完成的比較好。 2.作業程序總結 相比前兩個單元,此單 ......

    uj5u.com 2020-09-10 05:35:41 more
  • 北航OO(2020)第四單元博客作業暨課程總結博客

    北航OO(2020)第四單元博客作業暨課程總結博客 本單元作業的架構設計 在本單元中,由于UML圖具有比較清晰的樹形結構,因此我對其中需要進行查詢操作的元素進行了包裝,在樹的父節點中存盤所有孩子的參考。考慮到性能問題,我采用了快取機制,一次查詢后盡可能快取已經遍歷過的資訊,以減少遍歷次數。 本單元我 ......

    uj5u.com 2020-09-10 05:35:48 more
  • BUAA_OO_第四單元

    一、UML決議器設計 ? 先看下題目:第四單元實作一個基于JDK 8帶有效性檢查的UML(Unified Modeling Language)類圖,順序圖,狀態圖分析器 MyUmlInteraction,實際上我們要建立一個有向圖模型,UML中的物件(元素)可能與同級元素連接,也可與低級元素相連形成 ......

    uj5u.com 2020-09-10 05:35:54 more
  • 6.1邏輯運算子

    邏輯運算子 1. && 短路與 運算式1 && 運算式2 01.運算式1為true并且運算式2也為true 整體回傳為true 02.運算式1為false,將不會執行運算式2 整體回傳為false 03.只要有一個運算式為false 整體回傳為false 2. || 短路或 運算式1 || 運算式2 ......

    uj5u.com 2020-09-10 05:35:56 more
  • BUAAOO 第四單元 & 課程總結

    1. 第四單元:StarUml檔案決議 本單元采用了圖模型決議UML。 UML檔案可以抽象為圖、子圖、邊的邏輯結構。 在實作中,圖的節點包括類、介面、屬性,子圖包括狀態圖、順序圖等。 采用了三次遍歷UML元素的方法建圖,第一遍遍歷建點,第二、三次遍歷設定屬性、連邊,實作圖物件的初始化。這里借鑒了一些 ......

    uj5u.com 2020-09-10 05:36:06 more
  • 談談我對C# 多型的理解

    面向物件三要素:封裝、繼承、多型。 封裝和繼承,這兩個比較好理解,但要理解多型的話,可就稍微有點難度了。今天,我們就來講講多型的理解。 我們應該經常會看到面試題目:請談談對多型的理解。 其實呢,多型非常簡單,就一句話:呼叫同一種方法產生了不同的結果。 具體實作方式有三種。 一、多載 多載很簡單。 p ......

    uj5u.com 2020-09-10 05:36:09 more
  • Python 資料驅動工具:DDT

    背景 python 的unittest 沒有自帶資料驅動功能。 所以如果使用unittest,同時又想使用資料驅動,那么就可以使用DDT來完成。 DDT是 “Data-Driven Tests”的縮寫。 資料:http://ddt.readthedocs.io/en/latest/ 使用方法 dd. ......

    uj5u.com 2020-09-10 05:36:13 more
  • Python里面的xlrd模塊詳解

    那我就一下面積個問題對xlrd模塊進行學習一下: 1.什么是xlrd模塊? 2.為什么使用xlrd模塊? 3.怎樣使用xlrd模塊? 1.什么是xlrd模塊? ?python操作excel主要用到xlrd和xlwt這兩個庫,即xlrd是讀excel,xlwt是寫excel的庫。 今天就先來說一下xl ......

    uj5u.com 2020-09-10 05:36:28 more
  • 當我們創建HashMap時,底層到底做了什么?

    jdk1.7中的底層實作程序(底層基于陣列+鏈表) 在我們new HashMap()時,底層創建了默認長度為16的一維陣列Entry[ ] table。當我們呼叫map.put(key1,value1)方法向HashMap里添加資料的時候: 首先,呼叫key1所在類的hashCode()計算key1 ......

    uj5u.com 2020-09-10 05:36:38 more
最新发布
  • 【中介者設計模式詳解】C/Java/JS/Go/Python/TS不同語言實作

    * 中介者模式是一種行為型設計模式,它可以用來減少類之間的直接依賴關系,
    * 將物件之間的通信封裝到一個中介者物件中,從而使得各個物件之間的關系更加松散。
    * 在中介者模式中,物件之間不再直接相互互動,而是通過中介者來中轉訊息。 ......

    uj5u.com 2023-04-20 08:20:47 more
  • 露天煤礦現場調研和交流案例分享

    他們集團的資訊化公司及研究院在一個礦區正在做智能礦山的統一平臺的 試點,專案投資大概1億,包括了礦山的各方面的內容,顯示得我們這次交流有點多余。他們2年前開始做智能礦山的規劃,有很多煤礦行業專家的加持,他們的描述是非常完美,但是去年底應該上線的平臺,現在還沒有看到影子。他們確實有很多場景需求,但是被... ......

    uj5u.com 2023-04-20 08:20:25 more
  • 《社區人員管理》實戰案例設計&個人案例分享

    設計是一個讓人夢想成真程序,開始編碼、測驗、除錯之前進行需求分析和架構設計,才能保證關鍵方面都做正確 ......

    uj5u.com 2023-04-20 08:20:17 more
  • 軟體架構生態化-多角色交付的探索實踐

    作為一個技術架構師,不僅僅要緊跟行業技術趨勢,還要結合研發團隊現狀及痛點,探索新的交付方案。在日常中,你是否遇到如下問題 “ 業務需求排期長研發是瓶頸;非研發角色感受不到研發技改提效的變化;引入ISV 團隊又擔心質量和安全,培訓周期長“等等,基于此我們探索了一種新的技術體系及交付方案來解決如上問題。 ......

    uj5u.com 2023-04-20 08:20:10 more
  • 【中介者設計模式詳解】C/Java/JS/Go/Python/TS不同語言實作

    * 中介者模式是一種行為型設計模式,它可以用來減少類之間的直接依賴關系,
    * 將物件之間的通信封裝到一個中介者物件中,從而使得各個物件之間的關系更加松散。
    * 在中介者模式中,物件之間不再直接相互互動,而是通過中介者來中轉訊息。 ......

    uj5u.com 2023-04-20 08:19:44 more
  • 露天煤礦現場調研和交流案例分享

    他們集團的資訊化公司及研究院在一個礦區正在做智能礦山的統一平臺的 試點,專案投資大概1億,包括了礦山的各方面的內容,顯示得我們這次交流有點多余。他們2年前開始做智能礦山的規劃,有很多煤礦行業專家的加持,他們的描述是非常完美,但是去年底應該上線的平臺,現在還沒有看到影子。他們確實有很多場景需求,但是被... ......

    uj5u.com 2023-04-20 08:19:07 more
  • 《社區人員管理》實戰案例設計&個人案例分享

    設計是一個讓人夢想成真程序,開始編碼、測驗、除錯之前進行需求分析和架構設計,才能保證關鍵方面都做正確 ......

    uj5u.com 2023-04-20 08:18:57 more
  • 軟體架構生態化-多角色交付的探索實踐

    作為一個技術架構師,不僅僅要緊跟行業技術趨勢,還要結合研發團隊現狀及痛點,探索新的交付方案。在日常中,你是否遇到如下問題 “ 業務需求排期長研發是瓶頸;非研發角色感受不到研發技改提效的變化;引入ISV 團隊又擔心質量和安全,培訓周期長“等等,基于此我們探索了一種新的技術體系及交付方案來解決如上問題。 ......

    uj5u.com 2023-04-20 08:18:49 more
  • 05單件模式

    #經典的單件模式 public class Singleton { private static Singleton uniqueInstance; //一個靜態變數持有Singleton類的唯一實體。 // 其他有用的實體變數寫在這里 //構造器宣告為私有,只有Singleton可以實體化這個類! ......

    uj5u.com 2023-04-19 08:42:51 more
  • 【架構與設計】常見微服務分層架構的區別和落地實踐

    軟體工程的方方面面都遵循一個最基本的道理:沒有銀彈,架構分層模型更是如此,每一種都有各自優缺點,所以請根據不同的業務場景,并遵循簡單、可演進這兩個重要的架構原則選擇合適的架構分層模型即可。 ......

    uj5u.com 2023-04-19 08:42:41 more