我有一個閱讀 Message 的 Kafka 消費者。每條訊息都有一個 ID 和內容。
case class Message(id: String, content: String)
根據 ID,我想將 Message 寫入單獨的接收器。具體到一個 MongoDB 集合中。Mongo 提供了一個 Sink 將其寫入 DB 到指定的集合中。
val sink: Sink[Document, Future[Done]] = MongoSink.insertOne(collection(id))
問題是,我需要在連接 Kafka 消費者源時指定接收器,但是每個元素都定義了它應該進入哪個接收器。有沒有辦法在元素到達時動態使用特定的接收器。或者這是不可能的,例如,我應該為每個 id 使用不同的 Kafka 主題并將每個源連接到單獨的接收器?
uj5u.com熱心網友回復:
這不是完全清楚的型別是如何在你的榜樣排隊(如之間的關系Document和Message),但也有幾個方法,你可以采取:
- 如果有很多可能的集合并且無法提前知道,那么 Akka Streams 中最不壞的選擇將是
Sink.foreachAsync[Message](parallelism) { msg =>
val document = documentFromMessage(msg)
val collection = collection(msg.id)
Source.single(document).runWith(MongoSink.insertOne(collection))
}
請注意,這將為每條訊息使用一個新的 Mongo 接收器,這可能會帶來效率問題。請注意,如果有一種更輕量級的方式(例如在 reactivemongo 驅動程式中?)Future在插入單個檔案后回傳 a ,但使用連接池之類的東西來減少單個檔案插入的開銷,那可能會更可取。
- 如果集合事先知道,你可以預生成每個收集和使用匯
Partition和GraphDSL定義其采用了預建匯匯
// collection0, etc. are predefined and encompass all of the collections which might be returned by collection(id)
val collections: Map[MongoCollection[Document], (Int, Sink[Document, Future[Done]])] = Map(
collection0 -> (0 -> MongoSink.insertOne(collection0)),
collection1 -> (1 -> MongoSink.insertOne(collection1)),
collection2 -> (2 -> MongoSink.insertOne(collection2)),
collection3 -> (3 -> MongoSink.insertOne(collection3))
)
val combinedSink = Sink.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val partition = builder.add(
Partition[Message](
collections.size,
{ msg => collections(collection(msg.id))._1 }
)
)
val toDocument = Flow[Message].map(documentFromMessage)
collections.foreach {
case (_, (n, sink)) =>
partition.out(n) ~> toDocument ~> sink
}
SinkShape.of(partition.in)
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/qianduan/405127.html
標籤:
