我正在嘗試使用 AKKA 從 kafka 主題中消費并使用 KPL 發送到 Kinesis Stream。
我已經為 kafka 主題創建了一個來源。
ActorSystem actorSystem = ActorSystemFactory.getClassicActorSystem();
KafkaService kafkaService = new kafkaService(actorSystem);
Source<Message, Consumer.Control> kafkaSource = kakfaService.createSource(runtimeConfiguration.getSourceKafkaTopic());
現在我想使用某種型別的接收器將記錄發送到 Kinesis Stream。“本機”akka kinesis sink 的問題在于它不支持 KPL。我正在嘗試使用此依賴項
<!-- https://mvnrepository.com/artifact/com.github.j5ik2o/akka-kinesis-kpl -->
<dependency>
<groupId>com.github.j5ik2o</groupId>
<artifactId>akka-kinesis-kpl_2.13</artifactId>
<version>1.0.252</version>
</dependency>
但它只有三個類可能與我的目的相關。KPLFlow、KPLFlowSettings 和 KPLFlowStage。
我從來沒有在akka中使用過Flows,有沒有辦法從流中創建一個接收器,或者最多能夠使用流來設定一種發送到正確運動流的方法?
KPLFlowStage
extends GraphStageWithMaterializedValue[FlowShape[UserRecord, UserRecordResult], Future[KinesisProducer]]
從搞砸它,看起來 KPLFlow 正在期待 UserRecord,這意味著記錄已經準備好發送到 kinesis 流。
uj5u.com熱心網友回復:
沒有使用 Kinesis 的經驗,但您可以將 a Flow<In, Out, Mat>(例如 a Flow<UserRecord, UserRecordResult, Future<KinesisProducer>>)轉換為Sink<In, Pair<Out, CompletionStage<Done>>with
Sink<UserRecord, Pair<scala.concurrent.Future<KinesisProducer>, CompletionStage<Done>>> sink =
flow.toMat(Sink.ignore(), Keep.both())
由于KPLFlowStage似乎具體化為 ScalaFuture并引入了 Scala 2.13 的庫,這應該給你一對CompletionStages:
import scala.jdk.javaapi.FutureConverters
Sink<UserRecord, Pair<CompletionStage<KinesisProducer>, CompletionStage<Done>>> sink =
flow.mapMaterializedValue(scalaFuture -> FutureConverters.asJava(scalaFuture))
.toMat(Sink.ignore(), Keep.both())
轉載請註明出處,本文鏈接:https://www.uj5u.com/qiye/510550.html
