一、簡介
1.1 概述
Kafka是最初由Linkedin公司開發,是一個分布式、磁區的、多副本的、多訂閱者,基于zookeeper協調的分布式日志系統(也可以當做MQ系統),常見可以用于web/nginx日志、訪問日志,訊息服務等等,Linkedin于2010年貢獻給了Apache基金會并成為頂級開源專案,
主要應用場景是:日志收集系統和訊息系統,
Kafka主要設計目標如下:
- 以時間復雜度為O(1)的方式提供訊息持久化能力,即使對TB級以上資料也能保證常數時間的訪問性能,
- 高吞吐率,即使在非常廉價的商用機器上也能做到單機支持每秒100K條訊息的傳輸,
- 支持Kafka Server間的訊息磁區,及分布式消費,同時保證每個partition內的訊息順序傳輸,
- 同時支持離線資料處理和實時資料處理,
- 支持在線水平擴展
1.2 訊息系統介紹
一個訊息系統負責將資料從一個應用傳遞到另外一個應用,應用只需關注于資料,無需關注資料在兩個或多個應用間是如何傳遞的,分布式訊息傳遞基于可靠的訊息佇列,在客戶端應用和訊息系統之間異步傳遞訊息,有兩種主要的訊息傳遞模式:點對點傳遞模式、發布-訂閱模式,大部分的訊息系統選用發布-訂閱模式,Kafka就是一種發布-訂閱模式,
1.3 點對點訊息傳遞
在點對點訊息系統中,訊息持久化到一個佇列中,此時,將有一個或多個消費者消費佇列中的資料,但是一條訊息只能被消費一次,當一個消費者消費了佇列中的某條資料之后,該條資料則從訊息佇列中洗掉,該模式即使有多個消費者同時消費資料,也能保證資料處理的順序,這種架構描述示意圖如下:

生產者發送一條訊息到queue,只有一個消費者能收到,
1.4 發布-訂閱訊息傳遞
在發布-訂閱訊息系統中,訊息被持久化到一個topic中,與點對點訊息系統不同的是,消費者可以訂閱一個或多個topic,消費者可以消費該topic中所有的資料,同一條資料可以被多個消費者消費,資料被消費后不會立馬洗掉,在發布-訂閱訊息系統中,訊息的生產者稱為發布者,消費者稱為訂閱者,該模式的示例圖如下:

