主頁 >  其他 > 溪源的Java筆記—訊息佇列

溪源的Java筆記—訊息佇列

2020-12-10 18:37:27 其他

溪源的Java筆記—訊息佇列

前言

Java的分布式應用中有一個提升性能的利器——訊息佇列,通過訊息佇列我們可以讓很多操作“異步”地去執行,這種異步的操作可以幫助應用均勻地去處理大量請求涌入的情況,從而降低系統的訪問壓力,本篇博客將帶領大家去了解我們經常使用的兩種訊息佇列RabbitMQKafka的原理,

Redis服務器可參考我的博客:溪源的Java筆記之Redis服務器

正文

訊息佇列之RabbitMQ

RabbitMQ是基于下面兩種基礎來實作:

  • Erlang語言:面向電信行業的函式式編程語言,它為RabbitMQ提供了節點之間訊息通信輕量級執行緒,提供了狀態無關的高并發性,
  • AMQP協議規范:它讓RabbitMQ成為一個與供應商無關、平臺獨立的解決方案,使其可以實作靈活的訊息路由、配置化的訊息持久以及跨資料中心通信,

RabbitMQ的功能

  1. 應用解耦:應用架構不再受限于資料庫寫入的性能瓶頸,應用只需要發送訊息,不需要長時間占用執行緒等待回應;
  2. 資料庫解耦:將直接存在資料庫的資料發送到RabbitMQ,從而實作資料的異步處理,同時也可以通過消費者應用進行限流或者直接關閉,避免資料庫崩潰,
  3. 資料同步:通過RabbitMQ可以將同一個資料存盤到不同的資料庫中,同樣也可以把不同資料庫的資料應用到同一個系統,
  4. 流量削峰:可以借助訊息佇列異步特性來降低系統的訪問壓力,
  5. 日志處理 :借助訊息佇列來異步處理日志檔案

RabbitMQ與AMQP協議
AMQP把客戶端和代理服務器之間的通信資料拆分成一種叫做幀的塊狀結構,

低層AMQP幀的組成:

  • 幀頭:幀型別、信道編號、以位元組為單位的幀大小
  • 幀的有效載荷
  • 結束位元組標記

幀的型別

  • 協議頭幀:用于連接到RabbitMQ,僅使用一次,
  • 心跳幀:客戶端與RabbitMQ之間進行傳遞,作為一種校驗機制確保連接的兩端可以正常作業,
  • 方法幀:攜帶發給RabbitMQ或從RabbitMQ接收到的RPC請求或回應,(告訴RabbitMQ如何路由)
  • 內容頭幀:包含一條訊息的大小和屬性,
  • 訊息體幀:包含訊息的內容,

幀與訊息

  • 我們通常使用方法幀、內容頭幀和訊息幀向RabbitMQ發布訊息,
  • 一個訊息通常以:方法幀、內容頭幀以及一個或者多個訊息體幀,(每個幀都有一定的大小限制,超過限制就會被拆分成多個)
  • 傳輸程序中,方法幀和內容頭幀會被打包成二進制,訊息體幀不會進行任何打包或編碼,它可以是任何資料型別,

Rabbit的抽象組件

  • 交換機(Exchange):接收發送到RabbitMQ中的訊息并決定把他們投遞到那個佇列的組件,
  • 系結(Binding):一套規則,用于告訴交換器訊息應該被存盤到哪個佇列,
  • 佇列(Queue):用來存盤訊息的資料結構,位于硬碟或記憶體中,
  • 信道(Channel):多路復用連接中的一條獨立的雙向資料流通道,信道會占用大量資源,要合理使用,
    在這里插入圖片描述

交換機的類別

  • Direct交換機:是完全匹配、單播的模式,
  • Fanout交換機:類似子網廣播,所有與該互動機系結的佇列都會接受到一份訊息的副本,這種方式是最快的,
  • Topic交換機:通過 正則運算式的方式 實作一對多的系結,
  • Headers交換機:它通過采用訊息屬性中的headers表結合正則運算式的方式 實作一對多的系結(幾乎不使用)

訊息佇列的匹配規則
1.訊息佇列模式
訊息佇列模式:發送者,接受者

2.主題訊息模式
主題訊息:發布者,訂閱者

訊息佇列如何確保其訊息的順序性
通常來說有以下的思路:

  • 單執行緒消費來確保訊息的順序性,
  • 對訊息進行編號,消費者處理時根據編號判斷順序,

RabbitMQ確保訊息有序性
拆分多個queue,每個queue對應一個consumer,然后這個consumer內部用記憶體佇列做排隊,然后分發給底部不同worker處理:

  • 防止使用同一個佇列,導致資料123進入不同的消費者,從而使得資料123沒有按指定的順序被執行
  • 通過拆分queue來保證每一個消費者都能獲得完整的資料123,然后消費者內部進行排隊,從而保證訊息的有序性,
  • 這里同時也要設計,保證訊息的冪等性,
    在這里插入圖片描述

