主頁 >  其他 > 復習一下Kafka

復習一下Kafka

2021-07-21 12:53:12 其他

一、前言

前幾天看了一篇《打造全球最大規模Kafka集群,UBER的多區域災備實踐》的一篇博客,Uber用kafka實踐,每天處理數億級訊息和幾個PB的資料,kafka現在成了Uber的技術堆疊的基石,并基于構建了一復雜的生態系統,它實作了kafka的多區部署、多區消費訊息和容災,今天我不重點說kafka在UBer的具體實踐,而是借用這個主題全面梳理一下kafka,重新復習一下,說到訊息佇列,耳熟能詳有rabbitMQ、activeMq、rocketMq、redis的MQ等等,在工程化越來越復雜的今天,訊息佇列已越來越重要也越來越常用,先來全域了解一下kafka:
在這里插入圖片描述

二、kafka簡介

2.1 什么是kafka

Kafka是一款分布式訊息發布和訂閱系統,它的特點是高性能、高吞吐量,最早設計的目的是作為LinkedIn的活動流和運營資料的處理管道,這些資料主要是用來對用戶做用戶畫像分析以及服務器性能資料的一些監控,所以kafka一開始設計的目標就是作為一個分布式、高吞吐量的訊息系統,所以適合運用在大資料傳輸場景,

2.2 Kafka的應用場景

由于kafka具有更好的吞吐量、內置磁區、冗余及容錯性的優點(kafka每秒可以處理幾十萬訊息),讓kafka成為了一個很好的大規模訊息處理應用的解決方案,所以在企業級應用長,主要會應用于如下幾個方面:

  • 行為跟蹤:kafka可以用于跟蹤用戶瀏覽頁面、搜索及其他行為,通過發布-訂閱模式實時記錄到對應的topic中,通過后端大資料平臺接入處理分析,并做更進一步的實時處理和監控;
  • 日志收集:日志收集方面,有很多比較優秀的產品,比如Apache Flume,很多公司使用kafka代理日志聚合,日志聚合表示從服務器上收集日志檔案,然后放到一個集中的平臺(檔案服務器)進行處理,在實際應用開發中,我們應用程式的log都會輸出到本地的磁盤上,排查問題的話通過linux命令來搞定,如果應用程式組成了負載均衡集群,并且集群的機器有幾十臺以上,那么想通過日志快速定位到問題,就是很麻煩的事情了,所以一般都會做一個日志統一收集平臺管理log日志用來快速查詢重要應用的問題,所以很多公司的套路都是把應用日志集中到kafka上,然后分別匯入到es和hdfs上,用來做實時檢索分析和離線統計資料備份等,而另一方面,kafka本身又提供了很好的api來集成日志并且做日志收集,

2.3 架構

一個典型的kafka集群包含若干Producer(可以是應用節點產生的訊息,也可以是通過Flume收集日志產生的事件),若干個Broker(kafka支持水平擴展)、若干個Consumer Group,以及一個zookeeper集群,kafka通過zookeeper管理集群配置及服務協同,Producer使用push模式將訊息發布到broker,consumer通過監聽使用pull模式從broker訂閱并消費訊息,多個broker協同作業,producer和consumer部署在各個業務邏輯中,三者通過zookeeper管理協調請求和轉發,這樣就組成了一個高性能的分布式訊息發布和訂閱系統,
在這里插入圖片描述

三、基礎知識

3.1 名詞解釋

  • 1)Broker
    Kafka集群包含一個或多個服務器,這種服務器被稱為broker,broker端不維護資料的消費狀態,提升了性能,直接使用磁盤進行存盤,線性讀寫,速度快:避免了資料在JVM記憶體和系統記憶體之間的復制,減少耗性能的創建物件和垃圾回收,
  • 2)Producer
    負責發布訊息到Kafka broker
  • 3)Consumer
    訊息消費者,向Kafka broker讀取訊息的客戶端,consumer從broker拉取(pull)資料并進行處理,
  • 4)Topic
    每條發布到Kafka集群的訊息都有一個類別,這個類別被稱為Topic,(物理上不同Topic的訊息分開存盤,邏輯上一個Topic的訊息雖然保存于一個或多個broker上但用戶只需指定訊息的Topic即可生產或消費資料而不必關心資料存于何處)
  • 5)Partition
    Parition是物理上的概念,每個Topic包含一個或多個Partition.
  • 6)Consumer Group
    每個Consumer屬于一個特定的Consumer Group(可為每個Consumer指定group name,若不指定group name則屬于默認的group)
  • 7)Topic & Partition
    Topic在邏輯上可以被認為是一個queue,每條消費都必須指定它的Topic,可以簡單理解為必須指明把這條訊息放進哪個queue里,為了使得Kafka的吞吐率可以線性提高,物理上把Topic分成一個或多個Partition,每個Partition在物理上對應一個檔案夾,該檔案夾下存盤這個Partition的所有訊息和索引檔案,

3.2 基礎引數說明

  • batch.size
    生產者發送多個訊息到broker上的同一個磁區時,為了減少網路請求帶來的性能開銷,通過批量的方式來提交訊息,可以通過這個引數來控制批量提交的位元組數大小,默認大小是16384byte,也就是16kb,意味著當一批訊息大小達到指定的batch.size的時候會統一發送
  • linger.ms
    Producer默認會把兩次發送時間間隔內收集到的所有Requests進行一次聚合然后再發送,以此提高吞吐量,而linger.ms就是為每次發送到broker的請求增加一些delay,以此來聚合更多的Message請求,這個有點想TCP里面的Nagle演算法,在TCP協議的傳輸中,為了減少大量小資料包的發送,采用了Nagle演算法,也就是基于小包的等-停協議,
    batch.size和linger.ms這兩個引數是kafka性能優化的關鍵引數,batch.size和linger.ms這兩者的作用是一樣的,如果兩個都配置了,那么怎么作業的呢?實際上,當二者都配置的時候,只要滿足其中一個要求,就會發送請求到broker上
  • group.id
    consumer group是kafka提供的可擴展且具有容錯性的消費者機制,既然是一個組,那么組內必然可以有多個消費者或消費者實體(consumer instance),它們共享一個公共的ID,即group ID,組內的所有消費者協調在一起來消費訂閱主題(subscribed topics)的所有磁區(partition),當然,每個磁區只能由同一個消費組內的一個consumer來消費.如下圖所示,分別有三個消費者,屬于兩個不同的group,那么對于firstTopic這個topic來說,這兩個組的消費者都能同時消費這個topic中的訊息,對于此時的架構來說,這個firstTopic就類似于ActiveMQ中的topic概念,如右圖所示,如果3個消費者都屬于同一個group,那么此時firstTopic就是一個Queue的概念
    在這里插入圖片描述
    在這里插入圖片描述
  • enable.auto.commit
    消費者消費位移的提交方式,true為自動提交,即consumer poll訊息后自動提交上次之前poll的所有訊息位移,若為false則需要手動提交,即consumer poll出的訊息需要手動提交訊息位移,提交訊息位移的方式有同步提交和異步提交,
  • auto.commit.interval.ms
    在enable.auto.commit 為true的情況下, 自動提交消費位移的間隔,默認值5000ms,那么消費者會在poll方法呼叫后每隔5000ms(由auto.commit.interval.ms指定)提交一次位移,和很多其 他操作一樣,自動提交消費位移也是由poll()方法來驅動的;在呼叫poll()時,消費者判斷是否到達提交時間(auto.commit.interval.ms指定的值),如果是則提交上一次poll回傳的最大位移,
  • auto.offset.reset
    這個引數是針對新的groupid中的消費者而言的,當有新groupid的消費者來消費指定的topic時,對于該引數的配置,會有不同的語意,
    auto.offset.reset=latest情況下,新的消費者將會從其他消費者最后消費的offset處開始消費Topic下的訊息,
    auto.offset.reset= earliest情況下,新的消費者會從該topic最早的訊息開始消費,
    auto.offset.reset=none情況下,新的消費者加入以后,由于之前不存在offset,則會直接拋出例外,
  • max.poll.records
    consumer是通過輪訓的方式使用poll()方法不斷獲取訊息的,max.poll.records引數可以限制每次呼叫poll回傳的訊息數,默認是500條,
  • max.poll.interval.ms
    默認值5分鐘,表示若5分鐘之內consumer沒有消費完上一次poll的訊息,也就是在5分鐘之內沒有呼叫下次的poll()函式,那么kafka會認為consumer已經宕機,所以會將該consumer踢出consumer group,緊接著就會發生rebalance,發生rebalance可能會發生重復消費的情況,

四、關于Topic和Partition

4.1 Topic

在kafka中,topic是一個存盤訊息的邏輯概念,可以認為是一個訊息集合,每條訊息發送到kafka集群的訊息都有一個類別,物理上來說,不同的topic的訊息是分開存盤的,每個topic可以有多個生產者向它發送訊息,也可以有多個消費者去消費其中的訊息,
在這里插入圖片描述

4.2 Partition(磁區)

??每個topic可以劃分多個磁區(每個Topic至少有一個磁區),同一topic下的不同磁區包含的訊息是不同的,那么為什么要設定多partition呢?第一磁區存盤可以存盤更多的訊息,其次是為了提高吞吐量,如果只有一個partition,則所有訊息只能存盤在該partition內,消費時不管有多少個消費者也只能順序讀取該partition內的訊息,如果是多個partition,那么消費者就可以同時從多個partition內并發讀取訊息,正是這個原因才提高了吞吐量,
??每個訊息在被添加到磁區時,都會被分配一個offset(稱之為偏移量),它是訊息在此磁區中的唯一編號,kafka通過offset保證訊息在磁區內的順序,offset的順序不跨磁區,即kafka只保證在同一個磁區內的訊息是有序的,
??在多partition和多consumer的情況下,生產的訊息是具有順序性的,且根據partition的分發策略依次插入到相應的partition中,但是由于kafak只保證同一個partition內的訊息輸出有序性,所以多partition依次輸出的訊息順序并不能保證和生產訊息寫入的順序是一樣的,
??下圖中,對于名字為test的topic,做了3個磁區,分別是p0、p1、p2,每一條訊息發送到broker時,會根據partition的規則選擇存盤到哪一個partition,如果partition規則設定合理,那么所有的訊息會均勻的分布在不同的partition中,這樣就有點類似資料庫的分庫分表的概念,把資料做了分片處理,
在這里插入圖片描述

4.3 訊息分發消費&磁區策略

  • 4.3.1 生產者訊息分發策略

?? 訊息是kafka中最基本的資料單元,在kafka中一條訊息由key、value兩部分構成,在發送一條訊息時,我們可以指定這個key,那么producer會根據key和partition機制來判斷當前這條訊息應該發送并存盤到哪個partition中,我們可以根據需要進行擴展producer的partition機制,
?? 默認情況下,kafka采用的是hash取模的磁區演算法,如果Key為null,則會隨機分配一個磁區,這個隨機是在這個引數”metadata.max.age.ms”的時間范圍內隨機選擇一個,對于這個時間段內,如果key為null,則只會發送到唯一的磁區,當然也可以自己定義磁區略,

  • 4.3.2 消費者消費原理