發布者發送到topic的訊息,只有訂閱了topic的訂閱者才會收到訊息,
1.5 Kafka的優點
1)解耦:
在專案啟動之初來預測將來專案會碰到什么需求,是極其困難的,訊息系統在處理程序中間插入了一個隱含的、基于資料的介面層,兩邊的處理程序都要實作這一介面,這允許你獨立的擴展或修改兩邊的處理程序,只要確保它們遵守同樣的介面約束,
2)冗余:(副本)
有些情況下,處理資料的程序會失敗,除非資料被持久化,否則將造成丟失,訊息佇列把資料進行持久化直到它們已經被完全處理,通過這一方式規避了資料丟失風險,許多訊息佇列所采用的"插入-獲取-洗掉"范式中,在把一個訊息從佇列中洗掉之前,需要你的處理系統明確的指出該訊息已經被處理完畢,從而確保你的資料被安全的保存直到你使用完畢,
3)擴展性
因為訊息佇列解耦了你的處理程序,所以增大訊息入隊和處理的頻率是很容易的,只要另外增加處理程序即可,不需要改變代碼、不需要調節引數,擴展就像調大電力按鈕一樣簡單,
4)靈活性&峰值處理能力
在訪問量劇增的情況下,應用仍然需要繼續發揮作用,但是這樣的突發流量并不常見;如果為以能處理這類峰值訪問為標準來投入資源隨時待命無疑是巨大的浪費,使用訊息佇列能夠使關鍵組件頂住突發的訪問壓力,而不會因為突發的超負荷的請求而完全崩潰,
5)可恢復性
系統的一部分組件失效時,不會影響到整個系統,訊息佇列降低了行程間的耦合度,所以即使一個處理訊息的行程掛掉,加入佇列中的訊息仍然可以在系統恢復后被處理,
6)順序保證
在大多使用場景下,資料處理的順序都很重要,大部分訊息佇列本來就是排序的,并且能保證資料會按照特定的順序來處理,Kafka保證一個Partition內的訊息的有序性,
7)緩沖
在任何重要的系統中,都會有需要不同的處理時間的元素,例如,加載一張圖片比應用過濾器花費更少的時間,訊息佇列通過一個緩沖層來幫助任務最高效率的執行———寫入佇列的處理會盡可能的快速,該緩沖有助于控制和優化資料流經過系統的速度,
8)異步通信
很多時候,用戶不想也不需要立即處理訊息,訊息佇列提供了異步處理機制,允許用戶把一個訊息放入佇列,但并不立即處理它,想向佇列中放入多少訊息就放多少,然后在需要的時候再去處理它們,
1.6 常用MQ對比
1)RabbitMQ
RabbitMQ是使用Erlang撰寫的一個開源的訊息佇列,本身支持很多的協議:AMQP,XMPP, SMTP, STOMP,也正因如此,它非常重量級,更適合于企業級的開發,同時實作了Broker構架,這意味著訊息在發送給客戶端時先在中心佇列排隊,對路由,負載均衡或者資料持久化都有很好的支持,
2)Redis
Redis是一個基于Key-Value對的NoSQL資料庫,開發維護很活躍,雖然它是一個Key-Value資料庫存盤系統,但它本身支持MQ功能,所以完全可以當做一個輕量級的佇列服務來使用,對于RabbitMQ和Redis的入隊和出隊操作,各執行100萬次,每10萬次記錄一次執行時間,測驗資料分為128Bytes、512Bytes、1K和10K四個不同大小的資料,實驗表明:入隊時,當資料比較小時Redis的性能要高于RabbitMQ,而如果資料大小超過了10K,Redis則慢的無法忍受;出隊時,無論資料大小,Redis都表現出非常好的性能,而RabbitMQ的出隊性能則遠低于Redis,
3)ZeroMQ
ZeroMQ號稱最快的訊息佇列系統,尤其針對大吞吐量的需求場景,ZeroMQ能夠實作RabbitMQ不擅長的高級/復雜的佇列,但是開發人員需要自己組合多種技術框架,技術上的復雜度是對這MQ能夠應用成功的挑戰,ZeroMQ具有一個獨特的非中間件的模式,你不需要安裝和運行一個訊息服務器或中間件,因為你的應用程式將扮演這個服務器角色,你只需要簡單的參考ZeroMQ程式庫,可以使用NuGet安裝,然后你就可以愉快的在應用程式之間發送訊息了,但是ZeroMQ僅提供非持久性的佇列,也就是說如果宕機,資料將會丟失,其中,Twitter的Storm 0.9.0以前的版本中默認使用ZeroMQ作為資料流的傳輸(Storm從0.9版本開始同時支持ZeroMQ和Netty作為傳輸模塊),
4)ActiveMQ
ActiveMQ是Apache下的一個子專案, 類似于ZeroMQ,它能夠以代理人和點對點的技術實作佇列,同時類似于RabbitMQ,它少量代碼就可以高效地實作高級應用場景,
5)Kafka/Jafka
Kafka是Apache下的一個子專案,是一個高性能跨語言分布式發布/訂閱訊息佇列系統,而Jafka是在Kafka之上范訓而來的,即Kafka的一個升級版,具有以下特性:快速持久化,可以在O(1)的系統開銷下進行訊息持久化;高吞吐,在一臺普通的服務器上既可以達到10W/s的吞吐速率;完全的分布式系統,Broker、Producer、Consumer都原生自動支持分布式,自動實作負載均衡;支持Hadoop資料并行加載,對于像Hadoop的一樣的日志資料和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案,Kafka通過Hadoop的并行加載機制統一了在線和離線的訊息處理,Apache Kafka相對于ActiveMQ是一個非常輕量級的訊息系統,除了性能非常好之外,還是一個作業良好的分布式系統,
1.7 Kafka中的術語解釋
概述
在深入理解Kafka之前,先介紹一下Kafka中的術語,下圖展示了Kafka的相關術語以及之間的關系:

上圖中一個topic配置了3個partition,Partition1有兩個offset:0和1,Partition2有4個offset,Partition3有1個offset,副本的id和副本所在的機器的id恰好相同,
如果一個topic的副本數為3,那么Kafka將在集群中為每個partition創建3個相同的副本,集群中的每個broker存盤一個或多個partition,多個producer和consumer可同時生產和消費資料,

