我們正在使用 Spring Cloud Stream 來監聽多個佇列。
spring:
cloud:
function:
definition: functionRouter;supplier
stream:
solace:
bindings:
functionRouter-in-0:
consumer:
...
bindings:
functionRouter-in-0:
destination: ${SOLACE_QUEUE},${SOLACE_DMQ}
由于我們期望佇列上有多種訊息格式,我們使用 functionRouter 并MessageRoutingCallback找到處理訊息的正確函式,因此我們可以利用 JSON 訊息的自動反序列化。簡化示例:
class MessageRouter : MessageRoutingCallback {
override fun functionDefinition(message: Message<*>): String {
val topic = message.headers[SolaceHeaders.DESTINATION]
...
val isDmqEligible = message.headers[SolaceHeaders.DMQ_ELIGIBLE] as Boolean
if (!isDmqEligible) {
return "receiveFromDMQ"
}
return "receiveAndSend"
}
}
由于兩個佇列使用相同的系結,我們不能像backOffMaxInterval只使用一個 functionRouter 那樣設定不同的消費者屬性。
有沒有使用類似方法(根據訊息頭選擇處理函式的路由器)但支持多個路由器作為入口點的解決方案?像這樣的東西:
spring:
cloud:
function:
definition: functionRouter1;functionRouter2;supplier
stream:
solace:
bindings:
functionRouter1-in-0:
consumer:
...
functionRouter2-in-0:
consumer:
...
bindings:
functionRouter1-in-0:
destination: ${SOLACE_QUEUE}
functionRouter2-in-0:
destination: ${SOLACE_DMQ}
我也愿意接受任何解決方案(使用spring cloud stream),我們可以從同一通道動態調度不同的訊息型別,${SOLACE_QUEUE}并且我們仍然可以利用自動反序列化。
uj5u.com熱心網友回復:
所以, RoutingFunction 并不是真正為這種情況設計的,因為正如你所說,它是單一的系結和路由是通過參考發生在其他函式上的(中間沒有佇列)。也就是說,您當然可以通過在您的背景關系中簡單地定義另一個具有不同名稱的路由器函式 bean 來執行您的建議:
@Bean
RoutingFunction functionRouter2(FunctionCatalog functionCatalog, FunctionProperties functionProperties,
BeanFactory beanFactory, @Nullable MessageRoutingCallback routingCallback) {
return new RoutingFunction(functionCatalog, functionProperties, new BeanFactoryResolver(beanFactory), routingCallback);
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/qukuanlian/419249.html
標籤:
上一篇:多重分離的Spring屬性系結
