我目前有一個spring cloud流應用,它有一個listener函式,主要是監聽某個topic,依次執行如下:
- 使用來自主題的訊息
- 將消費的訊息存盤在資料庫中
- 呼叫外部服務獲取一些資訊
- 處理資料
- 在DB中記錄結果
- 將訊息發送到另一個主題
- 確認訊息(我將確認模式設定為手動)
我們決定遷移到 Spring 云函式,并且我已經能夠使用Function介面完成上述幾乎所有步驟,源主題作為輸入,接收器主題作為輸出。
@Bean
public Function<Message<NotificationMessage>, Message<ValidatedEvent>> validatedProducts() {
return message -> {
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
notificationMessageService.saveOrUpdate(notificationMessage, 0, false);
String status = restEndpoint.getStatusFor(message.getPayload());
ValidatedEvent event = getProcessingResult(message.getPayload(), status);
notificationMessageService.saveOrUpdate(notificationMessage, 1, true);
Optional.ofNullable(acknowledgment).ifPresent(Acknowledgment::acknowledge);
return MessageBuilder
.withPayload(event)
.setHeader(KafkaHeaders.MESSAGE_KEY, event.getKey().getBytes())
.build();
}
}
我的問題與第 7 步中的例外處理有關(確認訊息)。如果我們確定訊息已成功發送到接收器佇列,我們??只會確認該訊息,否則我們不確認該訊息。
我的問題是,這樣的事情怎么能在Spring cloud函式中實作,特別是send方法完全依賴于Spring Framework(作為函式介面實作評估的結果)。
早些時候,我們可以通過 try/catch 來做到這一點
@StreamListener(value = NotificationMesage.INPUT)
public void onMessage(Message<NotificationMessage> message) {
try {
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
notificationMessageService.saveOrUpdate(notificationMessage, 0, false);
String status = restEndpoint.getStatusFor(message.getPayload());
ValidatedEvent event = getProcessingResult(message.getPayload(), status);
Message message = MessageBuilder
.withPayload(event)
.setHeader(KafkaHeaders.MESSAGE_KEY, event.getKey().getBytes())
.build();
kafkaTemplate.send(message);
notificationMessageService.saveOrUpdate(notificationMessage, 1, true);
Optional.ofNullable(acknowledgment).ifPresent(Acknowledgment::acknowledge);
}catch (Exception exception){
notificationMessageService.saveOrUpdate(notificationMessage, 1, false);
}
}
是否有在函式介面成功回傳后觸發的監聽器,類似于KafkaSendCallback但沒有指定模板
uj5u.com熱心網友回復:
基于 Oleg 上面提到的內容,如果您想嚴格恢復StreamListener代碼中的行為,您可以嘗試以下方法。您可以切換到使用者,而不是使用函式,然后KafkaTemplate像以前一樣使用發送出站。
@Bean
public Consumer<Message<NotificationMessage>> validatedProducts() {
return message -> {
try{
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
notificationMessageService.saveOrUpdate(notificationMessage, 0, false);
String status = restEndpoint.getStatusFor(message.getPayload());
ValidatedEvent event = getProcessingResult(message.getPayload(), status);
Message message = MessageBuilder
.withPayload(event)
.setHeader(KafkaHeaders.MESSAGE_KEY, event.getKey().getBytes())
.build();
kafkaTemplate.send(message); //here, you make sure that the data was sent successfully by using some callback.
//only ack if the data was sent successfully.
Optional.ofNullable(acknowledgment).ifPresent(Acknowledgment::acknowledge);
}
catch (Exception exception){
notificationMessageService.saveOrUpdate(notificationMessage, 1, false);
}
};
}
另一件值得研究的事情是使用 Kafka 事務,在這種情況下,如果它不能端到端地作業,則不會發生確認。基于 Spring for Apache Kafka 的基礎,Spring Cloud Stream binder 對此提供了支持。更多細節在這里。 這是關于此的 Spring Cloud Stream 檔案。
uj5u.com熱心網友回復:
Spring Cloud Stream 對函式一無所知。它與之前的訊息處理程式相同,因此與您之前使用的回呼方法相同的方法也適用于函式。所以也許你可以分享一些可以澄清你的意思的代碼?我也不明白..send 方法完全依賴于 Spring Framework 是什么意思..
轉載請註明出處,本文鏈接:https://www.uj5u.com/gongcheng/352834.html
