我想定義一個使用 Reactor Kafka 消耗 kafka 并寫入 MongoDB 的流,并且只有在成功時才會將 ID 寫入 Kafka。我正在將 Project Reactor 與 Spring Integration JavaDSL 一起使用,并且我希望有一個FlowBuilder類可以在較高級別上定義我的管道。我目前有以下方向:
public IntegrationFlow buildFlow() {
return IntegrationFlows.from(reactiveKafkaConsumerTemplate)
.publishSubscribeChannel(c -> c
.subscribe(sf -> sf
.handle(MongoDb.reactiveOutboundChannelAdapter()))
.handle(writeToKafka)
.get();
}
我在檔案中看到支持另一種方法,該方法也適用于 Project Reactor。這種方法不包括使用IntegrationFlows. 這看起來像這樣:
@MessagingGateway
public static interface TestGateway {
@Gateway(requestChannel = "promiseChannel")
Mono<Integer> multiply(Integer value);
}
...
@ServiceActivator(inputChannel = "promiseChannel")
public Integer multiply(Integer value) {
return value * 2;
}
...
Flux.just("1", "2", "3", "4", "5")
.map(Integer::parseInt)
.flatMap(this.testGateway::multiply)
.collectList()
.subscribe(integers -> ...);
我想知道在使用這兩個庫時更推薦的處理方式是什么。我想知道如何在第二個示例中使用 Reactive MongoDB 配接器。我不確定如果沒有IntegrationFlows包裝器,第二種方法是否可行。
uj5u.com熱心網友回復:
專為高級最終用戶 API 設計,@MessagingGateway以盡可能隱藏訊息傳遞。因此,當您開發其邏輯時,目標服務不受任何訊息傳遞抽象的影響。
可以使用這樣的介面配接器IntegrationFlow,您應該將其視為常規服務激活器,因此它看起來像這樣:
.handle("testGateway", "multiply", e -> e.async(true))
使async(true)此服務激活器訂閱回傳的Mono. 您可以省略它,然后您自己在下游訂閱它,因為這Mono將是payload流中下一條訊息的確切訊息。
如果你想要相反的東西:IntegrationFlow從 中呼叫 an Flux,就像 that flatMap(),然后考慮使用toReactivePublisher()流定義中的運算子回傳 aPublisher<?>并將其宣告為 bean。在這種情況下,最好不要使用 that MongoDb.reactiveOutboundChannelAdapter(),而只是ReactiveMongoDbStoringMessageHandler讓它回傳Mono以傳播到 that Publisher。
另一方面,如果您想將其@MessagingGateway與Monoreturn 一起使用,但仍從中呼叫 a ReactiveMongoDbStoringMessageHandler,則將其宣告為 bean 并用 that 標記@ServiceActivator。
我們還可以ExpressionEvaluatingRequestHandlerAdvice在特定端點上捕獲錯誤(或成功)并分別處理它們:https ://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#expression-建議
我認為您正在尋找的是這樣的:
public IntegrationFlow buildFlow() {
return IntegrationFlows.from(reactiveKafkaConsumerTemplate)
.handle(reactiveMongoDbStoringMessageHandler, "handleMessage")
.handle(writeToKafka)
.get();
}
注意.handle(reactiveMongoDbStoringMessageHandler)-它不是關于一個MongoDb.reactiveOutboundChannelAdapter()。因為這個包裝ReactiveMessageHandler成一個ReactiveMessageHandlerAdapter自動訂閱。您需要的是看起來更像是您希望將其Mono<Void>回傳到您自己的控制中,因此您可以將其用作服務的輸入writeToKafka并自己在那里訂閱并按照您的解釋處理成功或錯誤。關鍵是,使用 Reactive Stream,我們無法提供命令式錯誤處理。該方法與任何異步 API 使用相同。因此,我們也將錯誤發送到errorChannelfor Reactive Streams。
我們可能可以通過讓像您這樣的用例開箱即用來改進這MongoDb.reactiveOutboundChannelAdapter()一點。returnMono(true/false)
轉載請註明出處,本文鏈接:https://www.uj5u.com/qukuanlian/419246.html
標籤:
