假設我有一個如下所示的資料框
| ID | 一個 | 乙 | C | D |
|---|---|---|---|---|
| 1 | 100 | 10 | 20 | 5 |
| 2 | 0 | 5 | 10 | 5 |
| 3 | 0 | 7 | 2 | 3 |
| 4 | 0 | 1 | 3 | 7 |
以上需要轉換為類似下面的內容
| ID | 一個 | 乙 | C | D | 乙 |
|---|---|---|---|---|---|
| 1 | 100 | 10 | 20 | 5 | 75 |
| 2 | 75 | 5 | 10 | 5 | 60 |
| 3 | 60 | 7 | 2 | 3 | 50 |
| 4 | 50 | 1 | 3 | 7 | 40 |
這件事通過下面提供的細節起作用
- 資料框現在有一個新列 E,對于第 1 行計算為
col(A) - (max(col(B), col(C)) col(D))=>100-(max(10,20) 5)= 75 - 在有
Id2 的行中,來自第 1 行的 col E 的值作為 Col 的值向前推進A - 因此,對于第 2 行,column
E被確定為75-(max(5,10) 5)= 60 - 類似地,在有
Id3 的行中,A 的值變為 60,col 的新值E基于此確定
問題是,col A 的值取決于除第一行之外的前一行的值
是否有可能使用視窗和滯后來解決這個問題
uj5u.com熱心網友回復:
您可以collect_list在按列排序的 Window 上使用函式,并獲取包含andId的值的結構的累積陣列(作為 field )。然后,申請計算列。Amax(B, C) DTaggregateE
請注意,在這種特殊情況下,您不能使用lag視窗函式,因為您希望遞回地獲取計算值。
import org.apache.spark.sql.expressions.Window
val df2 = df.withColumn(
"tmp",
collect_list(
struct(col("A"), (greatest(col("B"), col("C")) col("D")).as("T"))
).over(Window.orderBy("Id"))
).withColumn(
"E",
expr("aggregate(transform(tmp, (x, i) -> IF(i=0, x.A - x.T, -x.T)), 0, (acc, x) -> acc x)")
).withColumn(
"A",
col("E") greatest(col("B"), col("C")) col("D")
).drop("tmp")
df2.show(false)
// --- --- --- --- --- ---
//|Id |A |B |C |D |E |
// --- --- --- --- --- ---
//|1 |100|10 |20 |5 |75 |
//|2 |75 |5 |10 |5 |60 |
//|3 |60 |7 |2 |3 |50 |
//|4 |50 |1 |3 |7 |40 |
// --- --- --- --- --- ---
您可以顯示中間列tmp以了解計算背后的邏輯。
uj5u.com熱心網友回復:
正如blackbishop 所說,您不能使用 lag 函式來檢索列的變化值。當您使用 scala API 時,您可以開發自己的用戶定義的聚合函式
您創建以下案例類,代表您當前正在讀取的行和聚合器的緩沖區:
case class InputRow(A: Integer, B: Integer, C: Integer, D: Integer)
case class Buffer(var E: Integer, var A: Integer)
然后您使用它們來定義您的RecursiveAggregator自定義聚合器:
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.Encoder
object RecursiveAggregator extends Aggregator[InputRow, Buffer, Buffer] {
override def zero: Buffer = Buffer(null, null)
override def reduce(buffer: Buffer, currentRow: InputRow): Buffer = {
buffer.A = if (buffer.E == null) currentRow.A else buffer.E
buffer.E = buffer.A - (math.max(currentRow.B, currentRow.C) currentRow.D)
buffer
}
override def merge(b1: Buffer, b2: Buffer): Buffer = {
throw new NotImplementedError("should be used only over ordered window")
}
override def finish(reduction: Buffer): Buffer = reduction
override def bufferEncoder: Encoder[Buffer] = ExpressionEncoder[Buffer]
override def outputEncoder: Encoder[Buffer] = ExpressionEncoder[Buffer]
}
最后,您將您的轉換為您在資料框RecursiveAggregator上應用的用戶定義的聚合函式input:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{col, udaf}
val recursiveAggregator = udaf(RecursiveAggregator)
val window = Window.orderBy("Id")
val result = input
.withColumn("computed", recursiveAggregator(col("A"), col("B"), col("C"), col("D")).over(window))
.select("Id", "computed.A", "B", "C", "D", "computed.E")
如果您將問題的資料框作為input資料框,您將獲得以下result資料框:
--- --- --- --- --- ---
|Id |A |B |C |D |E |
--- --- --- --- --- ---
|1 |100|10 |20 |5 |75 |
|2 |75 |5 |10 |5 |60 |
|3 |60 |7 |2 |3 |50 |
|4 |50 |1 |3 |7 |40 |
--- --- --- --- --- ---
轉載請註明出處,本文鏈接:https://www.uj5u.com/yidong/421894.html
標籤:
