目錄
- 一、Kafka 介紹
- 1.1 訊息佇列的介紹
- 1.2 Kafka 簡介
- 1.3 安裝 Kafka 集群
- 二、Kafka 使用初體驗
- 三、Kafka 核心擴展內容
- 3.1 Broker 擴展
- 3.2 Producer 擴展
- 3.3 Consumer 擴展
- 3.4 Kafka 核心之存盤和容錯機制
- 3.5 Kafka 高效讀寫資料
- 3.6 Zookeeper 在 Kafka 中的作用
- 3.7 Kafka 事務
一、Kafka 介紹
1.1 訊息佇列的介紹
在沒有使用訊息系統以前,我們對于許多的傳統業務,以及跨服務器傳遞訊息的時候,會采用 串行方式或者并行方法; 串行方式如下:用戶注冊實體:將注冊資訊寫入資料庫成功后,發送注冊郵件,再發送注冊短信,

并行方式:將注冊資訊寫入資料庫成功后,發送注冊郵件的同時,發送注冊短信,

以上三個任務完成之后,回應給客戶端,與串行的差別是并行的方式可以縮短程式整體處理的時間,
訊息系統: 訊息系統 負責將資料從一個應用程式傳送到另一個應用程式,因此應用程式可以專注于資料,但是不必擔心如何共享它,分布式訊息系統基于可靠的訊息佇列的概念,訊息在客戶端應用程式和訊息傳遞系統之間的異步排隊,有兩種型別的訊息模式可用
(1) 點對點模式(一對一,消費者主動拉取資料,訊息收到后訊息清除)
訊息生產者生產訊息發送到 Queue 中,然后訊息消費者從 Queue 中取出并且消費訊息,訊息被消費以后,Queue 中不再有存盤,所以訊息消費者不可能消費到已經被消費的訊息,Queue 支持存在多個消費者,但是對一個訊息而言,只會有一個消費者可以消費,

(2) 發布/訂閱模式(一對多,消費者消費資料之后不會清除訊息) 訊息生產者(發布)將訊息發布到 topic 中,同時有多個訊息消費者(訂閱) 消費該訊息,和點對點方式不同,發布到 topic 的訊息會被所有訂閱者消費,

使用訊息佇列的好處:
- 解耦,允許你獨立的擴展或修改兩邊的處理程序,只要確保它們遵守同樣的介面約束,
- 可恢復性,系統的一部分組件失效時,不會影響到整個系統,訊息佇列降低了行程間的耦合度,所以即使一個處理訊息的行程掛掉,加入佇列中的訊息仍然可以在系統恢復后被處理,
- 緩沖,有助于控制和優化資料流經過系統的速度,解決生產訊息和消費訊息的處理速度不一致的情況,
- 靈活性 & 峰值處理能力,在訪問量劇增的情況下,應用仍然需要繼續發揮作用,但是這樣的突發流量并不常見,如果為以能處理這類峰值訪問為標準來投入資源隨時待命無疑是巨大的浪費,使用訊息佇列能夠使關鍵組件頂住突發的訪問壓力,而不會因為突發的超負荷的請求而完全崩潰,舉個例子: 秒殺活動當中,一般會因為流量過大,應用服務器掛掉,為了解決這個問題,一般需要在應用前端加上訊息佇列以控制訪問流量,1、可以控制活動的人數 可以緩解短時間內流量大使得服務器崩掉,2、可以通過佇列進行資料快取,后續再進行消費處理,
- 異步通信,很多時候,用戶不想也不需要立即處理訊息,訊息佇列提供了異步處理機制,允許用戶把一個訊息放入佇列,但并不立即處理它,想向佇列中放入多少訊息就放多少,然后在需要的時候再去處理它們,
1.2 Kafka 簡介
Kafka 是最初由 Linkedin 公司開發,是一個分布式、支持磁區的、多副本的、多訂閱者的,基于 ZooKeeper 協調的分布式日志系統,常見可以用于 web/nginx 日志、訪問日志、訊息服務等,Linkedin 公司于2010年將 Kafka 貢獻給了 Apache基金會并成為頂級開源專案,
說明:Jay Kreps,是 Linkedin 公司的一名在線資料架構技術高管,負責 Kafka 專案,Kafka 是以一位小說家的名字命名,因為 Kafka 是 a system optimized for writing(一個用于優化寫作的系統),同時 Jay Kreps 很喜歡 Kafka 的作品,
Kafka 最大的特性就是支持分布式,可以實時的處理大量資料以滿足各種需求場景,比如基于 Hadoop 的批處理系統、低延遲的實時系統、storm/Spark 流式處理引擎,web/nginx日志、訪問日志、訊息服務等,除此以外,Kafaka 還有以下特性:
- 高吞吐量,高性能,低延遲:Kafka 每秒可以處理幾十萬條訊息,它的延遲最低只有幾毫秒,
- 高容錯性,運行集群中節點失敗,若副本數量為 n,則運行 n-1 個節點失敗,
- 高持久性,高可靠性:訊息被持久化到本地磁盤,并且支持資料備份,防止資料丟失,
- 高并發:支持數千個客戶端同時讀寫,
- 有效地安全機制:Kafka 具有如下幾種安全措施,① 通過SSL和SASL(Kerberos),SASL/PLAIN驗證機制支持生產者、消費者與代理連接時的身份認證;② 支持代理與 ZooKeeper 連接身份驗證;③ 通信時資料加密;④ 客戶端讀、寫權限認證;⑤ Kafka 支持與外部其他認證授權服務的集成,
Kafka 的體系結構如下圖所示:

Kafka 中的名詞決議:
- 訊息中間件, 通過前面的介紹,讀者可能只看懂了 Kafka 是訊息中間件,但中間件具體是什么,有什么作用并不知道,下面通過一個例子來介紹什么是訊息中間件,Kafka 有兩個最重要的核心,就是生產者與消費者,例如,生產者生產包子,消費者消費包子;生產者生產一個包子,消費者就消費一個包子,假設消費者消費包子的時候噎住了,但生產者還在生產包子,那新生產的包子就浪費了,再比如另一種情況,生產者很厲害,1分鐘能生產100個包子,但消費者1分鐘只能吃10個包子,一段時間之后,消費者就吃撐了,拒絕再吃,那么剩下的包子又浪費了,這個時候如果在生產者和消費者之間放個盤子,生產出的包子都放到盤子中,消費者去盤子里拿包子,這樣剩下的包子就不會浪費了,那么這個盤子就是 Kafka,如下圖所示:

上面例子中出現的場景對應在 Kafka 集群的哪種情況,1、“消費者消費包子的時候噎住了”:系統宕機,2、“生產者很厲害,1分鐘能生產100個包子”:資料交易量巨大,3、“消費者就吃撐了,拒絕再吃”:訊息堵塞,導致系統超時,包子就相當于是資料流,系統之間的互動都是通過資料流來傳輸的,也叫做“訊息”, - 訊息, 即上面例子中的包子,訊息是 Kafka 通信的基本單位,由一個固定長度的訊息頭和一個可變長度的訊息體構成,在老版本中,每一條訊息稱為 Message;在 java 實作的客戶端中,每一條訊息稱之為 Record,
- 生產者(Producer), 生產包子的,即將訊息寫入 Kafka 集群,
- 消費者(Consumer), 消費生產出的包子,從 Kafka 集群中讀取訊息,
- 主題(topic), 主題,相當于每個生產者生產出的包子都有自己的品牌,消費者可不是誰生產的包子都吃的,這樣不同的生產者生產出來的包子,消費者就可以根據品牌的不同,選擇性地吃了,Kafka 將一組訊息抽象歸納為一個主題,也就是說,一個主題就是對訊息的分類,生產者將訊息發送到特定的主題,消費者訂閱主題或主題的某些磁區進行消費,主題有多個消費者,即一個主題可以有零個、一個或多個消費者來讀取資料,
- 磁區(Partition), Kafka 將一組訊息歸納為一個主題,而每個主題又被分成一個或多個磁區,每個磁區由一系列有序、不可變訊息組成,是一個有序佇列,每個磁區在物理上對應為一個檔案夾,磁區的命名規則為主題名稱后接“一”連接符,之后再接磁區編號,磁區編號從0開始,每個主題對應的磁區數可以在 Kafka 啟動時加載組態檔中配置、也可以在創建主題時指定、還可以在修改主題時修改磁區數,磁區使得 Kafka 在并發處理上變得更加容易,理論上來說,磁區數越多吞吐量越高,同時磁區也是 Kafka 保證訊息被順序消費以及對訊息進行負載均衡的基礎,Kafka 只能保證一個磁區內訊息的有序性,并不能保證跨磁區訊息的有序性,每條訊息被追加到相應的磁區中,是順序寫磁盤,因此效率非常高,
- 副本(Replication), 每個磁區又有一個至多個副本,磁區的副本分布在集群的不同代理上,以提高可用性,從存盤的角度上分析,磁區的每個副本在邏輯上可以抽象為一個日志(Log)物件,在創建主題時,可以指定副本數,通常設定為3,以防資料丟失,注意:磁區數可以多個,可以大于 Broker 數(Kafka 集群中 Kafka 的節點數),即一個 Broker 可以包含多個磁區;但磁區的副本數不能多于 Broker 數,
- 節點(Broker), Kafka 集群中,一個 Kafka 節點被稱為 Broker,多個 Broker 組成一個 Kafka 集群,
1.3 安裝 Kafka 集群
Kafka 可以安裝單機版,也可以安裝集群版,即安裝在集群上,本章主要介紹 Kafka 的集群安裝,Kafka 是由 Scala 寫成,Scala 運行在 Java 虛擬機上,并兼容現有的 Java 程式,因此部署 Kakfa 的時候,需要先部署 JDK 環境,筆者安裝 Kafka 集群前的必備基礎軟體如下表所示:
| 軟體名稱 | 版本 |
|---|---|
| JDK | 1.8.0_202 |
| Zookeeper | zookeeper-3.5.8 |
| Scala | 2.12.11 |
Linux 下 JDK 安裝:https://blog.csdn.net/xw1680/article/details/115434353
Zookeeper 集群安裝:https://blog.csdn.net/xw1680/article/details/118002073
Kafka 的官方下載網站為:http://kafka.apache.org/downloads,下載步驟如下:
(1) 選擇適合的 Kafka 版本進行下載,Kafka 的安裝包有兩種,一種 xxx-src.tgz,為 Kafka 的源代碼,需要自行編譯安裝,較為靈活;另一種 xxx.tgz,為已經編譯好的,可以直接使用的安裝包,本文中選擇第二種,已經被編譯過的安裝包進行下載安裝,如下圖所示:

