我正在嘗試使用 pyspark 決議多個 xml 檔案。所有 xml 檔案都具有相同的已知架構。
首先,我將所有檔案作為文本加載以觸發 DF:
path = 'c:\\path\\to\\xml\\files\\*.xml'
df = spark.read.text(path)
此時我的 DF 看起來像這樣:
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
| value
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|<Msg><Header><tag1>some str1</tag1><tag2>2</tag2><tag3>2022-02-16 10:39:26.730</tag3></Header><Body><Pair><N>N1</N><V>V1</V></Pair><Pair><N>N2</N><V>V2</V></Pair><Pair><N>N3</N><V>V3</V></Pair></Body></Msg>|
|<Msg><Header><tag1>some str2</tag1><tag2>5</tag2><tag3>2022-02-17 10:39:26.730</tag3></Header><Body><Pair><N>N4</N><V>V4</V></Pair><Pair><N>N5</N><V>V5</V></Pair></Body></Msg>|
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
xml 檔案架構如下:
df.printSchema()
root
|-- Header: struct (nullable = false)
| |-- tag1: string (nullable = false)
| |-- tag2: integer (nullable = false)
| |-- tag3: timestamp (nullable = false)
|-- Body: struct (nullable = false)
| |-- Pair: array (nullable = false)
| | |-- element: struct (containsNull = true)
| | | |-- N: string (nullable = false)
| | | |-- V: string (nullable = false)
所以決議后的最終輸出應該是這樣的:
--------- ----- ------------------------ --- --
|tag1 | tag2| tag3 | N |V |
--------- ----- ------------------------ --- --
|some str1| 2 |2022-02-16 10:39:26.730 |N1 |V1|
|some str1| 2 |2022-02-16 10:39:26.730 |N2 |V2|
|some str1| 2 |2022-02-16 10:39:26.730 |N3 |V3|
|some str2| 5 |2022-02-17 10:39:26.730 |N4 |V4|
|some str2| 5 |2022-02-17 10:39:26.730 |N5 |V5|
--------- ----- ------------------------ --- --
意思是“標題”元素應該為來自相同 xml 字串的所有 NV 對重復。
所以我想我找到了一種用xpathor提取所有標題標簽的方法,xml.etree.ElementTree但我的問題是我真的不明白如何將我的 NV 對提取到我以后可以爆炸的東西。
我錯過了什么?
----澄清----
我試圖加載我的xml檔案
path = 'c:\\path\\to\\xml\\files\\*.xml'
df = spark.read.format('xml').option('rowTag','Msg').schema(schema).load(path)
但是這個選項不能提供*.xml路徑,所以這就是我將檔案作為文本讀取的原因。
uj5u.com熱心網友回復:
試一試:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.read.format('xml').options(rowTag='book').load('books.xml')
請參閱https://github.com/databricks/spark-xml#python-api
uj5u.com熱心網友回復:
根據您的 spark 版本,您必須將其添加到環境中。我使用的是 spark 2.4.0,這個版本對我有用。資料塊 xml 版本
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.7.0 pyspark-shell'
input_path.xml 如下所示:
<Msg><Header><tag1>some str1</tag1><tag2>2</tag2><tag3>2022-02-16 10:39:26.730</tag3></Header><Body><Pair><N>N1</N><V>V1</V></Pair><Pair><N>N2</N><V>V2</V></Pair><Pair><N>N3</N><V>V3</V></Pair></Body></Msg>
<Msg><Header><tag1>some str2</tag1><tag2>5</tag2><tag3>2022-02-17 10:39:26.730</tag3></Header><Body><Pair><N>N4</N><V>V4</V></Pair><Pair><N>N5</N><V>V5</V></Pair></Body></Msg>
input_path = 'src/input/input.xml'
xmlDF = spark.read.format('xml').option('rowTag', 'Msg').load(input_path)
xmlDF.printSchema()
root
|-- Body: struct (nullable = true)
| |-- Pair: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- N: string (nullable = true)
| | | |-- V: string (nullable = true)
|-- Header: struct (nullable = true)
| |-- tag1: string (nullable = true)
| |-- tag2: long (nullable = true)
| |-- tag3: timestamp (nullable = true)
由于您不能在同一個查詢中分解 2 個串列,因此可以這樣劃分:
xmlDF.select(
'*',
explode("Body.Pair.N").alias('N')
).select(
'N',
explode("Body.Pair.V").alias('V'),
col("Header.tag1").alias('tag1'),
col("Header.tag2").alias('tag2'),
col("Header.tag3").alias('tag3'),
) \
.dropDuplicates() \
.show(truncate=False)
它將根據您的輸入給出以下結果:
--- --- --------- ---- ----------------------
|N |V |tag1 |tag2|tag3 |
--- --- --------- ---- ----------------------
|N2 |V1 |some str1|2 |2022-02-16 10:39:26.73|
|N4 |V5 |some str2|5 |2022-02-17 10:39:26.73|
|N1 |V3 |some str1|2 |2022-02-16 10:39:26.73|
|N5 |V5 |some str2|5 |2022-02-17 10:39:26.73|
|N5 |V4 |some str2|5 |2022-02-17 10:39:26.73|
|N4 |V4 |some str2|5 |2022-02-17 10:39:26.73|
|N1 |V1 |some str1|2 |2022-02-16 10:39:26.73|
|N3 |V3 |some str1|2 |2022-02-16 10:39:26.73|
|N2 |V2 |some str1|2 |2022-02-16 10:39:26.73|
|N3 |V2 |some str1|2 |2022-02-16 10:39:26.73|
|N1 |V2 |some str1|2 |2022-02-16 10:39:26.73|
|N3 |V1 |some str1|2 |2022-02-16 10:39:26.73|
|N2 |V3 |some str1|2 |2022-02-16 10:39:26.73|
--- --- --------- ---- ----------------------
uj5u.com熱心網友回復:
它似乎與我當地的火花版本有關。我已對其進行了更新,并將 HADOOP_HOME 添加到了我的 PATH 中。所以現在:
path = 'c:\\path\\to\\xml\\files\\*.xml'
df = spark.read.format('xml').option('rowTag','Msg').schema(schema).load(path)
完美運行!
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/436609.html
