我有這樣的案例類。我正在使用 RateStreamSource 生成測驗資料。它給了我一個資料集。現在我正在對資料集進行分組groupByKey并呼叫mapGroupsWithState.
但是在狀態函式內部updateRateAnother有一些邏輯,我正在列印迭代器。迭代器在方法中總是以空的形式出現,我的邏輯不起作用。
以下是代碼的最小可重現示例
case class Employee(id: String, value: Long)
case class Rate(timestamp: Timestamp, value: Long)
case class Rate2(timestamp: Timestamp, value: Long, age: Int)
object ResourceConfigConsolidator {
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession
.builder()
.appName("TestJob")
.getOrCreate()
import sparkSession.implicits._
val rate = 2
val randoms = List(10, 20, 30, 40, 50, 60, 70)
def randomElement = Random.shuffle(randoms).head
val rcConfigDS = sparkSession
.readStream
.format("rate") // <-- use RateStreamSource
.option("rowsPerSecond", rate)
.load()
.as[Rate].filter(_.value % 40 == 0).map {
r => Rate2(r.timestamp, r.value, randomElement)
}
def updateRateAnother(key: Int, values: Iterator[Rate2], state:
GroupState[Employee]): Option[Employee] = {
println("key is here ::" key)
if (state.hasTimedOut) {
// We've timed out, lets extract the state and send it down the stream
state.remove()
state.getOption
} else {
println("the iterating values ::::" values.toList.mkString(" , \n"))
println("hello length ::::" values.length)
if (!state.exists) {
if (values.length == 0) {
None
} else {
val latestValue = values.toList.maxBy(_.value)
val employee = Employee(latestValue.value.toString, latestValue.value)
state.update(employee)
Some(employee)
}
} else {
if (values.isEmpty) {
val currentState = state.get
Some(currentState)
} else {
val latestValue = values.toList.maxBy(_.value)
val currentState = state.get
val updated = currentState.copy(latestValue.value.toString, latestValue.value)
state.update(currentState.copy(latestValue.value.toString, latestValue.value))
Some(updated)
}
}
}
}
val res: Dataset[Employee] = rcConfigDS.groupByKey(_.age).
mapGroupsWithState(GroupStateTimeout.NoTimeout())(updateRateAnother).flatMap(emp =>
emp)
res.writeStream.format("console")
.outputMode(OutputMode.Update())
.option("truncate", value = false)
.option("checkpointLocation", "checkpoint1")
.start()
}
}
由于我與 進行分組age,因此迭代器中應該至少有一個物件。我這樣說對嗎?為什么迭代器是空的?
uj5u.com熱心網友回復:
您確定列印時它是空的嗎?因為這是唯一令人驚訝的事情。你只能通過一次迭代器,所以一旦你values.toList第一次這樣做,它就會變成空的。您應該將結果分配給toList變數,并丟棄迭代器。
更好的是,改變你的邏輯,這樣你只需要一次通過,然后擺脫toList(你可以直接呼叫maxBy......Iterator但只有一次)。這個想法是在處理大型資料集時不要一次將所有資料加載到記憶體中。
轉載請註明出處,本文鏈接:https://www.uj5u.com/caozuo/440180.html
