使用下面的代碼,我試圖輸出每個人的姓名和年齡總和:
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Sink, Source}
object CalculateMeanInStream extends App {
implicit val actorSystem = ActorSystem()
case class Person(name: String, age: Double)
val personSource = Source(List(Person("1", 30),Person("1", 20),Person("1", 20),Person("1", 30),Person("2", 2)))
val meanPrintSink = Sink.foreach[Double](println)
val printSink = Sink.foreach[Double](println)
def calculateMean(values: List[Double]): Double = {
values.sum / values.size
}
personSource.groupBy(maxSubstreams = 2 , s => s.name)
.map(m => m.age)
.reduce(_ _ )
.mergeSubstreams
.runForeach(println)
}
輸出是:
2.0
100.0
有沒有辦法將人名保留為 reduce 的一部分,以便在輸出中生成以下內容:
(2.0 , 2)
(100.0 , 1)
我試過了 :
personSource.groupBy(maxSubstreams = 2 , s => s.name)
.reduce((x , y) => x.age y.age)
.mergeSubstreams
.runForeach(println)
但拋出編譯器錯誤:
type mismatch;
found : Double
required: CalculateMeanInStream.Person
.reduce((x , y) => x.age y.age)
uj5u.com熱心網友回復:
可能有更優雅的方式,但我會這樣做:
personSource
.groupBy(maxSubstreams = 2, s => s.name)
.map(x => x.name -> x.age)
.reduce { case ((a, b) , (_, d)) => (a, b d) }
.mergeSubstreams
.runForeach(println)
uj5u.com熱心網友回復:
personSource
.groupBy(maxSubstreams = 2, s => s.name)
.reduce((person1, person2) => Person(person1.name, person1.age person2.age))
.mergeSubstreams
.runForeach(println)
uj5u.com熱心網友回復:
您可以使用fold(),它可以讓您跳過地圖。而不是 map 和 reduce 線。
只需寫如下:
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Sink, Source}
object CalculateMeanInStream extends App {
implicit val actorSystem: ActorSystem = ActorSystem()
case class Person(name: String, age: Double)
val personSource = Source(List(Person("1", 30),Person("1", 20),Person("1", 20),Person("1", 30),Person("2", 2)))
val meanPrintSink = Sink.foreach[Double](println)
val printSink = Sink.foreach[Double](println)
def calculateMean(values: List[Double]): Double = {
values.sum / values.size
}
val reducer: ((String, Double), (String, Double)) => (String, Double) =
(person, accPerson) => (person._1, person._2 accPerson._2)
personSource.groupBy(maxSubstreams = 2 , s => s.name)
.fold((0D, "")){ case ((sum, _), x) => (sum x.age, x.name )}
.mergeSubstreams
.runForeach(println)
.onComplete(_ => actorSystem.terminate())(actorSystem.dispatcher)
}
您的輸出將是:
(2.0, 2)
(100.0, 1)
根據您的要求。
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/515722.html
標籤:斯卡拉阿卡流
上一篇:如何實作以下順序?
下一篇:如何總結一個類的多個欄位?