以安裝檔案名 “kafka_2.12-2.4.1.tgz (asc, sha512)” 為例,其中 2.12 為 Scala 的版本,2.4.1 是 Kafka 的版本,由于筆者安裝的 Scala 的版本是 2.12.11,所以需要下載的是名為 “kafka_2.12-2.4.1.tgz (asc, sha512)” 的安裝檔案,
(2) 進行下載,確定了要下載的 kafka 版本之后,單擊 “kafka_2.12-2.4.1.tgz (asc, sha512)” 鏈接,直接下載,如下圖所示:

下載好的 Kafka 安裝包名為 “kafka_2.12-2.4.1.tgz”,將該安裝包復制到 bigdata01 節點的 “/data/soft” 目錄下,準備安裝,注意:Kafka 在啟動的時候不需要安裝 Scala 環境,只有在編譯原始碼的時候才需要,因為運行的時候是在 JVM 虛擬機上運行的,只需要有 JDK 環境就可以了,
注意:在安裝 Kafka 之前需要先確保 Zookeeper 集群是啟動狀態,分別在 bigdata01、bigdata02、bigdata03 上啟動:bin/zkServer.sh start,
安裝 Kafka:
- 集群節點規劃,使用三個節點搭建一個 Kafka 集群,分別為 bigdata01、bigdata02、bigdata03,注意:針對 Kafka 集群而言,沒有主從之分,所有節點都是一樣的,
- 首先在 bigdata01 節點上配置 Kafka,解壓:tar -zxvf kafka_2.12-2.4.1.tgz,修改組態檔,此時針對集群模式需要修改 broker.id、log.dirs、以及 zookeeper.connect,進入:cd kafka_2.12-2.4.1/config/、vi server.properties
- broker.id 的值默認是從0開始的,集群中所有節點的 broker.id 從 0 開始遞增即可,所以 bigdata01 節點的 broker.id 值為0,

- log.dirs 的值建議指定到一塊存盤空間比較大的磁盤上面,因為在實際作業中 Kafka 中會存盤很多資料,筆者虛擬機里面就一塊磁盤,所以就指定到 /data 目錄下面了,

- zookeeper.connect 的值是 zookeeper 集群的地址,可以指定集群中的一個節點或者多個節點地址,多個節點地址之間使用逗號隔開即可,

- broker.id 的值默認是從0開始的,集群中所有節點的 broker.id 從 0 開始遞增即可,所以 bigdata01 節點的 broker.id 值為0,
- 將修改好配置的 Kafka 安裝包拷貝到其它兩個節點, scp -rq kafka_2.12-2.4.1 bigdata02:/data/soft/、scp -rq kafka_2.12-2.4.1 bigdata03:/data/soft/,
- 修改 bigdata02 和 bigdata03 上 Kafka 中 broker.id 的值,修改 bigdata02 節點上的 broker.id 的值為 1、修改 bigdata03 節點上的 broker.id 的值為 2,


- 啟動集群,分別在 bigdata01、bigdata02、bigdata03 上啟動 Kafka 行程,[root@bigdata01 kafka_2.12-2.4.1]# bin/kafka-server-start.sh -daemon config/server.properties、[root@bigdata02 kafka_2.12-2.4.1]# bin/kafka-server-start.sh -daemon config/server.properties、[root@bigdata03 kafka_2.12-2.4.1]# bin/kafka-server-start.sh -daemon config/server.properties
- 驗證,分別在 bigdata01、bigdata02、bigdata03 上執行 jps 命令驗證是否有 Kafka 行程,如果都有就說明 Kafka 集群啟動正常了,

二、Kafka 使用初體驗
Kafka 中最重要的就是主題(topic),在搭建好 Kafka 集群后,首先要做的就是創建訊息主題,主題相當于檔案系統的目錄,就是用來保存訊息內容的物體,像目錄通過目錄名標識一樣,主題也是通過主題名來進行標識的,例如,在 bigdata01 節點上創建一個名為 demo 的主題(指定2個磁區,2個副本),進入 Kafka 安裝目錄,執行如下代碼:
bin/kafka-topics.sh --create --zookeeper bigdata01:2181 --partitions 2 --replication-factor 2 --topic demo
注意:副本數不能大于集群中 broker 的數量,因為每個 partition 的副本必須保存在不同的 broker,否則沒有意義,如果 partition 的副本都保存在同一個 broker,那么這個 broker 掛了,則 partition 資料依然會丟失,在這里筆者使用的是3個節點的 Kafka 集群,所以副本數我就暫時設定為2,最大可以設定為3,如果讀者用的是單機 Kafka 的話,這里的副本數就只能設定為1了,最后,當結果中出現 “Created topic demo.” 時,則說明創建主題成功了,
查看已創建的主題: 可以通過 list 命令來顯示主題,在 bigdata01 節點的終端中輸入如下命令:
bin/kafka-topics.sh --list --zookeeper localhost:2181
其實,還可以通過顯示主題名的方式來驗證集群,例如在 bigdata02 節點和 bigdata03 節點上分別執行 “查看 bigdata01 節點的 Kafka 主題名” 命令,如果結果顯示為 demo,則說明 Kafka 集群已成功搭建,
查看指定topic的詳細資訊:
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic demo
查詢結果如下圖所示:

- 第一行顯示指定 topic 所有 partitions 的一個總結,PartitionCount:表示這個 Topic 一共有多少個 Partition,ReplicationFactor:表示這個 topic 中 Partition 的副本因子是幾,Config:這個表示創建 Topic 時動態指定的配置資訊,在這沒有額外指定配置資訊,
- 下面每一行給出的是一個 Partition 的資訊,如果只有一個 Partition,則只顯示一行,Topic:顯示當前的 topic 名稱、Partition:顯示當前 topic 的 partition 編號、Leader:Leader partition 所在的節點編號,這個編號其實就是 broker.id 的值,來看這個圖:

這個圖里面的 hello 這個 topic 有兩個 Partition,其中 Partition1 的 leader 所在的節點是 Broker1,Partition2 的 Leader 所在的節點是 Broker2、Replicas:當前 partition 所有副本所在的節點編號【包含 Leader 所在的節點】,如果設定多個副本的話,這里會顯示多個,不管該節點是否是 Leader 以及是否存活,Isr:當前 partition 處于同步狀態的所有節點,這里顯示的所有節點都是存活狀態的,并且跟 Leader 同步的(包含 Leader 所在的節點),所以說 Replicas 和 Isr 的區別就是:如果某個 partition 的副本所在的節點宕機了,在 Replicas 中還是會顯示那個節點,但是在 Isr 中就不會顯示了,Isr 中顯示的都是處于正常狀態的節點,
修改Topic:修改 Topic 的 partition 數量,只能增加, 因為資料是存盤在 partition 中的,如果可以減少 partition 的話,那么 partition 中的資料就丟了,
bin/kafka-topics.sh --alter --zookeeper localhost:2181 --partitions 5 --topic demo
修改之后再來查看一下 topic 的詳細資訊:

洗掉Topic:洗掉 Kafka 中的指定 Topic:
bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic demo
洗掉操作是不可逆的,洗掉 Topic 會洗掉它里面的所有資料,注意:Kafka 從 1.0.0 開始默認開啟了洗掉操作,之前的版本只會把 Topic 標記為洗掉狀態,需要設定 delete.topic.enable 為 true 才可以真正洗掉,如果不想開啟洗掉功能,可以設定 delete.topic.enable 為 false,這樣洗掉 topic 的時候只會把它標記為洗掉狀態,此時這個 topic 依然可以正常使用,delete.topic.enable 可以配置在 server.properties 檔案中,

前面我們學習了 Kafka 中的 topic 的創建方式,下面我們可以向 topic 中生產資料以及消費資料了,生產資料需要用到生產者,消費資料需要用到消費者,Kafka 默認提供了基于控制臺的生產者和消費者,方便測驗使用,如下:

建一個訊息生產者,利用它來產生訊息,在 Kafka 的安裝目錄下,執行如下命令:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic hello
執行結果如下圖所示:

在執行上面的命令后,系統會等待用戶輸入資訊,在創建了生產者之后,終端會一直處于生產訊息的狀態,用戶可以一直輸入訊息,輸入的訊息會保存到訊息主題當中,在創建了生產者生產了訊息之后,需要在另一個終端上創建訊息消費者,用來接收訊息,命令如下:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello
發現消費不到剛才生產的資料,為什么呢?因為 kafka 的消費者默認是消費最新生產的資料,如果想消費之前生產的資料需要添加一個引數 --from-beginning,表示從頭消費的意思,如下:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello --from-beginning
三、Kafka 核心擴展內容
3.1 Broker 擴展
Broker 的引數可以配置在 server.properties 這個組態檔中,Broker 中支持的完整引數在官方檔案中有體現:

具體鏈接為:https://kafka.apache.org/documentation/#brokerconfigs
針對 Broker 的引數,主要分析兩塊:
- Log Flush Policy:設定資料 flush 到磁盤的時機,為了減少磁盤寫入的次數,Broker 會將訊息暫時快取起來,當訊息的個數達到一定閥值或者過了一定的時間間隔后,再 flush 到磁盤,這樣可以減少磁盤 IO 呼叫的次數,這塊主要通過兩個引數控制:log.flush.interval.messages

一個磁區的訊息數閥值,達到該閾值則將該磁區的資料 flush 到磁盤,注意這里是針對磁區,因為 topic 是一個邏輯概念,磁區是真實存在的,每個磁區會在磁盤上產生一個目錄,如下圖所示:

這個引數的默認值為 9223372036854775807,long 的最大值
默認值太大了,所以建議修改,可以使用 server.properties 中針對這個引數指定的值 10000,需要去掉注釋之后這個引數才生效,

第二個引數:log.flush.interval.ms,間隔指定時間,默認間隔指定的時間將記憶體中快取的資料 flush 到磁盤中,由檔案可知,這個引數的默認值為 null,此時會使用 log.flush.scheduler.interval.ms 引數的值,

log.flush.scheduler.interval.ms 引數的值默認是 9223372036854775807,long 的最大值,所以這個值也建議修改,可以使用 server.properties 中針對這個引數指定的值1000,單位是毫秒,表示每1秒寫一次磁盤,這個引數也需要去掉注釋之后才生效,

- Log Retention Policy:設定資料保存周期,默認7天,Kafka 中的資料默認會保存7天,如果 Kafka 每天接收的資料量過大,這樣是很占磁盤空間的,建議修改資料保存周期,之前在實際作業中是將資料保存周期改為了1天,資料保存周期主要通過這幾個引數控制:log.retention.hours,這個引數默認值為168,單位是小時,就是7天,可以在這調整資料保存的時間,超過這個時間資料會被自動洗掉,

log.retention.bytes:這個引數表示當磁區的檔案達到一定大小的時候會洗掉它,如果設定了按照指定周期洗掉資料檔案,這個引數不設定也可以,這個引數默認是沒有開啟的,

log.retention.check.interval.ms:這個引數表示檢測的間隔時間,單位是毫秒,默認值是300000,就是5分鐘,表示每5分鐘檢測一次檔案看是否滿足洗掉的時機,

3.2 Producer 擴展
Kafka 磁區的原因:
- 方便在集群中擴展,每個 Partition 可以通過調整以適應它所在的機器,而一個 topic 又可以有多個 Partition 組成,因此整個集群就可以適應任意大小的資料了;
- 可以提高并發,因為可以以 Partition 為單位讀寫了,
Kafka 磁區的原則: 我們需要將 producer 發送的資料封裝成一個 ProducerRecord 物件,

- 指明 partition 的情況下,直接將指明的值直接作為 partiton 值;
- 沒有指明 partition 值但有 key 的情況下,將 key 的 hash 值與 topic 的 partition 數進行取余得到 partition 值;
- 既沒有 partition 值又沒有 key 值的情況下, kafka 采用 Sticky Partition(黏性磁區器),會隨機選擇一個磁區,并盡可能一直使用該磁區,待該磁區的 batch 已滿或者已完成,kafka 再隨機一個磁區進行使用,
針對 producer 的資料通訊方式:同步發送和異步發送, 同步是指:生產者發出資料后,等接收方發回回應以后再發送下個資料的通訊方式,異步是指:生產者發出資料后,不等接收方發回回應,接著發送下個資料的通訊方式,具體的資料通訊策略是由 acks引數 控制的,
acks 默認為1,表示需要 Leader 節點 回復收到訊息,這樣生產者才會發送下一條資料,如果在 follower 同步成功之前 Leader 故障,那么將會丟失資料,

acks:all(-1),表示需要所有 Leader+副本節點 回復收到訊息(acks=-1),這樣生產者才會發送下一條資料,但是如果在 follower 同步完成后,broker 發送 acks 之前,Leader 發生故障,那么會造成資料重復,

acks:0,表示不需要任何節點回復,生產者會繼續發送下一條資料,再來看一下這個圖:

在向 hello 這個 topic 生產資料的時候,可以在生產者中設定 acks 引數,acks 設定為1,表示我們在向 hello 這個 topic 的 partition1 這個磁區寫資料的時候,只需要讓 leader 所在的 Broker1 這個節點回復確認收到的訊息就可以了,這樣生產者就可以發送下一條資料了,如果 acks 設定為 all,則需要 partition1 的這兩個副本所在的節點(包含Leader)都回復收到訊息,生產者才會發送下一條資料,如果 acks 設定為0,表示生產者不會等待任何 partition 所在節點的回復,它只管發送資料,不管你有沒有收到,所以這種情況丟失資料的概率比較高,

副本資料同步策略如下:
| 方案 | 優點 | 缺點 |
|---|---|---|
| 半數以上完成同步,就發送acks | 延遲低 | 選舉新的leader時,容忍n臺節點的故障,需要2n+1個副本 |
| 全部完成同步,才發送acks | 選舉新的leader時,容忍n臺節點的故障,需要n+1個副本 | 延遲高 |
Kafka 選擇了第二種方案,原因如下:
- 同樣為了容忍 n 臺節點的故障,第一種方案需要 2n+1個副本,而第二種方案只需要 n+1個副本,而 Kafka 的每個磁區都有大量的資料,第一種方案會造成大量資料的冗余,
- 雖然第二種方案的網路延遲會比較高,但網路延遲對 Kafka 的影響較小,
ISR: 采用第二種方案之后,設想以下情景:leader 收到資料,所有 follower 都開始同步資料,但有一個 follower,因為某種故障,遲遲不能與 leader 進行同步,那 leader 就要一直等下去,直到它完成同步,才能發送 acks,這個問題怎么解決呢?Leader 維護了一個動態的 in-sync replica set (ISR),意為和 leader 保持同步的 follower 集合,當 ISR 中的 follower 完成資料的同步之后,leader 就會給 producer 發送 acks,如果 follower 長時間未向 leader 同步資料,則該 follower 將被踢出 ISR,該時間閾值由 replica.lag.time.max.ms 引數設定,Leader 發生故障之后,就會從 ISR 中選舉新的 leader,

Leader和Follower故障處理細節:

LEO:指的是每個副本最大的 offset;
HW:指的是消費者能見到的最大的 offset,ISR 佇列中最小的 LEO,
- Follower 故障,Follower 發生故障后會被臨時踢出 ISR,待該 Follower 恢復后,Follower 會讀取本地磁盤記錄的上次的 HW,并將 log 檔案高于 HW 的部分截取掉,從 HW 開始向 Leader 進行同步,等該 Follower 的 LEO 大于等于該 Partition 的 HW,即 Follower 追上 Leader 之后,就可以重新加入 ISR 了,
- Leader 故障,Leader 發生故障之后,會從 ISR 中選出一個新的 Leader,之后,為保證多個副本之間的資料一致性,其余的 Follower 會先將各自的 log 檔案高于 HW 的部分截掉,然后從新的 Leader 同步資料,注意:這只能保證副本之間的資料一致性,并不能保證資料不丟失或者不重復,
Exactly Once 語意: 將服務器的 acks 級別設定為-1,可以保證 Producer 到 Server 之間不會丟失資料,即 At Least Once 語意,相對的,將服務器 acks 級別設定為0,可以保證生產者每條訊息只會被發送一次,即 At Most Once 語意,At Least Once 可以保證資料不丟失,但是不能保證資料不重復;相對的,At Most Once 語意可以保證資料不重復,但是不能保證資料不丟失,但是,對于一些非常重要的資訊,比如說交易資料,下游資料消費者要求資料既不重復也不丟失,即 Exactly Once 語意,在 0.11 版本以前的 Kafka,對此是無能為力的,只能保證資料不丟失,再在下游消費者對資料做全域去重,對于多個下游應用的情況,每個都需要單獨做全域去重,這就對性能造成了很大影響,0.11 版本的 Kafka,引入了一項重大特性:冪等性,所謂的冪等性就是指 Producer 不論向 Server 發送多少次重復資料,Server 端都只會持久化一條,冪等性結合 At Least Once 語意,就構成了 Kafka 的 Exactly Once 語意,即:At Least Once + 冪等性 = Exactly Once 要啟用冪等性,只需要將 Producer 的引數中 enable.idempotence 設定為 true 即可,Kafka 的冪等性實作其實就是將原來下游需要做的去重放在了資料上游,開啟冪等性的 Producer 在初始化的時候會被分配一個PID,發往同一 Partition 的訊息會附帶 Sequence Number,而 Broker 端會對 <PID, Partition, SeqNumber> 做快取,當具有相同主鍵的訊息提交時,Broker 只會持久化一條,但是 PID 重啟就會變化,同時不同的 Partition 也具有不同主鍵,所以冪等性無法保證跨磁區跨會話的 Exactly Once,
3.3 Consumer 擴展
我們再來看看 Kafka 基礎架構:

名詞解釋如下:
- Producer :訊息生產者,就是向 kafka broker 發訊息的客戶端;
- Consumer :訊息消費者,向 kafka broker 取訊息的客戶端;
- Consumer Group(CG):消費者組,由多個 Consumer 組成,消費者組內每個消費者負責消費不同磁區的資料,一個磁區只能由一個組內消費者消費;消費者組之間互不影響,所有的消費者都屬于某個消費者組,即消費者組是邏輯上的一個訂閱者,
- Broker :一臺 kafka 服務器就是一個 broker,一個集群由多個 broker 組成,一個 broker 可以容納多個 topic,
- Topic :可以理解為一個佇列,生產者和消費者面向的都是一個 topic;
- Partition:為了實作擴展性,一個非常大的 topic 可以分布到多個 broker (即服務器)上,一個 topic 可以分為多個 partition,每個 partition 是一個有序的佇列;
- Replication:副本,為保證集群中的某個節點發生故障時,該節點上的 partition 資料不丟失,且 kafka 仍然能夠繼續作業,kafka 提供了副本機制,一個 topic 的每個磁區都有若干個副本,一個 leader 和若干個 follower
- Leader:每個磁區多個副本的“主”,生產者發送資料的物件,以及消費者消費資料的物件都是 leader,
- Follower:每個磁區多個副本中的“從”,實時從 leader 中同步資料,保持和 leader 資料的同步,leader 發生故障時,某個 follower 會成為新的 leader,
消費方式: consumer 采用 pull(拉)模式 從 broker 中讀取資料,push(推)模式 很難適應消費速率不同的消費者,因為訊息發送速率是由 broker 決定的,它的目標是盡可能以最快速度傳遞訊息,但是這樣很容易造成 consumer 來不及處理訊息,典型的表現就是拒絕服務以及網路擁塞,而 pull 模式則可以根據 consumer 的消費能力以適當的速率消費訊息,pull 模式不足之處是,如果 Kafka 沒有資料,消費者可能會陷入回圈中,一直回傳空資料,針對這一點,Kafka 的消費者在消費資料時會傳入一個時長引數 timeout,如果當前沒有資料可供消費,consumer 會等待一段時間之后再回傳,這段時長即為 timeout,
磁區分配策略: 一個 consumer group 中有多個 consumer,一個 topic 有多個 partition,所以必然會涉及到 partition 的分配問題,即確定那個 partition 由哪個 consumer 來消費,Kafka 有三種分配策略:RoundRobin,Range,Sticky,
磁區分配策略之 RoundRobin:

磁區分配策略之 Range :

總結:在同一個消費者組中,一個 partition 同時只能有一個消費者消費資料如果消費者的個數小于磁區的個數,一個消費者會消費多個磁區的資料,
如果消費者的個數大于磁區的個數,則多余的消費者不消費資料,所以,對于一個topic,同一個消費者組中推薦不能有多于磁區個數的消費者,否則將意味著某些消費者將無法獲得訊息,組間:多個消費者組消費相同的資料,互不影響,
offset的維護: 由于 consumer 在消費程序中可能會出現斷電宕機等故障,consumer 恢復后,需要從故障前的位置的繼續消費,所以 consumer 需要實時記錄自己消費到了哪個 offset,以便故障恢復后繼續消費,Kafka 0.9 版本之前,consumer 默認將 offset 保存在 Zookeeper 中,因為頻繁操作 Zookeeper 性能不高,所以 kafka 在自己的 topic 中負責維護消費者的 offset 資訊,從0.9版本開始,consumer 默認將 offset 保存在 Kafka 一個內置的 topic 中,該 topic為 __consumer_offsets,
消費者組案例:測驗同一個消費者組中的消費者,同一時刻只能有一個消費者消費,
- 在 bigdata01 上修改 /data/soft/kafka_2.12-2.4.1/config/consumer.properties 組態檔中的 group.id 屬性為任意組名,

