我對 Scala 和 Akka Stream 有點陌生,我試圖從 websocket 獲取 JSON 字串訊息并將它們推送到 Kafka 主題。
現在我只在“從 ws 獲取訊息”部分作業。
來自 websocket 的訊息如下所示:
{
"bitcoin":"6389.06534240",
"ethereum":"192.93111286",
"monero":"108.90302506",
"litecoin":"52.25484165"
}
我想將此 JSON 訊息拆分為多條訊息:
{"coin": "bitcoin", "price": "6389.06534240"}
{"coin": "ethereum", "price": "192.93111286"}
{"coin": "monero", "price": "108.90302506"}
{"coin": "litecoin", "price": "52.25484165"}
然后將這些訊息中的每一條推送到 kafka 主題。
這是我迄今為止取得的成就:
val message_decomposition: Flow[Message, String, NotUsed] = Flow[Message].mapConcat(
msg => msg.toString.replaceAll("[{})(]", "").split(",")
).map( msg => {
val splitted = msg.split(":")
s"{'coin': ${splitted(0)}, 'price': ${splitted(1)}}"
})
val sink: Sink[String, Future[Done]] = Sink.foreach[String](println)
val flow: Flow[Message, Message, Promise[Option[Message]]] =
Flow.fromSinkAndSourceMat(
message_decomposition.to(sink),
Source.maybe[Message])(Keep.right)
val (upgradeResponse, promise) = Http().singleWebSocketRequest(
WebSocketRequest("wss://ws.coincap.io/prices?assets=ALL"),
flow)
我正在獲取預期的輸出 Json 訊息,但我想知道我是否可以以更“Akka-ish”的風格撰寫這個生產者,比如使用 GraphDSL。所以我有幾個問題:
- 是否可以使用 GraphDSL 連續使用 WebSocket?如果是的話,你能給我舉個例子嗎?
- 使用 GraphDSL 使用 WS 是個好主意嗎?
- 在將接收到的 Json 訊息發送到 kafka 之前,我應該像我一樣分解它嗎?或者最好發送它以降低延遲?
- 在向 Kafka 生成訊息后,我計劃使用 Apache Storm 使用它,這是個好主意嗎?還是我應該堅持使用 Akka?
感謝閱讀我,問候,Arès
uj5u.com熱心網友回復:
這段代碼非常具有 Akka 風格:scaladsl就像 AkkaGraphDSL或實作自定義GraphStage. IMO/E 轉到 的唯一原因GraphDSL是,如果圖形的實際形狀在scaladsl.
我會親自去定義一個CoinPrice類來使模型顯式
case class CoinPrice(coin: String, price: BigDecimal)
然后Flow[Message, CoinPrice, NotUsed]將 1 個傳入訊息決議為零個或多個CoinPrices。一些東西(在此處使用 Play JSON),例如:
val toCoinPrices =
Flow[Message]
.mapConcat { msg =>
Json.parse(msg.toString)
.asOpt[JsObject]
.toList
.flatMap { json =>
json.underlying.flatMap { kv =>
import scala.util.Try
kv match {
case (coin, JsString(priceStr)) =>
Try(BigDecimal(priceStr)).toOption
.map(p => CoinPrice(coin, p))
case (coin, JsNumber(price)) => Some(CoinPrice(coin, price))
case _ => None
}
}
}
}
根據訊息中 JSON 的大小,您可能希望將其分解為不同的流階段,以允許 JSON 決議和提取到CoinPrices之間的異步邊界。例如,
Flow[Message]
.mapConcat { msg =>
Json.parse(msg.toString).asOpt[JsObject].toList
}
.async
.mapConcat { json =>
json.underlying.flatMap { kv =>
import scala.util.Try
kv match {
case (coin, JsString(priceStr)) =>
Try(BigDecimal(priceStr)).toOption
.map(p => CoinPrice(coin, p))
case (coin, JsNumber(price)) => Some(CoinPrice(coin, price))
case _ => None
}
}
}
在上面,async邊界任一側的階段將在單獨的 actor 中執行,因此可能并發(如果有足夠的 CPU 內核可用等),代價是 actor 協調和交換訊息的額外開銷。只有當 JSON 物件足夠大并且輸入足夠快(在前一個物件完成處理之前始終輸入)時,額外的協調/通信開銷(參見 Gunther 的通用可擴展性定律)才值得。
如果您打算在程式停止之前使用 websocket,您可能會發現使用Source.never[Message].
轉載請註明出處,本文鏈接:https://www.uj5u.com/gongcheng/362474.html
