我有一個這樣的資料框:
columns = ['manufacturer', 'product_id']
data = [("Factory", "AE222"), ("Sub-Factory-1", "0"), ("Sub-Factory-2", "0"),("Factory", "AE333"), ("Sub-Factory-1", "0"), ("Sub-Factory-2", "0")]
rdd = spark.sparkContext.parallelize(data)
df = rdd.toDF(columns)
------------- ----------
| manufacturer|product_id|
------------- ----------
| Factory| AE222|
|Sub-Factory-1| 0|
|Sub-Factory-2| 0|
| Factory| AE333|
|Sub-Factory-1| 0|
|Sub-Factory-2| 0|
------------- ----------
我想變成這樣:
------------- ----------
| manufacturer|product_id|
------------- ----------
| Factory| AE222|
|Sub-Factory-1| AE222|
|Sub-Factory-2| AE222|
| Factory| AE333|
|Sub-Factory-1| AE333|
|Sub-Factory-2| AE333|
------------- ----------
這樣每個人都從當前行上方Sub-Factory最接近的值中獲取值。我可以用嵌套的 for 回圈來解決它,但它不是很有效,因為可能有數百萬行。我研究了 Pyspark Window 功能,但無法真正理解它。有任何想法嗎?FactorySub-Factory
uj5u.com熱心網友回復:
您可以在視窗上使用first函式。ignorenulls=True但是您需要識別 的組manufacturer才能按group.
由于您沒有提供ID我正在使用monotonically_increasing_id的任何列和累積條件總和來創建組列:
from pyspark.sql import functions as F
df1 = df.withColumn(
"row_id",
F.monotonically_increasing_id()
).withColumn(
"group",
F.sum(F.when(F.col("manufacturer") == "Factory", 1)).over(Window.orderBy("row_id"))
).withColumn(
"product_id",
F.when(
F.col("product_id") == 0,
F.first("product_id", ignorenulls=True).over(Window.partitionBy("group").orderBy("row_id"))
).otherwise(F.col("product_id"))
).drop("row_id", "group")
df1.show()
# ------------- ----------
#| manufacturer|product_id|
# ------------- ----------
#| Factory| AE222|
#|Sub-Factory-1| AE222|
#|Sub-Factory-2| AE222|
#| Factory| AE333|
#|Sub-Factory-1| AE333|
#|Sub-Factory-2| AE333|
# ------------- ----------
轉載請註明出處,本文鏈接:https://www.uj5u.com/qiye/419842.html
標籤:
上一篇:如何從bash中的字串獲取版本號
下一篇:將陣列轉換為結構pyspark
