我來自這篇文章:pyspark:計算串列中不同元素的出現次數,其中 OP 詢問從陣列列中獲取不同專案的計數。如果我已經提前知道了詞匯表并想要計算一個預設長度的向量怎么辦?
所以假設我的詞匯是
vocab = ['A', 'B', 'C', 'D', 'E']
我的資料看起來像這樣(從其他帖子更改)
data = {'date': ['2014-01-01', '2014-01-02', '2014-01-03'],
'flat': ['A;A;B', 'D;B;E;B;B', 'B;A']}
data['date'] = pd.to_datetime(data['date'])
data = pd.DataFrame(data)
data['date'] = pd.to_datetime(data['date'])
spark = SparkSession.builder \
.master('local[*]') \
.config("spark.driver.memory", "500g") \
.appName('my-pandasToSparkDF-app') \
.getOrCreate()
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark.sparkContext.setLogLevel("OFF")
df=spark.createDataFrame(data)
new_frame = df.withColumn("list", F.split("flat", "\;"))
最終這就是我想要的:
------------------- ----------- ---------------------
| date| flat | counts |
------------------- ----------- ---------------------
|2014-01-01 00:00:00|A;A;B |[2, 1, 0, 0, 0] |
|2014-01-02 00:00:00|D;B;E;B;B |[0, 3, 0, 1, 1] |
|2014-01-03 00:00:00|B;A |[1, 1, 0, 0, 0] |
------------------- ----------- ---------------------
這是一個似乎效率低下的可行解決方案,改編自上一篇文章的解決方案:
from pyspark.sql import functions as F
df=spark.createDataFrame(data)
df.withColumn("list", F.split("flat","\;"))\
.withColumn("distinct_items", F.array_distinct("list") \
.withColumn("occurrences", F.expr("""array_sort(transform(distinct_items, x-> aggregate(list, 0,(acc,t)->acc IF(t=x,1,0))))"""))\
.withColumn("count_map", F.map_from_arrays("distinct_items", "occurrences"))\
.withColumn(
"counts",
F.array(
[
F.when(
F.col("count_map")
.getItem(v)
.isNull(),
0,
)
.otherwise(
F.col("count_map").getItem(v)
)
for v in vocab
]
).drop("occurrences", "distinct_items").show()
我可以在不必創建地圖然后從地圖創建陣列的情況下執行此操作嗎?在實踐中,我需要在具有大量列的大表上執行此程序,因此我想避免必須執行groupBy,agg型別操作。
uj5u.com熱心網友回復:
非常好的問題。你的直覺是完全正確的:在這種情況下可以避免洗牌。
但是,我不確定我能否解釋其中的邏輯。這是我第一次使用嵌套transform。可能還有其他聰明的方法...
from pyspark.sql import functions as F
vocab = ['A', 'B', 'C', 'D', 'E']
df = spark.createDataFrame([('A;A;B',), ('D;B;E;B;B',), ('B;A',),], ['flat'])
voc_arr = F.array([F.lit(x) for x in vocab])
df = df.withColumn('count', F.transform(voc_arr, lambda v: F.size(F.array_remove(F.transform(F.split('flat', ';'), lambda f: f == v), False))))
df.show()
# --------- ---------------
# | flat| count|
# --------- ---------------
# | A;A;B|[2, 1, 0, 0, 0]|
# |D;B;E;B;B|[0, 3, 0, 1, 1]|
# | B;A|[1, 1, 0, 0, 0]|
# --------- ---------------
uj5u.com熱心網友回復:
非常有趣的問題。
雖然它可以在沒有更高階函式的洗牌的情況下作業,但我無法計算出低于 VocabularySize * flatSize 的復雜度。
我猜還是比洗牌好。
vocabulary = ["A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K", "L", "M", "N", "O", "P",
"Q", "R", "S", "U", "V", "W", "X", "Y", "Z"]
vocabulary_df = spark.createDataFrame(
[
[{k:0 for k in vocabulary}]
],
["vocab"]
)
df \
.crossJoin(vocabulary_df) \
.withColumn("count_distinct", aggregate(
"flat",
initValue="vocab",
merge=lambda acc, flat_value: transform_values(
acc,
lambda vocab_key, vocab_value: when(
flat_value == vocab_key,
vocab_value 1
).otherwise(vocab_value)
)
)) \
.select("flat", "count_distinct") \
.show(truncate=0)
--------------- --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|flat |count_distinct |
--------------- --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|[A, A, B] |{A -> 2, B -> 1, C -> 0, D -> 0, E -> 0, F -> 0, G -> 0, H -> 0, I -> 0, J -> 0, K -> 0, L -> 0, M -> 0, N -> 0, O -> 0, P -> 0, Q -> 0, R -> 0, S -> 0, U -> 0, V -> 0, W -> 0, X -> 0, Y -> 0, Z -> 0}|
|[D, B, E, B, B]|{A -> 0, B -> 3, C -> 0, D -> 1, E -> 1, F -> 0, G -> 0, H -> 0, I -> 0, J -> 0, K -> 0, L -> 0, M -> 0, N -> 0, O -> 0, P -> 0, Q -> 0, R -> 0, S -> 0, U -> 0, V -> 0, W -> 0, X -> 0, Y -> 0, Z -> 0}|
|[B, A] |{A -> 1, B -> 1, C -> 0, D -> 0, E -> 0, F -> 0, G -> 0, H -> 0, I -> 0, J -> 0, K -> 0, L -> 0, M -> 0, N -> 0, O -> 0, P -> 0, Q -> 0, R -> 0, S -> 0, U -> 0, V -> 0, W -> 0, X -> 0, Y -> 0, Z -> 0}|
--------------- --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
uj5u.com熱心網友回復:
我有第一種方法的另一種變體,將整個詞匯放在每個陣列的前面......不確定相對優點可能是什么
from pyspark.sql import functions as F
vocab_arr = F.array([F.lit(v) for v in vocab])
df=spark.createDataFrame(data)
df.withColumn("list", F.split("flat","\;"))\
.withColumn("list_", F.concat(vocab_arr, "list")) \
.withColumn(
"counts",
F.expr("""transform(list_, x-> aggregate(list_, -1,(acc,t)->acc IF(t=x,1,0)))""")) \
.withColumn("counts", F.slice("counts", 1, len(vocab))) \
.drop("list_").show()
轉載請註明出處,本文鏈接:https://www.uj5u.com/gongcheng/488041.html
標籤:数组 阿帕奇火花 pyspark apache-spark-sql 火花洗牌
上一篇:將函式應用于array<string>列中的所有元素
下一篇:有條件地合并陣列
