嘗試使用 Kafka 活頁夾從云流中讀取 CloudEvents 的批處理訊息。如果我使用任何帶有自定義序列化器/反序列化器的自定義類,它作業正常,但使用 cloudevents 訊息不會出現。
spring
cloud:
function.definition: consumer
stream:
bindings:
producer-out-0:
destination: audit
group: audit-producer
producer:
useNativeEncoding: true
consumer-in-0:
destination: audit
group: audit-consumer
consumer:
batch-mode: true
useNativeDecoding: true
kafka:
binder:
brokers: localhost:9092
consumer-properties:
max.poll.records: 5
fetch.min.bytes: 10000
fetch.max.wait.ms: 10000
bindings:
producer-out-0:
producer:
configuration:
cloudevents:
serializer:
encoding: STRUCTURED
event_format: application/cloudevents json
key.serializer: org.apache.kafka.common.serialization.StringSerializer
# value.serializer: com.sagar.audit.watcher.domain.MessageSerializer
value.serializer: io.cloudevents.kafka.CloudEventSerializer
consumer-in-0:
consumer:
configuration:
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
# value.deserializer: com.sagar.audit.watcher.domain.MessageDeserializer
value.deserializer: io.cloudevents.kafka.CloudEventDeserializer
而消費者我嘗試了 List<?> 和 single
@Bean
public Consumer<List<CloudEvent>> consumer() {
System.out.println("inside consumer");
//return auditMessage -> System.out.println("data at loop--" thread " -- " auditMessage);
return s -> s.forEach(auditMessage -> System.out.println("data at loop--" thread " -- " auditMessage));
}
如果我只使用消費者,我會收到以下錯誤,這意味著反序列化正在發生,但不知何故訊息沒有發送給消費者。
2022-10-26 20:31:24.070 WARN [,8289fada18f22581,831ea94d13ef311e] 64368 --- [container-0-C-1] s.c.f.c.c.SmartCompositeMessageConverter : Failure during type conversion by org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter@3bf97caf. Will try the next converter.
org.springframework.messaging.converter.MessageConversionException: Could not read JSON: Cannot construct instance of `io.cloudevents.CloudEvent` (no Creators, like default constructor, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information
at [Source: (String)"[CloudEvent{id='hello', source=http://localhost, type='example.kafka', datacontenttype='application/json', data=JsonCloudEventData{node={"id":null,"name":"sagar-cloud-1"}}, extensions={}}, CloudEvent{id='hello', source=http://localhost, type='example.kafka', datacontenttype='application/json', data=JsonCloudEventData{node={"id":null,"name":"sagar-cloud-2"}}, extensions={}}]"; line: 1, column: 1]; nested exception is com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `io.cloudevents.CloudEvent` (no Creators, like default constructor, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information
at [Source: (String)"[CloudEvent{id='hello', source=http://localhost, type='example.kafka', datacontenttype='application/json', data=JsonCloudEventData{node={"id":null,"name":"sagar-cloud-1"}}, extensions={}}, CloudEvent{id='hello', source=http://localhost, type='example.kafka', datacontenttype='application/json', data=JsonCloudEventData{node={"id":null,"name":"sagar-cloud-2"}}, extensions={}}]"; line: 1, column: 1]
at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertFromInternal(MappingJackson2MessageConverter.java:237) ~[spring-messaging-5.3.23.jar:5.3.23]
at org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter.convertFromInternal(ApplicationJsonMessageMarshallingConverter.java:115) ~[spring-cloud-stream-3.2.5.jar:3.2.5]
at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:185) ~[spring-messaging-5.3.23.jar:5.3.23]
at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:176) ~[spring-messaging-5.3.23.jar:5.3.23]
Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `io.cloudevents.CloudEvent` (no Creators, like default constructor, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information
at [Source: (String)"[CloudEvent{id='hello', source=http://localhost, type='example.kafka', datacontenttype='application/json', data=JsonCloudEventData{node={"id":null,"name":"sagar-cloud-1"}}, extensions={}}, CloudEvent{id='hello', source=http://localhost, type='example.kafka', datacontenttype='application/json', data=JsonCloudEventData{node={"id":null,"name":"sagar-cloud-2"}}, extensions={}}]"; line: 1, column: 1]
at com.fasterxml.jackson.databind.exc.InvalidDefinitionException.from(InvalidDefinitionException.java:67) ~[jackson-databind-2.13.4.2.jar:2.13.4.2]
uj5u.com熱心網友回復:
您似乎正在使用 Cloud Events SDK,不確定您從中獲得了什么價值,因為沒有它您可以做任何事情(以及更多)。這是我討論的兩個帖子:
https://spring.io/blog/2020/12/10/cloud-events-and-spring-part-1 https://spring.io/blog/2020/12/23/cloud-events-and-spring -第2部分
無論如何,如果您仍想依賴 CloudEvent 等 SDK 型別,您可能會缺少依賴項:
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-spring</artifactId>
<version>2.3.0</version>
</dependency>
我們也有基于 SDK 的示例,因此可能會有所幫助。 https://github.com/spring-cloud/spring-cloud-function/blob/main/spring-cloud-function-samples/function-sample-cloudevent-sdk/pom.xml
否則,你能做的最好的事情就是創建一個重現問題的小應用程式,將其推送到 github 并發送鏈接,以便我們進行查看。
轉載請註明出處,本文鏈接:https://www.uj5u.com/qukuanlian/522037.html
上一篇:FeignExceptioncom.fasterxml.jackson.databind.exc.InvalidDefinitionException:無法構造`org.springframework
