如何分解具有可變長度和潛在空值的多個陣列列?
我的輸入資料如下所示:
---- ------------ -------------- --------------------
|col1| col2| col3| col4|
---- ------------ -------------- --------------------
| 1|[id_1, id_2]| [tim, steve]| [apple, pear]|
| 2|[id_3, id_4]| [jenny]| [avocado]|
| 3| null|[tommy, megan]| [apple, strawberry]|
| 4| null| null|[banana, strawberry]|
---- ------------ -------------- --------------------
我需要這樣爆炸:
- 具有相同索引的陣列項映射到同一行
- 如果一列中只有 1 個條目,則適用于每個展開的行
- 如果陣列為空,則適用于每一行
我的輸出應該是這樣的:
---- ---- ----- ----------
|col1|col2|col3 |col4 |
---- ---- ----- ----------
|1 |id_1|tim |apple |
|1 |id_2|steve|pear |
|2 |id_3|jenny|avocado |
|2 |id_4|jenny|avocado |
|3 |null|tommy|apple |
|3 |null|megan|strawberry|
|4 |null|null |banana |
|4 |null|null |strawberry|
---- ---- ----- ----------
我已經能夠使用以下代碼實作這一點,但我覺得必須有一個更直接的方法:
df = spark.createDataFrame(
[
(1, ["id_1", "id_2"], ["tim", "steve"], ["apple", "pear"]),
(2, ["id_3", "id_4"], ["jenny"], ["avocado"]),
(3, None, ["tommy", "megan"], ["apple", "strawberry"]),
(4, None, None, ["banana", "strawberry"])
],
["col1", "col2", "col3", "col4"]
)
df.createOrReplaceTempView("my_table")
spark.sql("""
with cte as (
SELECT
col1,
col2,
col3,
col4,
greatest(size(col2), size(col3), size(col4)) as max_array_len
FROM my_table
), arrays_extended as (
select
col1,
case
when col2 is null then array_repeat(null, max_array_len)
else col2
end as col2,
case
when size(col3) = 1 then array_repeat(col3[0], max_array_len)
when col3 is null then array_repeat(null, max_array_len)
else col3
end as col3,
case
when size(col4) = 1 then array_repeat(col4[0], max_array_len)
when col4 is null then array_repeat(null, max_array_len)
else col4
end as col4
from cte),
arrays_zipped as (
select *, explode(arrays_zip(col2, col3, col4)) as zipped
from arrays_extended
)
select
col1,
zipped.col2,
zipped.col3,
zipped.col4
from arrays_zipped
""").show(truncate=False)
uj5u.com熱心網友回復:
您可以將inline_outer與第一個非 null結合使用,selectExpr并另外用于處理不同陣列中的大小不匹配coalesce
資料準備
inp_data = [
(1,['id_1', 'id_2'],['tim', 'steve'],['apple', 'pear']),
(2,['id_3', 'id_4'],['jenny'],['avocado']),
(3,None,['tommy','megan'],['apple', 'strawberry']),
(4,None,None,['banana', 'strawberry'])
]
inp_schema = StructType([
StructField('col1',IntegerType(),True)
,StructField('col2',ArrayType(StringType(), True))
,StructField('col3',ArrayType(StringType(), True))
,StructField('col4',ArrayType(StringType(), True))
]
)
sparkDF = sql.createDataFrame(data=inp_data,schema=inp_schema)\
sparkDF.show(truncate=False)
---- ------------ -------------- --------------------
|col1|col2 |col3 |col4 |
---- ------------ -------------- --------------------
|1 |[id_1, id_2]|[tim, steve] |[apple, pear] |
|2 |[id_3, id_4]|[jenny] |[avocado] |
|3 |null |[tommy, megan]|[apple, strawberry] |
|4 |null |null |[banana, strawberry]|
---- ------------ -------------- --------------------
行內外
sparkDF.selectExpr("col1"
,"""inline_outer(arrays_zip(
coalesce(col2,array()),
coalesce(col3,array()),
coalesce(col4,array())
)
)""").show(truncate=False)
---- ---- ----- ----------
|col1|0 |1 |2 |
---- ---- ----- ----------
|1 |id_1|tim |apple |
|1 |id_2|steve|pear |
|2 |id_3|jenny|avocado |
|2 |id_4|null |null |
|3 |null|tommy|apple |
|3 |null|megan|strawberry|
|4 |null|null |banana |
|4 |null|null |strawberry|
---- ---- ----- ----------
uj5u.com熱心網友回復:
您可以使用UDF 函式:
from pyspark.sql import functions as F, types as T
cols_of_interest = [c for c in df.columns if c != 'col1']
@F.udf(returnType=T.ArrayType(T.ArrayType(T.StringType())))
def get_sequences(*cols):
"""Equivalent of arrays_zip, but handling different lengths of the arrays.
For shorter array than the maximum length last element is repeated.
"""
# Get the length of the longest array in the row
max_len = max(map(len, filter(lambda x: x, cols)))
return list(zip(*[
# create a list for each column with a length equal to the max_len.
# If the original column has less elements than needed, repeat the last one.
# None values will be filled with a list of Nones with length max_len.
[c[min(i, len(c) - 1)] for i in range(max_len)] if c else [None] * max_len for c in cols
]))
df2 = (
df
.withColumn('temp', F.explode(get_sequences(*cols_of_interest)))
.select('col1',
*[F.col('temp').getItem(i).alias(c) for i, c in enumerate(cols_of_interest)])
)
df2如下DataFrame:
---- ---- ----- ----------
|col1|col2| col3| col4|
---- ---- ----- ----------
| 1|id_1| tim| apple|
| 1|id_2|steve| pear|
| 2|id_3|jenny| avocado|
| 2|id_4|jenny| avocado|
| 3|null|tommy| apple|
| 3|null|megan|strawberry|
| 4|null| null| banana|
| 4|null| null|strawberry|
---- ---- ----- ----------
uj5u.com熱心網友回復:
獲取后max_array_len,只需使用序列函式遍歷陣列,將它們轉換為結構,然后分解生成的結構陣列,見下面的 SQL:
spark.sql("""
with cte as (
SELECT
col1,
col2,
col3,
col4,
greatest(size(col2), size(col3), size(col4)) as max_array_len
FROM my_table
)
SELECT inline_outer(
transform(
sequence(0,max_array_len-1), i -> (
col1 as col1,
col2[i] as col2,
coalesce(col3[i], col3[0]) as col3, /* fill null with the first array item of col3 */
coalesce(col4[i], element_at(col4,-1)) as col4 /* fill null with the last array item of col4 */
)
)
)
FROM cte
""").show()
---- ---- ----- ----------
|col1|col2| col3| col4|
---- ---- ----- ----------
| 1|id_1| tim| apple|
| 1|id_2|steve| pear|
| 2|id_3|jenny| avocado|
| 2|id_4|jenny| avocado|
| 3|null|tommy| apple|
| 3|null|megan|strawberry|
| 4|null| null| banana|
| 4|null| null|strawberry|
---- ---- ----- ----------
這里有一個類似的問題。
轉載請註明出處,本文鏈接:https://www.uj5u.com/net/522067.html
標籤:阿帕奇火花pysparkapache-spark-sql
上一篇:資料框中的字典值
