主頁 > 軟體設計 > kafka--生產者詳解

kafka--生產者詳解

2021-04-12 12:11:26 軟體設計

一、生產者發送訊息的程序

1.包裝 ProducerRecord 物件

Kafka 會將發送訊息包裝為 ProducerRecord 物件, ProducerRecord 物件包含了目標主題和要發送的內容,同時還可以指定鍵和磁區,在發送 ProducerRecord 物件前,生產者會先把鍵和值物件序列化成位元組陣列,這樣它們才能夠在網路上傳輸,

2.指定磁區

接下來,資料被傳給磁區器,如果之前已經在 ProducerRecord 物件里指定了磁區,那么磁區器就不會再做任何事情,如果沒有指定磁區 ,那么磁區器會根據 ProducerRecord 物件的鍵來選擇一個磁區,緊接著,這條記錄被添加到一個記錄批次里,這個批次里的所有訊息會被發送到相同的主題和磁區上,

3.放入快取

分好區的訊息不是直接被發送到服務端,而是放入了生產者的一個快取里面,在這個快取里面,多條訊息會被封裝成為一個批次(batch),默認一個批次的大小是 16K,

4.發送訊息

Sender 執行緒啟動以后會從快取里面去獲取可以發送的批次,把這些記錄批次發送到相應的 broker 上,

5.接識訓傳

服務器在收到這些訊息時會回傳一個回應,如果訊息成功寫入 Kafka,就回傳一個 RecordMetaData 物件,它包含了主題和磁區資訊,以及記錄在磁區里的偏移量,如果寫入失敗,則會回傳一個錯誤,生產者在收到錯誤之后會嘗試重新發送訊息,如果達到指定的重試次數后還沒有成功,則直接拋出例外,不再重試

二、生產者整體架構

