我正在嘗試連接兩個 Flow,但我無法解釋我的實作的輸出。
val source = Source(1 to 10)
val sink = Sink.foreach(println)
val flow1 = Flow[Int].map(s => s 1)
val flow2 = Flow[Int].map(s => s * 10)
val flowGraph = Flow.fromGraph(
GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val concat = builder.add(Concat[Int](2))
val broadcast = builder.add(Broadcast[Int](2))
broadcast ~> flow1 ~> concat.in(0)
broadcast ~> flow2 ~> concat.in(1)
FlowShape(broadcast.in, concat.out)
}
)
source.via(flowGraph).runWith(sink)
我希望此代碼有以下輸出。
2
3
4
.
.
.
11
10
20
.
.
.
100
相反,我只看到列印了“2”。你能解釋一下我的實作有什么問題,我應該如何更改程式以獲得所需的輸出。
uj5u.com熱心網友回復:
來自 Akka Stream 的 API 檔案:
Concat:
當當前流有可用元素時發出;如果當前輸入完成,它會嘗試下一個
Broadcast:
當所有輸出停止反壓并且有可用的輸入元素時發出
這兩個運算子不會一起作業,因為它們的作業方式存在沖突 -Concat嘗試Broadcast在切換到另一個之前從一個的輸出中提取所有元素,而Broadcast除非需要所有元素,否則不會發出它的輸出。
對于您需要的內容,您可以concat按照評論者的建議使用連接:
source.via(flow1).concat(source.via(flow2)).runWith(sink)
或等效地,使用Source.combine如下:
Source.combine(source.via(flow1), source.via(flow2))(Concat[Int](_)).runWith(sink)
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/333173.html
上一篇:ScalaXML決議多個欄位
下一篇:Scala得到一個子串
