賞金將在 4 天后到期。此問題的答案有資格獲得 50聲望賞金。 約翰康斯坦丁想要引起更多的關注這個問題。
我有一個增量表,其中包含來自 kafka 的節儉資料,并且我正在使用 UDF 對其進行反序列化。使用常規 UDF 時沒有問題,但嘗試使用 Pandas UDF 時出現錯誤。
這運行良好,即 ruglar UDF
def decoder(thrift_data):
schema_file = thriftpy2.load("/dbfs/FileStore/schema_file.thrift")
schema = schema_file.SchemaClass()
decoded_payload = deserialize(schema, thrift_data, TCyBinaryProtocolFactory())
json_data = proto.struct_to_json(decoded_payload)
return json.dumps(json_data)
decoder_udf = udf(decoder, StringType())
data = spark.sql("""SELECT value FROM data_table""")
data = data.withColumn('decoded_json', decoder_udf(data.value))
但是當我使用 Pandas UDF
def decoder(thrift_data: pd.Series) -> pd.Series:
schema_file = thriftpy2.load("/dbfs/FileStore/schema_file.thrift")
schema = schema_file.SchemaClass()
decoded_payload = deserialize(schema, thrift_data, TCyBinaryProtocolFactory())
json_data = proto.struct_to_json(decoded_payload)
return json.dumps(json_data)
decoder_udf = pandas_udf(decoder, returnType=StringType())
data = spark.sql("""SELECT value FROM data_table""")
data = data.withColumn('decoded_json', decoder_udf(data.value))
我收到一個錯誤 PythonException:'RuntimeError: Result vector from pandas_udf was not the required length: expected 5000, got 651'.
uj5u.com熱心網友回復:
找出解決方案,我們必須將輸出作為一個系列回傳
轉載請註明出處,本文鏈接:https://www.uj5u.com/gongcheng/453289.html
