我想使用具有某些型別欄位的鑲木地板表:
name_process: String id_session: String time_write: String key: String value: String
“id_session”是 SparkSession 的 ID。
該表由“name_process”列磁區
例如:
| 名稱_行程 | id_session | 寫時間 | 鑰匙 | 價值 |
|---|---|---|---|---|
| 其他類 | sess000001 | 1639950466114000 | schema0.table0.csv | 成功 |
| 其他類 | sess000002 | 1639950466214000 | schema1.table1.csv | 成功 |
| 其他類 | sess000003 | 1639950466309000 | schema0.table0.csv | 成功 |
| 其他類 | sess000003 | 1639950466310000 | schema1.table1.csv | 失敗 |
| 其他類 | sess000003 | 1639950466311000 | schema2.table2.csv | 成功 |
| 其他類 | sess000003 | 1639950466312000 | schema3.table3.csv | 成功 |
| 外部類 | sess000004 | 1639950466413000 | schema0.table0.csv | 成功 |
“key”列的所有值僅在一個 spark session(“id_session”列)中是唯一的。發生這種情況是因為我每次啟動 spark 會話時都使用相同的檔案 (csv)。我打算將這些檔案發送到服務器。發送時間和來自服務器的回應都將記錄在“time_write”和“value”列中。也就是說,我想查看所有 csv 檔案的最新發送狀態。
這是我將與之互動的條目的日志。為了與這個日志互動,我想實作幾個方法:

所有 getter 方法都將回傳帶有所有列的過濾后的 DateFrame。也就是說,結果仍然是 5 列。我在使用 API Spark 時仍然遇到困難。我需要一些時間才能學會如何在 DataFrame 上執行漂亮的操作。這是我的結果:
abstract class ProcessResultBook(processName: String, onlyPartition: Boolean = true)(implicit spark: SparkSession) {
val pathTable = new File("/src/test/spark-warehouse/test_db.db/test_table").getAbsolutePath
val path = new Path(s"$pathTable${if(onlyPartition) s"/name_process=$processName" else ""}").toString
val df = spark.read.parquet(path)
def getLastSession: Dataset[Row] = {
val lastTime = df.select(max(col("time_write"))).collect()(0)(0).toString
val lastSession = df.select(col("id_session")).where(col("time_write") === lastTime).collect()(0)(0).toString
val dfByLastSession = df.filter(col("id_session") === lastSession)
dfByLastSession.show()
/*
---------- ---------------- ------------------ -------
|id_session| time_write| key| value|
---------- ---------------- ------------------ -------
|alskdfksjd|1639950466414000|schema2.table2.csv|Failure|
*/
dfByLastSession
}
def add(df: DataFrame) = ???
def add(processName: String, idSession: String, timeWrite: String, key: String, value: String) = ???
def getSessionsByProcess(processName: String) = ???
def getBySessionAndProcess(processName: String, idSession: String) = ???
def getUnique(processName: String) = ???
def delByTime(time: String) = ???
def delByIdSession(idSession: String) = ???
def getCurrentTime: SQLTimestamp = DateTimeUtils.fromMillis(TimeStamp.getCurrentTime.getTime)
def convertTime(time: Long): String = TimeStamp.getNtpTime(time).getDate.toString
}
我有案例類:
case class RowProcessResult(
nameProcess: String,
idSession: String,
timeWrite: String,
key: String,
value: String
)
幫助實作2種方法:
- def add(data: List[RowProcessResult]): 單位
- def getUnique(nameProcess: String): DataFrame 或 List[RowProcessResult]
方法add(..)已在 hive 表中添加資料集合。
方法getUnique(nameProcess: String): DataFrame。回傳一個 DataFrame,其中包含“key”列的唯一值的所有列。對于每個唯一的“鍵”值,選擇最近的日期。
PS.:我創建 Hive 表的測驗類:
def createHiveTable(implicit spark: SparkSession) {
val schema = "test_schema"
val table = "test_table"
val partitionName = "name_process"
val columnNames = "name_process" :: "id_session" :: "time_write" :: "key" :: "value" :: Nil
spark.sql(s"CREATE DATABASE IF NOT EXISTS test_db")
//val createTableSql = s"CREATE TABLE IF NOT EXISTS $schema.$table ($columnNames) PARTITIONED BY $partitionName STORED AS parquet"
val path = new File(".").getAbsolutePath "/src/test/data-lineage/test_data_journal.csv"
val df = spark.read.option("delimiter", ",")
.option("header", true)
.csv(path)
df.show()
df.write.mode(SaveMode.Append).partitionBy(partitionName).format("parquet").saveAsTable(s"test_db.$table")
}
uj5u.com熱心網友回復:
已經很久了。我留下我的決定。
import spark.implicits._
val schema = "test_db"
val table = "test_table"
val df = spark.read.table(s"$schema.$table").filter(col("name_process") === processName).persist
def getLastSession: Dataset[Row] = {
val lastSessionId = df.select(max(struct(col("time_write"), col("id_session")))("id_session"))
.first.getString(0)
val dfByLastSession = df.filter(col("id_session") === lastSessionId)
dfByLastSession.show()
dfByLastSession
}
def add(listRows: Seq[RowProcessResult]) = {
val df = listRows.toDF().withColumn("name_process", lit(processName))
df.show()
addDfToTable(df)
}
def add(nameProcess: String, idSession: String, timeWrite: String, key: String, value: String) = {
val df = RowProcessResult(idSession, timeWrite, key, value) :: Nil toDF()
addDfToTable(df)
}
def getSessionsByProcess(externalProcessName: String) = {
spark.read.table(s"$schema.$table").filter(col("name_process") === externalProcessName)
}
def getSession(idSession: String, processName: String = this.processName) = {
if (processName.equals(this.processName))
df.filter(col("id_session") === idSession)
else
getSessionsByProcess(processName).filter(col("id_session") === idSession)
}
def getUnique = df.sort(col("time_write").desc).dropDuplicates("key")
def addDfToTable(df: DataFrame) =
df.write.mode(SaveMode.Append).insertInto(s"$schema.$table")
def getFullDf = df
def getCurrentTime = TimeStamp.getCurrentTime
def convertTime(time: Long): String = TimeStamp.getNtpTime(time).getDate.toString
}
我可以獲得可以忍受的解決方案。這還不錯。謝謝你,新年快樂!!=)
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/398511.html