整個生 產者客戶端由兩個執行緒協調運行,這兩個執行緒分 別為主執行緒和 Sender 執行緒 (發送線 程),在主執行緒中由 kafkaProd ucer 創建訊息,然后通過可能的攔截器、序列化器和磁區器的作用之后快取到訊息累加器( RecordAccumulator ,也稱為訊息收 集器〉中, Sender 執行緒負責從 RecordAccumulator 獲取訊息并將其發送到 Ka fka

三、序列化

生產者需要用序列化器(Serializer)把物件轉換成位元組陣列才能通過網路發送給Kaflca, 而在對側, 消費者需要用反序列化器(Deserializer)把從Kaflca 中收到的位元組陣列轉換成相應的物件,訊息的key和value都使用了字串, 對應程式中的 序列化器也使用了客戶端自帶的org.apache.kaflca. common. serialization. StringSerializer, 除了用于 String 型別的序列化器,還有ByteArray、ByteBuffer、 Bytes、 Double、Integer、 Long這幾種類 型, 它們都實作了org.apache.kaflca. common. serialization. Serializer介面, 此介面有3個方法:

public void configure(Map<String, ?> configs, boolean isKey)
public byte[] serialize(String topic, T data)
public void close()

configure()方法用來配置當前類,serialize()方法用來執行序列化操作, 而close()方法用來關閉當前的序列化器, 一般情況下close()是個空方法, 如果實作了此方法, 則必須確保此方法的幕等性, 因為這個方法很可能會被KafkaProducer 呼叫多次, 生產者使用的序列化器和消費者使用的反序列化器是需要一一對應的, 如果生產者使用了 某種序列化器, 比如StringSerializer, 而消費者使用了另 一種序列化器, 比如IntegerSerializer,那么是無法決議出想要的資料的

kakfa支持配置自定義序列化:只需將KafkaProducer的value.serializer 引數設定為CompanySerializer類的全限定名即可,

四、磁區器

訊息在通過send()方法發往broker 的程序中, 有可能需要經過攔截器(Interceptor)、 序列 化器(Serializer)和磁區器(Partitioner)的一 系列作用之后才能被真正地發往 broker, 攔截器 一 般不是必需的, 而序列化器是必需的, 訊息 經過 序列化 之后就需要確定它發往的磁區 , 如果訊息ProducerRecord中指定了 partition欄位, 那么就不需要磁區器 的作用, 因為par巨巨on代表的就是所要發往的磁區號, 如果訊息ProducerRecord中沒有 指定par巨巨on欄位,那么就需要依賴磁區器,根據key 這個欄位來計算 partition 的值, 磁區器的作用 就是為訊息 分配磁區,
Kafka 中提供的默認磁區器是org.apache.kafka.clients.producer.intemals.DefaultPartitioner, 它 實作了org.apache.kafka.clients.producer.Partitioner 介面, 這個介面中定義了2個方法, 具體如下所示,
public int partition(S七ring topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster); 
public void close()
其中 partition ()方法用來計算磁區號,回傳值為 int 型別, partition ()方法中的引數分別表示 主題 、鍵、序列化后的 鍵、值、序列 后的值,以及集群的元資料資訊,通過這些資訊 可以實 現功能豐富的磁區器,c lose() 方法在關閉磁區器的時候用來回收 一些資源
在默認磁區器 DefaultPartitioner 的實作中, close() 是空方法,而在 partition ()方法中定義了 主要的磁區分配邏輯 如果 ke 不為 null ,那 么默認的磁區器會對 key 進行哈 希(采用 MurmurHash2 演算法 ,具備高運算性能及 低碰 撞率),最終根據得到 哈希值來 算磁區號,擁 有相同 key 的訊息會被寫入同一個磁區 如果 key為 null ,那么訊息將會以輪詢的方式發往主 題內的各個可用磁區,
如果key不為null,那么磁區號會是所有磁區中的任意一個,如果為null則僅會為可用磁區中的任意一個
除了使用 Kafka 提供的默認磁區器進行磁區分配,還可以使用自定義的磁區器,只需同 DefaultPartitioner 一樣實 Partitioner 介面 即可 ,默認 磁區器在 key null 時不 會選擇 非可用 的磁區,我們可以通過自 定義 的分 區器 DemoPartitioner 打破這 限制

五、生產者攔截器

攔截器( Interceptor )是早在 Kafka 0.10.0.0 中就 已經 引入的 個功能, Kafka 一共有 兩種攔 截器 生產者攔截器和消費者攔截器
生產者攔截器既 可以用 來在訊息發送前 一些準備作業 比如按照某 個規則過率 不符 合要 的消 息、修改訊息 的內容等, 也可以 用來在發送回 調邏輯前做一 些定 制化的需 求,比如統計 類作業,
生產者攔截器 使用 很方便,主要是自定義實作 org apache.kafka. clients . producer. Producerlnterceptor 介面, Producer Interceptor 口中包含三 個方法
public ProducerRecord<K, V> onSend (ProducerRecord<K, V> record ); 
public void onAcknowledgement(RecordMetadata metadata , Exception exception ); 
public void close() ;

KafkaProducer 在將訊息序列化和計算磁區 前會調 生產者攔截器 onSend() 方法來對消息進行相應 定制化操作,一般來說最好不要修改訊息 ProducerRecord 的topic和partition 等資訊,如果要修改,則需確保對其有準確的判斷,否則會與預想的效果出現偏 差,比如修改 key 不僅會影響磁區的計算,同樣會影響 broker 端日志壓縮( Log Compaction) 的功能

KafkaProducer 會在訊息被應 答( A cknowledgement )之前或訊息發送失 敗時呼叫生產者攔 截器的 onAcknowledgement () 方法,優先于用戶設定的 Ca llback 之前執行,這個方法運行在 Producer I/O 執行緒中,所以這個方法中實作的代碼邏輯越簡單越好 則會 影響訊息的發送 速度,
close () 方法主要用于在關閉攔截器時執行一些資源的清理作業,在這 個方 法中拋 出的異 常都會被捕獲并記錄到日志中,但并不會再向上傳遞,

六、RecordAccumulator訊息累加器(緩沖區)

主要用來快取訊息 Sender 執行緒可以批量發送,進 減少網路傳輸 的資源消耗以提升性能 RecordAccumulator 快取的大 小可以通過生產者客戶端引數 buffer. memory 配置,默認值為 33554432B ,即 32MB 如果生 產者發送 訊息的速度超過發 送到服務器的速度 ,則 會導致生產者空間不足,這個時候 KafkaPro ducer send () 方法呼叫要么 被阻塞,要么拋出例外,這個取決于引數 max block ms 的配置,此引數的默認值為 6 0000,及 60秒
內部結構
主執行緒中發送過來的訊息都會被迫加到 RecordAccumulator 的某個雙端佇列( Deque )中, 在RecordAccumulator 的內部為每個磁區都維護了 個雙端佇列,佇列中的內容就是 Prod uc e r Batch ,即 Deque ProducerBatch >,訊息寫入快取 時,追加到雙端佇列的尾部: Sender 讀取訊息時 ,從 雙端佇列的頭部讀取,注意 Producer Batch 不是 Producer Record, ProducerBatch 中可以包含一至多個 Producer Record 通俗地說, ProducerRecord 是生產者中創建的訊息,而 Producer Batch 是指一個訊息批次 ProducerRecord 會被包含在 Pro ducer Batch 中,這樣可以使字 節的使用更加緊湊,與此同時,將較小的 Producer Record 湊成一個較大 ProducerBatch ,也 可以減少網路請求的次數以提升整體的吞吐量 Producer Batch 和訊息的具體格式有關

七、kafka發送訊息

, 發送訊息主要有三種模式: 發 后即忘(fire-and-forget)、 同步(sync)及異步Casync)

send()本身是異步的,但是呼叫send()后可以通過代碼實作同步還是異步,異步:一旦訊息被保存在等待發送的訊息快取中,此方法就立即回傳,這樣并行發送多條訊息而不阻塞去等待每一條訊息的回應,當然也可以使用同步發送但是性能差,不推薦

簡單同步發送實作方法:

在呼叫send方法后直接呼叫get方法強行堵塞

 RecordMetadata metadata = producer.send(record).get();

異步實作

通常我們并不關心發送成功的情況,更多關注的是失敗的情況,因此 Kafka 提供了異步發送和回呼函式, 代碼如下:

producer.send(record, new Callback() {
        @Override
        public void onCompletion(RecordMetadata metadata, Exception exception) {
            if (exception != null) {
                System.out.println("進行例外處理");
            } else {
                System.out.printf("topic=%s, partition=%d, offset=%s \n",
                        metadata.topic(), metadata.partition(), metadata.offset());
            }
        }
    });

八、重要的生產者引數

在kafka生產者中大部分的引數都有合理的默認值,一般不需要修改它們

1.acks

這個引數用來指定磁區中必須要有多少個副本收到這條訊息,之后生產者才會認為這條訊息是成功寫入的, acks 是生產者客戶端中一個非常 重要的 引數 ,它涉及 訊息的可靠 性和吞吐量 之間的權衡acks 引數有3 種型別的值(都是字串型別)

(1)acks=1

默認值即為1 ,生產者發送訊息之后,只要磁區的 leader 副本成功寫入消 息,那么它就會收到來自服務端的成功回應,如果訊息無法寫入 leader 副本,比如在 leader副本崩潰、重新選舉新的 leader 副本的程序中,那么生產者就會收到一個錯誤的回應,為了避免訊息丟失,生產者可以選擇重發訊息,如果訊息寫入 leader 副本并 回傳成功回應給生產者,且在被其他 fo llo wer 副本拉取之前 leader 副本崩潰,那么此時訊息還是會丟失,因為新選舉的 leader 副本中并沒有這條對應的訊息 acks 設定為1,是訊息可靠性和吞吐量之間的折中方

(2)acks = 0

生產者發送消 息之后不需要等待任何服務端的回應 ,如果在訊息從發送到 寫入 Kafka 的程序中出現某些例外,導致 Kafka 并沒有收到這條訊息,那么生產者也無從得知,訊息也就丟失了,在其他配置環境相同的情況下, acks 設定為0 可以達到最大的吞吐量,

(3)acks = -1或acks =all

生產者在消 息發送之后,需要等待 ISR 中的所有副本都成功寫入訊息之后才能夠收到來自服務端的成功回應,在其他配置環境相同的情況下, acks 設定為 (all )可以達到最強的可靠性,但這并不意味著訊息就一定可靠,因為 JSR 中可能只有 leader 副本,這樣就退化成了 acks=1 的情況,要獲得更高的訊息 可靠性需要配合 min.insync.replicas 引數的聯動

注意 acks 引數配置的值是一個字串型別,而不是整數型別

2.max.request.size

該引數用于控制生產者發送的請求大小,它可以指發送的單個訊息的最大值,kafka默認的發送一條訊息的大小是1M

3.retries 和 retry.backo.ms

發生錯誤后,訊息重發的次數,如果達到設定值,生產者就會放棄重試并回傳錯誤,默認是0,即在發生例外的時候不進行任何重試動作,訊息在從生產者發出到成功寫入服務器之前可能發生一些臨時性的例外, 比如網路抖動、 le der 副本的選舉等,這種例外往往是可以自行恢復的,生產者可以通過配置 retries 大于0值,以此通過內部重試來恢復而不是一昧地將例外拋給生產者的應用程式,但是不是所有例外都能處理,比如超過訊息最大值的例外

retry.backoff.ms用來設定兩次重試之間的間隔

4.compression.type

這個引數用來指定訊息的壓縮方式,默認值為“ none ”,即默認情況下,訊息不會被壓縮, 該引數還可以配置為“ gzip snappy 和“ z4 對訊息進行壓縮可以極大地減少網路傳輸 、降低網路I/O,從而提 高整 體的性能 ,訊息壓 縮是 種使用時間換 間的優化 方式 ,如 果對 時延有一定的要求,則不推薦對訊息進行壓縮

5.linge .ms

這個引數用來指定生產者發送 ProducerBatch 之前等待更多訊息( ProducerRecord )加入 Producer Batch 時間,默認值為 ,生產者客戶端會在 ProducerBatch 填滿或等待時間超過 linger.ms 值時發迭出去,增大這個引數的值會增加訊息的延遲,但是同時能提升一定的吞吐量,

6. receive.buffer.bytes & send.buffer.byte

這個引數用來設定 Socket 接收訊息緩沖區( SO RE CBUF )的大小(滑動視窗協議),默認值接收視窗為 32KB,發生視窗為128KB,如果設定為 -1 ,則使用作業系統的默認值,如果 Producer Kafka 于不同的機房 則可以 適地調大這個引數值

7.timeout.ms, request.timeout.ms & metadata.fetch.timeout.ms

timeout.ms 指定了 borker 等待同步副本回傳訊息的確認時間;
request.timeout.ms 指定了生產者在發送資料時等待服務器回傳回應的時間;
metadata.fetch.timeout.ms 指定了生產者在獲取元資料(比如磁區首領是誰)時等待服務器回傳回應的時間,

轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/275130.html

標籤:其他

上一篇:K8S基礎及單節點服務的部署(etcd)

