溪源的Java筆記—訊息佇列
前言
在Java的分布式應用中有一個提升性能的利器——訊息佇列,通過訊息佇列我們可以讓很多操作“異步”地去執行,這種異步的操作可以幫助應用均勻地去處理大量請求涌入的情況,從而降低系統的訪問壓力,本篇博客將帶領大家去了解我們經常使用的兩種訊息佇列RabbitMQ和Kafka的原理,
Redis服務器可參考我的博客:溪源的Java筆記之Redis服務器
正文
訊息佇列之RabbitMQ
RabbitMQ是基于下面兩種基礎來實作:
- Erlang語言:面向電信行業的函式式編程語言,它為
RabbitMQ提供了節點之間訊息通信輕量級執行緒,提供了狀態無關的高并發性, - AMQP協議規范:它讓
RabbitMQ成為一個與供應商無關、平臺獨立的解決方案,使其可以實作靈活的訊息路由、配置化的訊息持久以及跨資料中心通信,
RabbitMQ的功能
- 應用解耦:應用架構不再受限于資料庫寫入的性能瓶頸,應用只需要發送訊息,不需要長時間占用執行緒等待回應;
- 資料庫解耦:將直接存在資料庫的資料發送到
RabbitMQ,從而實作資料的異步處理,同時也可以通過消費者應用進行限流或者直接關閉,避免資料庫崩潰, - 資料同步:通過
RabbitMQ可以將同一個資料存盤到不同的資料庫中,同樣也可以把不同資料庫的資料應用到同一個系統, - 流量削峰:可以借助訊息佇列異步特性來降低系統的訪問壓力,
- 日志處理 :借助訊息佇列來異步處理日志檔案
RabbitMQ與AMQP協議
AMQP把客戶端和代理服務器之間的通信資料拆分成一種叫做幀的塊狀結構,
低層AMQP幀的組成:
- 幀頭:幀型別、信道編號、以位元組為單位的幀大小
- 幀的有效載荷
- 結束位元組標記
幀的型別
- 協議頭幀:用于連接到
RabbitMQ,僅使用一次, - 心跳幀:客戶端與
RabbitMQ之間進行傳遞,作為一種校驗機制確保連接的兩端可以正常作業, - 方法幀:攜帶發給
RabbitMQ或從RabbitMQ接收到的RPC請求或回應,(告訴RabbitMQ如何路由) - 內容頭幀:包含一條訊息的大小和屬性,
- 訊息體幀:包含訊息的內容,
幀與訊息
- 我們通常使用方法幀、內容頭幀和訊息幀向
RabbitMQ發布訊息, - 一個訊息通常以:方法幀、內容頭幀以及一個或者多個訊息體幀,(每個幀都有一定的大小限制,超過限制就會被拆分成多個)
- 傳輸程序中,方法幀和內容頭幀會被打包成二進制,訊息體幀不會進行任何打包或編碼,它可以是任何資料型別,
Rabbit的抽象組件
- 交換機(Exchange):接收發送到
RabbitMQ中的訊息并決定把他們投遞到那個佇列的組件, - 系結(Binding):一套規則,用于告訴交換器訊息應該被存盤到哪個佇列,
- 佇列(Queue):用來存盤訊息的資料結構,位于硬碟或記憶體中,
- 信道(Channel):多路復用連接中的一條獨立的雙向資料流通道,信道會占用大量資源,要合理使用,

交換機的類別
- Direct交換機:是完全匹配、單播的模式,
- Fanout交換機:類似子網廣播,所有與該互動機系結的佇列都會接受到一份訊息的副本,這種方式是最快的,
- Topic交換機:通過 正則運算式的方式 實作一對多的系結,
- Headers交換機:它通過采用訊息屬性中的
headers表結合正則運算式的方式 實作一對多的系結(幾乎不使用)
訊息佇列的匹配規則
1.訊息佇列模式
訊息佇列模式:發送者,接受者
2.主題訊息模式
主題訊息:發布者,訂閱者
訊息佇列如何確保其訊息的順序性
通常來說有以下的思路:
- 單執行緒消費來確保訊息的順序性,
- 對訊息進行編號,消費者處理時根據編號判斷順序,
RabbitMQ確保訊息有序性
拆分多個queue,每個queue對應一個consumer,然后這個consumer內部用記憶體佇列做排隊,然后分發給底部不同worker處理:
- 防止使用同一個佇列,導致資料123進入不同的消費者,從而使得資料123沒有按指定的順序被執行
- 通過拆分
queue來保證每一個消費者都能獲得完整的資料123,然后消費者內部進行排隊,從而保證訊息的有序性, - 這里同時也要設計,保證訊息的冪等性,

訊息佇列如何保證其不會重復消費
簡單來說,如何實作訊息的冪等性,即訊息執行一次和執行多次的結果是一樣的,
保證資料不會重復消費,要結合業務來實作,比如:
- 基于資料庫的主鍵索引的來實作
- 基于
Redis來實作,使用set操作具有天然的冪等性 - 通過先查一次資料,來判斷是新增操作還是更新操作
- 通過向資料庫前置一個布隆過濾器來判斷資料是新資料還是舊資料,再使用主鍵索引來實作
訊息佇列如何保證訊息不會丟失
訊息從生產到消費可以經歷三個階段:
- 生產階段:在這個階段,從訊息在生產者創建出來,經過網路傳輸到訊息佇列服務器中,
- 存盤階段:訊息在訊息佇列服務器中存盤,如果是集群,訊息會在這個階段被復制到其他的副本上,
- 消費階段:消費者從訊息佇列服務器中拉取訊息,通過網路傳輸發送到消費者,
各階段保證資料的不丟失的方式:
- 生產階段:失敗回呼機制、發布者確認、訊息持久化
- 存盤階段:備用交換器、死信交換器、事務、高可用佇列、基于事務的高可用佇列、訊息持久化
- 消費階段:消費者確認、訊息持久化
失敗回呼機制
將 mandatory設定為true,如果訊息不可路由那么rabbitmq會把完整的訊息退回到發布者中
public RabbitTemplate rabbitTemplate(){
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setMandatory(true); //設定發送訊息失敗重試
return template;
}
發布者確認
發布者確認,即Confirm機制具體的實作方式:
spring.rabbitmq.template.mandatory = true設定成truespring.rabbitmq.publisher-confirms = true設定成true- 撰寫一個
java類,實作RabbitTemplate.ConfirmCallback介面,在這個里面我們可以確認訊息是否到達了RabbitMQ服務器,
這里RabbitMq提供一個回呼函式可以將投遞失敗的訊息給輸出出來:
// 訊息是否從Exchange路由到Queue, 注意: 這是一個失敗回呼, 只有訊息從Exchange路由到Queue失敗才會回呼這個方法
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.info("訊息從Exchange路由到Queue失敗: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", exchange, routingKey, replyCode, replyText, message);
});
消費者確認機制
消費者確認機制,即應答模式:
- 消費者完成消費處理后,會發送一個消費應答,告訴訊息佇列服務器這個訊息已經處理完成可以洗掉這個訊息了
- 如果一個消費者由于宕機,沒有發送訊息應答,那么訊息佇列服務器會認為訊息發送失敗,自動進行補償行為,即將這個訊息重新加入佇列,重新投遞,
應答模式可以分為:
- 自動應答:不在乎消費者對訊息處理是否成功,都會告訴佇列洗掉訊息,如果處理訊息失敗,實作自動補償(佇列投遞過去重新處理),
- 手動應答:消費者處理完業務邏輯,手動回傳ack(通知)告訴佇列處理完了,佇列進而洗掉訊息,
應答模式犧牲了訊息佇列的性能,從而提高了訊息的可靠性,
備用交換器
備用交換器用于處理無法路由的訊息,備用交換器在第一次宣告交換器時被指定,用來提供一種預先存在的交換器,即如果交換器無法路由訊息,那么訊息就會被路由到這個新的備用交換器,
死信交換器
過期的訊息、basic.nack或basic.reject且requeue引數為false或佇列滿的訊息將進入此交換器
事務
采用AMQP事務機制和發布者確認的機制,來解決服務器資料丟失的問題,
這種機制極大犧牲了性能從而換取訊息的可靠性,
高可用佇列
高可用佇列又稱為HA佇列,需要RabbitMQ集群環境,可以通過使用AMQP或者使用基于web的管理界面來設定,
RabbitMQ集群搭建結合Erlang來實作其內部通信,借助Haproxy實作請求的負載均衡,
基于事務的高可用佇列
在一個集群的環境下,采用的是事務或投遞確認機制,則訊息在被HA佇列定義的所有活動節點確認之后,RabbitMQ才會發送成功的回應,這種方式會造成很大的延遲,
訊息持久化
訊息持久化是解決訊息被投遞到RabbitMQ的記憶體中,還沒有投遞到消費者實體之前就宕機了,而導致訊息丟失的問題,
交換機的持久化:
new DirectExchange("log.user.exchange", true, false);- 第二個引數
durable: 是否持久化, - 第三個引數
autoDelete: 當所有系結佇列都不再使用時, 是否自動洗掉交換器,true: 洗掉,false: 不洗掉,
queue的持久化:
new Queue("log.user.queue.name", true);- 宣告佇列時指定持久化引數為
true即可
message的持久化:
- 將
delivery-mode設定為MessageDeliveryMode.PERSISTENT,即可以實作message的持久化, - 在默認的情況下
message都是持久化的,
知識點:
a.關于RabbitMQ的配置要根據實際的業務需要,在可靠性和性能之間進行抉擇:
- 使用
mandatory設定,RabbitMQ將不接受不可路由訊息 - 發布者確認作為事務的輕量級替代方法
- 使用備用交換器處理無法路由的訊息
- 基于事務的批量處理
- 使用
HA佇列避免節點故障 - 使用回推機制,拒絕接受發布者發布的過多訊息
消費訊息性能控制
RabbitMQ實作了兩個不同的AMQP RPC命令來獲取佇列中的訊息:
- Basic.Get:是一個輪詢模型(這種模式的性能通常比后低二倍以上)
- Basic.Cunsume:是一個推送模型(即 發布-訂閱模式)
所以使用推送模型,能很大程度提高RabbitMQ的消費能力,
a.提升消費者消費訊息的能力
RabbitMQ提供一下的方式來提升消費訊息的能力(性能依次降低)
1.基于no-ack模式進行消費
ack模式即為應答模式,即消費者確認,我們可以通過關閉ack模式(默認是打開的)的方式實作更快的吞吐量,
- 但是我們要知道
RabbitMQ將資料投遞到消費者的程序中會進過系統自帶的資料緩沖區, - 由于缺少消費者確認,當系統出現網路波動會導致當前作業系統的套接字接識訓沖區爆滿,從而影響程式的正常運行,
在linux系統可以通過增加net_core.rmem_default和net.core.rmem_max值,通常設定為16M即可,
sysctl -w net.core.rmem_max=16777216
sysctl -w net.core.rmem_default=16777216
2.基于確認和Qos>1進行消費
Qos服務質量設定,即在確認訊息接收之前,消費者可以預先要求接收一定數量的訊息:
QoS設定允許RabbitMQ通過為消費者預先分配一定數量的訊息來實作更高效地訊息發送Qos默認是1 ,默認每條訊息都會確認,這種方式不可以與no-ack同時設定,- 它本身可以視為
ack的一種優化,它不需要對每條訊息進行確認,可以通過設定multiple為true,對所有以前沒有進行確認的訊息進行確認,
3.使用事務來批量進行
事務可能會對訊息吞吐量產生負面影響,但有一個例外,如果你不適用QoS設定,那么在使用事務來批量確認訊息時,實際上可能會看到略微的性能提升,并且事務不適用于已禁止確認的消費者,
b.拒絕訊息
當訊息本身或訊息處理的程序中出現問題,RabbitMQ提供了兩種將訊息踢回代理服務器的機制:
- Basic.Reject: 一次只允許拒絕一個訊息
- Basic.Nack:一次可以拒絕多個訊息
死信交換器:
- 過期的訊息、
basic.nack或basic.reject且requeue引數為false或佇列滿的訊息將進入此交換器, RabbitMQ通過死信交換器將訊息路由到系結的佇列,就像正常發送給交換器的任何其他訊息一樣,- 死信功能還允許你使用預先指定的值覆寫路由鍵(死信交換器即可以系結一個非死信的佇列和一個死信的佇列),這樣可以允許你使用一個交換器同時處理死信訊息和非死信訊息,
- 但需要確保死信訊息不被投遞到非死信佇列中,需要在宣告佇列時指定一個額外的引數
x-dead-letter-routing-key
c.訊息佇列如何解決訊息堆積問題
訊息回推機制
訊息回推機制可以解決訊息堆積的問題
如果發布者應用程式因為發布訊息太快而開始對RabbitMQ造成壓力,那么RabbitMQ將:
- 發送
Channel.FlowRPC方法來讓使發布者阻塞 - 只有發送另一條
Channel.Flow命令,發布者才能解除阻塞狀態繼續發送訊息
但是,在RabbitMQ2.0之前存在發布者沒有監聽Channel.Flow方法的極端情況,
在RabbitMQ3.2之后
采用TCP背壓的機制來解決這種極端情況,即采用一個連接信用閾值的機制:
RabbitMQ將根據RPC請求的完成情況給每一個發布者打分,RabbitMQ只處理有足夠信用的發布者的訊息,- 同時借助
Connection.Blocked和Connection.Unblocked這兩個異步方法,來通知客戶端進行阻塞和取消阻塞,
知識點:
a.實際場景解決訊息堆積問題的流程:
- 修復現有
consumer的問題,并將其停掉, - 重新創建一個容量更大的
topic,比如patition是原來的10倍, - 撰寫一個臨時
consumer程式,消費原來積壓的佇列,該consumer不做任何耗時的操作,將訊息均勻寫入新創建的佇列里, - 將修復好的
consumer部署到原來10倍的機器上消費新佇列, - 訊息積壓解決后,恢復原有架構
核心思路,提高消費者的消費能力,
RabbitMQ集群
在RabbitMQ集群里,運行時狀態包含:
- 交換器
- 佇列
- 系結器
- 用戶
- 虛擬主機
- 策略
它們對所有節點都可用,這種共享運行時狀態的特性,使得集群中的每一個節點都能綁發布或者洗掉連接到第一個節點創建的交換器,因此當節點中出現節點宕機,其運行狀態以及資料都是可以轉移恢復的,
RabbitMQ提供了HA佇列,它可以
- 跨越多個集群節點并共享同步佇列狀態和訊息資料
HA佇列中的某個節點發生故障的話,集群中的其他節點仍然保存著訊息和佇列狀態,- 當故障的節點重新加入集群時,該節點會完全同步自節點故障以來所有被消費的訊息,
- 通過
HA佇列,我們可以實作集群中有節點專門服務發布者,有節點專門服務于消費者, - 一個訊息被發送到
RabbitMQ集群中的任何一個節點時,該訊息會被路由到佇列中去,無關佇列在集群中的位置,
HA佇列需要注意的地方
- 不能跨越
WAN或者互聯網來搭建RabbitMQ集群 - 集群中的節點不宜過多,通常上限在32-64個,太多的節點會增加內部共享運行時狀態的復雜性,
在RabbitMQ集群有三類節點
- 磁盤節點:將集群的運行時狀態會同時存盤在記憶體和磁盤中,
- 記憶體節點:只會把集群的運行時狀態存盤在記憶體中,
- 狀態節點:在任意時刻,一個集群只能有一個統計節點,用來負責收集集群中每一個節點的全部統計資料和狀態資料,
知識點:
- 磁盤節點和記憶體節點都可以進行資料持久化,
- 當節點或者集群崩潰時,在磁盤節點啟動并重新加入集群時,會被用來重建集群的運行狀態,而記憶體節點不會包含任何運行時資料,集群中其他節點會把佇列定義等資訊發送給它,
RabbitMQ集群中最少要有一個磁盤節點,但是過多磁盤節點也會導致共享狀態不一致的問題,
RabbitMQ集群間通訊
在RabbitMQ中提供了兩種方式可以實作跨越不同的RabbitMQ集群間的通訊:
- 聯合交換器:允許發往上游節點交換器的訊息被透明地發送至下游節點中相同名稱的交換器上,
- 聯合佇列:允許下游節點扮演上游節點中共享佇列的消費者角色,為下游節點提供了輪詢消費訊息的能力,
聯合交換器
在一個RabbitMQ集群中所有的節點之間要有一個低延遲的網路環境,
RabbitMQ捆綁的一個插件可以實作下游RabbitMQ可以從先前已經存在的RabbitMQ服務器上獲取訊息,
- 某種意義上,這個聯合插件的行為既像消費者又像訊息發布者,它在上游節點消費訊息,并在同一個節點上將這些訊息進行重新發送,
- 聯合交換器提供了一種簡單、可靠、健壯的方式來擴展
RabbitMQ的基礎設施,實作了RabbitMQ集群無法實作的跨網路延遲, - 聯合插件還能橋接邏輯上隔離的
RabbitMQ集群,使得一個資料中心可以有兩個不同版本的RabbitMQ集群,
聯合佇列
- 采用聯合佇列的方式可以,解決訊息的負載均衡,通過借助其他集群節點來消費佇列訊息,避免訊息堆積現象,
- 某種意義上聯合佇列與聯合交換器的配置沒有本質上的區別,很多時候上兩個是一起使用的,
通時借助聯合插件我們還能實作:
- 提供訊息處理的冗余(即容錯性)
- 實作基于地理分布的應用
- 無縫的實作
RabbitMQ集群的版本升級
訊息佇列之Kafka
Kafka是一個分布式流平臺,它具以下關鍵功能:
- 訊息傳遞系統:發布和訂閱記錄流,類似于訊息佇列或企業訊息傳遞系統,
- 存盤系統:以容錯的持久方式存盤記錄流,
- 流處理:處理發生的記錄流,
Kafka可以作為訊息佇列具有以下特性:
- 高可用:寫入
Kafka的資料將寫入磁盤并進行復制以實作容錯 - 持久性、可靠性:
Kafka提供可生產者確認機制保證了資料的可靠性,并且訊息都是以Record的結構寫入到本地檔案,節點關閉不會丟失資料 - 高吞吐量、低延遲:
Kafka每秒可以處理幾十萬條訊息,它的延遲最低只有幾毫秒 - 可擴展性:
Kafka集群支持熱擴展 - 高并發:支持數千個客戶端同時讀寫
- 高性能:存盤并允許客戶端控制其讀取位置(方便用戶定位資源),依靠使用檔案系統和依靠頁面快取,減少對記憶體的使用
知識點:
Kafka集群將記錄流存盤在稱為topic的類別中.一個topic可以由幾個Partition組成,- 每個記錄(
Record)由一個鍵,一個值和一個時間戳組成,
主題和日志
對于每個主題,Kafka群集都會維護一個磁區日志,如下所示:

每個磁區(Partition)都是有序的(所以每一個Partition內部都是有序的),不變的記錄序列,這些記錄連續地附加到結構化的提交日志中,
磁區中的每個記錄均分配有一個稱為偏移的順序ID號,該ID唯一地標識磁區中的每個記錄,
每個消費者保留的唯一元資料是該消費者在日志中的偏移量或位置,此偏移量由使用者控制:
- 通常,使用者在讀取記錄時會線性地推進其偏移量,但實際上,由于位置是由使用者控制的,因此它可以按喜歡的任何順序使用記錄,
- 例如,使用者可以重置到較舊的偏移量以重新處理過去的資料,或者跳到最近的記錄并從“現在”開始使用,(類似于游標指標的方式順序處理資料,并且該指標可以任意移動)
訊息的有序性
Kafka訊息的有序性,是采用訊息鍵保序策略來實作的,
一個topic,一個partition(分割),一個consumer,內部單執行緒消費,寫N個記憶體queue,然后N個執行緒分別消費一個記憶體queue,
- 通過指定
key的方式,具有相同key的訊息會分發到同一個partition partition會內部對其進行排序,保證其有序性,

磁區的設計結構
- 提供了負載均衡的能力,實作了系統的高伸縮性,
- 不同的磁區能夠被放置到不同節點的機器上,而資料的讀寫操作也都是針對磁區這個粒度而進行的,這樣每個節點的機器都能獨立地執行各自磁區的讀寫請求處理,
- 可以通過添加新的節點機器來增加整體系統的吞吐量,
Kafka磁區的設計邏輯和ES分片的設計邏輯是相同的,
生產者磁區策略
生產者磁區策略是 決定生產者將訊息發送到哪個磁區的演算法,即實作負載均衡,
主要有以下幾種:
- 輪詢策略:順序分配,輪詢策略有非常優秀的負載均衡表現,它總是能保證訊息最大限度地被平均分配到所有磁區上,故默認情況下它是最合理的磁區策略,(默認、常用)
- 隨機策略:所謂隨機就是我們隨意地將訊息放置到任意一個磁區上,
- 訊息鍵保序策略:
Kafka中每條訊息都會有自己的key,一旦訊息被定義了Key,那么你就可以保證同一個Key的所有訊息都進入到相同的磁區里面,由于每個磁區下的訊息處理都是有順序的,
生產者批量發送
Producer端可以在記憶體中合并多條訊息后,以一次請求的方式發送了批量的訊息給broker,從而大大減少broker存盤訊息的IO操作次數,- 但也一定程度上影響了訊息的實時性,相當于以時延代價,換取更好的吞吐量,
Kafka的訊息壓碩訓制
- 一般情況下壓碩訓制:在生產者端解壓、Broker端保持、消費者端解壓
Kafka支持 4 種壓縮演算法:GZIP、Snappy、LZ4,從 2.1.0 開始,Kafka正式支持Zstandard演算法(簡寫為zstd),- 壓碩訓制本質上是以消費者端
CPU性能換取節省網路傳輸帶寬以及Kafka Broker端的磁盤占用,
生產者端壓縮
- 生產者壓縮通常采用的
GZIP演算法,Producer啟動后生產的每個訊息集合都是經GZIP壓縮過的 - 生產者壓縮可以節省網路傳輸帶寬以及
Kafka Broker端的磁盤占用, - 開啟
GZIP壓縮:<entry key="compression.type" value="gzip"/>
Broker壓縮
大部分情況下 Broker 從 Producer 端接收到訊息后僅僅是原封不動地保存而不會對其進行任何修改,
但以下情況會引發Broker壓縮:
Broker端和Producer端采用了不同的壓縮演算法Broker端發生了訊息格式轉換
消費者端解壓
Kafka會將啟用了哪種壓縮演算法封裝進訊息集合中,在Consumer中進行解壓操作,
訊息可靠性機制
kafka提供以下特性來保證其訊息的不丟失,從而保證訊息的可靠性:
- 生產者確認機制
- 生產者失敗回呼機制
- 失敗重試機制
- 消費者確認機制
- 副本機制
- 限定
Broker選取Leader機制
生產者確認機制
- 當
Kafka的若干個Broker(根據配置策略,可以是一個,也可以是ALL)
成功地接收到一條訊息并寫入到日志檔案后,它們會告訴生產者程式這條訊息已成功提交, - 設定
acks = all,如果設定成all,則表明所有副本Broker都要接收到訊息,該訊息才算是“已提交”,
生產者失敗回呼機制
- 生產者不要使用
producer.send(msg),而要使用producer.send(msg,callback),記住,一定要使用帶有回呼通知的send方法, producer.send(msg, callback)采用異步的方式,當發生失敗時會呼叫callback方法,
失敗重試機制
- 設定
retries為一個較大的值,這里的retries同樣是Producer的引數,對應前面提到的Producer自動重試, - 當出現網路的瞬時抖動時,訊息發送可能會失敗,此時配置了
retries > 0的Producer能夠自動重試訊息發送,避免訊息丟失,
消費者確認機制
- 確保訊息消費完成再提交,確保如何消費失敗了,訊息還存在訊息佇列中,
Consumer端有個引數enable.auto.commit,最好把它設定成false,并采用手動提交位移的方式,
副本機制
- 設定
replication.factor >= 3,這也是Broker端的引數,其實這里想表述的是,最好將訊息多保存幾份,畢竟目前防止訊息丟失的主要機制就是冗余, - 設定
min.insync.replicas > 1,這依然是Broker端引數,控制的是訊息至少要被寫入到多少個副本才算是“已提交”,設定成大于 1 可以提升訊息持久性,在實際環境中千萬不要使用默認值 1, - 確保
replication.factor > min.insync.replicas,如果兩者相等,那么只要有一個副本掛機,整個磁區就無法正常作業了,我們不僅要改善訊息的持久性,防止資料丟失,還要在不降低可用性的基礎上完成,推薦設定成replication.factor = min.insync.replicas + 1,
限定Broker選取Leader機制
- 設定
unclean.leader.election.enable = false,這是Broker端的引數,它控制的是哪些
Broker有資格競選磁區的Leader, - 如果一個
Broker落后原先的Leader太多,那么它一旦成為新的Leader,必然會造成訊息的丟失, - 故一般都要將該引數設定成
false,即不允許這種情況的發生,
訊息冪等性和事務
由于kafka生產者確認機制、失敗重試機制的存在,kafka的訊息不會丟失但是存在由于網路延遲等原因造成重復發送的可能性:
kafka提供了冪等性Producer的方式來保證訊息冪等性,- 使用
<entry key="enable.idempotence" value="true"/>的方式開啟冪等性,
冪等性 Producer 的作用范圍:
- 只能保證單磁區上的冪等性,即一個冪等性
Producer能夠保證某個主題的一個磁區上不出現重復訊息,它無法實作多個磁區的冪等性, - 只能實作單會話上的冪等性,不能實作跨會話的冪等性,這里的會話,可以理解為
Producer行程的一次運行,當你重啟了Producer行程之后,這種冪等性保證就喪失了,
Kafka事務
- 事務型
Producer能夠保證將訊息原子性地寫入到多個磁區中,這批訊息要么全部寫入成功,要么全部失敗, - 事務型
Producer也不懼行程的重啟,Producer重啟回來后,Kafka依然保證它們發送訊息的精確一次處理, - 使用
<entry key="enable.idempotence" value="true"/>的方式開啟事務,
探究Kafka消費者的作業原理
消費者組
consumer group是kafka提供的可擴展且具有容錯性的消費者機制,它是由一個或者多個消費者組成,它們共享同一個Group ID.- 組內的所有消費者協調在一起來消費訂閱主題(
subscribed topics)的所有磁區(partition), - 每個磁區只能由同一個消費組內的一個
consumer來消費,
consummer group有以下的特性:
consumer group下可以有一個或多個consumer instance,consumer instance可以是一個行程,也可以是一個執行緒(所以消費者可以采用多執行緒的方式去消費訊息)group.id是一個字串,唯一標識一個consumer groupconsumer group下訂閱的topic下的每個磁區只能分配給某個group下的一個consumer(當然該磁區還可以被分配給其他group)
消費者位置
消費者位置,即位移, 消費者在消費的程序中需要記錄自己消費了多少資料,
位移提交有自動、手動兩種方式進行位移提交,
- 自動提交:在
kafka拉取到資料之后就直接提交,這樣很容易丟失資料 - 手動提交:成功拉取資料之后,對資料進行相應的處理之后再進行提交,如拉取資料之后進行寫入mysql這種 (存在資料處理失敗的可能性),所以這時我們就需要進行手動提交kafka的offset下標,
- 關閉自動提交,使用spring實作的提交方案:
<entry key="enable.auto.commit" value="false"/>
Kafka通過一個內置Topic(__consumer_offsets)來管理消費者位移,
Rebalance機制
Rebalance本質上是一種協議,規定了一個consumer group下的所有consumer如何達成一致來分配訂閱topic的每個磁區,
Kafka提供了一個角色:coordinator來執行對于consumer group的管理,
Group Coordinator是一個服務,每個Broker在啟動的時候都會啟動一個該服務,Group Coordinator的作用是用來存盤Group的相關Meta資訊,并將對應Partition的Offset資訊記錄到Kafka內置Topic(__consumer_offsets)中,
Rebalance 程序分為兩步:Join 和 Sync,
Join 顧名思義就是加入組:
- 所有成員都向
coordinator發送JoinGroup請求,請求加入消費組, - 一旦所有成員都發送了
JoinGroup請求,coordinator會從中選擇一個consumer擔任leader的角色,并把組成員資訊以及訂閱資訊發給leader
注意:leader和coordinator不是一個概念,leader負責消費分配方案的制定,
Sync,這一步leader開始分配消費方案:
- 分配哪個
consumer負責消費哪些topic的哪些partition, - 一旦完成分配,
leader會將這個方案封裝進SyncGroup請求中發給coordinator,非leader也會發SyncGroup請求,只是內容為空, coordinator接收到分配方案之后會把方案塞進SyncGroup的response中發給各個consumer,

CSDN認證博客專家
Java
Redis
架構
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/232664.html
標籤:其他
上一篇:成長筆記(個人心路歷程)
下一篇:關于 RocketMQ:The producer group has been created before, specify another name please.這個報錯的解決辦法