在實際生產程序中,每個topic都會有多個partitions,多個partitions的好處在于,一方面能夠對broker上的資料進行分片有效減少了訊息的容量從而提升io性能,另外一方面,提高了消費端的消費能力,如果只有一個partitions,那么多consumer也只能順序讀取該partitions內的訊息,如果是多個partitions的話,那么多consumer就可以從多partitions并發生讀取topic訊息,這樣就提高了訊息斷的消費能力,所以一般會設定多個consumer去消費同一個topic的多個partitions, 也就是消費端的負載均衡機制,
??在多個partition以及多個consumer的情況下,消費者是如何消費訊息的,kafka存在consumer group的概念,也就是group.id一樣的consumer,這些consumer屬于一個consumer group,組內的所有消費者協調在一起來消費訂閱主題的所有磁區,當然每一個磁區只能由同一個消費組內的consumer來消費,那么同一個consumergroup里面的consumer是怎么去分配該消費哪個磁區里的資料的呢?如下圖所示,3個磁區,3個消費者,那么哪個消費者消分哪個磁區?
在這里插入圖片描述
對于上面這個圖來說,這3個消費者會分別消費test這個topic 的3個磁區,也就是每個consumer消費一個partition,

  • 演示1(3個partiton對應3個consumer)
    ? 創建一個帶3個磁區的topic
    ? 啟動3個消費者消費同一個topic,并且這3個consumer屬于同一個組
    ? 啟動發送者進行訊息發送
    演示結果:consumer1會消費partition0磁區、consumer2會消費partition1磁區、consumer3會消費partition2磁區,如果是2個consumer消費3個partition呢?會是怎么樣的結果?
  • 演示2(3個partiton對應2個consumer)
    ? 基于上面演示的案例的topic不變
    ? 啟動2個消費這消費該topic
    ? 啟動發送者進行訊息發送
    演示結果:consumer1會消費partition0/partition1磁區、consumer2會消費partition2磁區
  • 演示3(3個partition對應4個或以上consumer)
    演示結果:仍然只有3個consumer對應3個partition,其他的consumer無法消費訊息,通過這個演示的程序,引出接下來需要了解的kafka的磁區分配策略(Partition Assignment Strategy)
  • 4.3.3 consumer和partition的數量建議

如果consumer比partition多,是浪費,因為kafka的設計是在一個partition上是不允許并發的,所以consumer數不要大于partition數,如果consumer比partition少,一個consumer會對應于多個partitions,這里主要合理分配consumer數和partition數,否則會導致partition里面的資料被取的不均勻,最好partiton數目是consumer數目的整數倍,所以partition數目很重要,如果consumer從多個partition讀到資料,不保證資料間的順序性,kafka只保證在一個partition上資料是有序的,但多個partition,根據你讀的順序會有不同增減consumer,broker,partition會導致rebalance,所以rebalance后consumer對應的partition會發生變化

  • 4.3.4 磁區分配策略

同一個group中的消費者對于一個topic中的多個partition,存在一定的磁區分配策略,每個消費者都可以設定自己的磁區分配策略,對于消費組而言,會從各個消費者上報過來的磁區分配策略中選舉一個彼此都贊同的策略來實作整體的磁區分配,在kafka中,存在三種磁區分配策略,一種是Range(默認)、 另一種是RoundRobin(輪詢)、StickyAssignor(粘性),

  • 1)、RangeAssignor(范圍磁區)
    Range策略是對每個主題而言的,首先對同一個主題里面的磁區按照序號進行排序,并對消費者按照字母順序進行排序,
    假設我們有10個磁區,3個消費者,排完序的磁區將會是0, 1, 2, 3, 4, 5, 6, 7, 8, 9;消費者執行緒排完序將會是C1-0, C2-0, C3-0,然后將partitions的個數除于消費者執行緒的總數來決定每個消費者執行緒消費幾個磁區,如果除不盡,那么前面幾個消費者執行緒將會多消費一個磁區,在我們的例子里面,我們有10個磁區,3個消費者執行緒, 10 / 3 = 3,而且除不盡,那么消費者執行緒 C1-0 將會多消費一個磁區.
    結果看起來是這樣的:
    C1-0 將消費 0, 1, 2, 3 磁區
    C2-0 將消費 4, 5, 6 磁區
    C3-0 將消費 7, 8, 9 磁區
    假如我們有11個磁區,那么最后磁區分配的結果看起來是這樣的:
    C1-0 將消費 0, 1, 2, 3 磁區
    C2-0 將消費 4, 5, 6, 7 磁區
    C3-0 將消費 8, 9, 10 磁區
    假如我們有2個主題(T1和T2),分別有10個磁區,那么最后磁區分配的結果看起來是這樣的:
    C1-0 將消費 T1主題的 0, 1, 2, 3 磁區以及 T2主題的 0, 1, 2, 3磁區
    C2-0 將消費 T1主題的 4, 5, 6 磁區以及 T2主題的 4, 5, 6磁區
    C3-0 將消費 T1主題的 7, 8, 9 磁區以及 T2主題的 7, 8, 9磁區
    可以看出,C1-0 消費者執行緒比其他消費者執行緒多消費了2個磁區,這種分配方式明顯的一個問題是隨著消費者訂閱的Topic的數量的增加,不均衡的問題會越來越嚴重,所以最好的情況就是partiton數目是consumer數目的整數倍,可以有效避免這個弊端,
  • 2)、RoundRobin(輪詢)
    輪詢磁區策略是把所有partition和所有consumer執行緒都列出來,然后按照hashcode進行排序,注意上一種range磁區是針對每一個topic而言的,而輪訓磁區是相對于所有的partition和consumer而言的,最后通過輪詢演算法分配partition給消費執行緒,如果消費組內,所有消費者訂閱的Topic串列是相同的(每個消費者都訂閱了相同的Topic),那么分配結果是盡量均衡的(消費者之間分配到的磁區數的差值不會超過1),如果訂閱的Topic串列是不同的,那么分配結果是不保證“盡量均衡”的,因為某些消費者不參與一些Topic的分配,
    在我們的例子里面,假如按照 hashCode 排序完的topic-partitions組依次為T1-5, T1-3, T1-0, T1-8, T1-2, T1-1, T1-4, T1-7, T1-6, T1-9,我們的消費者執行緒排序為C1-0, C1-1, C2-0, C2-1(c1和c2 consumer group都訂閱了t1),最后磁區分配的結果為:
    C1-0 將消費 T1-5, T1-2, T1-6 磁區;
    C1-1 將消費 T1-3, T1-1, T1-9 磁區;
    C2-0 將消費 T1-0, T1-4 磁區;
    C2-1 將消費 T1-8, T1-7 磁區;
    相對于RangeAssignor,在訂閱多個Topic的情況下,RoundRobinAssignor的方式能消費者之間盡量均衡的分配到磁區(分配到的磁區數的差值不會超過1——RangeAssignor的分配策略可能隨著訂閱的Topic越來越多,差值越來越大)
    對于訂閱組內消費者訂閱Topic不一致的情況:假設有三個消費者分別為C1-0、C2-0、C3-0,有3個Topic T1、T2、T3,分別擁有1、2、3個磁區,并且C1-0訂閱T1,C2-0訂閱T1和T2,C3-0訂閱T1、T2、T3,那么RoundRobinAssignor的分配結果如下:
    在這里插入圖片描述
    看上去分配已經盡量的保證均衡了,不過可以發現C3-0承擔了4個磁區的消費而C2-0和C1-0都是承擔一個磁區,如果T2-1分配給c2-0,均衡性是不是更好呢?帶個這個問題,繼續下面的這次策略,
  • 3) StickyAssignor(粘性)
    盡管RoundRobinAssignor已經在RangeAssignor上做了一些優化來更均衡的分配磁區,但是在一些情況下依舊會產生嚴重的分配偏差,比如消費組中訂閱的Topic串列不相同的情況下,更核心的問題是無論是RangeAssignor,還是RoundRobinAssignor,當前的磁區分配演算法都沒有考慮上一次的分配結果,顯然,在執行一次新的分配之前,如果能考慮到上一次分配的結果,盡量少的調整磁區分配的變動,顯然是能節省很多開銷的,它主要有兩個目的:
  • 1、磁區的分配盡可能的均勻
  • 2、磁區的分配盡可能和上次分配保持相同,也就是rebalance之后磁區的分配盡量和之前的磁區分配相同,
    當兩者發生沖突時, 第 一 個目標優先于第二個目標, 第一個目標是每個分配演算法都盡量嘗試去完成的,而第二個目標才真正體現出StickyAssignor特性的,

