我正在嘗試使用 Spring Cloud僅向 Kafka發布一條訊息,而沒有任何棄用的類/方法或注釋。我還希望能夠輕松更改有效負載。
因此,為了清楚起見,我試圖不使用已棄用的 @Output注釋,也不使用任何KafkaTemplate.
我的配置:
spring:
cloud:
stream:
bindings:
message-out-0:
destination: ${spring.application.name}
producer:
key:
serializer:
type: string
format: utf-8
charset: utf-8
value:
serializer:
type: string
format: utf-8
charset: utf-8
我的代碼 - 到目前為止我嘗試過的:
@Component
@RequiredArgsConstructor
public class ApplicationAnnouncer implements CommandLineRunner {
private final MessageService messageService;
@Override
public void run(String... args) throws Exception {
messageService.value = "Application started...";
messageService.message();
}
}
一種嘗試:
@Configuration
public class MessageService {
public Object value;
@Bean
public Supplier<Message<?>> message () {
return () -> MessageBuilder.withPayload(value).build();
}
}
另一種嘗試:
@Configuration
public class MessageService {
public Object value;
@Bean
public Supplier<Flux<?>> message () {
return () -> Flux.fromStream(Stream.generate(() -> {
try {
Thread.sleep(1000);
return value;
} catch (Exception e) {
// ignore
}
return null;
})).subscribeOn(Schedulers.elastic()).share();
}
}
兩次嘗試在控制臺使用者中的輸出:
Hello World!
Hello World!
Hello World!
Hello World! // ... Repeated every second
該檔案指出:
該框架提供了一個默認的輪詢機制(回答“誰?”的問題),它將觸發供應商的呼叫,并且默認情況下它會每秒執行一次(回答“多久一次?”的問題)。
但是如果我不希望它每秒輪詢一次呢?
我向 MessageService 提供訊息的方式很奇怪......它是配置嗎?或者它是一種服務?
我還沒有找到將ONE CUSTOMIZABLE MESSAGE推送到 Kafka 的最基本示例。
uj5u.com熱心網友回復:
您可以使用StreamBridge來訪問云流系結:
@Component
@RequiredArgsConstructor
public class ApplicationAnnouncer implements CommandLineRunner {
private final StreamBridge streamBridge;
@Override
public void run(String... args) throws Exception {
streamBridge.send("message-out-0", "Application started...");
}
}
第一個字串是從提供函式的 bean 派生的應用程式設定中提供的系結名稱。
您甚至不需要從中派生系結名稱的實際 bean 。在這種情況下,任何名稱都可以。
您可以在此處找到一些示例。
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/346836.html
下一篇:如何在URL中使用^讀取查詢引數