1、Broker
Kafka 集群包含一個或多個服務器,服務器節點稱為broker,
broker存盤topic的資料,如果某topic有N個partition,集群有N個broker,那么每個broker存盤該topic的一個partition,
如果某topic有N個partition,集群有(N+M)個broker,那么其中有N個broker存盤該topic的一個partition,剩下的M個broker不存盤該topic的partition資料,
如果某topic有N個partition,集群中broker數目少于N個,那么一個broker存盤該topic的一個或多個partition,在實際生產環境中,盡量避免這種情況的發生,這種情況容易導致Kafka集群資料不均衡,
2、Topic
每條發布到Kafka集群的訊息都有一個類別,這個類別被稱為Topic,(物理上不同Topic的訊息分開存盤,邏輯上一個Topic的訊息雖然保存于一個或多個broker上但用戶只需指定訊息的Topic即可生產或消費資料而不必關心資料存于何處)
類似于資料庫的表名
3、Partition
topic中的資料分割為一個或多個partition,每個topic至少有一個partition,每個partition中的資料使用多個segment檔案存盤,partition中的資料是有序的,不同partition間的資料丟失了資料的順序,如果topic有多個partition,消費資料時就不能保證資料的順序,在需要嚴格保證訊息的消費順序的場景下,需要將partition數目設為1,
4、Producer
生產者即資料的發布者,該角色將訊息發布到Kafka的topic中,broker接收到生產者發送的訊息后,broker將該訊息追加到當前用于追加資料的segment檔案中,生產者發送的訊息,存盤到一個partition中,生產者也可以指定資料存盤的partition,
5、Consumer
消費者可以從broker中讀取資料,消費者可以消費多個topic中的資料,
6、Consumer Group
每個Consumer屬于一個特定的Consumer Group(可為每個Consumer指定group name,若不指定group name則屬于默認的group),這是kafka用來實作一個topic訊息的廣播(發給所有的consumer)和單播(發給任意一個consumer)的手段,一個topic可以有多個CG,topic的訊息會復制-給consumer,如果需要實作廣播,只要每個consumer有一個獨立的CG就可以了,要實作單播只要所有的consumer在同一個CG,用CG還可以將consumer進行自由的分組而不需要多次發送訊息到不同的topic,
7、Leader
每個partition有多個副本,其中有且僅有一個作為Leader,Leader是當前負責資料的讀寫的partition,
8、Follower
Follower跟隨Leader,所有寫請求都通過Leader路由,資料變更會廣播給所有Follower,Follower與Leader保持資料同步,如果Leader失效,則從Follower中選舉出一個新的Leader,當Follower與Leader掛掉、卡住或者同步太慢,leader會把這個follower從“in sync replicas”(ISR)串列中洗掉,重新創建一個Follower,
9、Offset
kafka的存盤檔案都是按照offset.kafka來命名,用offset做名字的好處是方便查找,例如你想找位于2049的位置,只要找到2048.kafka的檔案即可,當然the first offset就是00000000000.kafka
一、Kafka的架構

如上圖所示,一個典型的Kafka集群中包含若干Producer(可以是web前端產生的Page View,或者是服務器日志,系統CPU、Memory等),若干broker(Kafka支持水平擴展,一般broker數量越多,集群吞吐率越高),若干Consumer Group,以及一個Zookeeper集群,Kafka通過Zookeeper管理集群配置,選舉leader,以及在Consumer Group發生變化時進行rebalance,Producer使用push模式將訊息發布到broker,Consumer使用pull模式從broker訂閱并消費訊息,
2.1 分布式模型
Kafka每個主題的多個磁區日志分布式地存盤在Kafka集群上,同時為了故障容錯,每個磁區都會以副本的方式復制到多個訊息代理節點上,其中一個節點會作為主副本(Leader),其他節點作為備份副本(Follower,也叫作從副本),主副本會負責所有的客戶端讀寫操作,備份副本僅僅從主副本同步資料,當主副本出現故障時,備份副本中的一個副本會被選擇為新的主副本,因為每個磁區的副本中只有主副本接受讀寫,所以每個服務器端都會作為某些磁區的主副本,以及另外一些磁區的備份副本,這樣Kafka集群的所有服務端整體上對客戶端是負載均衡的,
Kafka的生產者和消費者相對于服務器端而言都是客戶端,
Kafka生產者客戶端發布訊息到服務端的指定主題,會指定訊息所屬的磁區,生產者發布訊息時根據訊息是否有鍵,采用不同的磁區策略,訊息沒有鍵時,通過輪詢方式進行客戶端負載均衡;訊息有鍵時,根據磁區語意(例如hash)確保相同鍵的訊息總是發送到同一磁區,
Kafka的消費者通過訂閱主題來消費訊息,并且每個消費者都會設定一個消費組名稱,因為生產者發布到主題的每一條訊息都只會發送給消費者組的一個消費者,所以,如果要實作傳統訊息系統的“佇列”模型,可以讓每個消費者都擁有相同的消費組名稱,這樣訊息就會負責均衡到所有的消費者;如果要實作“發布-訂閱”模型,則每個消費者的消費者組名稱都不相同,這樣每條訊息就會廣播給所有的消費者,
磁區是消費者現場模型的最小并行單位,如下圖(圖1)所示,生產者發布訊息到一臺服務器的3個磁區時,只有一個消費者消費所有的3個磁區,在下圖(圖2)中,3個磁區分布在3臺服務器上,同時有3個消費者分別消費不同的磁區,假設每個服務器的吞吐量時300MB,在下圖(圖1)中分攤到每個磁區只有100MB,而在下圖(圖2)中,集群整體的吞吐量有900MB,可以看到,增加服務器節點會提升集群的性能,增加消費者數量會提升處理性能,
同一個消費組下多個消費者互相協調消費作業,Kafka會將所有的磁區平均地分配給所有的消費者實體,這樣每個消費者都可以分配到數量均等的磁區,Kafka的消費組管理協議會動態地維護消費組的成員串列,當一個新消費者加入消費者組,或者有消費者離開消費組,都會觸發再平衡操作,

