我想定義一個寫入 MongoDB 的流,并且只有在成功時才會將 ID 寫入 Kafka。我正在使用 JavaDSL,我希望有一個FlowBuilder類可以在較高級別定義我的管道。我正在尋找使我能夠撰寫流程的功能,例如:
public IntegrationFlow buildFlow() {
return IntegrationFlows.from(reactiveKafkaConsumerTemplate)
.process(writeToMongo) // <-- Searching for this kind of function
.handle(writeToKafka)
.get();
}
我已經看到Apache Camel 的作業原理完全一樣,我想知道 Spring Integration 是否也有一個簡單而好的解決這個基本問題的方法。
uj5u.com熱心網友回復:
您正在尋找的是publishSubscribeChannel()具有多個訂閱者的能力。默認情況下,如果沒有在通道上配置執行器,下一個訂閱者只會在前一個訂閱者之后并且只有當這個訂閱者成功時才會被呼叫。
它可能看起來類似于您用 that 表達的內容process():
public IntegrationFlow buildFlow() {
return IntegrationFlows.from(reactiveKafkaConsumerTemplate)
.publishSubscribeChannel(c -> c
.subscribe(sf -> sf
.handle(MongoDb.reactiveOutboundChannelAdapter()))
.handle(writeToKafka)
.get();
}
另一種選擇是 a gateway(),但是您需要從那里回傳一些東西才能繼續。在 Spring Integration 中,如果沒有回復,則流程將停止。它沒有重用infor outif no的概念out。
轉載請註明出處,本文鏈接:https://www.uj5u.com/qukuanlian/419253.html
標籤:
上一篇:將AuthorizationHeaderBearerAuthentication添加到SpringBootController
