我在嘗試理解使用 Spring Boot webflux 用 Kotlin 撰寫的代碼庫并使用 spring-kafka 從 kafka 主題中讀取訊息時遇到了一些困難。
控制器的簽名如下:
import org.springframework.kafka.annotation.KafkaListener
import reactor.core.Disposable
interface CreationEventConsumer {
@KafkaListener(
topics = ["\${topic.east}", "\${topic.west}"],
containerFactory = "creationFactory"
)
fun readEvent(record: String): Disposable
}
正如您在簽名中看到的,方法 readEvent 回傳一個Disposable。
所以我不明白為什么考慮到這只是一種單向流,函式必須回傳一些東西,spring 會對此做些什么嗎?有什么好處嗎?
該方法的實作如下:
override fun readEvent(record: String): Disposable {
logger.info("$INCOMING")
return creationEventService.process(record)
.doOnError {
logger.error("error")
}.retry(3)
.subscribe()
}
creationEventService.process ( record)呼叫回傳一個 Flux
我還有更多問題,因為有一個特殊的API增加了對 kafka 反應式編程的支持,但這個專案使用的是spring-kafka。
謝謝!
uj5u.com熱心網友回復:
對我來說也沒有意義;Spring 對Disposable.
如果沒有注釋,從 a 回傳一個值(任何型別的)@KafkaListener都不會做任何事情@SendTo——在這種情況下,“回復”被發送到 replyTo 標頭。如果沒有@SendTo,則忽略回傳的物件(帶有 DEBUG 日志)。
在這種情況下,將提交記錄的偏移量,而不管異步操作的成功/失敗。
你是對的; reactor-kafka 應該用于回應式作業負載,而不是 Spring 用于 Apache Kafka。
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/531730.html
