我嘗試從 Hive 表創建 DataFrame。但是我不擅長使用 Spark API。
我需要幫助來優化 method 中的查詢getLastSession,將兩個任務合并為一個 Spark 任務:
val pathTable = new File("/src/test/spark-warehouse/test_db.db/test_table").getAbsolutePath
val path = new Path(s"$pathTable${if(onlyPartition) s"/name_process=$processName" else ""}").toString
val df = spark.read.parquet(path)
def getLastSession: Dataset[Row] = {
val lastTime = df.select(max(col("time_write"))).collect()(0)(0).toString
val lastSession = df.select(col("id_session")).where(col("time_write") === lastTime).collect()(0)(0).toString
val dfByLastSession = df.filter(col("id_session") === lastSession)
dfByLastSession.show()
/*
---------- ---------------- ------------------ -------
|id_session| time_write| key| value|
---------- ---------------- ------------------ -------
|alskdfksjd|1639950466414000|schema2.table2.csv|Failure|
*/
dfByLastSession
}
附注。我的源表(例如):
| 名稱_行程 | id_session | 寫時間 | 鑰匙 | 價值 |
|---|---|---|---|---|
| 其他類 | jsdfsdfsf | 43434883477 | schema0.table0.csv | 成功 |
| 其他類 | jksdfkjhka | 23212123323 | schema1.table1.csv | 成功 |
| 其他類 | alskdfksjd | 23343212234 | schema2.table2.csv | 失敗 |
| 外部類 | sdfjkhsdfd | 34455453434 | schema3.table3.csv | 成功 |
uj5u.com熱心網友回復:
您可以row_number像這樣與 Window 一起使用:
import org.apache.spark.sql.expressions.Window
val dfByLastSession = df.withColumn(
"rn",
row_number().over(Window.orderBy(desc("time_write")))
).filter("rn=1").drop("rn")
dfByLastSession.show()
但是,由于您沒有按任何欄位進行磁區,因此可能會降低性能。
您可以在代碼中更改的另一件事是使用結構排序來獲取id_session與最近time_write一次查詢相關聯的資訊:
val lastSession = df.select(max(struct(col("time_write"), col("id_session")))("id_session")).first.getString(0)
val dfByLastSession = df.filter(col("id_session") === lastSession)
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/393238.html
標籤:数据框 斯卡拉 阿帕奇火花 Hadoop apache-spark-sql
