我是Spring Boot @kafkaListener的新手。我的應用程式每秒收到近20萬條訊息的主題。我想把訊息監聽器和訊息的處理分開。
我如何將java.util.concurrent.BlockingQueue與@kafkaListener一起使用?我可以通過使用CompletableFuture來使用它嗎?
任何示例代碼都會有更多幫助。
uj5u.com熱心網友回復:
這可能不是你要找的答案,但你問的是資訊傳遞中的一個反模式。將資料從代理處轉移到記憶體中的意義何在?你完全消除了訊息傳遞中間件的一個目的,即內部佇列的轉移。另一方面,你只是把問題推得更遠一些,但當你最終無法從內部佇列中消費它們時,太多的訊息仍然會存在。而最終,它將會出現記憶體不足的錯誤。
最好的辦法是考慮以最快的速度處理它們,并且在當前批次完成之前不要從Kafka主題中提取更多資料。這樣一來,記錄就不會丟失,因為它們仍然保留在代理上,而且當記憶體不足時也不會出現任何故障。
這確實是訊息中間件最初的重點:區分生產者和消費者,讓他們以自己的節奏做自己的事情。未被處理的內容被存盤在代理上,并可在以后被消費。
uj5u.com熱心網友回復:
我相信你想讓你的消費者實施管道化。在像你這樣的情況下,實作這一點并不罕見。為什么呢?好吧,KafkaConsumer的不足之處在于,在不考慮處理時間的情況下,解壓/反序列化會很耗時。由于這些操作都堆積在一個執行緒后面,因此理想的做法是將輪詢與處理分開,而這是通過幾個緩沖區實作的。
一種方法是將輪詢與處理分開。
一種方法可以做到這一點。
您的 EventReceiver 為輪詢啟動了一個執行緒。該執行緒將做你一直在做的事情,但不是為每個事件啟動監聽器,而是將事件傳遞給一個receivedEvents緩沖區,該緩沖區可以是BlockingQueue<RecieveEvent>。所以在for回圈中,你把每條記錄都傳遞給阻塞佇列。一旦for回圈結束,這個執行緒將利用另一個緩沖區,比如Queen<Map<TopicPartition, OffsetAndMetadata>>--它將提交proccessingThread成功處理的偏移量。
接下來,你的EventReceiver將啟動另一個執行緒--processingThread。這將處理從緩沖區中提取記錄,向該接收器的所有監聽器發射事件,然后更新 Queues 狀態,以便 pollingThread 提交。
為什么處理執行緒不直接提交事件,而要將其傳回給輪詢執行緒?這是因為KafkaConsumer要求呼叫.poll()的執行緒應該是呼叫consumer.commitAsync(...)的執行緒,否則你會得到一個并發例外。
這種方法在啟用自動提交的情況下并不適用。
關于如何使用Spring Kafka實作這一目標,我并不完全確定。然而,我知道Spring Kafka將EventReceiver與EventListener (@KafkaListener)分開,這是將底層的kafka作業與業務邏輯分開。從理論上講,你必須調整他們的實作,但我認為在沒有Spring Kafka庫的情況下實作這個庫會更容易。
轉載請註明出處,本文鏈接:https://www.uj5u.com/qianduan/332559.html
標籤:
下一篇:如何擴展TextButton?
