我有兩張桌子
----- -----
|store|sales|
----- -----
| F| 4000|
| M| 3000|
| A| 4000|
----- ----- `
----- ------
| upc| store|
----- ------
|40288|[F, M]|
|42114| [M]|
|39192|[F, A]|
----- ------ `
我希望進入決賽桌
----- ------ -----
| upc| store|sales|
----- ------ -----
|40288|[F, M]| 7000|
|42114| [M]| 3000|
|39192|[F, A]| 8000|
----- ------ -----
請使用此代碼生成資料框
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql import *
spark = SparkSession.builder.appName("SparkByExamples.com").getOrCreate()
data2 = [
("F", 4000),
("M", 3000),
("A", 4000),
]
schema = StructType(
[
StructField("store", StringType(), True),
StructField("sales", IntegerType(), True),
]
)
df11 = spark.createDataFrame(data=data2, schema=schema)
data3 = [
("40288", ["F", "M"]),
("42114", ["M"]),
("39192", ["F", "A"]),
]
schema = StructType(
[
StructField("upc", StringType(), True),
StructField("store", StringType(), True),
]
)
df22 = spark.createDataFrame(data=data3, schema=schema)
我可以使用回圈來完成這項作業,但對于 big_data 來說效率很低。我有一段帶有用于熊貓資料框的回圈的代碼,但現在遷移到 Pyspark,因此需要 Pyspark 中的等效代碼。有沒有更好的方法可以不用回圈來獲得如上所示的 final_table?
for i, row in df22.iterrows():
new_sales = df11[df11.store.isin(df22[df22.upc == row.upc]["store"].values[0])][
"sales"
].sum()
df22.at[i, "sales"] = new_sales
uj5u.com熱心網友回復:
您可以join基于 array_contains. 加入后,在 df22 和sales 中按upc和分組。storesum
from pyspark.sql import functions as F
df11_with_df22 = df11.join(df22, F.array_contains(df22["store"], df11["store"]))
df11_with_df22.groupBy(df22["upc"], df22["store"]).agg(F.sum("sales").alias("sales")).show()
輸出
----- ------ -----
| upc| store|sales|
----- ------ -----
|40288|[F, M]| 7000|
|39192|[F, A]| 8000|
|42114| [M]| 3000|
----- ------ -----
uj5u.com熱心網友回復:
稍微修改您的輸入,因為df22["store"]您的示例中的型別為 Array(String):
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType
data3 = [
("40288", ["F", "M"]),
("42114", ["M"]),
("39192", ["F", "A"]),
]
schema = StructType(
[
StructField("upc", StringType(), True),
StructField("store", ArrayType(StringType()), True),
]
)
df22 = spark.createDataFrame(data=data3, schema=schema)
df22 更連貫:
df22.show()
----- ------
| upc| store|
----- ------
|40288|[F, M]|
|42114| [M]|
|39192|[F, A]|
----- ------
df22.printSchema()
root
|-- upc: string (nullable = true)
|-- store: array (nullable = true)
| |-- element: string (containsNull = true)
根據這些資料,我分解了商店,使用它來加入,然后在 upc 上聚合以將商店重新創建為串列并生成銷售額。
from pyspark.sql import functions as F
df = (
df22.withColumn("store", F.explode("store"))
.join(df11, on="store")
.groupBy("upc")
.agg(F.collect_list("store").alias("store"), F.sum("sales").alias("sales"))
)
結果:
df.show()
----- ------ -----
| upc| store|sales|
----- ------ -----
|42114| [M]| 3000|
|40288|[F, M]| 7000|
|39192|[F, A]| 8000|
----- ------ -----
df.printSchema()
root
|-- upc: string (nullable = true)
|-- store: array (nullable = true)
| |-- element: string (containsNull = true)
|-- sales: long (nullable = true)
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/381284.html
上一篇:在Pythonpandas中將xlsx檔案轉換為字典
下一篇:熊貓強制重新索引重復軸