五、磁區副本

5.1.1 副本機制

??我們已經知道Kafka的每個topic都可以分為多個Partition,并且同一topic的多個partition會均勻分布在集群的各個節點下,雖然這種方式能夠有效的對資料進行分片,但是對于每個partition來說,都是單點的,當其中一個partition不可用的時候,那么這部分訊息就沒辦法消費,所以kafka為了提高partition的可靠性而提供了副本的概念(Replica),通過副本機制來實作冗余備份,
?? 每個磁區可以有多個副本,并且在副本集合中會存在一個leader的副本,所有的讀寫請求都是由leader副本來進行處理,剩余的其他副本都做為follower副本,follower副本會從leader副本同步訊息日志,和redis cluster中的節點概念相同,leader副本為redis cluster中的主節點,follower副本為redis cluster中的備節點,
??一般情況下,同一個磁區的多個副本會被均勻分配到集群中的不同broker上,當leader副本所在的broker出現故障后,可以重新選舉新的leader副本繼續對外提供服務,通過這樣的副本機制來提高kafka集群的可用性,
在這里插入圖片描述
通常follower副本和leader副本不會在同一個broker上,這種是為了保證當leader副本所在broker宕機后,follower副本可繼續提供服務,

5.1.2 副本選舉機制

Kafka提供了資料復制演算法保證,如果leader副本所在的broker節點宕機或者出現故障,或者磁區的leader節點發生故障,這個時候怎么處理呢?kafka必須要保證從follower副本中選擇一個新的leader副本,那么kafka是如何實作選舉的呢?要了解leader選舉,我們需要了解幾個概念,Kafka磁區下有可能有很多個副本(replica)用于實作冗余,從而進一步實作高可用,副本根據角色的不同可分為3類:

  • leader副本:回應clients端讀寫請求的副本
  • follower副本:被動地備份leader副本中的資料,不能回應clients端讀寫請求,
  • ISR副本:Zookeeper中為每一個partition動態的維護了一個ISR,這個ISR里的所有replica都跟上了leader,只有ISR里的成員才能有被選為leader的可能,ISR副本包含了leader副本和所有與leader副本保持同步的follower副本,注意是和保持同步,不包含和leader副本沒保持同步的follower副本,
5.1.3 副本協同機制

剛剛提到了,訊息的讀寫操作都只會由leader節點來接收和處理,follower副本只負責同步資料以及當leader副本所在的broker掛了以后,會從ISR副本中的follower副本中選取新的leader,
寫請求首先由Leader副本處理,之后follower副本會從leader上拉取寫入的訊息,這個程序會有一定的延遲,導致follower副本中保存的訊息略少于leader副本,但是只要沒有超出閾值都可以容忍,但是如果一個follower副本出現例外,比如宕機、網路斷開等原因長時間沒有同步到訊息,那這個時候,leader就會把它踢出去,kafka通過ISR集合來維護一個磁區副本資訊
在這里插入圖片描述
一個新leader被選舉并被接受客戶端的訊息成功寫入,Kafka確保從同步副本串列中選舉一個副本為leader;leader負責維護和跟蹤ISR(in-Sync replicas , 副本同步佇列)中所有follower滯后的狀態,當producer發送一條訊息到broker后,leader寫入訊息并復制到所有follower,訊息提交之后才被成功復制到所有的同步副本,

5.1.4 ISR

