我想創建一個包含某些型別欄位的鑲木地板表:
name_process: String id_session: Int time_write: LocalDate or Timestamp key: String value: String
| 名稱_行程 | id_session | 寫時間 | 鑰匙 | 價值 |
|---|---|---|---|---|
| 其他類 | jsdfsdfsf | 43434883477 | schema0.table0.csv | 成功 |
| 其他類 | jksdfkjhka | 23212123323 | schema1.table1.csv | 成功 |
| 其他類 | alskdfksjd | 23343212234 | schema2.table2.csv | 失敗 |
| 外部類 | sdfjkhsdfd | 34455453434 | schema3.table3.csv | 成功 |
我想正確地寫出這樣的表格。使用正確的資料型別。然后我要從中讀取磁區。我正在嘗試實作讀寫。但到目前為止結果很糟糕。
def createHiveTable(implicit spark: SparkSession) {
val schema = "test_schema"
val table = "test_table"
val partitionName = "name_process"
val columnNames = "name_process" :: "id_session" :: "time_write" :: "key" :: "value" :: Nil
spark.sql(s"CREATE DATABASE IF NOT EXISTS test_db")
//val createTableSql = s"CREATE TABLE IF NOT EXISTS $schema.$table ($columnNames) PARTITIONED BY $partitionName STORED AS parquet"
val path = new File(".").getAbsolutePath "/src/test/data-lineage/test_data_journal.csv"
val df = spark.read.option("delimiter", ",")
.option("header", true)
.csv(path)
df.show()
df.write.mode(SaveMode.Append).partitionBy(partitionName).format("parquet").saveAsTable(s"test_db.$table")
}
def getLastSession(processName: String)(implicit spark: SparkSession): Unit = {
val df = spark.read.parquet(s"spark-warehouse/test_db.db/test_table/name_process=$processName")
.select(
col("name_process").cast(StringType),
col("id_session").cast(StringType),
col("time_write").cast(LongType),
col("key").cast(StringType),
col("value").cast(StringType)
)
val lastTime = df.select(col("time_write")).select(max("time_write")).collect()(0).get(0)
val lastSession = df.filter(col("time_write").equalTo(lastTime)).select("id_session").head().getString(0)
println(lastSession)
println(TimeStamp.getCurrentTime)
}
來自火花的日志:
21/12/16 14:51:19 INFO TaskSchedulerImpl: Killing all running tasks in stage 3: Stage finished
21/12/16 14:51:19 INFO DAGScheduler: Job 3 finished: parquet at DataLineageJournal.scala:28, took 0,076899 s
org.apache.spark.sql.AnalysisException: cannot resolve '`name_process`' given input columns: [id_session, key, time_write, value];
'Project [unresolvedalias(cast('name_process as string), None), cast(id_session#78 as string) AS id_session#86, cast(time_write#79 as bigint) AS time_write#87L, cast(key#80 as string) AS key#88, cast(value#81 as string) AS value#89]
- Relation[id_session#78,time_write#79,key#80,value#81] parquet
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:155)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:152)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$2(TreeNode.scala:342)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:74)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:342)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:339)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:408)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:244)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:406)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:359)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:339)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:339)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:408)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:244)
uj5u.com熱心網友回復:
問題
當你這樣做時
spark.read.parquet(s"spark-warehouse/test_db.db/test_table/name_process=$processName")
您正在從特定目錄中讀取資料,這name_process就是缺少該列的原因。
解決方案:
您可以執行以下操作
spark.read.parquet(s"spark-warehouse/test_db.db/test_table").filter(f.col('name_process') == processName)
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/389725.html
