我有一個帶有此架構的 DataFrame:
root
|-- id: string (nullable = false)
|-- data_zone_array: array (nullable = true)
| |-- element: string (containsNull = true)
它實際上包含一個data_zone_array包含幾個或多或少可預測的字串值(或根本沒有)的陣列,其中它們的鍵和值由:; 看起來像這樣的show(5)輸出:
id | data_zone_array
1 | ['name:john', 'surname:wick', 'group:a', 'group:b', 'group:c']
2 | ['name:joe', 'surname:linda', 'surname:boss', 'group:d', 'group:b']
3 | ['name:david', 'group:a', 'age:7']
4 | ['name:mary', 'surname:gilles']
5 | ['name:charles', 'surname:paul', 'group:d', 'group:b', 'group:c', 'age:6', 'unplanned_attribute_165:thisvalue']
我想要 :
- 根據鍵串列(例如
name和surname)提取其中一些值- 知道它們的目標型別是可預測的(name將始終是唯一的字串和字串surname陣列) - 將所有其他找到的屬性放在包含字串陣列的結構中。請注意,會有不可預測的屬性,例如
unplanned_attribute_165.
它會給出這種模式:
root
|-- id: string (nullable = false)
|-- name: string (nullable = true)
|-- surname: array (nullable = true)
| |-- element: string (containsNull = true)
|-- other_attributes: struct (nullable = true)
| |-- <attrx>: array (containsNull = true)
| | |-- element: string(containsNull = true)
| |-- <attry>: array (containsNull = true)
| | |-- element: string(containsNull = true)
| |-- ......
有如下記錄:
id | name | surname | other_attributes
1 | 'john' | ['wick'] | {group:['a','b','c']}
2 | 'joe' | ['boss', 'linda'] | {group:['b', 'd']}
3 | 'david' | <null> | {group: ['a'], age:['7']}
4 | 'mary' | ['gilles'] | <null>
5 | 'charles' | ['paul'] | {group: ['b','c','d'], age:['6'], unplanned_attribute_165:['thisvalue']}
關于如何執行此類操作的任何想法?
uj5u.com熱心網友回復:
這是一種方法。
首先,data_zone_array分解列key并將鍵和值提取到單獨的列中,并value在 上拆分:。
然后,按id和key收集與每個鍵關聯的值串列。并再次 group byid以創建 map key -> [values]。
最后,選擇鍵要用作列和使用地圖篩選鍵的reste map_keys filter transform創建other_attributes列。
import pyspark.sql.functions as F
df1 = (df.withColumn("data_zone_array", F.explode("data_zone_array"))
.withColumn("key", F.split("data_zone_array", ":")[0])
.withColumn("value", F.split("data_zone_array", ":")[1])
.groupBy("id", "key").agg(F.collect_list("value").alias("values"))
.groupBy("id").agg(F.map_from_arrays(F.collect_list("key"), F.collect_list("values")).alias("attributes"))
.select("id",
F.col("attributes")["name"].alias("name"),
F.col("attributes")["surname"].alias("surname"),
F.expr("""transform(
filter(map_keys(attributes), k -> k not in('name', 'surname')),
x -> struct(x as key, attributes[x] as value)
)""").alias("other_attributes")
)
)
df1.show(truncate=False)
# --- --------- ------------- ------------------------------------------------------------------------
# |id |name |surname |other_attributes |
# --- --------- ------------- ------------------------------------------------------------------------
# |5 |[charles]|[paul] |[{group, [d, b, c]}, {age, [6]}, {unplanned_attribute_165, [thisvalue]}]|
# |1 |[john] |[wick] |[{group, [a, b, c]}] |
# |3 |[david] |null |[{group, [a]}, {age, [7]}] |
# |2 |[joe] |[linda, boss]|[{group, [d, b]}] |
# |4 |[mary] |[gilles] |[] |
# --- --------- ------------- ------------------------------------------------------------------------
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/360769.html
標籤:Python 阿帕奇火花 火花 分裂 apache-spark-sql
上一篇:在SparkScala中的用例
