主頁 > 後端開發 > Kafka如果丟了訊息,怎么處理的?

Kafka如果丟了訊息,怎么處理的?

2021-01-29 06:31:53 後端開發

Kafka存在丟訊息的問題,訊息丟失會發生在Broker,Producer和Consumer三種,Java面試寶典PDF完整版

Broker

Broker丟失訊息是由于Kafka本身的原因造成的,kafka為了得到更高的性能和吞吐量,將資料異步批量的存盤在磁盤中,訊息的刷盤程序,為了提高性能,減少刷盤次數,kafka采用了批量刷盤的做法,即,按照一定的訊息量,和時間間隔進行刷盤,這種機制也是由于linux作業系統決定的,將資料存盤到linux作業系統種,會先存盤到頁快取(Page cache)中,按照時間或者其他條件進行刷盤(從page cache到file),或者通過fsync命令強制刷盤,資料在page cache中時,如果系統掛掉,資料會丟失,

Broker在linux服務器上高速讀寫以及同步到Replica

上圖簡述了broker寫資料以及同步的一個程序,broker寫資料只寫到PageCache中,而pageCache位于記憶體,這部分資料在斷電后是會丟失的,pageCache的資料通過linux的flusher程式進行刷盤,刷盤觸發條件有三:

  • 主動呼叫sync或fsync函式

  • 可用記憶體低于閥值

  • dirty data時間達到閥值,dirty是pagecache的一個標識位,當有資料寫入到pageCache時,pagecache被標注為dirty,資料刷盤以后,dirty標志清除,

Broker配置刷盤機制,是通過呼叫fsync函式接管了刷盤動作,從單個Broker來看,pageCache的資料會丟失,

Kafka沒有提供同步刷盤的方式,同步刷盤在RocketMQ中有實作,實作原理是將異步刷盤的流程進行阻塞,等待回應,類似ajax的callback或者是java的future,下面是一段rocketmq的原始碼,

GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); // 刷盤

也就是說,理論上,要完全讓kafka保證單個broker不丟失訊息是做不到的,只能通過調整刷盤機制的引數緩解該情況,比如,減少刷盤間隔,減少刷盤資料量大小,時間越短,性能越差,可靠性越好(盡可能可靠),這是一個選擇題,

為了解決該問題,kafka通過producer和broker協同處理單個broker丟失引數的情況,一旦producer發現broker訊息丟失,即可自動進行retry,除非retry次數超過閥值(可配置),訊息才會丟失,此時需要生產者客戶端手動處理該情況,那么producer是如何檢測到資料丟失的呢?是通過ack機制,類似于http的三次握手的方式,

The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following settings are allowed: acks=0 If set to zero then the producer will not wait for any acknowledgment from the server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case, and the retries configuration will not take effect (as the client won’t generally know of any failures). The offset given back for each record will always be set to -1. acks=1 This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have replicated it then the record will be lost. acks=allThis means the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee. This is equivalent to the acks=-1 setting. kafka.apache.org/20/document…

以上的參考是kafka官方對于引數acks的解釋(在老版本中,該引數是request.required.acks),

  • acks=0,producer不等待broker的回應,效率最高,但是訊息很可能會丟,

  • acks=1,leader broker收到訊息后,不等待其他follower的回應,即回傳ack,也可以理解為ack數為1,此時,如果follower還沒有收到leader同步的訊息leader就掛了,那么訊息會丟失,按照上圖中的例子,如果leader收到訊息,成功寫入PageCache后,會回傳ack,此時producer認為訊息發送成功,但此時,按照上圖,資料還沒有被同步到follower,如果此時leader斷電,資料會丟失,

  • acks=-1,leader broker收到訊息后,掛起,等待所有ISR串列中的follower回傳結果后,再回傳ack,-1等效與 all ,這種配置下,只有leader寫入資料到pagecache是不會回傳ack的,還需要所有的ISR回傳“成功”才會觸發ack,如果此時斷電,producer可以知道訊息沒有被發送成功,將會重新發送,如果在follower收到資料以后,成功回傳ack,leader斷電,資料將存在于原來的follower中,在重新選舉以后,新的leader會持有該部分資料,資料從leader同步到follower,需要2步:

  1. 資料從pageCache被刷盤到disk,因為只有disk中的資料才能被同步到replica,

  2. 資料同步到replica,并且replica成功將資料寫入PageCache,在producer得到ack后,哪怕是所有機器都停電,資料也至少會存在于leader的磁盤內,

