訊息在通過 send() 方法發往 broker 的程序中,有可能需要經過攔截、序列化器 和 磁區器 的一系列作用之后才能被真正地發往 broker,
攔截器是早在 Kafka 0.10.0.0 中就已經引入的一個功能,Kafka 一共有兩種攔截器:生產者攔截器和消費者攔截器,這里主要講述生產者攔截器的相關內容
生產者攔截器既可以用來在訊息發送前做一些準備作業,比如按照某個規則過濾不符合要求的訊息、修改訊息的內容等,也可以用來在發送回呼邏輯前做一些定制化的需求,比如統計類作業,
生產者攔截器的使用也很方便,主要是自定義實作 org.apache.kafka.clients.producer. ProducerInterceptor 介面,ProducerInterceptor 介面中包含3個方法:

KafkaProducer 在將訊息序列化和計算磁區之前會呼叫生產者攔截器的onSend() 方法來對訊息進行相應的定制化操作,一般來說最好不要修改訊息 ProducerRecord 的 topic、key 和 partition 等資訊,如果要修改,則需確保對其有準確的判斷,否則會與預想的效果出現偏差,比如修改 key 不僅會影響磁區的計算,同樣會影響 broker 端日志壓縮(Log Compaction)的功能,
KafkaProducer 會在訊息被應答(Acknowledgement)之前或訊息發送失敗時呼叫生產者攔截器的 onAcknowledgement() 方法,優先于用戶設定的 Callback 之前執行,這個方法運行在 Producer 的I/O執行緒中,所以這個方法中實作的代碼邏輯越簡單越好,否則會影響訊息的發送速度,
close() 方法主要用于在關閉攔截器時執行一些資源的清理作業,在這3個方法中拋出的例外都會被捕獲并記錄到日志中,但并不會再向上傳遞,
ProducerInterceptor 介面與 Partitioner 介面一樣,它也有一個同樣的父介面 Configurable,具體的內容可以參見 Partitioner 介面的相關介紹,
下面通過一個示例來演示生產者攔截器的具體用法,ProducerInterceptorPrefix 中通過 onSend() 方法來為每條訊息添加一個前綴“prefix1-”,并且通過 onAcknowledgement() 方法來計算發送訊息的成功率,ProducerInterceptorPrefix 類的具體實作如代碼

實作自定義的 ProducerInterceptorPrefix 之后,需要在 KafkaProducer 的配置引數 interceptor.classes 中指定這個攔截器,此引數的默認值為“”,示例如下:

然后使用指定了 ProducerInterceptorPrefix 的生產者連續發送10條內容為“kafka”的訊息,在發送完之后客戶端列印出如下資訊:

如果消費這10條訊息,會發現消費了的訊息都變成了“prefix1-kafka”,而不是原來的“kafka”,
KafkaProducer 中不僅可以指定一個攔截器,還可以指定多個攔截器以形成攔截鏈,攔截鏈會按照 interceptor.classes 引數配置的攔截器的順序來一一執行(配置的時候,各個攔截器之間使用逗號隔開),下面我們再添加一個自定義攔截器 ProducerInterceptorPrefixPlus,它只實作了 Interceptor 介面中的 onSend() 方法,主要用來為每條訊息添加另一個前綴“prefix2-”,具體實作如下:


此時生產者再連續發送10條內容為“kafka”的訊息,那么最終消費者消費到的是10條內容為“prefix2-prefix1-kafka”的訊息,如果將 interceptor.classes 配置中的兩個攔截器的位置互換:

那么最終消費者消費到的訊息為“prefix1-prefix2-kafka”,
如果攔截鏈中的某個攔截器的執行需要依賴于前一個攔截器的輸出,那么就有可能產生“副作用”,設想一下,如果前一個攔截器由于例外而執行失敗,那么這個攔截器也就跟著無法繼續執行,在攔截鏈中,如果某個攔截器執行失敗,那么下一個攔截器會接著從上一個執行成功的攔截器繼續執行,

CSDN認證博客專家
軟體架構師
B站網紅UP
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/257067.html
標籤:其他
上一篇:編輯安裝MySQL
