Kafka
異步通信原理
觀察者模式
- 觀察者模式(Observer),又叫發布-訂閱模式(Publish/Subscribe)
- 定義物件間一種一對多的依賴關系,使得每當一個物件改變狀態,則所有依賴于它的物件都會得到通知并自動更新,
- 一個物件(目標物件)的狀態發生改變,所有的依賴物件(觀察者物件)都將得到通知,
生產者消費者模式
-
傳統模式
-
生產者直接將訊息傳遞給指定的消費者
-
耦合性特別高,當生產者或者消費者發生變化,都需要重寫業務邏輯
- 生產者消費者模式
-
-
生產者消費者模式
-
通過一個容器來解決生產者和消費者的強耦合問題,生產者和消費者彼此之間不直接通訊,而通過阻塞佇列來進行通訊
-
資料傳遞流程
-
生產者負責向緩沖區里面添加資料單元
-
消費者負責從緩沖區里面取出資料單元
- 一般遵循先進先出的原則
-
-
緩沖區
-
解耦
- 假設生產者和消費者分別是兩個類,如果讓生產者直接呼叫消費者的某個方法,那么生產者對于消費者就會產生依賴
-
支持并發
- 生產者直接呼叫消費者的某個方法程序中函式呼叫是同步的
- 萬一消費者處理資料很慢,生產者就會白白糟蹋大好時光
-
支持忙閑不均
- 緩沖區還有另一個好處,如果制造資料的速度時快時慢,緩沖區的好處就體現出來了,
- 當資料制造快的時候,消費者來不及處理,未處理的資料可以暫時存在緩沖區中,
- 等生產者的制造速度慢下來,消費者再慢慢處理掉,
資料單元
-
關聯到業務物件
- 資料單元必須關聯到某種業務物件
-
完整性
- 就是在傳輸程序中,要保證該資料單元的完整
-
獨立性
- 就是各個資料單元之間沒有互相依賴
- 某個資料單元傳輸失敗不應該影響已經完成傳輸的單元;也不應該影響尚未傳輸的單元,
-
顆粒度
- 資料單元需要關聯到某種業務物件,那么資料單元和業務物件應該處于的關系(一對一?一對多)
- 如果顆粒度過小會增加資料傳輸的次數
- 如果顆粒度過大會增加單個資料傳輸的時間,影響后期消費
訊息系統原理
點對點訊息傳遞
- 在點對點訊息系統中,訊息持久化到一個佇列中一條訊息只能被消費一次
- 當一個消費者消費了佇列中的某條資料之后,該條資料則從訊息佇列中洗掉
- 該模式即使有多個消費者同時消費資料,也能保證資料處理的順序
- 基于推送模型的訊息系統,由訊息代理記錄消費狀態
發布訂閱訊息傳遞
- 在發布-訂閱訊息系統中,訊息被持久化到一個topic中
- 消費者可以訂閱一個或多個topic,消費者可以消費該topic中所有的資料,同一條資料可以被多個消費者消費,資料被消費后不會立馬洗掉
- Kafka 采取拉取模型(Poll),由自己控制消費速度,消費者可以按照任意的偏移量進行消費
Kafka簡介
概述
-
流處理平臺,本質上是一個 MQ(Message Queue)
-
使用訊息佇列的好處
- 解耦:允許我們獨立的擴展或修改佇列兩邊的處理程序,
- 可恢復性:即使一個處理訊息的行程掛掉,加入佇列中的訊息仍然可以在系統恢復后被處理,
- 緩沖:有助于解決生產訊息和消費訊息的處理速度不一致的情況,
- 靈活性&峰值處理能力:不會因為突發的超負荷的請求而完全崩潰,訊息佇列能夠使關鍵組件頂住突發的訪問壓力,
- 異步通信:訊息佇列允許用戶把訊息放入佇列但不立即處理它
設計目標
- 以時間復雜度為O(1)的方式提供訊息持久化能力,即使對TB級以上資料也能保證常數時間的訪問性能,
- 高吞吐率,即使在非常廉價的商用機器上也能做到單機支持每秒100K條訊息的傳輸,
- 支持Kafka Server間的訊息磁區,及分布式消費,同時保證每個partition內的訊息順序傳輸,
- 同時支持離線資料處理和實時資料處理,
- 支持在線水平擴展
Kafka的優點
-
解耦
-
冗余
-
擴展性
- 因為訊息佇列解耦了你的處理程序,所以增大訊息入隊和處理的頻率是很容易的,只要另外增加處理程序即可
-
靈活性&峰值處理能力
- 在訪問量劇增的情況下,應用仍然需要繼續發揮作用
-
可恢復性
- 系統的一部分組件失效時,不會影響到整個系統
- 訊息佇列降低了行程間的耦合度,所以即使一個處理訊息的行程掛掉,加入佇列中的訊息仍然可以在系統恢復后被處理
-
順序保證
- Kafka保證一個Partition內的訊息的有序性
-
緩沖
- 訊息佇列通過一個緩沖層來幫助任務最高效率的執行
-
異步通信
- 訊息佇列提供了異步處理機制,允許用戶把一個訊息放入佇列,但并不立即處理它
Kafka系統架構
Broker
- Kafka 集群包含一個或多個服務器,服務器節點稱為broker
Topic
- 每條發布到Kafka集群的訊息都有一個類別,這個類別被稱為Topic,
- 類似于資料庫的table或者ES的Index
- 物理上不同Topic的訊息分開存盤
- 邏輯上一個Topic的訊息雖然保存于一個或多個broker上但用戶只需指定訊息的Topic即可生產或消費資料而不必關心資料存于何處)
Partition
-
topic中的資料分割為一個或多個partition
-
每個topic至少有一個partition,當生產者產生資料的時候,根據分配策略,選擇磁區,然后將訊息追加到指定的磁區的末尾(佇列)
-
每條訊息都會有一個自增的編號
- 標識順序
- 用于標識訊息的偏移量
- 每個Partition都有自己獨立的編號
-
每個partition中的資料使用多個segment檔案存盤,
-
partition中的資料是有序的,不同partition間的資料丟失了資料的順序,
-
如果topic有多個partition,消費資料時就不能保證資料的順序,嚴格保證訊息的消費順序的場景下,需要將partition數目設為1
Leader
- 每個partition有多個副本,其中有且僅有一個作為Leader,Leader是當前負責資料的讀寫的partition
Follower
- Follower跟隨Leader,所有寫請求都通過Leader路由,資料變更會廣播給所有Follower,Follower與Leader保持資料同步,
- 如果Leader失效,則從Follower中選舉出一個新的Leader,
- 當Follower掛掉、卡住或者同步太慢,leader會把這個follower從“in sync replicas”(ISR)串列中洗掉,重新創建一個Follower
replication
-
資料會存放到topic的partation中,但是有可能磁區會損壞
-
我們需要對磁區的資料進行備份(備份多少取決于你對資料的重視程度)
-
我們將磁區的分為Leader(1)和Follower(N)
- Leader負責寫入和讀取資料
- Follower只負責備份
- 保證了資料的一致性
-
備份數設定為N,表示主+備=N(參考HDFS)
producer
- 生產者即資料的發布者,該角色將訊息發布到Kafka的topic中,
- broker接收到生產者發送的訊息后,broker將該訊息追加到當前用于追加資料的segment檔案中,
- 生產者發送的訊息,存盤到一個partition中,生產者也可以指定資料存盤的partition
consumer
- 消費者可以從broker中讀取資料,消費者可以消費多個topic中的資料,
Consumer Group
- 每個Consumer屬于一個特定的Consumer Group(可為每個Consumer指定group name,若不指定group name則屬于默認的group),
- 將多個消費者集中到一起去處理某一個Topic的資料,可以更快的提高資料的消費能力
- 整個消費者組共享一組偏移量(防止資料被重復讀取),因為一個Topic有多個磁區
offset偏移量
- 可以唯一的標識一條訊息
- 偏移量決定讀取資料的位置,不會有執行緒安全的問題,消費者通過偏移量來決定下次讀取的訊息
- 訊息被消費之后,并不被馬上洗掉,這樣多個業務就可以重復使用kafka的訊息
- 我們某一個業務也可以通過修改偏移量達到重新讀取訊息的目的,偏移量由用戶控制
- 訊息最侄訓是會被洗掉的,默認生命周期為1周(7*24小時)
Zookeeper
- kafka 通過 zookeeper 來存盤集群的 meta 資訊
- 新版本由partition自己保存
- 輔助選舉
Kafka資料存盤
topic在物理層面以partition為分組,一個topic可以分成若干個partition
partition還可以細分為Segment,一個partition物理上由多個Segment組成
-
segment 的引數有兩個
- log.segment.bytes:單個segment可容納的最大資料量,默認為1GB
- log.segment.ms:Kafka在commit一個未寫滿的segment前,所等待的時間(默認為7天)
LogSegment 檔案由兩部分組成,分別為“.index”檔案和“.log”檔案,分別表示為 Segment 索引檔案和資料檔案
- partition全域的第一個segment從0開始,后續每個segment檔案名為上一個segment檔案最后一條訊息的offset值
- 數值大小為64位,20位數字字符長度,沒有數字用0填充
訊息都具有固定的物理結構,包括:offset(8 Bytes)、訊息體的大小(4 Bytes)、crc32(4 Bytes)、magic(1 Byte)、attributes(1 Byte)、key length(4 Bytes)、key(K Bytes)、payload(N Bytes)等等欄位,可以確定一條訊息的大小,即讀取到哪里截止
“.index” 檔案存盤大量的索引資訊,“.log” 檔案存盤大量的資料,索引檔案中的元資料指向對應資料檔案中 Message 的物理偏移量
- .index中順序存盤,存在硬碟中,預分配10M
其他檔案
- .index 位移索引
- .timeindex 時間戳索引
- .snapshot檔案,記錄了producer的事務資訊,(todo)
生產者資料安全
資料磁區
-
磁區原因
- 方便在集群中擴展,每個 Partition 可以通過調整以適應它所在的機器,
- 一個 Topic 又可以有多個 Partition 組成,因此可以以 Partition 為單位讀寫了,
- 可以提高并發,因此可以以 Partition 為單位讀寫了
資料可靠性保證
-
ACK機制
- 為保證 Producer 發送的資料,能可靠地發送到指定的 Topic
- Topic 的每個 Partition 收到 Producer 發送的資料后,都需要向 Producer 發送ACK(ACKnowledge 確認收到),
- 如果 Producer 收到 ACK,就會進行下一輪的發送,否則重新發送資料,
-
ACK時機
-
部分 Follower 與 Leader 同步完成,Leader 發送 ACK
-
全部 Follower 與 Leader 同步完成,Leader 發送 ACK,
-
ISR(部分)
-
關鍵詞
- AR : Assigned Replicas 用來標識副本的全集
- OSR :out -sync Replicas 離開同步佇列的副本
- ISR :in -sync Replicas 加入同步佇列的副本
- ISR = Leader + 沒有落后太多的副本;
- AR = OSR+ ISR,
-
Leader維護了一個動態的 in-sync replica set(ISR 和 Leader 保持同步的 Follower 集合)
-
當 ISR 集合中的 Follower 完成資料的同步之后,Leader 就會給 Follower 發送 ACK,
-
如果 Follower 長時間未向 Leader 同步資料,則該 Follower 將被踢出 ISR 集合,判斷標準
-
超過10秒鐘沒有同步資料
- replica.lag.time.max.ms=10000
-
主副節點差4000條資料
- rerplica.lag.max.messages=4000
-
-
Leader 發生故障后,就會從 ISR 中選舉出新的 Leader, 可設定從OSR中選舉
- kafka采用一種降級措施來處理:
- 選舉第一個恢復的node作為leader提供服務,以它的資料為基準,這個措施被稱為臟leader選舉
-
-
-
ACK應答機制
-
Kafka 為用戶提供了三種可靠性級別,用戶根據可靠性和延遲的要求進行權衡
-
acks=0
- 這意味著 Producer 無需等待來自 Leader的確認而繼續發送下一批訊息
- 當 Broker 故障時有可能丟失資料
- 最多一條
-
acks=1
- Producer 在 ISR 中的 Leader 已成功收到的資料并得到確認后發送下一條 Message
- 如果在 Follower 同步成功之前 Leader 故障,那么將會丟失資料
- 最少一條
-
acks=-1
- Producer 需要等待 ISR 中的所有 Follower 都確認接收到資料后才算一次發送完成,可靠性最高
- 不會出現臟選舉
- 在 Broker 發送 ACK 時,Leader 發生故障,則會造成資料重復
- 最少一條
-
-
-
故障處理
-
LEO:每個副本最大的 Offset
-
HW:消費者能見到的最大的 Offset,ISR 佇列中最小的 LEO
-
Follower 故障
- Follower 發生故障后會被臨時踢出 ISR 集合,待該 Follower 恢復后,Follower 會 讀取本地磁盤記錄的上次的 HW,并將 log 檔案高于 HW 的部分截取掉,從 HW 開始向 Leader 進行同步資料操作
- 等該 Follower 的 LEO 大于等于該 Partition 的 HW,即 Follower 追上 Leader 后,就可以重新加入 ISR 了
-
Leader 故障
- Leader 發生故障后,會從 ISR 中選出一個新的 Leader,之后,為保證多個副本之間的資料一致性,其余的 Follower 會先將各自的 log 檔案高于 HW 的部分截掉,然后從新的 Leader 同步資料
- 這只能保證副本之間的資料一致性,并不能保證資料不丟失或者不重復
- ack=-1可以避免資料丟失
-
-
Exactly Once 語意
-
將服務器的 ACK 級別設定為 -1或1,可以保證 Producer 到 Server 之間不會丟失資料,即 At LeastOnce 語意
-
將服務器 ACK 級別設定為 0,可以保證生產者每條訊息只會被發送一次,即 At Most Once 語意
- At Least Once 可以保證資料不丟失,但是不能保證資料不重復;
- At Most Once 可以保證資料不重復,但是不能保證資料不丟失,
-
Exactly Once
- 重要資料既不重復也不丟失
- 0.11 版本的 Kafka,引入了冪等性:Producer 不論向 Server 發送多少重復資料,Server 端都只會持久化一條
-
消費者資料安全
消費方式
- Consumer 采用 Pull(拉取)模式從 Broker 中讀取資料
- Pull 模式不足之處是,如果 Kafka 沒有資料,消費者可能會陷入回圈中,一直回傳空資料
- 因為消費者從 Broker 主動拉取資料,需要維護一個長輪詢,針對這一點, Kafka 的消費者在消費資料時會傳入一個時長引數 timeout
- 如果當前沒有資料可供消費,Consumer 會等待一段時間之后再回傳,這段時長即為 timeout
磁區分配策略
-
概述
-
將磁區的所有權從一個消費者移到另一個消費者稱為重新平衡(rebalance)
-
磁區分配的時機
- 同一個 Consumer Group 內新增消費者
- 消費者離開當前所屬的Consumer Group,包括shuts down 或 crashes
- 訂閱的主題新增磁區
-
Kafka 有三種分配策略
- 一個是RangeAssignor(默認)
- 一個是 RoundRobinAssignor
- 一個是StickyAssignor(0.11.x版本開始引入)
-
-
RangeAssignor分配策略
-
原理
- 按照消費者總數和磁區總數進行整除運算來獲得一個跨度,然后將磁區按照跨度進行平均分配
- 如果不夠平均分配,那么字典序靠前的消費者會被多分配一個磁區且同一個topic內分到的磁區連續
-
例
-
-
RoundRobinAssignor分配策略
-
策略的原理是將消費組內所有消費者以及消費者所訂閱的所有topic的partition按照字典序排序,然后通過輪詢消費者方式逐個將磁區分配給每個消費者
-
例
-
消費者訂閱相同 Topic
-
消費者訂閱不同 Topic
-
-
-
StickyAssignor分配策略
-
“sticky”這個單詞可以翻譯為“粘性的”,Kafka從0.11.x版本開始引入這種分配策略
-
目的
- ① 磁區的分配要盡可能的均勻;
- ② 磁區的分配盡可能的與上次分配的保持相同,
- ③ 當兩者發生沖突時,第一個目標優先于第二個目標,
-
OffSet
-
生產端offset
-
Kafka接收到生產者發送的訊息實際上是以日志檔案的形式保存在對應磁區的磁盤上,每條訊息都有一個offset值來表示它在磁區中的位置,每次寫入都是追加到檔案的末尾
-
如上圖所示,它代表一個日志檔案,這個日志檔案中有 9 條訊息
- 第一條訊息的 offset( logStartOffset)為 0,最后一條訊息的 offset 為 8,LEO(Log EndOffset)為 9 ,代表下一條待寫入的訊息,
- 日志檔案的 HW(Low Watermark)為 6,表示消費者只能拉取到 offset 在 0 至 5 之間的訊息, 而 offset 為 6 的訊息對消費者而言是不可見的,
- 每個磁區副本都會維護自身的LEO,而ISR集合中最小的LEO即為磁區的HW,
-
-
消費端offset
-
消費者在消費時,也維護一個offset,表示消費到磁區中的某個訊息所在的位置,
-
如上圖所示,ConsumerA的offset=9,表示ConsumerA已經消費完offset為8的那條資料,提交的offset值為9,下次消費從offset為9的資料開始消費
-
消費者提交的offset值維護在consumer_offsets這個Topic中,具體維護在哪個磁區中,是由消費者所在的消費者組groupid決定
- 計算方式是:groupid的hashCode值對50取余
-
消費者提交offset方式可以是手動提交也可以是自動提交,相關的引數設定是enable.auto.commit
- 引數默認為true,表示每5秒拉取磁區中最大的訊息位移進行提交,
- 引數設定為false時,需要手動提交offset
-
提交方式有同步提交(commitSync)和異步提交(commitAsync)兩種方式
- 同步提交會根據poll方法拉取最新位移進行提交,只要沒有發生不可恢復的錯誤,它就會阻塞消費執行緒直至提交完成
- 異步提交執行時不會阻塞消費執行緒,但有可能出現先提交的位移失敗了而后提交的位移成功了,
- 如果重試,就會發生重復消費,對此,可設定遞增的序號來維護異步提交順序,也可以在退出或者rebalance前使用同步提交,
-
消費者消費時
- 如果沒有對應的offset記錄會按auto.offset.reset的配置來消費,默認值為latest,表示從磁區末尾開始消費,
- 如果配置為earliest表示從磁區起始處開始消費,在代碼中也可以通過seek()方法指定磁區具體的offset處開始消費,
- 另外,我們也可以重置消費者組的offset
-
消費者消費提交的offset也會被定期清理,對應的引數是
-
offsets.retention.check.interval.ms:
- offset定期檢查資料過期周期
-
offsets.retention.minutes
- offset保留時長超過offsets.retention.minutes時間且offset沒有改變時,消費者提交的offset會被清理掉
- 再次消費時會按auto.offset.reset配置去消費,此時,會有資料丟失或者重復,可通過重置offset來解決
-
-
存盤位置
- Kafka 0.9 版本之前,Consumer 默認將 Offset 保存在 Zookeeper 中,
- 從 0.9 版本開始,Consumer 默認將 Offset 保存在 Kafka 一個內置的 Topic 中,該 Topic 為__consumer_offsets
-
-
checkpoint的offset
-
Consumer重置Offset
- 更新Offset由三個維度決定:Topic的作用域、重置策略、執行方案
Kafka的事務性
前言
- 冪等和事務是Kafka 0.11.0.0版本引入的兩個特性,以此來實作EOS(exactly once semantics,精確一次處理語意)
Kafka冪等性
-
冪等,就是指多介面的多次呼叫所產生的結果和只呼叫一次是一致的,沒有冪等性的情況下就會重復發送資料
-
Kafka的冪等性機制能保證單個磁區不會重復寫入資料,而實作冪等性的核心就是引入了producerid 和 sequence number這兩個概念
-
判斷流程
-
理解一
-
每個新的生產者實體在初始化的時候都會被分配一個PID,這個PID對用戶而言是完全透明的
-
對于每個PID,訊息發送到的每一個磁區都有對應的序列號,這些序列號從0開始單調遞增,生產者每發送一條訊息就會將對應的序列號的值加1
-
broker端會在記憶體中為每一對維護一個序列號
- 如果SN_new = SN_old + 1時,broker才會接收它,
- 如果SN_new< SN_old + 1,那么說明訊息被重復寫入,broker可以直接將其丟棄,
- 如果SN_new> SN_old + 1,那么說明中間有資料尚未寫入,出現了亂序,暗示可能有訊息丟失,這個例外是一個嚴重的例外,
-
-
理解二
- Kafka內部會自動為每個Producer分配一個producer id(PID),broker端會為producer每個Partition維護一個<PID,Partition> -> sequence number映射,sequence number時從0開始單調遞增的,
- 如果新訊息的sequence number正好是broker端維護的<PID,Partition> -> sequencenumber大1,說broker會接受處理這條訊息,
- 如果新訊息的sequence number比broker端維護的sequence number要小,說明時重復訊息,broker可以將其直接丟棄
- 如果新訊息的sequence number比broker端維護的sequence number要大過1,說明中間存在了丟資料的情況
-
-
開啟
- Properties.put(“enable.idempotence”,true);
Kafka事務
- Kafka事務性主要是為了解決冪等性無法跨Partition運作的問題,事務性提供了多個Partition寫入的原子性
- 即寫入多個Partition要么全部成功,要么全部失敗,不會出現部分成功部分失敗這種情況
Flume+Kafka
Kafka Eagle
Kafka 集群訊息監控系統
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/379432.html
標籤:其他
上一篇:大資料之Spark
下一篇:從零搭建秒殺系統