訊息佇列如何保證其不會重復消費
簡單來說,如何實作訊息的冪等性,即訊息執行一次和執行多次的結果是一樣的,
保證資料不會重復消費,要結合業務來實作,比如:

  1. 基于資料庫的主鍵索引的來實作
  2. 基于Redis來實作,使用set操作具有天然的冪等性
  3. 通過先查一次資料,來判斷是新增操作還是更新操作
  4. 通過向資料庫前置一個布隆過濾器來判斷資料是新資料還是舊資料,再使用主鍵索引來實作

訊息佇列如何保證訊息不會丟失
訊息從生產到消費可以經歷三個階段:

  • 生產階段:在這個階段,從訊息在生產者創建出來,經過網路傳輸到訊息佇列服務器中,
  • 存盤階段:訊息在訊息佇列服務器中存盤,如果是集群,訊息會在這個階段被復制到其他的副本上,
  • 消費階段:消費者從訊息佇列服務器中拉取訊息,通過網路傳輸發送到消費者,

各階段保證資料的不丟失的方式:

  • 生產階段:失敗回呼機制、發布者確認、訊息持久化
  • 存盤階段:備用交換器、死信交換器、事務、高可用佇列、基于事務的高可用佇列、訊息持久化
  • 消費階段:消費者確認、訊息持久化

失敗回呼機制
mandatory設定為true,如果訊息不可路由那么rabbitmq會把完整的訊息退回到發布者中

public RabbitTemplate rabbitTemplate(){
    RabbitTemplate template = new RabbitTemplate(connectionFactory());
    template.setMandatory(true); //設定發送訊息失敗重試
    return template;
    }

發布者確認
發布者確認,即Confirm機制具體的實作方式:

  1. spring.rabbitmq.template.mandatory = true 設定成true
  2. spring.rabbitmq.publisher-confirms = true 設定成true
  3. 撰寫一個 java 類,實作 RabbitTemplate.ConfirmCallback介面,在這個里面我們可以確認訊息是否到達了RabbitMQ服務器,

這里RabbitMq提供一個回呼函式可以將投遞失敗的訊息給輸出出來:

// 訊息是否從Exchange路由到Queue, 注意: 這是一個失敗回呼, 只有訊息從Exchange路由到Queue失敗才會回呼這個方法
  rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
         log.info("訊息從Exchange路由到Queue失敗: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", exchange, routingKey, replyCode, replyText, message);
    });

消費者確認機制
消費者確認機制,即應答模式:

  • 消費者完成消費處理后,會發送一個消費應答,告訴訊息佇列服務器這個訊息已經處理完成可以洗掉這個訊息了
  • 如果一個消費者由于宕機,沒有發送訊息應答,那么訊息佇列服務器會認為訊息發送失敗,自動進行補償行為,即將這個訊息重新加入佇列,重新投遞,

應答模式可以分為:

  • 自動應答:不在乎消費者對訊息處理是否成功,都會告訴佇列洗掉訊息,如果處理訊息失敗,實作自動補償(佇列投遞過去重新處理),
  • 手動應答:消費者處理完業務邏輯,手動回傳ack(通知)告訴佇列處理完了,佇列進而洗掉訊息,

應答模式犧牲了訊息佇列的性能,從而提高了訊息的可靠性,

備用交換器
備用交換器用于處理無法路由的訊息,備用交換器在第一次宣告交換器時被指定,用來提供一種預先存在的交換器,即如果交換器無法路由訊息,那么訊息就會被路由到這個新的備用交換器,

死信交換器
過期的訊息、basic.nackbasic.rejectrequeue引數為false或佇列滿的訊息將進入此交換器

事務
采用AMQP事務機制和發布者確認的機制,來解決服務器資料丟失的問題,
這種機制極大犧牲了性能從而換取訊息的可靠性,

高可用佇列
高可用佇列又稱為HA佇列,需要RabbitMQ集群環境,可以通過使用AMQP或者使用基于web的管理界面來設定,
RabbitMQ集群搭建結合Erlang來實作其內部通信,借助Haproxy實作請求的負載均衡,

基于事務的高可用佇列
在一個集群的環境下,采用的是事務或投遞確認機制,則訊息在被HA佇列定義的所有活動節點確認之后,RabbitMQ才會發送成功的回應,這種方式會造成很大的延遲,

訊息持久化
訊息持久化是解決訊息被投遞到RabbitMQ的記憶體中,還沒有投遞到消費者實體之前就宕機了,而導致訊息丟失的問題,

交換機的持久化:

  • new DirectExchange("log.user.exchange", true, false);
  • 第二個引數durable: 是否持久化,
  • 第三個引數autoDelete: 當所有系結佇列都不再使用時, 是否自動洗掉交換器, true: 洗掉, false: 不洗掉,

queue的持久化:

  • new Queue("log.user.queue.name", true);
  • 宣告佇列時指定持久化引數為true即可

message的持久化:

  • delivery-mode設定為MessageDeliveryMode.PERSISTENT,即可以實作message的持久化,
  • 在默認的情況下message都是持久化的,