上面第三點提到了ISR的串列的follower,需要配合另一個引數才能更好的保證ack的有效性,ISR是Broker維護的一個“可靠的follower串列”,in-sync Replica串列,broker的配置包含一個引數:min.insync.replicas,該引數表示ISR中最少的副本數,如果不設定該值,ISR中的follower串列可能為空,此時相當于acks=1,

如上圖中:

  • acks=0,總耗時f(t) = f(1),

  • acks=1,總耗時f(t) = f(1) + f(2),

  • acks=-1,總耗時f(t) = f(1) + max( f(A) , f(B) ) + f(2),

性能依次遞減,可靠性依次升高,

Producer

Producer丟失訊息,發生在生產者客戶端,

為了提升效率,減少IO,producer在發送資料時可以將多個請求進行合并后發送,被合并的請求咋發送一線快取在本地buffer中,快取的方式和前文提到的刷盤類似,producer可以將請求打包成“塊”或者按照時間間隔,將buffer中的資料發出,通過buffer我們可以將生產者改造為異步的方式,而這可以提升我們的發送效率,

但是,buffer中的資料就是危險的,在正常情況下,客戶端的異步呼叫可以通過callback來處理訊息發送失敗或者超時的情況,但是,一旦producer被非法的停止了,那么buffer中的資料將丟失,broker將無法收到該部分資料,又或者,當Producer客戶端記憶體不夠時,如果采取的策略是丟棄訊息(另一種策略是block阻塞),訊息也會被丟失,抑或,訊息產生(異步產生)過快,導致掛起執行緒過多,記憶體不足,導致程式崩潰,訊息丟失,

producer采取批量發送的示意圖

異步發送訊息生產速度過快的示意圖

根據上圖,可以想到幾個解決的思路:

  • 異步發送訊息改為同步發送消,或者service產生訊息時,使用阻塞的執行緒池,并且執行緒數有一定上限,整體思路是控制訊息產生速度,

  • 擴大Buffer的容量配置,這種方式可以緩解該情況的出現,但不能杜絕,

  • service不直接將訊息發送到buffer(記憶體),而是將訊息寫到本地的磁盤中(資料庫或者檔案),由另一個(或少量)生產執行緒進行訊息發送,相當于是在buffer和service之間又加了一層空間更加富裕的緩沖層,

Consumer

Consumer消費訊息有下面幾個步驟:

  1. 接收訊息

  2. 處理訊息

  3. 反饋“處理完畢”(commited)

Consumer的消費方式主要分為兩種:

  • 自動提交offset,Automatic Offset Committing

  • 手動提交offset,Manual Offset Control

Consumer自動提交的機制是根據一定的時間間隔,將收到的訊息進行commit,commit程序和消費訊息的程序是異步的,也就是說,可能存在消費程序未成功(比如拋出例外),commit訊息已經提交了,此時訊息就丟失了,

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
// 自動提交開關
props.put("enable.auto.commit", "true");
// 自動提交的時間間隔,此處是1s
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
        // 呼叫poll后,1000ms后,訊息狀態會被改為 committed
 ConsumerRecords<String, String> records = consumer.poll(100);
 for (ConsumerRecord<String, String> record : records)
  insertIntoDB(record); // 將訊息入庫,時間可能會超過1000ms
}

上面的示例是自動提交的例子,如果此時,insertIntoDB(record)發生例外,訊息將會出現丟失,接下來是手動提交的例子:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
// 關閉自動提交,改為手動提交
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
        // 呼叫poll后,不會進行auto commit
   ConsumerRecords<String, String> records = consumer.poll(100);
   for (ConsumerRecord<String, String> record : records) {
    buffer.add(record);
   }
   if (buffer.size() >= minBatchSize) {
    insertIntoDb(buffer);
                // 所有訊息消費完畢以后,才進行commit操作
    consumer.commitSync();
    buffer.clear();
   }
 }

將提交型別改為手動以后,可以保證訊息“至少被消費一次”(at least once),但此時可能出現重復消費的情況,重復消費不屬于本篇討論范圍,

上面兩個例子,是直接使用Consumer的High level API,客戶端對于offset等控制是透明的,也可以采用Low level API的方式,手動控制offset,也可以保證訊息不丟,不過會更加復雜,Java面試寶典PDF完整版

  try {
     while(running) {
         ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
         for (TopicPartition partition : records.partitions()) {
             List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
             for (ConsumerRecord<String, String> record : partitionRecords) {
                 System.out.println(record.offset() + ": " + record.value());
             }
             long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
             // 精確控制offset
             consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
         }
     }
  } finally {
    consumer.close();
  }



轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/253808.html

標籤:Java

上一篇:JDK8(一)Object

下一篇:Mysql優化操作學習紀錄

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 【C++】Microsoft C++、C 和匯編程式檔案

    ......

    uj5u.com 2020-09-10 00:57:23 more
  • 例外宣告

    相比于斷言適用于排除邏輯上不可能存在的狀態,例外通常是用于邏輯上可能發生的錯誤。 例外宣告 Item 1:當函式不可能拋出例外或不能接受拋出例外時,使用noexcept 理由 如果不打算拋出例外的話,程式就會認為無法處理這種錯誤,并且應當盡早終止,如此可以有效地阻止例外的傳播與擴散。 示例 //不可 ......

    uj5u.com 2020-09-10 00:57:27 more
  • Codeforces 1400E Clear the Multiset(貪心 + 分治)

    鏈接:https://codeforces.com/problemset/problem/1400/E 來源:Codeforces 思路:給你一個陣列,現在你可以進行兩種操作,操作1:將一段沒有 0 的區間進行減一的操作,操作2:將 i 位置上的元素歸零。最終問:將這個陣列的全部元素歸零后操作的最少 ......

    uj5u.com 2020-09-10 00:57:30 more
  • UVA11610 【Reverse Prime】

    本人看到此題沒有翻譯,就附帶了一個自己的翻譯版本 思考 這一題,它的第一個要求是找出所有 $7$ 位反向質數及其質因數的個數。 我們應該需要質數篩篩選1~$10^{7}$的所有數,這里就不慢慢介紹了。但是,重讀題,我們突然發現反向質數都是 $7$ 位,而將它反過來后的數字卻是 $6$ 位數,這就說明 ......

    uj5u.com 2020-09-10 00:57:36 more
  • 統計區間素數數量

    1 #pragma GCC optimize(2) 2 #include <bits/stdc++.h> 3 using namespace std; 4 bool isprime[1000000010]; 5 vector<int> prime; 6 inline int getlist(int ......

    uj5u.com 2020-09-10 00:57:47 more
  • C/C++編程筆記:C++中的 const 變數詳解,教你正確認識const用法

    1、C中的const 1、區域const變數存放在堆疊區中,會分配記憶體(也就是說可以通過地址間接修改變數的值)。測驗代碼如下: 運行結果: 2、全域const變數存放在只讀資料段(不能通過地址修改,會發生寫入錯誤), 默認為外部聯編,可以給其他源檔案使用(需要用extern關鍵字修飾) 運行結果: ......

    uj5u.com 2020-09-10 00:58:04 more
  • 【C++犯錯記錄】VS2019 MFC添加資源不懂如何修改資源宏ID

    1. 首先在資源視圖中,添加資源 2. 點擊新添加的資源,復制自動生成的ID 3. 在解決方案資源管理器中找到Resource.h檔案,編輯,使用整個專案搜索和替換的方式快速替換 宏宣告 4. Ctrl+Shift+F 全域搜索,點擊查找全部,然后逐個替換 5. 為什么使用搜索替換而不使用屬性視窗直 ......

    uj5u.com 2020-09-10 00:59:11 more
  • 【C++犯錯記錄】VS2019 MFC不懂的批量添加資源

    1. 打開資源頭檔案Resource.h,在其中預先定義好宏 ID(不清楚其實ID值應該設定多少,可以先新建一個相同的資源項,再在這個資源的ID值的基礎上遞增即可) 2. 在資源視圖中選中專案資源,按F7編輯資源檔案,按 ID 型別 相對路徑的形式添加 資源。(別忘了先把檔案拷貝到專案中的res檔案 ......

    uj5u.com 2020-09-10 01:00:19 more
  • C/C++編程筆記:關于C++的參考型別,專供新手入門使用

    今天要講的是C++中我最喜歡的一個用法——參考,也叫別名。 參考就是給一個變數名取一個變數名,方便我們間接地使用這個變數。我們可以給一個變數創建N個參考,這N + 1個變數共享了同一塊記憶體區域。(參考型別的變數會占用記憶體空間,占用的記憶體空間的大小和指標型別的大小是相同的。雖然參考是一個物件的別名,但 ......

    uj5u.com 2020-09-10 01:00:22 more
  • 【C/C++編程筆記】從頭開始學習C ++:初學者完整指南

    眾所周知,C ++的學習曲線陡峭,但是花時間學習這種語言將為您的職業帶來奇跡,并使您與其他開發人員區分開。您會更輕松地學習新語言,形成真正的解決問題的技能,并在編程的基礎上打下堅實的基礎。 C ++將幫助您養成良好的編程習慣(即清晰一致的編碼風格,在撰寫代碼時注釋代碼,并限制類內部的可見性),并且由 ......

    uj5u.com 2020-09-10 01:00:41 more
最新发布
  • Rust中的智能指標:Box<T> Rc<T> Arc<T> Cell<T> RefCell<T> Weak

    Rust中的智能指標是什么 智能指標(smart pointers)是一類資料結構,是擁有資料所有權和額外功能的指標。是指標的進一步發展 指標(pointer)是一個包含記憶體地址的變數的通用概念。這個地址參考,或 ” 指向”(points at)一些其 他資料 。參考以 & 符號為標志并借用了他們所 ......

    uj5u.com 2023-04-20 07:24:10 more
  • Java的值傳遞和參考傳遞

    值傳遞不會改變本身,參考傳遞(如果傳遞的值需要實體化到堆里)如果發生修改了會改變本身。 1.基本資料型別都是值傳遞 package com.example.basic; public class Test { public static void main(String[] args) { int ......

    uj5u.com 2023-04-20 07:24:04 more
  • [2]SpinalHDL教程——Scala簡單入門

    第一個 Scala 程式 shell里面輸入 $ scala scala> 1 + 1 res0: Int = 2 scala> println("Hello World!") Hello World! 檔案形式 object HelloWorld { /* 這是我的第一個 Scala 程式 * 以 ......

    uj5u.com 2023-04-20 07:23:58 more
  • 理解函式指標和回呼函式

    理解 函式指標 指向函式的指標。比如: 理解函式指標的偽代碼 void (*p)(int type, char *data); // 定義一個函式指標p void func(int type, char *data); // 宣告一個函式func p = func; // 將指標p指向函式func ......

    uj5u.com 2023-04-20 07:23:52 more
  • Django筆記二十五之資料庫函式之日期函式

    本文首發于公眾號:Hunter后端 原文鏈接:Django筆記二十五之資料庫函式之日期函式 日期函式主要介紹兩個大類,Extract() 和 Trunc() Extract() 函式作用是提取日期,比如我們可以提取一個日期欄位的年份,月份,日等資料 Trunc() 的作用則是截取,比如 2022-0 ......

    uj5u.com 2023-04-20 07:23:45 more
  • 一天吃透JVM面試八股文

    什么是JVM? JVM,全稱Java Virtual Machine(Java虛擬機),是通過在實際的計算機上仿真模擬各種計算機功能來實作的。由一套位元組碼指令集、一組暫存器、一個堆疊、一個垃圾回收堆和一個存盤方法域等組成。JVM屏蔽了與作業系統平臺相關的資訊,使得Java程式只需要生成在Java虛擬機 ......

    uj5u.com 2023-04-20 07:23:31 more
  • 使用Java接入小程式訂閱訊息!

    更新完微信服務號的模板訊息之后,我又趕緊把微信小程式的訂閱訊息給實作了!之前我一直以為微信小程式也是要企業才能申請,沒想到小程式個人就能申請。 訊息推送平臺🔥推送下發【郵件】【短信】【微信服務號】【微信小程式】【企業微信】【釘釘】等訊息型別。 https://gitee.com/zhongfuch ......

    uj5u.com 2023-04-20 07:22:59 more
  • java -- 緩沖流、轉換流、序列化流

    緩沖流 緩沖流, 也叫高效流, 按照資料型別分類: 位元組緩沖流:BufferedInputStream,BufferedOutputStream 字符緩沖流:BufferedReader,BufferedWriter 緩沖流的基本原理,是在創建流物件時,會創建一個內置的默認大小的緩沖區陣列,通過緩沖 ......

    uj5u.com 2023-04-20 07:22:49 more
  • Java-SpringBoot-Range請求頭設定實作視頻分段傳輸

    老實說,人太懶了,現在基本都不喜歡寫筆記了,但是網上有關Range請求頭的文章都太水了 下面是抄的一段StackOverflow的代碼...自己大修改過的,寫的注釋挺全的,應該直接看得懂,就不解釋了 寫的不好...只是希望能給視頻網站開發的新手一點點幫助吧. 業務場景:視頻分段傳輸、視頻多段傳輸(理 ......

    uj5u.com 2023-04-20 07:22:42 more
  • Windows 10開發教程_編程入門自學教程_菜鳥教程-免費教程分享

    教程簡介 Windows 10開發入門教程 - 從簡單的步驟了解Windows 10開發,從基本到高級概念,包括簡介,UWP,第一個應用程式,商店,XAML控制元件,資料系結,XAML性能,自適應設計,自適應UI,自適應代碼,檔案管理,SQLite資料庫,應用程式到應用程式通信,應用程式本地化,應用程式 ......

    uj5u.com 2023-04-20 07:22:35 more