我對 spark 很陌生,我想知道這是否會改變有關記憶體消耗以及如何將任務分配給其作業人員的任何內容。請參閱下面的最小示例,以便您能夠理解我在問什么。
# import thing for the pandas udf
import pyspark.sql.functions as F
import pyspark.sql.types as T
# for creating minimal example
import pandas as pd
import numpy as np
#create minimal example
df_minimal_example = pd.DataFrame({"x":np.arange(0,50,1), "y":np.arange(50,100,1) })
# crate a random integer
df_minimal_example["PARTITION_ID"] = np.random.randint(0,2,size=len(df_minimal_example) )
sdf_minimal_example = spark.createDataFrame(df_minimal_example)
讓我們列印輸出
x y PARTITION_ID
0 0 50 1
1 1 51 0
2 2 52 1
3 3 53 1
4 4 54 0
現在我將執行pandas udf,以便能夠在spark中使用我的python函式
schema = T.StructType([T.StructField('xy', T.FloatType() ),
T.StructField('x2', T.FloatType() ),
T.StructField('y2', T.FloatType() ),
T.StructField('PARTITION_ID', T.LongType() )
]
)
@F.pandas_udf(schema, F.PandasUDFType.GROUPED_MAP)
def newfunction(pdf):
pdf["xy"] = pdf["x"]*pdf["y"]
pdf["x2"] = pdf["x"]*pdf["x"]
pdf["y2"] = pdf["y"]*pdf["y"]
cols2retrieve = ["PARTITION_ID","xy","x2","y2"]
newpdf = pdf[cols2retrieve].copy()
return newpdf
newpdf = sdf_minimal_example.groupby("PARTITION_ID").apply(newfunction)
# to see results
display(newpdf )
如您所見,我在應用 Pandas udf 函式時使用 .groupby("PARTITION_ID") ;并且“PARTITION_ID”列有 1 或 0。問題是:如果 PARTITION_ID 有 0 到 100 之間的整數怎么辦?例如:
#instead of this
df_minimal_example["PARTITION_ID"] = np.random.randint(0,2,size=len(df_minimal_example) )
# use this
df_minimal_example["PARTITION_ID"] = np.random.randint(0,100,size=len(df_minimal_example) )
這是否會改變有關記憶體問題以及如何將任務分配給每個作業人員的任何內容?如果有人可以提供更多有關此的資訊,那就太好了。
uj5u.com熱心網友回復:
groupby 是 Spark 中的 Wide 轉換,這意味著需要對資料進行打亂,并且此操作通常會消耗記憶體。
將聚合鍵從 2 更改為 100 會如何影響性能很難提前說清楚,因為這取決于資料的“物理”重新磁區。
您可以使用PARTITION_ID它重新磁區您的資料,如果您將此列用于joins或,它可以加快操作速度groupby。
我說“可能”是因為有一個權衡,并且有很多小檔案可能會影響其他活動的性能,所以它不像只是在右列上重新磁區以查看性能改進那么簡單。
有關更多詳細資訊,請參閱此帖子。
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/360760.html
上一篇:如果檔案大小大于spark中的驅動程式大小會發生什么?
下一篇:如何在Databricks上讀取PySpark中的json檔案時跳過/忽略重復列。將運行時從7.3LTS(Spark3.0.1)升級到9.1LTS(Spark3.1.2)