知識點:
a.關于RabbitMQ的配置要根據實際的業務需要,在可靠性和性能之間進行抉擇:

  1. 使用mandatory設定,RabbitMQ將不接受不可路由訊息
  2. 發布者確認作為事務的輕量級替代方法
  3. 使用備用交換器處理無法路由的訊息
  4. 基于事務的批量處理
  5. 使用HA佇列避免節點故障
  6. 使用回推機制,拒絕接受發布者發布的過多訊息

消費訊息性能控制

RabbitMQ實作了兩個不同的AMQP RPC命令來獲取佇列中的訊息:

  • Basic.Get:是一個輪詢模型(這種模式的性能通常比后低二倍以上)
  • Basic.Cunsume:是一個推送模型(即 發布-訂閱模式)

所以使用推送模型,能很大程度提高RabbitMQ的消費能力,

a.提升消費者消費訊息的能力
RabbitMQ提供一下的方式來提升消費訊息的能力(性能依次降低)
1.基于no-ack模式進行消費

ack模式即為應答模式,即消費者確認,我們可以通過關閉ack模式(默認是打開的)的方式實作更快的吞吐量,

  • 但是我們要知道RabbitMQ將資料投遞到消費者的程序中會進過系統自帶的資料緩沖區,
  • 由于缺少消費者確認,當系統出現網路波動會導致當前作業系統的套接字接識訓沖區爆滿,從而影響程式的正常運行,

linux系統可以通過增加net_core.rmem_defaultnet.core.rmem_max值,通常設定為16M即可,

sysctl -w net.core.rmem_max=16777216
sysctl -w net.core.rmem_default=16777216

2.基于確認和Qos>1進行消費
Qos服務質量設定,即在確認訊息接收之前,消費者可以預先要求接收一定數量的訊息:

  • QoS設定允許RabbitMQ通過為消費者預先分配一定數量的訊息來實作更高效地訊息發送
  • Qos 默認是1 ,默認每條訊息都會確認,這種方式不可以與no-ack同時設定,
  • 它本身可以視為ack的一種優化,它不需要對每條訊息進行確認,可以通過設定multipletrue,對所有以前沒有進行確認的訊息進行確認,

3.使用事務來批量進行
事務可能會對訊息吞吐量產生負面影響,但有一個例外,如果你不適用QoS設定,那么在使用事務來批量確認訊息時,實際上可能會看到略微的性能提升,并且事務不適用于已禁止確認的消費者,

b.拒絕訊息
當訊息本身或訊息處理的程序中出現問題,RabbitMQ提供了兩種將訊息踢回代理服務器的機制:

  • Basic.Reject: 一次只允許拒絕一個訊息
  • Basic.Nack:一次可以拒絕多個訊息

死信交換器:

  • 過期的訊息、basic.nackbasic.rejectrequeue引數為false或佇列滿的訊息將進入此交換器,
  • RabbitMQ通過死信交換器將訊息路由到系結的佇列,就像正常發送給交換器的任何其他訊息一樣,
  • 死信功能還允許你使用預先指定的值覆寫路由鍵(死信交換器即可以系結一個非死信的佇列和一個死信的佇列),這樣可以允許你使用一個交換器同時處理死信訊息和非死信訊息,
  • 但需要確保死信訊息不被投遞到非死信佇列中,需要在宣告佇列時指定一個額外的引數 x-dead-letter-routing-key

c.訊息佇列如何解決訊息堆積問題

訊息回推機制
訊息回推機制可以解決訊息堆積的問題
如果發布者應用程式因為發布訊息太快而開始對RabbitMQ造成壓力,那么RabbitMQ將:

  • 發送Channel.FlowRPC方法來讓使發布者阻塞
  • 只有發送另一條Channel.Flow命令,發布者才能解除阻塞狀態繼續發送訊息

但是,在RabbitMQ2.0之前存在發布者沒有監聽Channel.Flow方法的極端情況,

RabbitMQ3.2之后
采用TCP背壓的機制來解決這種極端情況,即采用一個連接信用閾值的機制:

  • RabbitMQ將根據RPC請求的完成情況給每一個發布者打分,RabbitMQ只處理有足夠信用的發布者的訊息,
  • 同時借助Connection.BlockedConnection.Unblocked這兩個異步方法,來通知客戶端進行阻塞和取消阻塞,

知識點:
a.實際場景解決訊息堆積問題的流程:

  1. 修復現有consumer的問題,并將其停掉,
  2. 重新創建一個容量更大的topic,比如patition是原來的10倍,
  3. 撰寫一個臨時consumer程式,消費原來積壓的佇列,該consumer不做任何耗時的操作,將訊息均勻寫入新創建的佇列里,
  4. 將修復好的consumer部署到原來10倍的機器上消費新佇列,
  5. 訊息積壓解決后,恢復原有架構

核心思路,提高消費者的消費能力,

RabbitMQ集群

RabbitMQ集群里,運行時狀態包含:

  • 交換器
  • 佇列
  • 系結器
  • 用戶
  • 虛擬主機
  • 策略

它們對所有節點都可用,這種共享運行時狀態的特性,使得集群中的每一個節點都能綁發布或者洗掉連接到第一個節點創建的交換器,因此當節點中出現節點宕機,其運行狀態以及資料都是可以轉移恢復的,

RabbitMQ提供了HA佇列,它可以

  • 跨越多個集群節點并共享同步佇列狀態和訊息資料
  • HA佇列中的某個節點發生故障的話,集群中的其他節點仍然保存著訊息和佇列狀態,
  • 當故障的節點重新加入集群時,該節點會完全同步自節點故障以來所有被消費的訊息,
  • 通過HA佇列,我們可以實作集群中有節點專門服務發布者,有節點專門服務于消費者,
  • 一個訊息被發送到RabbitMQ集群中的任何一個節點時,該訊息會被路由到佇列中去,無關佇列在集群中的位置,

HA佇列需要注意的地方

  • 不能跨越WAN或者互聯網來搭建RabbitMQ集群
  • 集群中的節點不宜過多,通常上限在32-64個,太多的節點會增加內部共享運行時狀態的復雜性,

在RabbitMQ集群有三類節點

  • 磁盤節點:將集群的運行時狀態會同時存盤在記憶體和磁盤中,
  • 記憶體節點:只會把集群的運行時狀態存盤在記憶體中,
  • 狀態節點:在任意時刻,一個集群只能有一個統計節點,用來負責收集集群中每一個節點的全部統計資料和狀態資料,

知識點:

  1. 磁盤節點和記憶體節點都可以進行資料持久化,
  2. 當節點或者集群崩潰時,在磁盤節點啟動并重新加入集群時,會被用來重建集群的運行狀態,而記憶體節點不會包含任何運行時資料,集群中其他節點會把佇列定義等資訊發送給它,
  3. RabbitMQ集群中最少要有一個磁盤節點,但是過多磁盤節點也會導致共享狀態不一致的問題,

RabbitMQ集群間通訊
RabbitMQ中提供了兩種方式可以實作跨越不同的RabbitMQ集群間的通訊:

  • 聯合交換器:允許發往上游節點交換器的訊息被透明地發送至下游節點中相同名稱的交換器上,
  • 聯合佇列:允許下游節點扮演上游節點中共享佇列的消費者角色,為下游節點提供了輪詢消費訊息的能力,

聯合交換器
在一個RabbitMQ集群中所有的節點之間要有一個低延遲的網路環境,

RabbitMQ捆綁的一個插件可以實作下游RabbitMQ可以從先前已經存在的RabbitMQ服務器上獲取訊息,

  • 某種意義上,這個聯合插件的行為既像消費者又像訊息發布者,它在上游節點消費訊息,并在同一個節點上將這些訊息進行重新發送,
  • 聯合交換器提供了一種簡單、可靠、健壯的方式來擴展RabbitMQ的基礎設施,實作了RabbitMQ集群無法實作的跨網路延遲,
  • 聯合插件還能橋接邏輯上隔離的RabbitMQ集群,使得一個資料中心可以有兩個不同版本的RabbitMQ集群,

聯合佇列

  • 采用聯合佇列的方式可以,解決訊息的負載均衡,通過借助其他集群節點來消費佇列訊息,避免訊息堆積現象,
  • 某種意義上聯合佇列與聯合交換器的配置沒有本質上的區別,很多時候上兩個是一起使用的,

通時借助聯合插件我們還能實作:

  • 提供訊息處理的冗余(即容錯性)
  • 實作基于地理分布的應用
  • 無縫的實作RabbitMQ集群的版本升級

訊息佇列之Kafka

Kafka是一個分布式流平臺,它具以下關鍵功能:

  • 訊息傳遞系統:發布和訂閱記錄流,類似于訊息佇列或企業訊息傳遞系統,
  • 存盤系統:以容錯的持久方式存盤記錄流,
  • 流處理:處理發生的記錄流,

Kafka可以作為訊息佇列具有以下特性:

  • 高可用:寫入Kafka的資料將寫入磁盤并進行復制以實作容錯
  • 持久性、可靠性Kafka提供可生產者確認機制保證了資料的可靠性,并且訊息都是以Record的結構寫入到本地檔案,節點關閉不會丟失資料
  • 高吞吐量、低延遲Kafka每秒可以處理幾十萬條訊息,它的延遲最低只有幾毫秒
  • 可擴展性Kafka集群支持熱擴展
  • 高并發:支持數千個客戶端同時讀寫
  • 高性能:存盤并允許客戶端控制其讀取位置(方便用戶定位資源),依靠使用檔案系統和依靠頁面快取,減少對記憶體的使用

知識點:

  • Kafka集群將記錄流存盤在稱為topic的類別中.一個topic可以由幾個Partition組成,
  • 每個記錄(Record)由一個鍵,一個值和一個時間戳組成,

主題和日志
對于每個主題,Kafka群集都會維護一個磁區日志,如下所示:
在這里插入圖片描述

每個磁區(Partition)都是有序的(所以每一個Partition內部都是有序的),不變的記錄序列,這些記錄連續地附加到結構化的提交日志中,
磁區中的每個記錄均分配有一個稱為偏移的順序ID號,該ID唯一地標識磁區中的每個記錄,

每個消費者保留的唯一元資料是該消費者在日志中的偏移量或位置,此偏移量由使用者控制:

  • 通常,使用者在讀取記錄時會線性地推進其偏移量,但實際上,由于位置是由使用者控制的,因此它可以按喜歡的任何順序使用記錄,
  • 例如,使用者可以重置到較舊的偏移量以重新處理過去的資料,或者跳到最近的記錄并從“現在”開始使用,(類似于游標指標的方式順序處理資料,并且該指標可以任意移動)

訊息的有序性
Kafka訊息的有序性,是采用訊息鍵保序策略來實作的,
一個topic,一個partition(分割),一個consumer,內部單執行緒消費,寫N個記憶體queue,然后N個執行緒分別消費一個記憶體queue

  • 通過指定key的方式,具有相同key的訊息會分發到同一個partition
  • partition會內部對其進行排序,保證其有序性,
    在這里插入圖片描述

磁區的設計結構

  • 提供了負載均衡的能力,實作了系統的高伸縮性,
  • 不同的磁區能夠被放置到不同節點的機器上,而資料的讀寫操作也都是針對磁區這個粒度而進行的,這樣每個節點的機器都能獨立地執行各自磁區的讀寫請求處理,
  • 可以通過添加新的節點機器來增加整體系統的吞吐量,
  • Kafka磁區的設計邏輯和ES分片的設計邏輯是相同的,

生產者磁區策略
生產者磁區策略是 決定生產者將訊息發送到哪個磁區的演算法,即實作負載均衡,
主要有以下幾種:

  • 輪詢策略:順序分配,輪詢策略有非常優秀的負載均衡表現,它總是能保證訊息最大限度地被平均分配到所有磁區上,故默認情況下它是最合理的磁區策略,(默認、常用)
  • 隨機策略:所謂隨機就是我們隨意地將訊息放置到任意一個磁區上,
  • 訊息鍵保序策略Kafka 中每條訊息都會有自己的key,一旦訊息被定義了 Key,那么你就可以保證同一個 Key的所有訊息都進入到相同的磁區里面,由于每個磁區下的訊息處理都是有順序的,

生產者批量發送

  • Producer 端可以在記憶體中合并多條訊息后,以一次請求的方式發送了批量的訊息給 broker,從而大大減少 broker 存盤訊息的IO 操作次數,
  • 但也一定程度上影響了訊息的實時性,相當于以時延代價,換取更好的吞吐量,

Kafka的訊息壓碩訓制

  • 一般情況下壓碩訓制:在生產者端解壓、Broker端保持、消費者端解壓
  • Kafka 支持 4 種壓縮演算法:GZIPSnappyLZ4,從 2.1.0 開始,Kafka 正式支持 Zstandard演算法(簡寫為 zstd),
  • 壓碩訓制本質上是以消費者端CPU性能換取節省網路傳輸帶寬以及Kafka Broker端的磁盤占用,

生產者端壓縮

  • 生產者壓縮通常采用的GZIP演算法, Producer 啟動后生產的每個訊息集合都是經 GZIP 壓縮過的
  • 生產者壓縮可以節省網路傳輸帶寬以及 Kafka Broker 端的磁盤占用,
  • 開啟GZIP壓縮:<entry key="compression.type" value="gzip"/>

Broker壓縮
大部分情況下 BrokerProducer 端接收到訊息后僅僅是原封不動地保存而不會對其進行任何修改,
但以下情況會引發Broker壓縮:

  • Broker端和Producer端采用了不同的壓縮演算法
  • Broker端發生了訊息格式轉換

消費者端解壓

  • Kafka 會將啟用了哪種壓縮演算法封裝進訊息集合中,在Consumer中進行解壓操作,

訊息可靠性機制

kafka提供以下特性來保證其訊息的不丟失,從而保證訊息的可靠性:

  1. 生產者確認機制
  2. 生產者失敗回呼機制
  3. 失敗重試機制
  4. 消費者確認機制
  5. 副本機制
  6. 限定Broker選取Leader機制

