我想在 pyspark df 中有一個 json 格式的列。
示例df:
| ID | 型別 | 價值 |
|---|---|---|
| 1 | 一種 | 11 |
| 1 | b | 12 |
| 2 | C | 21 |
預期結果:
| ID | json |
|---|---|
| 1 | {"a":"11","b":"12","c":""} |
| 2 | {"a":"","b":"","c":"21"} |
我試著用
df.groupBy(df.id) \
.agg(collect_list(to_json(create_map(df.type, df.value))).alias('json')) \
但它回傳一個像這樣的嵌套 json{{"a":"11"},{"b":"12"}}
誰能幫幫我,謝謝!!
uj5u.com熱心網友回復:
示例 df:
df = spark.createDataFrame(
[
('1','a','11'),
('1','b','12'),
('2','c','21')
], ['id','type','value']
)
from pyspark.sql import functions as F
df.groupBy("id")\
.agg(F.map_from_entries(F.collect_list(F.struct("type", "value"))).alias("type_value"))\
.withColumn('json', F.to_json('type_value'))\
.show(truncate=False)
--- ------------------ -------------------
|id |type_value |json |
--- ------------------ -------------------
|1 |{a -> 11, b -> 12}|{"a":"11","b":"12"}|
|2 |{c -> 21} |{"c":"21"} |
--- ------------------ -------------------
uj5u.com熱心網友回復:
有一種更簡單的方法可以做到這一點。見下面的邏輯 -
輸入_DF
from pyspark.sql.functions import *
from pyspark.sql.types import *
schema = StructType([StructField("id", StringType(), True), StructField("type", StringType(), True), StructField("value", StringType(), True)])
df = spark.createDataFrame([('1','a','11'),('1','b','12'),('2','c','21')], schema)
df.show(truncate=False)
--- ---- -----
| id|type|value|
--- ---- -----
| 1| a| 11|
| 1| b| 12|
| 2| c| 21|
--- ---- -----
首先,旋轉type列并使用其對應的聚合它value,如下所示 -
df1 = df.groupBy("id").pivot("type").agg(first("value"))
df1.show()
--- ---- ---- ----
| id| a| b| c|
--- ---- ---- ----
| 1| 11| 12|null|
| 2|null|null| 21|
--- ---- ---- ----
一旦你有了這個,你必須將null值替換為它的string等價物。話雖如此,spark 在從型別null創建json列時會忽略值。struct見下文 -
df1.select(*(df1.columns)).fillna("null").withColumn("json", regexp_replace(to_json(struct(col("a"), col("b"), col("c"))), "null", "")).drop("a", "b", "c").show(truncate=False)
--- --------------------------
|id |json |
--- --------------------------
|1 |{"a":"11","b":"12","c":""}|
|2 |{"a":"","b":"","c":"21"} |
--- --------------------------
uj5u.com熱心網友回復:
df = spark.createDataFrame(
[
('1','a','11'),
('1','b','12'),
('2','c','21')
], ['id','type','value']
)
from pyspark.sql import functions as F
from pyspark.sql.types import *
def input_type(json):
dic_json = eval(json)
for x in ['a', 'b', 'c']:
if x not in dic_json: dic_json[x] = ''
return dic_json
input_type_udf = F.udf(input_type, StringType())
df.groupBy("id")\
.agg(F.map_from_entries(F.collect_list(F.struct("type", "value"))).alias("type_value"))\
.withColumn('json', F.to_json('type_value'))\
.withColumn('dic_json', input_type_udf(F.col('json')))\
.show(truncate=False)
--- ------------------ ------------------- ----------------
|id |type_value |json |dic_json |
--- ------------------ ------------------- ----------------
|1 |{a -> 11, b -> 12}|{"a":"11","b":"12"}|{a=11, b=12, c=}|
|2 |{c -> 21} |{"c":"21"} |{a=, b=, c=21} |
--- ------------------ ------------------- ----------------
轉載請註明出處,本文鏈接:https://www.uj5u.com/net/442695.html