下一篇:Kubernetes(k8s)基礎簡介

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 面試突擊第一季,第二季,第三季

    第一季必考 https://www.bilibili.com/video/BV1FE411y79Y?from=search&seid=15921726601957489746 第二季分布式 https://www.bilibili.com/video/BV13f4y127ee/?spm_id_fro ......

    uj5u.com 2020-09-10 05:35:24 more
  • 第三單元作業總結

    1.前言 這應該是本學期最后一次寫作業總結了吧。總體來說,對作業的節奏也差不多掌握了,作業做起來的效率也更高了。雖然和之前的作業一樣,作業中都要用到新的知識,但是相比之前,更加懂得了如何利用工具以及資料。雖然之間卡過殼,但總體而言,這幾次作業還算完成的比較好。 2.作業程序總結 相比前兩個單元,此單 ......

    uj5u.com 2020-09-10 05:35:41 more
  • 北航OO(2020)第四單元博客作業暨課程總結博客

    北航OO(2020)第四單元博客作業暨課程總結博客 本單元作業的架構設計 在本單元中,由于UML圖具有比較清晰的樹形結構,因此我對其中需要進行查詢操作的元素進行了包裝,在樹的父節點中存盤所有孩子的參考。考慮到性能問題,我采用了快取機制,一次查詢后盡可能快取已經遍歷過的資訊,以減少遍歷次數。 本單元我 ......

    uj5u.com 2020-09-10 05:35:48 more
  • BUAA_OO_第四單元

    一、UML決議器設計 ? 先看下題目:第四單元實作一個基于JDK 8帶有效性檢查的UML(Unified Modeling Language)類圖,順序圖,狀態圖分析器 MyUmlInteraction,實際上我們要建立一個有向圖模型,UML中的物件(元素)可能與同級元素連接,也可與低級元素相連形成 ......

    uj5u.com 2020-09-10 05:35:54 more
  • 6.1邏輯運算子

    邏輯運算子 1. && 短路與 運算式1 && 運算式2 01.運算式1為true并且運算式2也為true 整體回傳為true 02.運算式1為false,將不會執行運算式2 整體回傳為false 03.只要有一個運算式為false 整體回傳為false 2. || 短路或 運算式1 || 運算式2 ......

    uj5u.com 2020-09-10 05:35:56 more
  • BUAAOO 第四單元 & 課程總結

    1. 第四單元:StarUml檔案決議 本單元采用了圖模型決議UML。 UML檔案可以抽象為圖、子圖、邊的邏輯結構。 在實作中,圖的節點包括類、介面、屬性,子圖包括狀態圖、順序圖等。 采用了三次遍歷UML元素的方法建圖,第一遍遍歷建點,第二、三次遍歷設定屬性、連邊,實作圖物件的初始化。這里借鑒了一些 ......

    uj5u.com 2020-09-10 05:36:06 more
  • 談談我對C# 多型的理解

    面向物件三要素:封裝、繼承、多型。 封裝和繼承,這兩個比較好理解,但要理解多型的話,可就稍微有點難度了。今天,我們就來講講多型的理解。 我們應該經常會看到面試題目:請談談對多型的理解。 其實呢,多型非常簡單,就一句話:呼叫同一種方法產生了不同的結果。 具體實作方式有三種。 一、多載 多載很簡單。 p ......

    uj5u.com 2020-09-10 05:36:09 more
  • Python 資料驅動工具:DDT

    背景 python 的unittest 沒有自帶資料驅動功能。 所以如果使用unittest,同時又想使用資料驅動,那么就可以使用DDT來完成。 DDT是 “Data-Driven Tests”的縮寫。 資料:http://ddt.readthedocs.io/en/latest/ 使用方法 dd. ......

    uj5u.com 2020-09-10 05:36:13 more
  • Python里面的xlrd模塊詳解

    那我就一下面積個問題對xlrd模塊進行學習一下: 1.什么是xlrd模塊? 2.為什么使用xlrd模塊? 3.怎樣使用xlrd模塊? 1.什么是xlrd模塊? ?python操作excel主要用到xlrd和xlwt這兩個庫,即xlrd是讀excel,xlwt是寫excel的庫。 今天就先來說一下xl ......

    uj5u.com 2020-09-10 05:36:28 more
  • 當我們創建HashMap時,底層到底做了什么?

    jdk1.7中的底層實作程序(底層基于陣列+鏈表) 在我們new HashMap()時,底層創建了默認長度為16的一維陣列Entry[ ] table。當我們呼叫map.put(key1,value1)方法向HashMap里添加資料的時候: 首先,呼叫key1所在類的hashCode()計算key1 ......

    uj5u.com 2020-09-10 05:36:38 more
