我在 pyspark df 中有一個列,其中包含一組地圖,如下所示:
[{"address": "Fadden", "city": "", "country": "", "note": "", "stateProvince": "Queensland"}]
df.printSchema()為列回傳以下內容:
|-- constituencies: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- address: string (nullable = true)
| | |-- city: string (nullable = true)
| | |-- country: string (nullable = true)
| | |-- note: string (nullable = true)
| | |-- stateProvince: string (nullable = true)
我想取消所有這些空字串。所以我認為這將是一個可以解決的完美問題F.transform(col, f)
所以我創建了這個函式,然后在轉換運算式中使用它,如下所示:
def nullify_vals(d):
def nullify_string(str_):
if str_.strip() == "":
return None
return str_.strip()
return (
dict((k, nullify_string(v)) for k, v in d.items())
)
請注意,上述方法在字典上測驗時有效:
dd = {"my": "map", "is": "", "not": " ", "entierly": " empty , right?"}
d_cln = nullify_vals(dd)
d_cln["not"] is None # returns True
但是當我在 Pyspark 中使用它時,它給了我一個錯誤:
import pyspark.sql.functions as F
result = kyclean.select(F.transform("constituencies", nullify_vals))
TypeError:“列”物件不可呼叫
這些是堆疊跟蹤的最后幾行:
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
File <command-899394298900126>:1, in <module>
----> 1 result = kyclean.select(F.transform("constituencies", nullify_vals))
File /databricks/spark/python/pyspark/sql/functions.py:4260, in transform(col, f)
4214 def transform(col, f):
4215 """
4216 Returns an array of elements after applying a transformation to each element in the input array.
4217
(...)
4258 --------------
4259 """
-> 4260 return _invoke_higher_order_function("ArrayTransform", [col], [f])
File /databricks/spark/python/pyspark/sql/functions.py:4209, in _invoke_higher_order_function(name, cols, funs)
4206 expr = getattr(expressions, name)
4208 jcols = [_to_java_column(col).expr() for col in cols]
-> 4209 jfuns = [_create_lambda(f) for f in funs]
4211 return Column(sc._jvm.Column(expr(*jcols jfuns)))
uj5u.com熱心網友回復:
您的函式nullify_vals應該采用 StructType 型別的 Column 物件,因為您的陣列元素是結構。但是你傳遞的是一個普通的 python 物件。
嘗試像這樣重寫它:
from pyspark.sql import functions as F, Column
def nullify_vals(struct_col: Column, fields: List[str]) -> Column:
for f in fields:
struct_col = struct_col.withField(
f,
F.when(F.trim(struct_col[f]) == "", None).otherwise(struct_col[f])
)
return struct_col
對于內部結構中的每個欄位,我們使用 columnwithField方法對其進行更新,如果它等于空字串,則將其設定為 null。
應用于您的輸入示例:
json_str = '{"constituencies":[{"address":"Fadden","city":"","country":"","note":"","stateProvince":"Queensland"}]}'
df = spark.read.json(spark.sparkContext.parallelize([json_str]))
您可以constituencies從資料框架構中獲取結構欄位串列:
constituencies_fields = df.selectExpr("inline(constituencies)").columns
df1 = df.withColumn(
"constituencies",
F.transform("constituencies", lambda x: nullify_vals(x, constituencies_fields))
)
df1.show(truncate=False)
# ----------------------------------------
#|constituencies |
# ----------------------------------------
#|[{Fadden, null, null, null, Queensland}]|
# ----------------------------------------
uj5u.com熱心網友回復:
我仍在調查您遇到的錯誤,當我找出問題所在時,我會更新帖子。與此同時,你可以做這樣的事情來解決它
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
schema = ArrayType(
StructType([
StructField('address', StringType()),
StructField('city', StringType()),
StructField('country', StringType()),
StructField('note', StringType()),
StructField('stateProvince', StringType()),
]), True)
nullify_udf = udf(lambda arr: [[(v if v.strip() != "" else None) for v in area] for area in arr], schema)
result = kyclean.withColumn('constituencies', nullify_udf('constituencies'))
您得到的具體錯誤是您不能d.items()作為函式呼叫,并且輸入函式確實需要處理d傳入的 Column 物件。
描述pyspark.sql.functions.transform說,“在對輸入陣列中的每個元素應用轉換后回傳一個元素陣列。”
但是在接受函式的描述中f,它說,“......并且可以使用 的方法,不支持在和.中Column定義的函式(SPARK-27052)。” 所以它還不能接受定制,這就是你想要做的。pyspark.sql.functionsScala UserDefinedFunctionsPython UserDefinedFunctionsPython UserDefinedFunctions
轉載請註明出處,本文鏈接:https://www.uj5u.com/net/429916.html
