我有兩個使用 RabbitMQ 進行通信的 Spring Boot 服務。Service1 向 Service2 發送創建會話的請求。Service2 處理請求并回傳回應。Service1 應該處理回應。
請求會話的 Service1 方法:
public void startSession()
{
ListenableFuture<SessionCreationResponseDTO> sessionCreationResponse = sessionGateway.requestNewSession();
sessionCreationResponse.addCallback(response -> {
//handle success
}, ex -> {
// handle exception
});
}
在 Service1 上,我定義了 AsyncOutboundGateway,例如:
@Bean
public IntegrationFlow requestSessionFlow(MessageChannel requestNewSessionChannel,
AsyncRabbitTemplate amqpTemplate,
SessionProperties sessionProperties)
{
return flow -> flow.channel(requestNewSessionChannel)
.handle(Amqp.asyncOutboundGateway(amqpTemplate)
.exchangeName(sessionProperties.getRequestSession().getExchangeName())
.routingKey(sessionProperties.getRequestSession().getRoutingKey()));
}
在 Service2 上,我有接收這些訊息的流程:
@Bean
public IntegrationFlow requestNewSessionFlow(ConnectionFactory connectionFactory,
SessionProperties sessionProperties,
MessageConverter messageConverter,
RequestNewSessionHandler requestNewSessionHandler)
{
return IntegrationFlows.from(Amqp.inboundGateway(connectionFactory,
sessionProperties.requestSessionProperties().queueName())
.handle(requestNewSessionHandler)
.get();
Service2 處理那里的請求:
@ServiceActivator(async = "true")
public ListenableFuture<SessionCreationResponseDTO> handleRequestNewSession()
{
SettableListenableFuture<SessionCreationResponseDTO> settableListenableFuture = new SettableListenableFuture<>();
// Goes through asynchronous process of creating session and sets value in listenable future
return settableListenableFuture;
}
問題是 Service2 立即將 ListenableFuture 作為訊息負載回傳給 Service1,而不是等待 future 的結果并回傳結果。
如果我理解正確的檔案,檔案通過設定async引數@ServiceActivator為true,成功的結果應回傳以及例外的情況下,錯誤通道將被使用。
Probably I misunderstood documentation, so that I need to unpack ListenableFuture in flow of Service2 before returning it as response, but I am not sure how to achieve that.
I tried something with publishSubscribeChannel but without much luck.
uj5u.com熱心網友回復:
你的問題在這里:
.handle(requestNewSessionHandler)
這樣的配置看不到您@ServiceActivator(async = "true"),并將其用作常規阻塞服務激活器。
讓我們看看這是否對您有幫助:
.handle(requestNewSessionHandler, "handleRequestNewSession", e -> e.async(true))
最好這樣考慮:或僅注釋配置。或僅通過 Java DSL 編程。
轉載請註明出處,本文鏈接:https://www.uj5u.com/gongcheng/352365.html
標籤:spring spring-integration spring-integration-amqp