生產者確認機制

  • Kafka 的若干個 Broker(根據配置策略,可以是一個,也可以是ALL
    成功地接收到一條訊息并寫入到日志檔案后,它們會告訴生產者程式這條訊息已成功提交,
  • 設定 acks = all,如果設定成 all,則表明所有副本 Broker 都要接收到訊息,該訊息才算是“已提交”,

生產者失敗回呼機制

  • 生產者不要使用 producer.send(msg),而要使用 producer.send(msg,callback),記住,一定要使用帶有回呼通知的 send 方法,
  • producer.send(msg, callback) 采用異步的方式,當發生失敗時會呼叫callback方法,

失敗重試機制

  • 設定 retries 為一個較大的值,這里的 retries 同樣是 Producer 的引數,對應前面提到的 Producer自動重試,
  • 當出現網路的瞬時抖動時,訊息發送可能會失敗,此時配置了 retries > 0Producer 能夠自動重試訊息發送,避免訊息丟失,

消費者確認機制

  • 確保訊息消費完成再提交,確保如何消費失敗了,訊息還存在訊息佇列中,
  • Consumer 端有個引數 enable.auto.commit,最好把它設定成 false,并采用手動提交位移的方式,

副本機制

  • 設定 replication.factor >= 3,這也是 Broker 端的引數,其實這里想表述的是,最好將訊息多保存幾份,畢竟目前防止訊息丟失的主要機制就是冗余,
  • 設定 min.insync.replicas > 1,這依然是 Broker 端引數,控制的是訊息至少要被寫入到多少個副本才算是“已提交”,設定成大于 1 可以提升訊息持久性,在實際環境中千萬不要使用默認值 1,
  • 確保 replication.factor > min.insync.replicas,如果兩者相等,那么只要有一個副本掛機,整個磁區就無法正常作業了,我們不僅要改善訊息的持久性,防止資料丟失,還要在不降低可用性的基礎上完成,推薦設定成 replication.factor = min.insync.replicas + 1

限定Broker選取Leader機制

  • 設定 unclean.leader.election.enable = false,這是 Broker 端的引數,它控制的是哪些
    Broker 有資格競選磁區的 Leader
  • 如果一個 Broker 落后原先的 Leader 太多,那么它一旦成為新的 Leader,必然會造成訊息的丟失,
  • 故一般都要將該引數設定成 false,即不允許這種情況的發生,

訊息冪等性和事務

由于kafka生產者確認機制、失敗重試機制的存在,kafka的訊息不會丟失但是存在由于網路延遲等原因造成重復發送的可能性:

  • kafka提供了冪等性Producer的方式來保證訊息冪等性,
  • 使用 <entry key="enable.idempotence" value="true"/>的方式開啟冪等性,

冪等性 Producer 的作用范圍:

  • 只能保證單磁區上的冪等性,即一個冪等性 Producer 能夠保證某個主題的一個磁區上不出現重復訊息,它無法實作多個磁區的冪等性,
  • 只能實作單會話上的冪等性,不能實作跨會話的冪等性,這里的會話,可以理解為 Producer 行程的一次運行,當你重啟了 Producer行程之后,這種冪等性保證就喪失了,

Kafka事務

  • 事務型 Producer 能夠保證將訊息原子性地寫入到多個磁區中,這批訊息要么全部寫入成功,要么全部失敗,
  • 事務型 Producer 也不懼行程的重啟,Producer 重啟回來后,Kafka 依然保證它們發送訊息的精確一次處理,
  • 使用 <entry key="enable.idempotence" value="true"/>的方式開啟事務,

探究Kafka消費者的作業原理

消費者組

  • consumer groupkafka提供的可擴展且具有容錯性的消費者機制,它是由一個或者多個消費者組成,它們共享同一個Group ID.
  • 組內的所有消費者協調在一起來消費訂閱主題(subscribed topics)的所有磁區(partition),
  • 每個磁區只能由同一個消費組內的一個consumer來消費,

consummer group有以下的特性:

  • consumer group下可以有一個或多個consumer instanceconsumer instance可以是一個行程,也可以是一個執行緒(所以消費者可以采用多執行緒的方式去消費訊息)
  • group.id是一個字串,唯一標識一個consumer group
  • consumer group下訂閱的topic下的每個磁區只能分配給某個group下的一個consumer(當然該磁區還可以被分配給其他group)

消費者位置
消費者位置,即位移, 消費者在消費的程序中需要記錄自己消費了多少資料,
位移提交有自動、手動兩種方式進行位移提交,

  • 自動提交:在kafka拉取到資料之后就直接提交,這樣很容易丟失資料
  • 手動提交:成功拉取資料之后,對資料進行相應的處理之后再進行提交,如拉取資料之后進行寫入mysql這種 (存在資料處理失敗的可能性),所以這時我們就需要進行手動提交kafka的offset下標,
  • 關閉自動提交,使用spring實作的提交方案:<entry key="enable.auto.commit" value="false"/>

Kafka通過一個內置Topic(__consumer_offsets)來管理消費者位移,

Rebalance機制

Rebalance本質上是一種協議,規定了一個consumer group下的所有consumer如何達成一致來分配訂閱topic的每個磁區,

Kafka提供了一個角色:coordinator來執行對于consumer group的管理,

  • Group Coordinator是一個服務,每個Broker在啟動的時候都會啟動一個該服務,
  • Group Coordinator的作用是用來存盤Group的相關Meta資訊,并將對應PartitionOffset資訊記錄到Kafka內置Topic(__consumer_offsets)中,

Rebalance 程序分為兩步:Join 和 Sync,

Join 顧名思義就是加入組:

  • 所有成員都向coordinator發送JoinGroup請求,請求加入消費組,
  • 一旦所有成員都發送了JoinGroup請求,coordinator會從中選擇一個consumer擔任leader的角色,并把組成員資訊以及訂閱資訊發給leader

注意:leadercoordinator不是一個概念,leader負責消費分配方案的制定,

Sync,這一步leader開始分配消費方案:

  • 分配哪個consumer負責消費哪些topic的哪些partition
  • 一旦完成分配,leader會將這個方案封裝進SyncGroup請求中發給coordinator,非leader也會發SyncGroup請求,只是內容為空,
  • coordinator接收到分配方案之后會把方案塞進SyncGroupresponse中發給各個consumer

在這里插入圖片描述

溪源的奇思妙想 CSDN認證博客專家 Java Redis 架構
微信公眾號:溪源的奇思妙想
溪源,一個在IT技術圈和經濟學之間的求知者——既對人工智能、物聯網等前沿技術興致勃勃,又對機會成本、邊際收益等經濟學理論流連忘返,人生是一場孤獨的旅行,只是我還是僥幸期待有同路人,我希望認識同樣熱愛技術、迷戀經濟學的你,

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/232664.html

標籤:其他

上一篇:成長筆記(個人心路歷程)

下一篇:關于 RocketMQ:The producer group has been created before, specify another name please.這個報錯的解決辦法

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 網閘典型架構簡述

    網閘架構一般分為兩種:三主機的三系統架構網閘和雙主機的2+1架構網閘。 三主機架構分別為內端機、外端機和仲裁機。三機無論從軟體和硬體上均各自獨立。首先從硬體上來看,三機都用各自獨立的主板、記憶體及存盤設備。從軟體上來看,三機有各自獨立的作業系統。這樣能達到完全的三機獨立。對于“2+1”系統,“2”分為 ......

    uj5u.com 2020-09-10 02:00:44 more
  • 如何從xshell上傳檔案到centos linux虛擬機里

    如何從xshell上傳檔案到centos linux虛擬機里及:虛擬機CentOs下執行 yum -y install lrzsz命令,出現錯誤:鏡像無法找到軟體包 前言 一、安裝lrzsz步驟 二、上傳檔案 三、遇到的問題及解決方案 總結 前言 提示:其實很簡單,往虛擬機上安裝一個上傳檔案的工具 ......

    uj5u.com 2020-09-10 02:00:47 more
  • 一、SQLMAP入門

    一、SQLMAP入門 1、判斷是否存在注入 sqlmap.py -u 網址/id=1 id=1不可缺少。當注入點后面的引數大于兩個時。需要加雙引號, sqlmap.py -u "網址/id=1&uid=1" 2、判斷文本中的請求是否存在注入 從文本中加載http請求,SQLMAP可以從一個文本檔案中 ......

    uj5u.com 2020-09-10 02:00:50 more
  • Metasploit 簡單使用教程

    metasploit 簡單使用教程 浩先生, 2020-08-28 16:18:25 分類專欄: kail 網路安全 linux 文章標簽: linux資訊安全 編輯 著作權 metasploit 使用教程 前言 一、Metasploit是什么? 二、準備作業 三、具體步驟 前言 Msfconsole ......

    uj5u.com 2020-09-10 02:00:53 more
  • 游戲逆向之驅動層與用戶層通訊

    驅動層代碼: #pragma once #include <ntifs.h> #define add_code CTL_CODE(FILE_DEVICE_UNKNOWN,0x800,METHOD_BUFFERED,FILE_ANY_ACCESS) /* 更多游戲逆向視頻www.yxfzedu.com ......

    uj5u.com 2020-09-10 02:00:56 more
  • 北斗電力時鐘(北斗授時服務器)讓網路資料更精準

    北斗電力時鐘(北斗授時服務器)讓網路資料更精準 北斗電力時鐘(北斗授時服務器)讓網路資料更精準 京準電子科技官微——ahjzsz 近幾年,資訊技術的得了快速發展,互聯網在逐漸普及,其在人們生活和生產中都得到了廣泛應用,并且取得了不錯的應用效果。計算機網路資訊在電力系統中的應用,一方面使電力系統的運行 ......

    uj5u.com 2020-09-10 02:01:03 more
  • 【CTF】CTFHub 技能樹 彩蛋 writeup

    ?碎碎念 CTFHub:https://www.ctfhub.com/ 筆者入門CTF時時剛開始刷的是bugku的舊平臺,后來才有了CTFHub。 感覺不論是網頁UI設計,還是題目質量,賽事跟蹤,工具軟體都做得很不錯。 而且因為獨到的金幣制度的確讓人有一種想去刷題賺金幣的感覺。 個人還是非常喜歡這個 ......

    uj5u.com 2020-09-10 02:04:05 more
  • 02windows基礎操作

    我學到了一下幾點 Windows系統目錄結構與滲透的作用 常見Windows的服務詳解 Windows埠詳解 常用的Windows注冊表詳解 hacker DOS命令詳解(net user / type /md /rd/ dir /cd /net use copy、批處理 等) 利用dos命令制作 ......

    uj5u.com 2020-09-10 02:04:18 more
  • 03.Linux基礎操作

    我學到了以下幾點 01Linux系統介紹02系統安裝,密碼啊破解03Linux常用命令04LAMP 01LINUX windows: win03 8 12 16 19 配置不繁瑣 Linux:redhat,centos(紅帽社區版),Ubuntu server,suse unix:金融機構,證券,銀 ......

    uj5u.com 2020-09-10 02:04:30 more
  • 05HTML

    01HTML介紹 02頭部標簽講解03基礎標簽講解04表單標簽講解 HTML前段語言 js1.了解代碼2.根據代碼 懂得挖掘漏洞 (POST注入/XSS漏洞上傳)3.黑帽seo 白帽seo 客戶網站被黑帽植入劫持代碼如何處理4.熟悉html表單 <html><head><title>TDK標題,描述 ......

    uj5u.com 2020-09-10 02:04:36 more
最新发布
  • 2023年最新微信小程式抓包教程

    01 開門見山 隔一個月發一篇文章,不過分。 首先回顧一下《微信系結手機號資料庫被脫庫事件》,我也是第一時間得知了這個訊息,然后跟蹤了整件事情的經過。下面是這起事件的相關截圖以及近日流出的一萬條資料樣本: 個人認為這件事也沒什么,還不如關注一下之前45億快遞資料查詢渠道疑似在近日復活的訊息。 訊息是 ......

    uj5u.com 2023-04-20 08:48:24 more
  • web3 產品介紹:metamask 錢包 使用最多的瀏覽器插件錢包

    Metamask錢包是一種基于區塊鏈技術的數字貨幣錢包,它允許用戶在安全、便捷的環境下管理自己的加密資產。Metamask錢包是以太坊生態系統中最流行的錢包之一,它具有易于使用、安全性高和功能強大等優點。 本文將詳細介紹Metamask錢包的功能和使用方法。 一、 Metamask錢包的功能 數字資 ......

    uj5u.com 2023-04-20 08:47:46 more
  • vulnhub_Earth

    前言 靶機地址->>>vulnhub_Earth 攻擊機ip:192.168.20.121 靶機ip:192.168.20.122 參考文章 https://www.cnblogs.com/Jing-X/archive/2022/04/03/16097695.html https://www.cnb ......

    uj5u.com 2023-04-20 07:46:20 more
  • 從4k到42k,軟體測驗工程師的漲薪史,給我看哭了

    清明節一過,盲猜大家已經無心上班,在數著日子準備過五一,但一想到銀行卡里的余額……瞬間心情就不美麗了。最近,2023年高校畢業生就業調查顯示,本科畢業月平均起薪為5825元。調查一出,便有很多同學表示自己又被平均了。看著這一資料,不免讓人想到前不久中國青年報的一項調查:近六成大學生認為畢業10年內會 ......

    uj5u.com 2023-04-20 07:44:00 more
  • 最新版本 Stable Diffusion 開源 AI 繪畫工具之中文自動提詞篇

    🎈 標簽生成器 由于輸入正向提示詞 prompt 和反向提示詞 negative prompt 都是使用英文,所以對學習母語的我們非常不友好 使用網址:https://tinygeeker.github.io/p/ai-prompt-generator 這個網址是為了讓大家在使用 AI 繪畫的時候 ......

    uj5u.com 2023-04-20 07:43:36 more
  • 漫談前端自動化測驗演進之路及測驗工具分析

    隨著前端技術的不斷發展和應用程式的日益復雜,前端自動化測驗也在不斷演進。隨著 Web 應用程式變得越來越復雜,自動化測驗的需求也越來越高。如今,自動化測驗已經成為 Web 應用程式開發程序中不可或缺的一部分,它們可以幫助開發人員更快地發現和修復錯誤,提高應用程式的性能和可靠性。 ......

    uj5u.com 2023-04-20 07:43:16 more
  • CANN開發實踐:4個DVPP記憶體問題的典型案例解讀

    摘要:由于DVPP媒體資料處理功能對存放輸入、輸出資料的記憶體有更高的要求(例如,記憶體首地址128位元組對齊),因此需呼叫專用的記憶體申請介面,那么本期就分享幾個關于DVPP記憶體問題的典型案例,并給出原因分析及解決方法。 本文分享自華為云社區《FAQ_DVPP記憶體問題案例》,作者:昇騰CANN。 DVPP ......

    uj5u.com 2023-04-20 07:43:03 more
  • msf學習

    msf學習 以kali自帶的msf為例 一、msf核心模塊與功能 msf模塊都放在/usr/share/metasploit-framework/modules目錄下 1、auxiliary 輔助模塊,輔助滲透(埠掃描、登錄密碼爆破、漏洞驗證等) 2、encoders 編碼器模塊,主要包含各種編碼 ......

    uj5u.com 2023-04-20 07:42:59 more
  • Halcon軟體安裝與界面簡介

    1. 下載Halcon17版本到到本地 2. 雙擊安裝包后 3. 步驟如下 1.2 Halcon軟體安裝 界面分為四大塊 1. Halcon的五個助手 1) 影像采集助手:與相機連接,設定相機引數,采集影像 2) 標定助手:九點標定或是其它的標定,生成標定檔案及內參外參,可以將像素單位轉換為長度單位 ......

    uj5u.com 2023-04-20 07:42:17 more
  • 在MacOS下使用Unity3D開發游戲

    第一次發博客,先發一下我的游戲開發環境吧。 去年2月份買了一臺MacBookPro2021 M1pro(以下簡稱mbp),這一年來一直在用mbp開發游戲。我大致分享一下我的開發工具以及使用體驗。 1、Unity 官網鏈接: https://unity.cn/releases 我一般使用的Apple ......

    uj5u.com 2023-04-20 07:40:19 more