最新发布
  • 【中介者設計模式詳解】C/Java/JS/Go/Python/TS不同語言實作

    * 中介者模式是一種行為型設計模式,它可以用來減少類之間的直接依賴關系,
    * 將物件之間的通信封裝到一個中介者物件中,從而使得各個物件之間的關系更加松散。
    * 在中介者模式中,物件之間不再直接相互互動,而是通過中介者來中轉訊息。 ......

    uj5u.com 2023-04-20 08:20:47 more
  • 露天煤礦現場調研和交流案例分享

    他們集團的資訊化公司及研究院在一個礦區正在做智能礦山的統一平臺的 試點,專案投資大概1億,包括了礦山的各方面的內容,顯示得我們這次交流有點多余。他們2年前開始做智能礦山的規劃,有很多煤礦行業專家的加持,他們的描述是非常完美,但是去年底應該上線的平臺,現在還沒有看到影子。他們確實有很多場景需求,但是被... ......

    uj5u.com 2023-04-20 08:20:25 more
  • 《社區人員管理》實戰案例設計&個人案例分享

    設計是一個讓人夢想成真程序,開始編碼、測驗、除錯之前進行需求分析和架構設計,才能保證關鍵方面都做正確 ......

    uj5u.com 2023-04-20 08:20:17 more
  • 軟體架構生態化-多角色交付的探索實踐

    作為一個技術架構師,不僅僅要緊跟行業技術趨勢,還要結合研發團隊現狀及痛點,探索新的交付方案。在日常中,你是否遇到如下問題 “ 業務需求排期長研發是瓶頸;非研發角色感受不到研發技改提效的變化;引入ISV 團隊又擔心質量和安全,培訓周期長“等等,基于此我們探索了一種新的技術體系及交付方案來解決如上問題。 ......

    uj5u.com 2023-04-20 08:20:10 more
  • 【中介者設計模式詳解】C/Java/JS/Go/Python/TS不同語言實作

    * 中介者模式是一種行為型設計模式,它可以用來減少類之間的直接依賴關系,
    * 將物件之間的通信封裝到一個中介者物件中,從而使得各個物件之間的關系更加松散。
    * 在中介者模式中,物件之間不再直接相互互動,而是通過中介者來中轉訊息。 ......

    uj5u.com 2023-04-20 08:19:44 more
  • 露天煤礦現場調研和交流案例分享

    他們集團的資訊化公司及研究院在一個礦區正在做智能礦山的統一平臺的 試點,專案投資大概1億,包括了礦山的各方面的內容,顯示得我們這次交流有點多余。他們2年前開始做智能礦山的規劃,有很多煤礦行業專家的加持,他們的描述是非常完美,但是去年底應該上線的平臺,現在還沒有看到影子。他們確實有很多場景需求,但是被... ......

    uj5u.com 2023-04-20 08:19:07 more
  • 《社區人員管理》實戰案例設計&個人案例分享

    設計是一個讓人夢想成真程序,開始編碼、測驗、除錯之前進行需求分析和架構設計,才能保證關鍵方面都做正確 ......

    uj5u.com 2023-04-20 08:18:57 more
  • 軟體架構生態化-多角色交付的探索實踐

    作為一個技術架構師,不僅僅要緊跟行業技術趨勢,還要結合研發團隊現狀及痛點,探索新的交付方案。在日常中,你是否遇到如下問題 “ 業務需求排期長研發是瓶頸;非研發角色感受不到研發技改提效的變化;引入ISV 團隊又擔心質量和安全,培訓周期長“等等,基于此我們探索了一種新的技術體系及交付方案來解決如上問題。 ......

    uj5u.com 2023-04-20 08:18:49 more
  • 05單件模式

    #經典的單件模式 public class Singleton { private static Singleton uniqueInstance; //一個靜態變數持有Singleton類的唯一實體。 // 其他有用的實體變數寫在這里 //構造器宣告為私有,只有Singleton可以實體化這個類! ......

    uj5u.com 2023-04-19 08:42:51 more
  • 【架構與設計】常見微服務分層架構的區別和落地實踐

    軟體工程的方方面面都遵循一個最基本的道理:沒有銀彈,架構分層模型更是如此,每一種都有各自優缺點,所以請根據不同的業務場景,并遵循簡單、可演進這兩個重要的架構原則選擇合適的架構分層模型即可。 ......

    uj5u.com 2023-04-19 08:42:41 more