根據函式的輸入和輸出型別,有不同種類的 pandasUDFType。
有:
系列到系列PandasUDFType.SCALAR:
from pyspark.sql.functions import pandas_udf, PandasUDFType
@pandas_udf('long', PandasUDFType.SCALAR)
def pandas_plus_one(v):
return v 1
spark.range(10).select(pandas_plus_one("id")).show()
并且還有系列迭代器到系列迭代器PandasUDFType.SCALAR_ITER:
from pyspark.sql.functions import pandas_udf, PandasUDFType
@pandas_udf('long', PandasUDFType.SCALAR_ITER)
def pandas_plus_one(iterator):
return map(lambda s: s 1, iterator)
spark.range(10).select(pandas_plus_one("id")).show()
您能否給我一個簡單的用例,該用例無法通過系列到系列PandasUDFType.SCALAR來解決,并且可以通過系列的迭代器到系列的迭代器來解決PandasUDFType.SCALAR_ITER。我似乎無法理解在另一個仍然存在時需要擁有一個
uj5u.com熱心網友回復:
根據官方檔案和Databricks 檔案,這兩種 Pandas UDF 非常相似,但在某些方面有所不同。除了輸入和輸出型別的不同,Iterator of Series to Iterator of Series UDF只能將單個列作為輸入,而Scalar UDF可以將多個輸入列。為了使迭代器UDF需要多火花列一樣,您需要使用多個系列,以系列UDF的Iterator的迭代器是基本相同,Iterator of Series to Iterator of Series UDF但需要的迭代元組的p.Series作為引數。
迭代器 UDF在以下情況下很有用:
- 您需要預取輸入迭代器
@pandas_udf("long")
def do_something(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
threading.Thread(consume, args=(iterator, q)) # prefetch the iterator
for s in q:
yield func(s)
- 在處理每個批次之前,您需要進行一些昂貴的狀態初始化:
@pandas_udf("long")
def do_something(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
s = some_initialization() # initialize states
for x in iterator:
yield func(x, s) # use the state for the whole iterator
但是,檔案中有這樣的參考,這引起了一些混亂,因為它在內部表示它與系列到系列的作業方式相同:
當 UDF 執行需要初始化某些狀態時,它也很有用,盡管在內部它的作業方式與系列到系列的 情況相同
轉載請註明出處,本文鏈接:https://www.uj5u.com/qiye/405150.html
標籤:
上一篇:如何將陣列陣列轉換為火花中的列?