Kafka的消費者消費訊息時,只保證在一個磁區內的訊息的完全有序性,并不保證同一個主題匯中多個磁區的訊息順序,而且,消費者讀取一個磁區訊息的順序和生產者寫入到這個磁區的順序是一致的,比如,生產者寫入“hello”和“Kafka”兩條訊息到磁區P1,則消費者讀取到的順序也一定是“hello”和“Kafka”,如果業務上需要保證所有訊息完全一致,只能通過設定一個磁區完成,但這種做法的缺點是最多只能有一個消費者進行消費,一般來說,只需要保證每個磁區的有序性,再對訊息假設鍵來保證相同鍵的所有訊息落入同一磁區,就可以滿足絕大多數的應用,
二、Topics 和 Partition
Topic在邏輯上可以被認為是一個queue,每條消費都必須指定它的Topic,可以簡單理解為必須指明把這條訊息放進哪個queue里,為了使得Kafka的吞吐率可以線性提高,物理上把Topic分成一個或多個Partition,每個Partition在物理上對應一個檔案夾,該檔案夾下存盤這個Partition的所有訊息和索引檔案,創建一個topic時,同時可以指定磁區數目,磁區數越多,其吞吐量也越大,但是需要的資源也越多,同時也會導致更高的不可用性,kafka在接收到生產者發送的訊息之后,會根據均衡策略將訊息存盤到不同的磁區中,因為每條訊息都被append到該Partition中,屬于順序寫磁盤,因此效率非常高(經驗證,順序寫磁盤效率比隨機寫記憶體還要高,這是Kafka高吞吐率的一個很重要的保證),

對于傳統的message queue而言,一般會洗掉已經被消費的訊息,而Kafka集群會保留所有的訊息,無論其被消費與否,當然,因為磁盤限制,不可能永久保留所有資料(實際上也沒必要),因此Kafka提供兩種策略洗掉舊資料,一是基于時間,二是基于Partition檔案大小,例如可以通過配置$KAFKA_HOME/config/server.properties,讓Kafka洗掉一周前的資料,也可在Partition檔案超過1GB時洗掉舊資料,配置如下所示:
# The minimum age of a log file to be eligible for deletion
log.retention.hours=168
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according to the retention policies
log.retention.check.interval.ms=300000
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
log.cleaner.enable=false
因為Kafka讀取特定訊息的時間復雜度為O(1),即與檔案大小無關,所以這里洗掉過期檔案與提高Kafka性能無關,選擇怎樣的洗掉策略只與磁盤以及具體的需求有關,另外,Kafka會為每一個Consumer Group保留一些metadata資訊——當前消費的訊息的position,也即offset,這個offset由Consumer控制,正常情況下Consumer會在消費完一條訊息后遞增該offset,當然,Consumer也可將offset設成一個較小的值,重新消費一些訊息,因為offet由Consumer控制,所以Kafka broker是無狀態的,它不需要標記哪些訊息被哪些消費過,也不需要通過broker去保證同一個Consumer Group只有一個Consumer能消費某一條訊息,因此也就不需要鎖機制,這也為Kafka的高吞吐率提供了有力保障,
三、Producer訊息路由
Producer發送訊息到broker時,會根據Paritition機制選擇將其存盤到哪一個Partition,如果Partition機制設定合理,所有訊息可以均勻分布到不同的Partition里,這樣就實作了負載均衡,如果一個Topic對應一個檔案,那這個檔案所在的機器I/O將會成為這個Topic的性能瓶頸,而有了Partition后,不同的訊息可以并行寫入不同broker的不同Partition里,極大的提高了吞吐率,可以在$KAFKA_HOME/config/server.properties中通過配置項num.partitions來指定新建Topic的默認Partition數量,也可在創建Topic時通過引數指定,同時也可以在Topic創建之后通過Kafka提供的工具修改,
在發送一條訊息時,可以指定這條訊息的key,Producer根據這個key和Partition機制來判斷應該將這條訊息發送到哪個Parition,Paritition機制可以通過指定Producer的paritition. class這一引數來指定,該class必須實作kafka.producer.Partitioner介面,
四、Consumer Group
使用Consumer high level API時,同一Topic的一條訊息只能被同一個Consumer Group內的一個Consumer消費,但多個Consumer Group可同時消費這一訊息,

