我有一個 PySpark 資料框:
| 用戶身份 | 庫存單位 | 行動 |
|---|---|---|
| 123 | 2345 | 2 |
| 123 | 2345 | 0 |
| 123 | 5422 | 0 |
| 123 | 7622 | 0 |
| 231 | 4322 | 2 |
| 231 | 4322 | 0 |
| 231 | 8342 | 0 |
| 231 | 5342 | 0 |
輸出應該是這樣的:
| 用戶身份 | sku_pos | sku_neg |
|---|---|---|
| 123 | 2345 | 5422 |
| 123 | 2345 | 7622 |
| 231 | 4322 | 8342 |
| 231 | 4322 | 5342 |
對于每個不同的“userid”,沒有“action”>0 的“sku”將進入“sku_neg”列,而“action”>0 的“sku”將進入“sku_pos”列。
uj5u.com熱心網友回復:
需要幾個聚合:
- 首先,將pos/neg狀態分配給“sku”
- 然后在第二個聚合中使用此狀態將所有“sku”收集到串列中
最后,分解串列。
輸入:
from pyspark.sql import functions as F
df = spark.createDataFrame(
[('123', '2345', 2),
('123', '2345', 0),
('123', '5422', 0),
('123', '7622', 0),
('231', '4322', 2),
('231', '4322', 0),
('231', '8342', 0),
('231', '5342', 0)],
['userid', 'sku', 'action'])
腳本:
df = df.groupBy('userid', 'sku').agg(
F.when(F.max('action') > 0, 'p').otherwise('n').alias('_flag')
)
df = (df
.groupBy('userid').pivot('_flag', ['p', 'n']).agg(F.collect_list('sku'))
.withColumn('sku_pos', F.explode('p'))
.withColumn('sku_neg', F.explode('n'))
.drop('p', 'n')
)
df.show()
# ------ ------- -------
# |userid|sku_pos|sku_neg|
# ------ ------- -------
# | 231| 4322| 5342|
# | 231| 4322| 8342|
# | 123| 2345| 7622|
# | 123| 2345| 5422|
# ------ ------- -------
uj5u.com熱心網友回復:
通過過濾 pos/neg 記錄并按“userid”分組來創建正負資料幀:
df_pos = df \
.filter(F.col("action") > 0) \
.groupBy("userid") \
.agg(F.collect_set("sku").alias("sku_pos_list")) \
.withColumnRenamed("userid", "userid_pos")
[Out]:
---------- ------------
|userid_pos|sku_pos_list|
---------- ------------
| 123| [2345]|
| 231| [4322]|
---------- ------------
df_neg = df \
.filter(F.col("action") <= 0) \
.groupBy("userid") \
.agg(F.collect_set("sku").alias("sku_neg_list")) \
.withColumnRenamed("userid", "userid_neg")
[Out]:
---------- ------------------
|userid_neg| sku_neg_list|
---------- ------------------
| 123|[2345, 5422, 7622]|
| 231|[8342, 4322, 5342]|
---------- ------------------
加入正負資料幀并分解 pos/neg 記錄:
df_joined = df_pos.join(df_neg, (F.col("userid_pos")==F.col("userid_neg")), how="full")
# Clean up null, empty
df_joined = df_joined \
.withColumn("userid", F.when(F.col("userid_pos").isNotNull(), F.col("userid_pos")).otherwise(F.col("userid_neg"))).drop("userid_pos", "userid_neg") \
.withColumn("sku_pos_list", F.when(F.col("sku_pos_list").isNull(), F.array([F.lit(-1)])).otherwise(F.col("sku_pos_list"))) \
.withColumn("sku_neg_list", F.when(F.col("sku_neg_list").isNull(), F.array([F.lit(-1)])).otherwise(F.col("sku_neg_list")))
[Out]:
------------ ------------------ ------
|sku_pos_list|sku_neg_list |userid|
------------ ------------------ ------
|[2345] |[2345, 5422, 7622]|123 |
|[4322] |[8342, 4322, 5342]|231 |
------------ ------------------ ------
df_joined = df_joined \
.withColumn("sku_pos", F.explode("sku_pos_list")) \
.withColumn("sku_neg", F.explode("sku_neg_list")) \
.drop("sku_pos_list", "sku_neg_list") \
.filter(F.col("sku_pos") != F.col("sku_neg"))
[Out]:
------ ------- -------
|userid|sku_pos|sku_neg|
------ ------- -------
| 123| 2345| 5422|
| 123| 2345| 7622|
| 231| 4322| 8342|
| 231| 4322| 5342|
------ ------- -------
使用的資料集:
df = spark.createDataFrame([
(123,2345,2),
(123,2345,0),
(123,5422,0),
(123,7622,0),
(231,4322,2),
(231,4322,0),
(231,8342,0),
(231,5342,0),
], ["userid", "sku", "action"])
uj5u.com熱心網友回復:
另一個提議的解決方案似乎非常好,但以防萬一,另一種不需要連接的方法。請注意,我假設sku_pos每個userid. 如果不是這種情況,這將行不通。
spark.read.option("header", "true").csv("sku")\
.withColumn("action", f.col("action") > 0)\
.groupBy("userid", "sku")\
.agg(f.max("action").alias("action"))\
.groupBy("userid", "action")\
.agg(f.collect_set("sku").alias("skus"))\
.withColumn("sku_pos", f.col("skus").getItem(0))\
.withColumn("sku_neg", f.when(~ f.col("action"), f.col("skus")))\
.groupBy("userid")\
.agg(f.first("sku_pos").alias("sku_pos"), f.first("sku_neg", ignorenulls=True).alias("sku_neg"))\
.withColumn("sku_neg", f.explode("sku_neg"))\
.show()\
------ ------- -------
|userid|sku_pos|sku_neg|
------ ------- -------
| 123| 5422| 5422|
| 123| 5422| 7622|
| 231| 4322| 5342|
| 231| 4322| 8342|
------ ------- -------
基本上這個想法是首先使用 agroupBy來分別收集正面和負面sku。然后我f.col("skus").getItem(0)只選擇一個sku_pos,使用另一個groupBy每行一行userid,最后炸開sku_neg陣列。
轉載請註明出處,本文鏈接:https://www.uj5u.com/caozuo/519595.html
上一篇:hadoop在datamechanics中安裝在哪里
下一篇:我可以通過AzureDatabricks將訊息作為批處理作業發送到KAFKA集群嗎(一旦我發送的訊息用完就關閉我的連接)?
