我正在嘗試使用 pyspark 對每個磁區中的 spark 資料幀和元素求和進行磁區。但我無法在呼叫函式“sumByHour”中執行此操作。基本上,我無法訪問“sumByHour”中的資料框列。
基本上,我按“小時”列進行磁區,并嘗試根據“小時”磁區對元素求和。因此預期輸出為:0、1、2 小時分別為 6、15、24。在下面嘗試沒有運氣。
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pandas as pd
def sumByHour(ip):
print(ip)
pandasDF = pd.DataFrame({'hour': [0,0,0,1,1,1,2,2,2], 'numlist': [1,2,3,4,5,6,7,8,9]})
myschema = StructType(
[StructField('hour', IntegerType(), False),
StructField('numlist', IntegerType(), False)]
)
myDf = spark.createDataFrame(pandasDF, schema=myschema)
mydf = myDf.repartition(3, "hour")
myDf.foreachPartition(sumByHour)
我可以用“window.partitionBy”解決這個問題。但我想知道它是否可以通過“foreachPartition”來解決。
提前致謝,
斯里蘭卡
uj5u.com熱心網友回復:
感謝您提供的代碼示例,它讓這一切變得簡單。這是一個非常簡單的示例,它修改了 sumByHour 代碼:
def sumByHour(ip):
mySum = 0
myPartition = ""
for x in ip:
mySum = x.numlist
myPartition = x.hour
myString = '{}_{}'.format(mySum, myPartition)
print(myString)
mydf = myDf.repartition(5,"hour") #wait 5 I wanted 3!!!
你幾乎得到了預期的結果:
>>> mydf.foreachPartition(sumByHour)
0_
0_
24_2
6_0
15_1
>>>
您可能會問為什么按“5”而不是“3”磁區?事實證明,用于 3 個磁區的哈希公式與 (0,1) 沖突進入同一磁區,然后有一個空磁區。(運氣不好)所以這會起作用,但是你只想在一個陣列上使用它適合記憶。
uj5u.com熱心網友回復:
您可以使用 aWindow來執行此操作并將其添加sumByHour為新列。
from pyspark.sql import functions, Window
w = Window.partitionBy("hour")
myDf = myDf.withColumn("sumByHour", functions.sum("numlist").over(w))
myDf.show()
---- ------- ---------
|hour|numlist|sumByHour|
---- ------- ---------
| 1| 4| 15|
| 1| 5| 15|
| 1| 6| 15|
| 2| 7| 24|
| 2| 8| 24|
| 2| 9| 24|
| 0| 1| 6|
| 0| 2| 6|
| 0| 3| 6|
---- ------- ---------
轉載請註明出處,本文鏈接:https://www.uj5u.com/yidong/421892.html
標籤:
