我有一個很小的 ??spark Dataframe,它基本上將字串推入 UDF。我期待,因為.repartition(3),它的長度與 相同targets,內部處理run_sequential將應用于可用的執行程式 - 即應用于 3 個不同的執行程式。
問題是只使用了 1 個執行器。如何并行化此處理以強制我的 pyspark 腳本將每個元素分配給target不同的執行程式?
import pandas as pd
import pyspark.sql.functions as F
def run_parallel(config):
def run_sequential(target):
#process with target variable
pass
return F.udf(run_sequential)
targets = ["target_1", "target_2", "target_3"]
config = {}
pdf = spark.createDataFrame(pd.DataFrame({"targets": targets})).repartition(3)
pdf.withColumn(
"apply_udf", run_training_parallel(config)("targets")
).collect()
uj5u.com熱心網友回復:
這里的問題是重新磁區 aDataFrame并不能保證所有創建的磁區都具有相同的大小。由于記錄數量如此之少,其中一些記錄很可能會映射到同一個磁區。Spark 并不意味著處理如此小的資料集,它的演算法是為高效處理大量資料而量身定制的——如果您的資料集有 300 萬條記錄,并且您將其分成 3 個磁區,每個磁區大約有100 萬條記錄,則每個磁區相差幾條記錄在大多數情況下,磁區將是微不足道的。重新磁區 3 條記錄時顯然不是這種情況。
您可以使用df.rdd.glom().map(len).collect()在重新磁區之前和之后檢查磁區的大小,以查看分布如何變化。
$ pyspark --master "local[3]"
...
>>> pdf = spark.createDataFrame([("target_1",), ("target_2",), ("target_3",)]).toDF("targets")
>>> pdf.rdd.glom().map(len).collect()
[1, 1, 1]
>>> pdf.repartition(3).rdd.glom().map(len).collect()
[0, 2, 1]
如您所見,生成的磁區不均勻,我的第一個磁區實際上是空的。具有諷刺意味的是,原始資料框具有所需的屬性,并且正在被repartition().
雖然您的特定情況不是 Spark 通常針對的目標,但仍然可以在三個磁區中強制分配三個記錄。您需要做的就是提供一個明確的磁區鍵。RDD 具有zipWithIndex()使用其 ID 擴展每條記錄的方法。ID 是完美的磁區鍵,因為它的值從 0 開始并增加 1。
>>> new_df = (pdf
.coalesce(1) # not part of the solution - see below
.rdd # Convert to RDD
.zipWithIndex() # Append ID to each record
.map(lambda x: (x[1], x[0])) # Make record ID come first
.partitionBy(3) # Repartition
.map(lambda x: x[1]) # Remove record ID
.toDF()) # Turn back into a dataframe
>>> new_df.rdd.glom().map(len).collect()
[1, 1, 1]
在上面的代碼中,coalesce(1)添加只是為了證明最終的磁區不受pdf每個磁區中最初有一個記錄的事實的影響。
僅 DataFrame 的解決方案是首先合并pdf到單個磁區,然后使用repartition(3). 在沒有提供磁區列的情況下,DataFrame.repartition()使用回圈磁區器,因此將實作所需的磁區。您不能簡單地這樣做,pdf.coalesce(1).repartition(3)因為 Catalyst(Spark 查詢優化引擎)優化了合并操作,因此必須在其間插入依賴于磁區的操作。添加包含的列F.monotonically_increasing_id()是此類操作的理想選擇。
>>> new_df = (pdf
.coalesce(1)
.withColumn("id", F.monotonically_increasing_id())
.repartition(3))
>>> new_df.rdd.glom().map(len).collect()
[1, 1, 1]
請注意,與基于 RDD 的解決方案不同,coalesce(1)它需要作為解決方案的一部分。
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/465519.html