ISR表示目前可用且訊息量與leader相差不多的副本集合,這是整個副本集合的一個子集,怎么去理解可用和相差不多這兩個詞呢?具體來說,ISR集合中的副本必須滿足兩個條件:

  • 1、副本所在節點必須維持著與zookeeper的連接
  • 2、副本最后一條訊息的offset與leader副本的最后一條訊息的offset之間的差值不能超過指定的閾值,(replica.lag.time.max.ms) replica.lag.time.max.ms:如果該follower在此時間間隔內一直沒有追上過leader的所有訊息,則該follower就會被剔除isr串列
  • 3、ISR資料保存在Zookeeper的 /brokers/topics//partitions//state 節點中
    follower副本把leader副本前的日志全部同步完成時,則認為follower副本已經追趕上了leader副本,這個時候會更新這個副本的lastCaughtUpTimeMs標識,kafka副本管理器會啟動一個副本過期檢查的定時任務,這個任務會定期檢查當前時間與副本的lastCaughtUpTimeMs的差值是否大于引數replica.lag.time.max.ms 的值,如果大于,則會把這個副本踢出ISR集合
    在這里插入圖片描述

5.1.5 所有副本不作業如何處理?

在ISR中至少有一個follower時,Kafka可以確保已經commit的資料不丟失,但如果某個Partition的所有Replica都宕機了,就無法保證資料不丟失了,這種情況下有兩種可行的方案:
1、等待ISR中的任一個Replica“活”過來,并且選它作為Leader
2、選擇第一個“活”過來的Replica(不一定是ISR中的)作為Leader,默認配置,
如果一定要等待ISR中的Replica“活”過來,那不可用的時間就可能會相對較長,而且如果ISR中的所有Replica都無法“活”過來了,或者資料都丟失了,這個Partition將永遠不可用,
選擇第一個“活”過來的Replica作為Leader,而這個Replica不是ISR中的Replica,那即使它并不保證已經包含了所有已commit的訊息,它也會成為Leader而作為consumer的資料源,默認情況下Kafka采用第二種策略,即unclean.leader.election.enable=true,也可以將此引數設定為false來啟用第一種策略,

5.1.6 副本資料同步原理

了解了副本的協同程序以后,還有一個最重要的機制,就是資料的同步程序,下圖中,深紅色部分表示test_replica磁區的leader副本,另外兩個節點上淺色部分表示follower副本
在這里插入圖片描述
Producer在發布訊息到某個Partition時,先通過ZooKeeper找到該Partition的Leader get /brokers/topics//partitions/2/state ,然后無論該Topic的Replication Factor為多少(也即該Partition有多少個Replica),Producer只將該訊息發送到該Partition的Leader,Leader會將該訊息寫入其本地Log,每個Follower都從Leader pull資料,這種方式上,Follower存盤的資料順序與Leader保持一致,Follower在收到該訊息并寫入其Log后,向Leader發送ACK,一旦Leader收到了ISR中的所有Replica的ACK,該訊息就被認為已經commit了,Leader將增加HW(HighWatermark)并且向Producer發送ACK,

  • LEO:即日志末端位移(log end offset),記錄了該副本底層日志(log)中下一條訊息的位移值,注意是下一條訊息!也就是說,如果LEO=10,那么表示該副本保存了10條訊息,位移值范圍是[0, 9],另外,leader LEO和follower LEO的更新是有區別的,可以看出leader副本和follower副本都有LEO,
  • HW:即所有follower副本中相對于leader副本最小的LEO值,HW是相對leader副本而言的,其HW值不會大于LEO值,小于等于HW值的所有訊息都被認為是“已備份”的(replicated),同理,leader副本和follower副本的HW更新是有區別的
    通過下面這幅圖來表達LEO、HW的含義,隨著follower副本不斷和leader副本進行資料同步,follower副本的LEO主鍵會后移并且追趕到leader副本,這個追趕上的判斷標準是當前副本的LEO是否大于或者等于leader副本的HW,如果follower在replica.lag.time.max.ms時間范圍內追趕上了leader副本,該follower副本則加入到ISR副本內,也可以使得之前被踢出的follower副本重新加入到ISR集合中;如果在replica.lag.time.max.ms時間范圍內follower副本沒追趕上leader副本,該follower副本會被從ISR副本范圍內踢出,可以看出ISR副本是一個由zookerper動態監控的變化的副本,

六、資料可靠性和持久性保證

6.1 生產者不丟數

當producer向leader發送資料時,可以通過request.required.acks引數來設定資料可靠性的級別:

  • request.required.acks=0
    producer寫入的一條訊息會立即回傳ack確認訊息,不管leader副本是否同步完或者ISR中的follower副本是否同步完,此配置丟失資料風險很大,生產環境很少使用,
  • request.required.acks=1(默認配置)
    producer寫入的一條訊息后會等到leader副本同步完成(不需要等到ISR內的follower副本同步完成)后立即回傳給客戶端ack訊息,該配置的風險是如果ISR內的follower副本還沒有完成資訊同步時,leader節點宕機了,然后通過選舉一個follower副本做為新的節點,此時就會有資料丟失的問題,相當于mysql的主從同步,優點就是可用性強,缺點就是弱一致性,可能造成資料丟失,
  • request.required.acks=-1
    producer寫入的一 條訊息需要等到磁區的leader 副本完成同步,且需要等待ISR集合中的所有follower副本都同步完之后才能回傳producer確認的ack,這樣就避免了部分資料被寫進了leader,還沒來得及被任何follower復制就宕機了,而造成資料丟失,類似于強一致性,追求強一致性也就意味著可用性(回應時間)會降低,設定成-1就可以保證寫入的資料不丟失了嗎?不一定,`比如當ISR中只有leader副本時(前面ISR那一節講到,ISR副本中的成員由于某些情況會增加也會減少,最少就只剩一個leader),當leader副本宕機后,所有資料丟失,
    為了避免資料的丟失,提高可靠性,避免ISR副本中只有一個leader副本情況的發生,可以使用引數min.insync.replicas來約束,該引數的意思是設定ISR中的最小副本數是多少,總數包含leader副本和follower副本之和,如果ISR中的副本數不夠引數min.insync.replicas所設定的值,客戶端會回傳例外,
    如果由于網路原因導致producer push資料失敗了,我們可以設定retries引數來進行重試,總結:producer訊息不丟失需要下面3中措施:
  • request.required.acks=-1
  • 設定min.insync.replicas引數
  • 設定retries引數

6.2 broker資料不丟失

上面已經介紹過unclean.leader.election.enable=false引數,
這里設定unclean.leader.election.enable=false,表示:如果ISR副本全部宕機后,等到ISR副本中的里一個副本啟動之后,并將他做為leader副本.

6.3 consumer資料不丟失

enable.auto.commit該引數默認為true,表明consumer在下次poll訊息時自動提交上次poll出的所有訊息的消費位移,如果設定為false,則需要用戶手動提交手動提交所有訊息的消費位移,

6.4 訊息重復消費和訊息丟失的場景

當 enable.auto.commit設定為true的時候會有訊息重復消費和訊息丟失的場景,
當應用端消費訊息時,還沒有提交消費位移的時候,此時kafka出現宕機,那么在kafka恢復之后,這些訊息將會重新被消費一遍,這就造成了重復消費,
比如consumer第一次poll出n條訊息進行消費,達到auto.commit.interval.ms時間后,cosumer會進行下一次poll并提交上次poll出的n條訊息的消費位移,如果第一次poll出的n條訊息客戶端還沒有消費完,此時客戶端宕機了,當客戶端重啟后,將會從第二次poll的位置開始拉取訊息,從而丟失第一次未提交消費位移的訊息,這就造成了資料丟失,

6.5 只能避免資料丟失而不能解決資料重復

當設定enable.auto.commit為false時,所有的訊息位移提交都為手動提交了,所有可以避免上面提到的資料丟失問題,可以保證consumer訊息時資料不會丟失,
手動提交有同步提交和異步提交,我們可以選擇在應用端處理完訊息后手動提交消費位移,如果在消費完訊息準備提交訊息位移的時候,應用端發生了宕機,那么重啟之后這些訊息還是會被重新消費一遍,所以通過配置enable.auto.commit引數為false只能避免消費端丟失訊息而不能避免消費端重復消費訊息.

七、訊息存盤持久化

訊息發送端發送訊息到broker上以后,訊息是如何持久化的呢?那么接下來去分析下訊息的存盤首先我們需要了解的是,kafka是使用日志檔案的方式來保存生產者和發送者的訊息,每條訊息都有一個offset值來表示它在磁區中的偏移量,Kafka中存盤的一般都是海量的訊息資料,為了避免日志檔案過大,Log并不是直接對應在一個磁盤上的日志檔案,而是對應磁盤上的一個目錄,這個目錄的命名規則是<topic_name>_<partition_id>

7.1 訊息的檔案存盤機制

一個topic的多個partition在物理磁盤上的保存路徑,路徑保存在 /tmp/kafka-logs/topic_partition,包含日志檔案、索引檔案和時間索引檔案
在這里插入圖片描述
kafka是通過分段的方式將Log分為多個LogSegment,LogSegment是一個邏輯上的概念,一個LogSegment對應磁盤上的一個日志檔案和一個索引檔案,其中日志檔案是用來記錄訊息的,索引檔案是用來保存訊息的索引,

7.2 LogSegment

假設kafka以partition為最小存盤單位,那么我們可以想象當kafka producer不斷發送訊息,必然會引起partition檔案的無線擴張,這樣對于訊息檔案的維護以及被消費的訊息的清理帶來非常大的挑戰,所以kafka 以segment為單位又把partition進行細分,每個partition相當于一個巨型檔案被平均分配到多個大小相等的segment資料檔案中(每個segment檔案中的訊息不一定相等),這種特性方便已經被消費的訊息的清理,提高磁盤的利用率,
segment file由2大部分組成,分別為index file和data file,此2個檔案一一對應,成對出現,后綴".index"和“.log”分別表示為segment索引檔案、資料檔案.
segment檔案命名規則:partion全域的第一個segment從0開始,后續每個segment檔案名為上一個segment檔案最后一條訊息的offset值進行遞增,數值最大為64位long大小,20位數字字符長度,沒有數字用0填充
在這里插入圖片描述

7.3 segment中index和log的對應關系

從所有分段中,找一個分段進行分析
為了提高查找訊息的性能,為每一個日志檔案添加2個索引索引檔案:OffsetIndex 和 TimeIndex,分別對應.index以及.timeindex, TimeIndex索引檔案格式:它是映射時間戳和相對offset
在這里插入圖片描述
如圖所示,index中存盤了索引以及物理偏移量, log存盤了訊息的內容,索引檔案的元資料執行對應資料檔案中message的物理偏移地址,舉個簡單的案例來說,以[4053,80899]為例,在log檔案中,對應的是第4053條記錄,物理偏移量(position)為80899. position是ByteBuffer的指標位置

7.4 如何通過offset查找message

查找的演算法是:

  • 1、根據offset的值,查找segment段中的index索引檔案,由于索引檔案命名是以上一個檔案的最后一個offset進行命名的,所以,使用二分查找演算法能夠根據offset快速定位到指定的索引檔案,
  • 2、找到索引檔案后,根據offset進行定位,找到索引檔案中的符合范圍的索引,(kafka采用稀疏索引的方式來提高查找性能)
  • 3、得到position以后,再到對應的log檔案中,從position出開始查找offset對應的訊息,將每條訊息的offset與目標offset進行比較,直到找到訊息
    比如說,我們要查找offset=2490這條訊息,那么先找到00000000000000000000.index, 然后找到[2487,49111]這個索引,再到log檔案中,根據49111這個position開始查找,比較每條訊息的offset是否大于等于2490,最后查找到對應的訊息以后回傳

7.5 日志的清除策略以及壓縮策略

1、日志清除策略

前面提到過,日志的分段存盤,一方面能夠減少單個檔案內容的大小,另一方面,方便kafka進行日志清理,日志的清理策略有兩個:

  • 1、根據訊息的保留時間,當訊息在kafka中保存的時間超過了指定的時間,就會觸發清理程序
  • 2、根據topic存盤的資料大小,當topic所占的日志檔案大小大于一定的閥值,則可以開始洗掉最舊的訊息,kafka會啟動一個后臺執行緒,定期檢查是否存在可以洗掉的訊息
    通過log.retention.bytes和log.retention.hours這兩個引數來設定,當其中任意一個達到要求,都會執行洗掉,默認的保留時間是:7天
2、日志壓縮策略

Kafka還提供了“日志壓縮(Log Compaction)”功能,通過這個功能可以有效的減少日志檔案的大小,緩解磁盤緊張的情況,在很多實際場景中,訊息的key和value的值之間的對應關系是不斷變化的,就像資料庫中的資料會不斷被修改一樣,消費者只關心key對應的最新的value,因此,我們可以開啟kafka的日志壓縮功能,服務端會在后臺啟動啟動Cleaner執行緒池,定期將相同的key進行合并,只保留最新的value值,日志的壓縮原理是
在這里插入圖片描述

八、小結

本文僅僅是自己復習一下,涉及并不太深入也并不太全面,如果想更深入了解,可以學習一下《深入理解Kafka- 核心設計與實踐原理》,作者是之前唯品會的同事朱忠華,他對rabbitMQ、Kafka了解非常深入,其實我也是拜讀他的幾篇博客,理解非常到位,另外新一代訊息中間件已問世,名字是Pulsar,下一代云原生訊息平臺、Apache 頂級專案 ,有空大家了解學習一下,他既擁有 Kafka 的優勢,又規避它的缺陷,同時還融合了 MQ 的一系列特性,

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/289218.html

標籤:其他

上一篇:京東、知乎、天貓等各大平臺的K8S架構你知道多少?

下一篇:分布式鎖-三種實作方式簡述

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 網閘典型架構簡述

    網閘架構一般分為兩種:三主機的三系統架構網閘和雙主機的2+1架構網閘。 三主機架構分別為內端機、外端機和仲裁機。三機無論從軟體和硬體上均各自獨立。首先從硬體上來看,三機都用各自獨立的主板、記憶體及存盤設備。從軟體上來看,三機有各自獨立的作業系統。這樣能達到完全的三機獨立。對于“2+1”系統,“2”分為 ......

    uj5u.com 2020-09-10 02:00:44 more
  • 如何從xshell上傳檔案到centos linux虛擬機里

    如何從xshell上傳檔案到centos linux虛擬機里及:虛擬機CentOs下執行 yum -y install lrzsz命令,出現錯誤:鏡像無法找到軟體包 前言 一、安裝lrzsz步驟 二、上傳檔案 三、遇到的問題及解決方案 總結 前言 提示:其實很簡單,往虛擬機上安裝一個上傳檔案的工具 ......

    uj5u.com 2020-09-10 02:00:47 more
  • 一、SQLMAP入門

    一、SQLMAP入門 1、判斷是否存在注入 sqlmap.py -u 網址/id=1 id=1不可缺少。當注入點后面的引數大于兩個時。需要加雙引號, sqlmap.py -u "網址/id=1&uid=1" 2、判斷文本中的請求是否存在注入 從文本中加載http請求,SQLMAP可以從一個文本檔案中 ......

    uj5u.com 2020-09-10 02:00:50 more
  • Metasploit 簡單使用教程

    metasploit 簡單使用教程 浩先生, 2020-08-28 16:18:25 分類專欄: kail 網路安全 linux 文章標簽: linux資訊安全 編輯 著作權 metasploit 使用教程 前言 一、Metasploit是什么? 二、準備作業 三、具體步驟 前言 Msfconsole ......

    uj5u.com 2020-09-10 02:00:53 more
  • 游戲逆向之驅動層與用戶層通訊

    驅動層代碼: #pragma once #include <ntifs.h> #define add_code CTL_CODE(FILE_DEVICE_UNKNOWN,0x800,METHOD_BUFFERED,FILE_ANY_ACCESS) /* 更多游戲逆向視頻www.yxfzedu.com ......

    uj5u.com 2020-09-10 02:00:56 more
  • 北斗電力時鐘(北斗授時服務器)讓網路資料更精準

    北斗電力時鐘(北斗授時服務器)讓網路資料更精準 北斗電力時鐘(北斗授時服務器)讓網路資料更精準 京準電子科技官微——ahjzsz 近幾年,資訊技術的得了快速發展,互聯網在逐漸普及,其在人們生活和生產中都得到了廣泛應用,并且取得了不錯的應用效果。計算機網路資訊在電力系統中的應用,一方面使電力系統的運行 ......

    uj5u.com 2020-09-10 02:01:03 more
  • 【CTF】CTFHub 技能樹 彩蛋 writeup

    ?碎碎念 CTFHub:https://www.ctfhub.com/ 筆者入門CTF時時剛開始刷的是bugku的舊平臺,后來才有了CTFHub。 感覺不論是網頁UI設計,還是題目質量,賽事跟蹤,工具軟體都做得很不錯。 而且因為獨到的金幣制度的確讓人有一種想去刷題賺金幣的感覺。 個人還是非常喜歡這個 ......

    uj5u.com 2020-09-10 02:04:05 more
  • 02windows基礎操作

    我學到了一下幾點 Windows系統目錄結構與滲透的作用 常見Windows的服務詳解 Windows埠詳解 常用的Windows注冊表詳解 hacker DOS命令詳解(net user / type /md /rd/ dir /cd /net use copy、批處理 等) 利用dos命令制作 ......

    uj5u.com 2020-09-10 02:04:18 more
  • 03.Linux基礎操作

    我學到了以下幾點 01Linux系統介紹02系統安裝,密碼啊破解03Linux常用命令04LAMP 01LINUX windows: win03 8 12 16 19 配置不繁瑣 Linux:redhat,centos(紅帽社區版),Ubuntu server,suse unix:金融機構,證券,銀 ......

    uj5u.com 2020-09-10 02:04:30 more
  • 05HTML

    01HTML介紹 02頭部標簽講解03基礎標簽講解04表單標簽講解 HTML前段語言 js1.了解代碼2.根據代碼 懂得挖掘漏洞 (POST注入/XSS漏洞上傳)3.黑帽seo 白帽seo 客戶網站被黑帽植入劫持代碼如何處理4.熟悉html表單 <html><head><title>TDK標題,描述 ......

    uj5u.com 2020-09-10 02:04:36 more
最新发布
  • 2023年最新微信小程式抓包教程

    01 開門見山 隔一個月發一篇文章,不過分。 首先回顧一下《微信系結手機號資料庫被脫庫事件》,我也是第一時間得知了這個訊息,然后跟蹤了整件事情的經過。下面是這起事件的相關截圖以及近日流出的一萬條資料樣本: 個人認為這件事也沒什么,還不如關注一下之前45億快遞資料查詢渠道疑似在近日復活的訊息。 訊息是 ......

    uj5u.com 2023-04-20 08:48:24 more
  • web3 產品介紹:metamask 錢包 使用最多的瀏覽器插件錢包

    Metamask錢包是一種基于區塊鏈技術的數字貨幣錢包,它允許用戶在安全、便捷的環境下管理自己的加密資產。Metamask錢包是以太坊生態系統中最流行的錢包之一,它具有易于使用、安全性高和功能強大等優點。 本文將詳細介紹Metamask錢包的功能和使用方法。 一、 Metamask錢包的功能 數字資 ......

    uj5u.com 2023-04-20 08:47:46 more
  • vulnhub_Earth

    前言 靶機地址->>>vulnhub_Earth 攻擊機ip:192.168.20.121 靶機ip:192.168.20.122 參考文章 https://www.cnblogs.com/Jing-X/archive/2022/04/03/16097695.html https://www.cnb ......

    uj5u.com 2023-04-20 07:46:20 more
  • 從4k到42k,軟體測驗工程師的漲薪史,給我看哭了

    清明節一過,盲猜大家已經無心上班,在數著日子準備過五一,但一想到銀行卡里的余額……瞬間心情就不美麗了。最近,2023年高校畢業生就業調查顯示,本科畢業月平均起薪為5825元。調查一出,便有很多同學表示自己又被平均了。看著這一資料,不免讓人想到前不久中國青年報的一項調查:近六成大學生認為畢業10年內會 ......

    uj5u.com 2023-04-20 07:44:00 more
  • 最新版本 Stable Diffusion 開源 AI 繪畫工具之中文自動提詞篇

    🎈 標簽生成器 由于輸入正向提示詞 prompt 和反向提示詞 negative prompt 都是使用英文,所以對學習母語的我們非常不友好 使用網址:https://tinygeeker.github.io/p/ai-prompt-generator 這個網址是為了讓大家在使用 AI 繪畫的時候 ......

    uj5u.com 2023-04-20 07:43:36 more
  • 漫談前端自動化測驗演進之路及測驗工具分析

    隨著前端技術的不斷發展和應用程式的日益復雜,前端自動化測驗也在不斷演進。隨著 Web 應用程式變得越來越復雜,自動化測驗的需求也越來越高。如今,自動化測驗已經成為 Web 應用程式開發程序中不可或缺的一部分,它們可以幫助開發人員更快地發現和修復錯誤,提高應用程式的性能和可靠性。 ......

    uj5u.com 2023-04-20 07:43:16 more
  • CANN開發實踐:4個DVPP記憶體問題的典型案例解讀

    摘要:由于DVPP媒體資料處理功能對存放輸入、輸出資料的記憶體有更高的要求(例如,記憶體首地址128位元組對齊),因此需呼叫專用的記憶體申請介面,那么本期就分享幾個關于DVPP記憶體問題的典型案例,并給出原因分析及解決方法。 本文分享自華為云社區《FAQ_DVPP記憶體問題案例》,作者:昇騰CANN。 DVPP ......

    uj5u.com 2023-04-20 07:43:03 more
  • msf學習

    msf學習 以kali自帶的msf為例 一、msf核心模塊與功能 msf模塊都放在/usr/share/metasploit-framework/modules目錄下 1、auxiliary 輔助模塊,輔助滲透(埠掃描、登錄密碼爆破、漏洞驗證等) 2、encoders 編碼器模塊,主要包含各種編碼 ......

    uj5u.com 2023-04-20 07:42:59 more
  • Halcon軟體安裝與界面簡介

    1. 下載Halcon17版本到到本地 2. 雙擊安裝包后 3. 步驟如下 1.2 Halcon軟體安裝 界面分為四大塊 1. Halcon的五個助手 1) 影像采集助手:與相機連接,設定相機引數,采集影像 2) 標定助手:九點標定或是其它的標定,生成標定檔案及內參外參,可以將像素單位轉換為長度單位 ......

    uj5u.com 2023-04-20 07:42:17 more
  • 在MacOS下使用Unity3D開發游戲

    第一次發博客,先發一下我的游戲開發環境吧。 去年2月份買了一臺MacBookPro2021 M1pro(以下簡稱mbp),這一年來一直在用mbp開發游戲。我大致分享一下我的開發工具以及使用體驗。 1、Unity 官網鏈接: https://unity.cn/releases 我一般使用的Apple ......

    uj5u.com 2023-04-20 07:40:19 more