我只是在嘗試這個當前有一個 TextMessage 作為源的示例流:
// print each incoming strict text message
val printSink: Sink[Message, Future[Done]] =
Sink.foreach {
case message: TextMessage.Strict =>
println(message.text)
case _ =>
// ignore other message types
}
val helloSource: Source[Message, NotUsed] =
Source.single(TextMessage("hello world!"))
// the Future[Done] is the materialized value of Sink.foreach
// and it is completed when the stream completes
val flow: Flow[Message, Message, Future[Done]] =
Flow.fromSinkAndSourceMat(printSink, helloSource)(Keep.left)
我想發送 2 條訊息,所以我嘗試了這個:
val source1 = Source.single(TextMessage("hello"))
val source2 = Source.single(TextMessage("world"))
val helloSource: Source[Message, NotUsed] =
Source.combine(source2)
但我收到此錯誤:
polymorphic expression cannot be instantiated to expected type;
[error] found : [U](strategy: Int => akka.stream.Graph[akka.stream.UniformFanInShape[akka.http.scaladsl.model.ws.TextMessage.Strict,U],akka.NotUsed]): akka.stream.scaladsl.Source[U,akka.NotUsed]
[error] required: akka.stream.scaladsl.Source[akka.http.scaladsl.model.ws.Message,akka.NotUsed]
[error] Source.combine(source1, source2)
[error] ^
[error] one error found
我到底應該做什么?
uj5u.com熱心網友回復:
Source.combine 是組合多個源的靈活方式,您需要指定組合它們的策略,如鏈接檔案中所述。
在這種情況下,如果您希望一個有限源緊跟另一個源,則可以使用該Concat策略。
val helloSource: Source[Message, NotUsed] =
Source.combine(source1, source2)(Concat(_))
作為更簡單的替代方法,您可以concat在第一個源上使用該方法:
val helloSource: Source[Message, NotUsed] =
source1.concat(source2)
然而,對于這個例子,你有一組固定的硬編碼元素,避免創建多個源并從Iterablewith只創建一個源更簡單Source.apply:
val helloSource: Source[Message, NotUsed] =
Source(Seq(TextMessage("hello"), TextMessage("world")))
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/318737.html
