我有一個應用程式,它連接到一個pub/sub,并在pub/sub訂閱發布時處理訊息。我希望能夠將這些訊息放入一個佇列通道,以避免一次處理大量的訊息。然而,當我試圖添加一個佇列通道時,我得到了以下錯誤?所以在我看來,一條訊息到達inboundChannelAdaptor,將訊息輸出到Queue Channel,然后messageReciever在QueueChannel中提取并操作這些訊息?
java.lang.IllegalArgumentException: 沒有定義用于基于注釋的端點的輪詢器,并且在背景關系中沒有默認的輪詢器可用。
at org.springframework.util.Assert.notNull(Assert.java:201) ~[spring-core-5. 2.8.RELEASE.jar:5.2.8.RELEASE]
在org.springframework.integration.config.annotation.AbstractMethodAnnotationPostProcessor.configurePollingEndpoint(AbstractMethodAnnotationPostProcessor.java:435)。~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
在org.springframework.integration.config.annotation.AbstractMethodAnnotationPostProcessor.doCreateEndpoint(AbstractMethodAnnotationPostProcessor.java:377) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
在org.springframework.integration.config.annotation.AbstractMethodAnnotationPostProcessor.createEndpoint(AbstractMethodAnnotationPostProcessor.java:367) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
在org.springframework.integration.config.annotation.AbstractMethodAnnotationPostProcessor.postProcess(AbstractMethodAnnotationPostProcessor.java:172) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
在org.springframework.integration.config.annotation.MessagingAnnotationPostProcessor.postProcessMethodAndRegisterEndpointIfAny(MessagingAnnotationPostProcessor. java:230) ~[spring-integration-core-5.3. 2.RELEASE.jar:5.3.2.RELEASE]
在org.springframework.integration.config.annotation.MessagingAnnotationPostProcessor.lambda$processAnnotationTypeOnMethod$1(MessagingAnnotationPostProcessor. java:220) ~[spring-integration-core-5.3.2.RELEASE.jar:5.3.2.RELEASE]
at java.base/java.util.ArrayList.forEach(ArrayList.java:1541) ~[na:na]
在org.springframework.integration.config.annotation.MessagingAnnotationPostProcessor.afterSingletonsInstantiated(MessagingAnnotationPostProcessor. java:141) ~[spring-integration-core-5.3. 2.RELEASE.jar:5.3.2.RELEASE]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:914) ~[spring-beans-5. 2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:879) ~[spring-text-5. 2.8.RELEASE.jar:5.2.8.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:551) ~[spring-text-5. 2.8.RELEASE.jar:5.2.8.RELEASE]
在org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:143) ~[spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:758) ~[spring-boot-2。 3.3.RELEASE.jar:2.3.3.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:750) ~[spring-boot-2. 3.3.RELEASE.jar:2.3.3.RELEASE]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) ~[spring-boot-2. 3.3.RELEASE.jar:2.3.3.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:315) ~[spring-boot-2。 3.3.RELEASE.jar:2.3.3.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1237) ~[spring-boot-2。 3.3.RELEASE.jar:2.3.3.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) ~[spring-boot-2。 3.3.RELEASE.jar:2.3.3.RELEASE]
at Application.main(Application.java:65) ~[classes/:na]
在java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
在java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
at org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49) ~[spring-boot-devtools-2. 3.3.RELEASE.jar:2.3.3.RELEASE]
下面是我的實作:
public MessageChannel inputMessageQueenChannel() {
return new QueueChannel(50)。
}
public PubSubInboundChannelAdapter inboundChannelAdapter(
MessageChannel messageChannel。
PubSubTemplate pubSubTemplate) {
PubSubInboundChannelAdapter adapter =
new PubSubInboundChannelAdapter(pubSubTemplate,'sub-one') 。
adapter.setOutputChannel(messageChannel);
adapter.setAckMode(AckMode.MANUAL);
adapter.setPayloadType(String.class)。
return配接器。
}
public void messageReceiver(
字串payload。
BasicAcknowledgeablePubSubMessage message) {
log.info("Payload: " payload);
message.ack()。
}
uj5u.com熱心網友回復:
佇列通道本身不做任何事情,除非它將發送的訊息存盤到內部的訊息存盤中。為了能夠從這個佇列中接收訊息,你需要定期地poll這樣一個佇列。為此,我們使用了PollingConsumer模式,但是它本身不能做任何事情:你需要告訴它如何輪詢該佇列。所以,必須提供輪詢器。參見該@ServiceActivator注解的poller屬性。或者你可以通過PollerMetadata bean定義提供一個全域默認的,其名稱為PollerMetadata.DEFAULT_POLLER。參見檔案中的更多資訊。https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#endpoint-pollingconsumer.
也請看這個問題和它的答案。Spring Integration 沒有為端點定義投票器
轉載請註明出處,本文鏈接:https://www.uj5u.com/qianduan/307781.html
標籤:
上一篇:在ASP.NETMVC中顯示從命令列行程中獲得的資料
下一篇:在SQL中使用連接操作
