我是 Spark 和 Scala 的新手。我正在嘗試從 Spark 表中決議嵌套的 JSON 格式列。這是表格的預覽(我只展示了 Spark 表格的第一行,其余部分看起來都一樣)
doc.show(1)
doc_content object_id object_version
{"id":"lni001","pub_date".... 20220301 7098727
每行的“doc_content”列的結構如下所示(某些行可能在“內容”欄位記憶體儲更多資訊):
{
"id":"lni001",
"pub_date":"20220301",
"doc_id":"7098727",
"unique_id":"64WP-UI-POLI",
"content":[
{
"c_id":"002",
"p_id":"P02",
"type":"org",
"source":"internet"
},
{
"c_id":"003",
"p_id":"P03",
"type":"org",
"source":"internet"
},
{
"c_id":"005",
"p_id":"K01",
"type":"people",
"source":"news"
}
]
}
我嘗試explode在“doc_content”列上使用
doc.select(explode($"doc_content") as "doc_content")
.withColumn("id", col("doc_info.id"))
.withColumn("pub_date", col("doc_info.pub_date"))
.withColumn("doc_id", col("doc_info.doc_id"))
.withColumn("unique_id", col("doc_info.unique_id"))
.withColumn("content", col("doc_info.content"))
.withColumn("content", explode($"content"))
.withColumn("c_id", col("content.c_id"))
.withColumn("p_id", col("content.p_id"))
.withColumn("type", col("content.type"))
.withColumn("source", col("content.source"))
.drop(col("doc_content"))
.drop(col("content"))
.show()
但我得到了這個錯誤org.apache.spark.sql.AnalysisException: cannot resolve 'explode(`doc_content`)' due to data type mismatch: input to function explode should be array or map type, not string;。我正在努力將列轉換為 Array 或 Map 型別(可能是 Scala LOL 的新手)。
決議“doc_content”列后,我希望表格看起來像這樣。
id pub_date doc_id unique_id c_id p_id type source oject_id object_version
lni001 20220301 7098727 64WP-UI-POLI 002 P02 org internet 20220301 7098727
lni001 20220301 7098727 64WP-UI-POLI 003 P03 org internet 20220301 7098727
lni001 20220301 7098727 64WP-UI-POLI 005 K01 people news 20220301 7098727
我想知道如何做到這一點,如果我能得到一些關于如何做到這一點的想法或方法,那就太好了。或者也許比我的方法更好,因為我在 Spark 表中有數百萬行,如果我能讓它運行得更快的話。
謝謝!
uj5u.com熱心網友回復:
您可以使用from_json將 JSON 字串決議為 MapType,然后explode在陣列列上使用來創建新行,這意味著您應該在doc_content.contentthan上展開doc_content。
指定用于決議 json 字串的架構:
import org.apache.spark.sql.types._
val schema = new StructType()
.add("id", StringType)
.add("pub_date", StringType)
.add("doc_id", StringType)
.add("unique_id", StringType)
.add("content", ArrayType(MapType(StringType, StringType)))
然后決議json字串并爆炸它
df.select(
$"object_id",
$"object_version",
from_json($"doc_content", schema).alias("doc_content")
).select(
$"object_id",
$"object_version",
col("doc_content.id").alias("id"),
col("doc_content.pub_date").alias("pub_date"),
col("doc_content.doc_id").alias("doc_id"),
col("doc_content.unique_id").alias("unique_id"),
explode(col("doc_content.content")).alias("content")
).select(
$"id",
$"pub_date",
$"doc_id",
$"unique_id",
col("content.c_id").alias("c_id"),
col("content.p_id").alias("p_id"),
col("content.type").alias("type"),
col("content.source").alias("source"),
$"object_id",
$"object_version"
)
轉載請註明出處,本文鏈接:https://www.uj5u.com/qiye/455006.html