- 克隆三個會話視窗,一個作為生產者,剩下兩個作為消費者,分別啟動:bin/kafka-console-producer.sh --topic demo --broker-list bigdata01、bin/kafka-console-consumer.sh --bootstrap-server bigdata01:9092 --topic demo --consumer.config config/consumer.properties
- 查看不同會話視窗中消費者的消費情況,發現同一時刻只能有一個消費者消費,
消費offset案例:
- 思想: __consumer_offsets 為 kafka 中的 topic, 那就可以通過消費者進行消費,
- 修改組態檔 consumer.properties,vi consumer.properties、添加:exclude.internal.topics=false(不排除內部的topic)
- 創建一個topic,bin/kafka-topics.sh --create --topic demo --zookeeper bigdata01:2181 --partitions 2 --replication-factor 2
- 啟動生產者和消費者,分別往 demo 生產資料和消費資料,bin/kafka-console-producer.sh --topic demo --broker-list bigdata01:9092、bin/kafka-console-consumer.sh --consumer.config config/consumer.properties --topic demo --bootstrap-server bigdata01:9092
- 消費offset,bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server bigdata01:9092 --formatter “kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter” --consumer.config config/consumer.properties --from-beginning
- 消費到的資料,
Consumer 消費順序: 當一個消費者消費一個 partition 時候,消費的資料順序和此 partition 資料的生產順序是一致的,當一個消費者消費多個 partition 時候,消費者按照 partition 的順序,首先消費一個 partition,當消費完一個 partition 最新的資料后再消費其它 partition 中的資料,總之:如果一個消費者消費多個 partiton,只能保證消費的資料順序在一個 partition 內是有序的,也就是說消費 kafka 中的資料只能保證消費 partition 內的資料是有序的,多個 partition 之間是無序的,
3.4 Kafka 核心之存盤和容錯機制
Topic、Partition 擴展: Kafka 中訊息是以 topic 進行分類的,生產者生產訊息,消費者消費訊息,都是面向 topic 的,topic 是邏輯上的概念,而 partition 是物理上的概念,每個 partition 對應于一個 log 檔案,該 log 檔案中存盤的就是 Producer 生產的資料,Producer 生產的資料會被不斷追加到該 log 檔案末端,且每條資料都有自己的 offset(每條訊息在log檔案中的位置),消費者組中的每個消費者,都會實時記錄自己消費到了哪個 offset,以便出錯恢復時,越多 partition 可以容納更多的 consumer,有效提升并發消費的能力,具體什么時候增加 topic 的數量?什么時候增加 partition 的數量呢?業務型別增加需要增加 topic、資料量大需要增加 partition,

針對 Kafka 中 Topic 命名的小技巧: 建議在給 Topic 命名的時候在后面跟上 r2p10 之類的內容
r2:表示Partition的副本因子是2
p10:表示這個Topic的磁區數是10
這樣的好處是后期我們如果要寫消費者消費指定 topic 的資料,通過 topic 的名稱我們就知道應該設定多少個消費者消費資料效率最高,因為一個 partition 同時只能被一個消費者消費,所以效率最高的情況就是消費者的數量和 topic 的磁區數量保持一致,在這里通過 topic 的名稱就可以直接看到,一目了然,但是也有一個缺點,就是后期如果我們動態調整了 topic 的 partiton,那么這個 topic 名稱上的 partition 數量就不準了,針對這個 topic,建議大家一開始的時候就提前預估一下,可以多設定一些 partition,我們在作業中的時候針對一些資料量比較大的 topic 一般會設定 40~50 個 partition,資料量少的 topic 一般設定 5~10 個 partition,這樣后期調整 topic partition 數量的場景就比較少了,
由于生產者生產的訊息會不斷追加到 log 檔案末尾,為防止 log 檔案過大導致資料定位效率低下,Kafka 采取了分片和索引機制,將每個 partition 分為多個 segment,每個 segment 對應兩個檔案——“.index”檔案和“.log”檔案, 這些檔案位于一個檔案夾下,該檔案夾的命名規則為:topic 名稱+磁區序號,例如,hello這個 topic 有兩個磁區,則其對應的檔案夾為 hello-0、hello-1,

分別進入到 hello-0 以及 hello-1 目錄查看,如下:

index 和 log 檔案以當前 segment 的第一條訊息的 offset 命名,下圖為 index 檔案和 log 檔案的結構示意圖:

這個圖其實不是太準確,kafka 不會每條訊息都會維護一個索引,“.index”檔案存盤大量的索引資訊,“.log”檔案存盤大量的資料, 索引檔案中的元資料指向對應資料檔案中 message 的物理偏移地址,
Message 擴展: 每條 Message 包含了以下三個屬性:
- offset 對應型別:long 表示此訊息在一個 partition 中的起始的位置,可以認為 offset 是 partition 中 Message 的 id,自增的,
- MessageSize 對應型別:int32 此訊息的位元組大小,
- data 是 Message 的具體內容,

在 kafka 中每個 topic 包含1到多個 partition,每個 partition 存盤一部分 Message,每條 Message 包含三個屬性,其中有一個是 offset,offset 相當于 partition 中這個 message 的唯一id,那么如何通過id高效的找到message?兩大法寶:分段+索引, kafak 中資料的存盤方式是這樣的:每個 partition 由多個 segment【片段】組成,每個 segment 中存盤多條訊息,每個 partition 在記憶體中對應一個index,記錄每個 segment 中的第一條訊息偏移量,
總結:Kafka 中資料的存盤流程是這樣的,生產者生產的訊息會被發送到 topic 的多個 partition 上,topic 收到訊息后往對應 partition 的最后一個 segment 上添加該訊息,segment 達到一定的大小后會創建新的 segment,
容錯機制:當Kafka集群中的一個Broker節點宕機,會出現什么現象?
下面來演示一下,使用 kill -9 殺掉 bigdata01 中的 broker 行程測驗:

我們可以先通過 zookeeper 來查看一下,因為當 kafka 集群中的 broker 節點啟動之后,會自動向 zookeeper 中進行注冊,保存當前節點資訊:[root@bigdata01 hello-1]# cd /data/soft/zookeeper-3.5.8/、[root@bigdata01 zookeeper-3.5.8]# bin/zkCli.sh

此時發現 zookeeper 的 /brokers/ids 下面只有2個節點資訊,可以通過get命令查看節點資訊,這里面會顯示對應的主機名和埠號:

然后再使用 describe 查詢 topic 的詳細資訊,會發現此時的磁區的 leader 全部變成了目前存活的另外兩個節點,此時可以發現 Isr 中的內容和 Replicas 中的不一樣了,因為 Isr 中顯示的是目前正常運行的節點,所以當 Kafka 集群中的一個 Broker 節點宕機之后,對整個集群而言沒有什么特別的大影響,此時集群會給 partition 重新選出來一些新的 Leader 節點,

當 Kafka 集群中新增一個 Broker 節點,會出現什么現象?新加入一個 broker 節點,zookeeper 會自動識別并在適當的機會選擇此節點提供服務,再次啟動 bigdata01 節點中的 broker 行程測驗,命令:[root@bigdata01 kafka_2.12-2.4.1]# bin/kafka-server-start.sh -daemon config/server.properties,此時到 zookeeper 中查看一下:

發現 broker.id 為0的這個節點資訊也有了,在通過 describe 查看 topic 的描述資訊,Isr 中的資訊和 Replicas 中的內容是一樣的了:

但是啟動后有個問題:發現新啟動的這個節點不會是任何磁區的 leader?怎么重新均勻分配呢?Broker 中的自動均衡策略(默認已經有):
auto.leader.rebalance.enable=true
leader.imbalance.check.interval.seconds 默認值:300
手動執行:
bin/kafka-leader-election.sh --bootstrap-server localhost:9092 --election-type preferred --all-topic-partitions
執行后的效果如下,這樣就實作了均勻分配:

3.5 Kafka 高效讀寫資料
- 順序寫磁盤,Kafka 的 producer 生產資料,要寫入到 log 檔案中,寫的程序是一直追加到檔案末端,為順序寫,官網有資料表明,同樣的磁盤,順序寫能到 600M/s,而隨機寫只有 100K/s,這與磁盤的機械機構有關,順序寫之所以快,是因為其省去了大量磁頭尋址的時間,
- 應用 PageCache ,Kafka 資料持久化是直接持久化到 PageCache 中,這樣會產生以下幾個好處: I/O Scheduler 會將連續的小塊寫組裝成大塊的物理寫從而提高性能、I/O Scheduler 會嘗試將一些寫操作重新按順序排好,從而減少磁盤頭的移動時間、充分利用所有空閑記憶體(非 JVM 記憶體),如果使用應用層 Cache(即 JVM 堆記憶體),會增加 GC 負擔、讀操作可直接在 PageCache 內進行,如果消費和生產速度相當,甚至不需要通過物理磁盤(直接通過 PageCache)交換資料、如果行程重啟,JVM 內的 Cache 會失效,但 PageCache 仍然可用,盡管持久化到 PageCache 上可能會造成宕機丟失資料的情況,但這可以被 Kafka 的 Replication 機制解決,如果為了保證這種情況下資料不丟失而強制將 PageCache 中的資料 Flush 到磁盤,反而會降低性能,
- 零復制技術,

3.6 Zookeeper 在 Kafka 中的作用
Kafka 集群中有一個 broker 會被選舉為 Controller,負責管理集群 broker 的上下線,所有 topic 的磁區副本分配和 leader 選舉等作業,Controller 的管理作業都是依賴于 Zookeeper 的,以下為 partition 的 leader 選舉程序:

3.7 Kafka 事務
Kafka 從 0.11 版本開始引入了事務支持,事務可以保證 Kafka 在 Exactly Once 語意的基礎上,生產和消費可以跨磁區和會話,要么全部成功,要么全部失敗,
- Producer 事務,為了實作跨磁區跨會話的事務,需要引入一個全域唯一的 Transaction ID,并將 Producer 獲得的 PID 和 Transaction ID 系結,這樣當 Producer 重啟后就可以通過正在進行的 Transaction ID 獲得原來的 PID,為了管理 Transaction,Kafka 引入了一個新的組件 Transaction Coordinator,Producer 就是通過和 Transaction Coordinator 互動獲得 Transaction ID 對應的任務狀態,Transaction Coordinator 還負責將事務所有寫入 Kafka 的一個內部 Topic,這樣即使整個服務重啟,由于事務狀態得到保存,進行中的事務狀態可以得到恢復,從而繼續進行,
- Consumer 事務(精準一次性消費),上述事務機制主要是從 Producer 方面考慮,對于 Consumer 而言,事務的保證就會相對較弱,尤其時無法保證 Commit 的資訊被精確消費,這是由于 Consumer 可以通過 offset 訪問任意資訊,而且不同的 Segment File 生命周期不同,同一事務的訊息可能會出現重啟后被洗掉的情況,如果想完成 Consumer 端的精準一次性消費,那么需要 kafka 消費端將消費程序和提交 offset 程序做原子系結,此時我們需要將 kafka 的 offset 保存到支持事務的自定義介質(比如mysql),
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/292171.html
標籤:其他
