我有一個看起來像這樣的資料集:
idx | attributes
--------------------------
101 | ['a','b','c']
102 | ['a','b','d']
103 | ['b','c']
104 | ['c','e','f']
105 | ['a','b','c']
106 | ['c','g','h']
107 | ['b','d']
108 | ['d','g','i']
我希望將上面的資料框轉換成這樣的:
idx | attributes
--------------------------
101 | [0,1,2]
102 | [0,1,3]
103 | [1,2]
104 | [2,4,5]
105 | [0,1,2]
106 | [2,6,7]
107 | [1,3]
108 | [3,6,8]
在這里,“a”被 0 替換,“b”被 1 替換,依此類推。本質上,我希望找到所有唯一元素并為它們分配數字,以便可以對它們進行整數運算。我目前的方法是使用 RDD 來維護單個集合并跨行回圈,但它是高度記憶體和時間密集型的。PySpark 中還有其他方法嗎?
提前致謝
uj5u.com熱心網友回復:
注釋代碼
from pyspark.ml.feature import StringIndexer
# Explode the dataframe by `attributes`
df1 = df.selectExpr('idx', "explode(attributes) as attributes")
# Create a StringIndexer to encode the labels
idx = StringIndexer(inputCol='attributes', outputCol='encoded', stringOrderType='alphabetAsc')
df1 = idx.fit(df1).transform(df1)
# group the encoded column by idx and aggregate using `collect_list`
df1 = df1.groupBy('idx').agg(F.collect_list(F.col('encoded').cast('int')).alias('attributes'))
結果
df1.show()
--- ----------
|idx|attributes|
--- ----------
|101| [0, 1, 2]|
|102| [0, 1, 3]|
|103| [1, 2]|
|104| [2, 4, 5]|
|105| [0, 1, 2]|
|106| [2, 6, 7]|
|107| [1, 3]|
|108| [3, 6, 8]|
--- ----------
uj5u.com熱心網友回復:
這可以在 spark 2.4 中作為一個內襯來完成。在 spark 3.0 中,這可以在沒有 expr 的情況下完成。
df = spark.createDataFrame(data=[(101,['a','b','c']),
(102,['a','b','d']),
(103,['b','c']),
(104,['c','e','f']),
(105,['a','b','c']),
(106,['c','g','h']),
(107,['b','d']),
(108,['d','g','i']),],schema = ["idx","attributes"])
df.select(df.idx, expr("transform( attributes, x -> ascii(x)-96)").alias("attributes") ).show()
--- ----------
|idx|attributes|
--- ----------
|101| [1, 2, 3]|
|102| [1, 2, 4]|
|103| [2, 3]|
|104| [3, 5, 6]|
|105| [1, 2, 3]|
|106| [3, 7, 8]|
|107| [2, 4]|
|108| [4, 7, 9]|
--- ----------
棘手的一點:expr("transform( attributes, x -> ascii(x)-96)")
expr用來說這是一個SQL運算式transform接受一列[即陣列]并將函式應用于陣列中的每個元素(x是陣列元素的 lambda 引數。->函式開始和)函式結束。ascii(x)-96)將ASCII碼轉換為整數。
如果您正在考慮性能,您可以考慮我的答案與迄今為止提供的另一個答案的解釋計劃:
df1.groupBy('idx').agg(collect_list(col('encoded').cast('int')).alias('attributes')).explain()
== Physical Plan ==
ObjectHashAggregate(keys=[idx#24L], functions=[collect_list(cast(encoded#140 as int), 0, 0)])
- Exchange hashpartitioning(idx#24L, 200)
- ObjectHashAggregate(keys=[idx#24L], functions=[partial_collect_list(cast(encoded#140 as int), 0, 0)])
- *(1) Project [idx#24L, UDF(attributes#132) AS encoded#140]
- Generate explode(attributes#25), [idx#24L], false, [attributes#132]
- Scan ExistingRDD[idx#24L,attributes#25]
我的答案:
df.select(df.idx, expr("transform( attributes, x -> ascii(x)-96)").alias("attributes") ).explain()
== Physical Plan ==
Project [idx#24L, transform(attributes#25, lambdafunction((ascii(lambda x#128) - 96), lambda x#128, false)) AS attributes#127]
轉載請註明出處,本文鏈接:https://www.uj5u.com/caozuo/491783.html
標籤:阿帕奇火花 pyspark apache-spark-sql
上一篇:加入資料框并合并/替換列值
下一篇:基于月份間隔的下一個未來日期