這是Kafka用來實作一個Topic訊息的廣播(發給所有的Consumer)和單播(發給某一個Consumer)的手段,一個Topic可以對應多個Consumer Group,如果需要實作廣播,只要每個Consumer有一個獨立的Group就可以了,要實作單播只要所有的Consumer在同一個Group里,用Consumer Group還可以將Consumer進行自由的分組而不需要多次發送訊息到不同的Topic,
實際上,Kafka的設計理念之一就是同時提供離線處理和實時處理,根據這一特性,可以使用Storm這種實時流處理系統對訊息進行實時在線處理,同時使用Hadoop這種批處理系統進行離線處理,還可以同時將資料實時備份到另一個資料中心,只需要保證這三個操作所使用的Consumer屬于不同的Consumer Group即可,
五、Push vs. Pull
作為一個訊息系統,Kafka遵循了傳統的方式,選擇由Producer向broker push訊息并由Consumer從broker pull訊息,一些logging-centric system,比如Facebook的Scribe和Cloudera的Flume,采用push模式,事實上,push模式和pull模式各有優劣,
push模式很難適應消費速率不同的消費者,因為訊息發送速率是由broker決定的,push模式的目標是盡可能以最快速度傳遞訊息,但是這樣很容易造成Consumer來不及處理訊息,典型的表現就是拒絕服務以及網路擁塞,而pull模式則可以根據Consumer的消費能力以適當的速率消費訊息,
對于Kafka而言,pull模式更合適,pull模式可簡化broker的設計,Consumer可自主控制消費訊息的速率,同時Consumer可以自己控制消費方式——即可批量消費也可逐條消費,同時還能選擇不同的提交方式從而實作不同的傳輸語意,
六、Kafka delivery guarantee
有這么幾種可能的delivery guarantee:
At most once 訊息可能會丟,但絕不會重復傳輸
At least one 訊息絕不會丟,但可能會重復傳輸
Exactly once 每條訊息肯定會被傳輸一次且僅傳輸一次,很多時候這是用戶所想要的,
當Producer向broker發送訊息時,一旦這條訊息被commit,因數replication的存在,它就不會丟,但是如果Producer發送資料給broker后,遇到網路問題而造成通信中斷,那Producer就無法判斷該條訊息是否已經commit,雖然Kafka無法確定網路故障期間發生了什么,但是Producer可以生成一種類似于主鍵的東西,發生故障時冪等性的重試多次,這樣就做到了Exactly once,
接下來討論的是訊息從broker到Consumer的delivery guarantee語意,(僅針對Kafka consumer high level API),Consumer在從broker讀取訊息后,可以選擇commit,該操作會在Zookeeper中保存該Consumer在該Partition中讀取的訊息的offset,該Consumer下一次再讀該Partition時會從下一條開始讀取,如未commit,下一次讀取的開始位置會跟上一次commit之后的開始位置相同,當然可以將Consumer設定為autocommit,即Consumer一旦讀到資料立即自動commit,如果只討論這一讀取訊息的程序,那Kafka是確保了Exactly once,但實際使用中應用程式并非在Consumer讀取完資料就結束了,而是要進行進一步處理,而資料處理與commit的順序在很大程度上決定了訊息從broker和consumer的delivery guarantee semantic,
Kafka默認保證At least once,并且允許通過設定Producer異步提交來實作At most once,而Exactly once要求與外部存盤系統協作,幸運的是Kafka提供的offset可以非常直接非常容易得使用這種方式,
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/290827.html
標籤:其他
