文章目錄
- 1. kafka是一個訊息佇列,
- 2. kafka中主要的術語
- 3. kafka broker
- 3.1 topic partition的資料冗余和leader partition
- 3.2 集群中的controller
- 3.3 partition中的leader,follower,in-sync-replica
- 1. leader如何判斷一個follower是否應該屬于in-sync-replica 集合呢
- 2. leader的選舉
- 3. 最小ISR設定
- 3.4 partition中訊息的High-Watermark
- 1. 什么是HighWatermark,有什么用
- 2. HW的更新機制:
- 1. LEO ,log-end-offset
- 4 producer
- 1. producer如何直接連接對應的topic所在節點
- 2. 相關配置
- 5 consumer
- 1. consumer的rebalance程序
- 1. 發生rebalance的時機
- 2. rebalance程序
- 2. 消費者的一些配置
- 6. 高性能讀寫的秘密
這一篇簡要總結一下kafka的原理和概念
1. kafka是一個訊息佇列,
對應的特性有
- 高吞吐量:吞吐量高達數十萬
- 高并發:支持數千個客戶端同時讀寫
- 低延遲:延遲最低只有幾毫秒
- 訊息持久性和可靠性:訊息被持久化到本地磁盤,同時支持資料備份
- 集群容錯性:允許部分節點失敗
- 可擴展性:支持集群動態擴展
2. kafka中主要的術語
- broker
- topic
- partition
- consumer
- producer
- 集群
- 訊息
??kafka 是一個分布式的訊息中間件,一條訊息就是類似我們往MYSQL中存盤中的任何一條資料一樣,kafka服務以集群的方式存在,集群中的每一個節點叫broker,訊息在kafka的歸檔方式是按照topic來進行歸檔的(就像是資料庫中一個表一樣),同時,每個topic又可以被分為一個或者多個partition(可以理解為MYSQL中的分表),用來提高并行度(生產者或者消費者的并行度),
??訊息的生產者被稱為producer,訊息的消費者被稱為consumer,生產者將訊息發送到broker中的摸個topic的某個partition當中,consumer指定從某一個或者多個topic的partion當中拉取資料,kafka可以保證保證資料的生產和消費的有序性(當然如果想要嚴格保證的話還需要進一步了解和設定它),kafka的一個簡單圖:

3. kafka broker
kafka broker層面要做的東西可以主要分為兩個比較大的方面,一方面是集群的管理,一方面是資料的管理,
3.1 topic partition的資料冗余和leader partition
??因為kafka是分布式的,所以他會通過一定的資料冗余來對抗集群中的部分節點失敗的情況,通過設定每個topic的replica.factor=3(或其他),每個topic就會有3份存盤,這樣的話當其中的一個或者連個副本所在的broker宕機的話,服務依然能夠提供(根據不同的配置,有可能導致少許資料丟失,后面會聊聊),因為每個partition有多個副本,而kafka規定了多個副本中只能有一個leader 副本,所有的讀寫都是對這個leader進行的,
??在整個kafka集群中有一個大部分分布式服務都有的master角色,他負責了整個集群的topic的partition的leader選舉,叫controller,
3.2 集群中的controller
??像所有的分布式集群都有master節點一樣(zookeeper有master節點,es也有),kafka也有一個master角色,只是在kafka當中叫controller,controller可以認為承擔了集群的可用性管理,他主要負責每個partition的leader選舉作業,
??那么kafka中的controller是如何選舉的的呢,因為有了zookeeper,所以選舉的功能得到了很大簡化,在啟動的時候大家都是去zookeeper那里搶占式的創建一個相同的臨時節點(/controller),zookeeper的分布式一致性保證了只有一個請求能夠創建成功,那么創建成功的就成為了controller,其他失敗的節點則在這個節點上添加監聽,在當前controller發生故障失敗的時候這個臨時節點(/controller)會被zookeeper洗掉,然后大家就可以再進行一次競爭了,
??controller在每次選舉得到的新的控制器會通過zookeeper確認自己是新的controller,然后會把epoch+=1,這里的epoch是什么呢,就是一個數值,可以簡單理解為皇帝的年號一樣的東西,每個皇帝都有自己的epoch,然后下一任的皇帝的epoch比當前這一任的大,controller每次發訊息的時候也會帶上這個epoch,這樣主要是為了防止腦裂,也就是新的controller選出來之后,舊的卡住的controller又活過來了,這個時候他可能還認為自己是controller,進而給其他的節點發號施令,但是他的epoch比較舊,這樣的資訊會被其他的節點忽略,
??epoch是分布式中避免腦裂常用的一種手段,在zookeeper和raft當中都有應用,
3.3 partition中的leader,follower,in-sync-replica
leader,follower,in-sync-replica都是運行時的概念,而且會動態變化的,在靜態的存盤中一般都叫replica
??在上面講到,kafka為了提升容錯能力,每個partition會有多個副本(replica)多個副本當中有一個會成為leader, 每次接收producer和consumer的請求總是由leader partition來處理,
??在producer發送訊息往leader時,leader會將message存入自己所在的partition當中,同時follower也在源源不斷的從leader拉取資料,也就是同步leader中的資料,在這里面kafka允許部分副本比較慢,這樣可以提升服務的服務性能,kafka維護了一個follower的子集,叫in-sync-replica叫同步副本集,這個集合里面也包括leader本身,也就是leader會認為這些副本是一直在保持和leader進行同步的,
1. leader如何判斷一個follower是否應該屬于in-sync-replica 集合呢
- 在更早的版本中可以通過設定
replica.lag.max.messages=5(或其他)來決定訊息滯后leader中訊息達到5條的follower將被踢出 in-sync-replica集合 - 從0.9開始,廢棄了這個引數,轉而用了
replica.lag.time.max.ms=10000(默認是10s),這個配置表示,如果in-sync-replica中的follower在10s內沒有向leader發送拉取資料的請求,那么這個followe將被踢出in-sync-replica, - 這樣的一個優化的好處是,假如遇到訊息瞬時流量高峰,in-sync-replica中的follower的資料很容易落后leader較多,然后會被踢出去,在跟上leader后又會加入回來,這種情況的踢出-加入本身是沒有必要的,如果采用
replica.lag.time.max.ms就可以有效避免這個問題
2. leader的選舉
??那么當一個leader掛掉之后,新的leader又是如何被controller選中的呢,他會從 in-sync-replica中選取一個座位leader,是不是有點粗暴,而且,我們也可以看到,因為 in-sync-replica中的follower是有可能落后于leader的,這樣,新的leader選出來以后,其資料是有可能落后于原來的leader的,這就有可能造成資料的丟失(已經給producer回傳了成功訊息,但是最終訊息卻沒有完成真正的持久化,consumer消費不到這條訊息),當然,我們通過一些配置是能夠達到資料不丟失的,需要broker端和producer端的配合,
3. 最小ISR設定
??在broker端還可以配置min.insync.replicas,如果min.insync.replicas=2,那么至少要存在兩個同步副本才能向磁區寫入資料,這個時候如果只有一個同步副本,那么Broker就會停止接受生產者的請求,此時Broker變成了只讀,嘗試發送資料的生產者會收到NotEnoughReplicasException例外,但是消費者仍然可以繼續讀取已有的資料,
??這是為了避免發生不完全選舉時資料的寫入和讀取出現非預期的行為,可以看出來,這個引數也是實作高可用的重要一環,假如設定min.insync.replicas=1,那么leader掛了,就無法選出leader了,
3.4 partition中訊息的High-Watermark
1. 什么是HighWatermark,有什么用
??kafka中的訊息在partition當中是按照先來后到的順序持續存入的,High-Watermark ,高水位,他標識了截止到哪條訊息是consumer可以看到的,因為要盡可能滿足資料的一致性,有可能訊息只是在partition中的leader存在,還沒有復制給其他的follower,這個時候讓consumer看到這條訊息是不安全的,因為這條訊息有可能還沒有答應producer已經成功持久化,這個時候如果leader宕機,也會導致資料的不一致,因為這個時候可能給producer回傳的是fail,但是實際上consumer卻消費到了這條資料,所以kafka就設計了high-water-mark,標識截止到哪條資料是consumer可以消費的,
2. HW的更新機制:
1. LEO ,log-end-offset
??在聊HW的更新機制之前,需要先了解LEO(log-end-offset),這個是partition的每個replica中存盤的日志的最后一條日志的offset(最后存進來的),offset其實就是日志的進來的順序編號,可以理解為陣列的下標,
??producer每發進來一條訊息,server端(broker)對應的都會放到某個指定的partition下,每個訊息都會產生一個offset,
??同時,leader當中不僅會有自己的LEO,也會有其他follower的LEO資訊(這個資訊也就是follower在拉取leader的資料的時候傳入的fetch offset ),leader會根據這些LEO資訊來完成HW的更新,HW=min(LEO)我們可以通以下幾個問題來回答HW的更新
-
follower 何時更新LEO
- follower 副本專屬執行緒不斷地向leader副本所在broker發送FETCH請求會攜帶自己的fetch-offset資料(也就是是自己的LEO),
- leader 副本發送 FETCH response 給follower副本,
- Follower 拿到response之后取出資料寫入到本地底層日志中,在該程序中其LEO值會被更新,
-
leader 如何更新自己記錄的follower的LEO
leader 端非自己副本物件 LEO值是在leader端處理follower的FETCH請求程序中被更新的, -
follower 何時更新HW
- Follower 副本物件更新HW是在其更新本地LEO之后,
- 一旦follower向本地日志寫完資料后它就會嘗試更新其HW值,
- 演算法為取本地LEO與FETCH response中leader的HW值的較小值,也就是說follower在fetch的時候leader會把自己的HW也傳過去
-
leader 何時更新HW
- Leader 副本物件處理 Follower FETCH請求是在更新完leader 端非自己副本物件的LEO后將嘗試更新其自己HW值
- producer 端寫入訊息會更新leader Replica的LEO
- 副本被踢出ISR時
- 某磁區變更為leader副本后
-
leader 在正常同步時的更新機制 HW的更新程序
- leader會根據所有follower的LEO來更新自己的HW
4 producer
??這里主要是要了解producer的發送機制,以及一些比較重要的配置,
1. producer如何直接連接對應的topic所在節點
Producers直接發送訊息到broker上的leader partition,不需要經過任何中介一系列的路由轉發,為了實作這個特性,
- kafka集群中的每個broker都可以回應producer的請求,并回傳topic的一些元資訊,這些元資訊包括哪些機器是存活的,topic的leader partition都在哪,現階段哪些leader partition是可以直接被訪問的,
- Producer客戶端自己控制著訊息被推送到哪些partition,實作的方式可以是隨機分配、實作一類隨機負載均衡演算法,或者指定一些磁區演算法,
- Kafka提供了介面供用戶實作自定義的磁區,用戶可以為每個訊息指定一個partitionKey,通過這個key來實作一些hash磁區演算法,比如,把userid作為partitionkey的話,相同userid的訊息將會被推送到同一個磁區,
- 以Batch的方式推送資料可以極大的提高處理效率,kafka Producer 可以將訊息在記憶體中累計到一定數量后作為一個batch發送請求,Batch的數量大小可以通過Producer的引數控制,引數值可以設定為累計的訊息的數量(如500條)、累計的時間間隔(如100ms)或者累計的資料大小(64KB),通過增加batch的大小,可以減少網路請求和磁盤IO的次數,當然具體引數設定需要在效率和時效性方面做一個權衡,
2. 相關配置
acks:
- acks=0 ,生產者把訊息發送到broker即認為成功,不等待broker的處理結果,這種情況下,下面的retries的配置也是無效的,這種方式的吞吐最高,但也是最容易丟失訊息的
- acks=1:生產者會在該磁區的群首(leader)寫入訊息并回傳成功后,認為訊息發送成功,如果群首寫入訊息失敗,生產者會收到錯誤回應并進行重試,這種方式能夠一定程度避免訊息丟失,但如果群首宕機時該訊息沒有復制到其他副本,那么該訊息還是會丟失,
- acks=all:生產者會等待所有副本成功寫入該訊息,這種方式是最安全的,能夠保證訊息不丟失,但是延遲也是最大的,這種一般是用在對資料的持久化和一致性比較高的場景,但是對資料吞吐量要求不是特別高,
retries
- 當生產者發送訊息收到一個可恢復例外時,會進行重試,這個引數指定了重試的次數,在實際情況中,這個引數需要結合retry.backoff.ms(重試等待間隔)來使用,建議總的重試時間比集群重新選舉群首的時間長,這樣可以避免生產者過早結束重試導致失敗
batch.size
- 當多條訊息發送到一個磁區時,生產者會進行批量發送,這個引數指定了批量訊息的大小上限(以位元組為單位),當批量訊息達到這個大小時,生產者會一起發送到broker;但即使沒有達到這個大小,生產者也會有定時機制來發送訊息,避免訊息延遲過大,
linger.ms
- 這個引數指定生產者在發送批量訊息前等待的時間,當設定此引數后,即便沒有達到批量訊息的指定大小,到達時間后生產者也會發送批量訊息到broker,默認情況下,生產者的發送訊息執行緒只要空閑了就會發送訊息,即便只有一條訊息,設定這個引數后,發送執行緒會等待一定的時間,這樣可以批量發送訊息增加吞吐量,但同時也會增加延遲,
buffer.memory
- 這個引數設定生產者緩沖發送的訊息的記憶體大小
client.id
- 這個引數可以是任意字串,它是broker用來識別訊息是來自哪個客戶端的,在broker進行列印日志、衡量指標或者配額限制時會用到,
max.in.flight.requests.per.connection
- 這個引數指定生產者可以發送多少訊息到broker并且等待回應,設定此引數較高的值可以提高吞吐量,但同時也會增加記憶體消耗,如果想要保證訊息的有序性,只能設定為1,2.0中默認為5,kafka保證了單個producer的嚴格exactly-once,也保證了有序性,比較牛叉
max.request.size
- 這個引數限制生產者發送資料包的大小,資料包的大小與訊息的大小、訊息數相關,如果我們指定了最大資料包大小為1M,那么最大的訊息大小為1M,或者能夠最多批量發送1000條訊息大小為1K的訊息,另外,broker也有message.max.bytes引數來控制接收的資料包大小,在實際中,建議這些引數值是匹配的,避免生產者發送了超過broker限定的資料大小,
5 consumer
??kafka的消費者,消費者,顧名思義,就是從kafka broker 上拉取生產者producer產生的資料,消費的資料粒度可以到達topic.partition,同時,也可以指定的起始位置offset值,或者是按照時間查找offset,然后進行消費,
??每個consumer會有一個group-id,多個consumer可以屬于一個group-id,進而分享一個topic的多個partition(提高并行能力)
??consumer可以使用自動提交消費點位offset,也可以使用手動提交的方式,他其實沒有ack機制,每次提交offset,就是往broker的__consumer_offset__ 這個topic生產訊息而已,然后下次啟動的時候又默認會從這個topic中取到相關的offset資訊,使用這個offset從broker中拉取資料,
1. consumer的rebalance程序
1. 發生rebalance的時機
1.正常情況
組成員個數發生變化,例如有新的 consumer 實體加入該消費組或者離開組,
訂閱的 Topic 個數發生變化,
訂閱 Topic 的磁區數發生變化,
2.消費者意外情況
session 過期
max.poll.interval 到期,在這個時間值達到時,心跳執行緒會自動停止發送heartbeats 然后 發送leave-group request
這個時候會觸發rebalance,
2. rebalance程序
下面以新增一個consumer來闡述
-
Consumer Client 發送 join-group 請求,如果 Group 不存在,創建該 Group,Group 的狀態為 Empty;
-
由于 Group 的 member 為空,將該 member 加入到 Group 中,并將當前 member (client)設定為 Group 的 leader,進行 rebalance 操作,Group 的狀態變為 preparingRebalance,等待 rebalance.timeout.ms 之后(為了等待其他 member 重新發送 join-group,如果 Group 的狀態變為 preparingRebalance,Consumer Client 在進行 poll 操作時,needRejoin() 方法結果就會回傳 true,也就意味著當前 Consumer Client 需要重新加入 Group),Group 的 member 更新已經完成,此時 Group 的狀態變為 AwaitingSync,并向 Group 的所有 member 回傳 join-group 回應;
-
client 在收到 join-group 結果之后,如果發現自己的角色是 Group 的 leader,就進行 assignment,該 leader 將 assignment 的結果通過 sync-group 請求發送給 GroupCoordinator,而 follower 也會向 GroupCoordinator 發送一個 sync-group 請求(只不過對應的欄位為空);
-
當 GroupCoordinator 收到這個 Group leader 的請求之后,獲取 assignment 的結果,將各個 member 對應的 assignment 發送給各個 member,而如果該 Client 是 follower 的話就不做任何處理,此時 group 的狀態變為 Stable(也就是說,只有當收到的 Leader 的請求之后,才會向所有 member 回傳 sync-group 的結果,這個是只發送一次的,由 leader 請求來觸發),
2. 消費者的一些配置
fetch.min.bytes
- 這個引數允許消費者指定從broker讀取訊息時最小的資料量,當消費者從broker讀取訊息時,如果資料量小于這個閾值,broker會等待直到有足夠的資料,然后才回傳給消費者,對于寫入量不高的主題來說,這個引數可以減少broker和消費者的壓力,因為減少了往返的時間,而對于有大量消費者的主題來說,則可以明顯減輕broker壓力,
fetch.max.wait.ms
- 上面的fetch.min.bytes引數指定了消費者讀取的最小資料量,而這個引數則指定了消費者讀取時最長等待時間,從而避免長時間阻塞,這個引數默認為500ms,
max.partition.fetch.bytes
-
這個引數指定了每個磁區回傳的最多位元組數,默認為1M,也就是說,KafkaConsumer.poll()回傳記錄串列時,每個磁區的記錄位元組數最多為1M,如果一個主題有20個磁區,同時有5個消費者,那么每個消費者需要4M的空間來處理訊息,實際情況中,我們需要設定更多的空間,這樣當存在消費者宕機時,其他消費者可以承擔更多的磁區,
-
需要注意的是,max.partition.fetch.bytes必須要比broker能夠接收的最大的訊息(由max.message.size設定)大,否則會導致消費者消費不了訊息,另外,在上面的樣例可以看到,我們通常回圈呼叫poll方法來讀取訊息,如果max.partition.fetch.bytes設定過大,那么消費者需要更長的時間來處理,可能會導致沒有及時poll而會話過期,對于這種情況,要么減小max.partition.fetch.bytes,要么加長會話時間,
session.timeout.ms
- 這個引數設定消費者會話過期時間,默認為3秒,也就是說,如果消費者在這段時間內沒有發送心跳,那么broker將會認為會話過期而進行磁區重平衡,這個引數與heartbeat.interval.ms有關,heartbeat.interval.ms控制KafkaConsumer的poll()方法多長時間發送一次心跳,這個值需要比session.timeout.ms小,一般為1/3,也就是1秒,更小的session.timeout.ms可以讓Kafka快速發現故障進行重平衡,但也加大了誤判的概率(比如消費者可能只是處理訊息慢了而不是宕機),
auto.offset.reset
- 這個引數指定了當消費者第一次讀取磁區或者上一次的位置太老(比如消費者下線時間太久)時的行為,可以取值為latest(從最新的訊息開始消費)或者earliest(從最老的訊息開始消費),
enable.auto.commit
- 這個引數指定了消費者是否自動提交消費位移,默認為true,如果需要減少重復消費或者資料丟失,你可以設定為false,如果為true,你可能需要關注自動提交的時間間隔,該間隔由auto.commit.interval.ms設定,
partition.assignment.strategy
-
我們已經知道當消費組存在多個消費者時,主題的磁區需要按照一定策略分配給消費者,這個策略由PartitionAssignor類決定,默認有兩種策略:
- 范圍(Range):對于每個主題,每個消費者負責一定的連續范圍磁區,假如消費者C1和消費者C2訂閱了兩個主題,這兩個主題都有3個磁區,那么使用這個策略會導致消費者C1負責每個主題的磁區0和磁區1(下標基于0開始),消費者C2負責磁區2,可以看到,如果消費者數量不能整除磁區數,那么第一個消費者會多出幾個磁區(由主題數決定),
- 輪詢(RoundRobin):對于所有訂閱的主題磁區,按順序一一的分配給消費者,用上面的例子來說,消費者C1負責第一個主題的磁區0、磁區2,以及第二個主題的磁區1;其他磁區則由消費者C2負責,可以看到,這種策略更加均衡,所有消費者之間的磁區數的差值最多為1,
-
partition.assignment.strategy設定了分配策略,默認為org.apache.kafka.clients.consumer.RangeAssignor(使用范圍策略),你可以設定為org.apache.kafka.clients.consumer.RoundRobinAssignor(使用輪詢策略),或者自己實作一個分配策略然后將partition.assignment.strategy指向該實作類,
client.id
- 這個引數可以為任意值,用來指明訊息從哪個客戶端發出,一般會在列印日志、衡量指標、分配配額時使用,
max.poll.records
- 這個引數控制一個poll()呼叫回傳的記錄數,這個可以用來控制應用在拉取回圈中的處理資料量,
max.poll.interval.ms
- 兩次 poll 之間的最大時間間隔,設定大一點可以處理訊息的時間,在到達這個時間沒有進行poll()操作的話會自動停止發送心跳,并且發送一個leave-group的請求,
假如兩次poll()之間處理請求比較大的話應該放到異步去做,因為服務器同時會使用這個引數作為等待其他consumer相應rejion的最大時長,假如其他consumer也把max.poll.interval.ms設定的比較長的話,那么整個rebalance可能耗時會很長,
receive.buffer.bytes、send.buffer.bytes
- 這兩個引數控制讀寫資料時的TCP緩沖區,設定為-1則使用系統的默認值,如果消費者與broker在不同的資料中心,可以一定程度加大緩沖區,因為資料中心間一般的延遲都比較大,
partition.assignment.strategy
- 這個設定了consumer的partition在consumer中的分配機制
kafka的壓縮和解壓縮,理論上broker是不會再處理的,除非單獨配置了,這樣有可能會導致cpu升高
http://zhongmingmao.me/2019/08/02/kafka-compression/
6. 高性能讀寫的秘密
- 順序讀寫
- 零拷貝
參考
https://juejin.im/post/5d9944e9f265da5b6a169271
https://juejin.im/post/5bf6b0acf265da612d18e931#heading-5
https://juejin.im/post/5c0683b1f265da614f701441
https://juejin.im/post/5c46e729e51d452c8e6d5679
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/264458.html
標籤:其他
上一篇:大資料平臺搭建 | Hive
下一篇:錯誤: 找不到或無法加載主類 org.apache.hadoop.mapreduce.v2.app.MRAppMaster
