我有一個可以參考 status_code 并回傳正文的 udf。
def Api(a):
path = endpoint
headers = {'sample-Key': sample}
body = [{'text': body }]
res = None
try:
req = requests.post(path, params=params, headers=headers, json=body)
req = req.json()
dumps=json.dumps(req)
except Exception as e:
return e
if res != None and req.status_code == 200:
return json.loads(dumps)
return None
udf_Api = udf(Api)
newDF=df.withColumn("output", udf_Api(col("input")))
我可以回傳 json.loads 并將其放入資料框。但是,我的問題是我還需要將 status_code 保留在單獨的列中。所以輸出看起來像:
--------- ----------- ----------
| input|status_code| output|
--------- ----------- ----------
|inputText| 200|outputText|
--------- ----------- ----------
那么如何同時回傳 req.status_code 和 json.loads(),但將它們放入資料幀中的單獨列中?我想回傳一個陣列然后拆分它,但不知道該怎么做。
uj5u.com熱心網友回復:
您可以修改您的 UDF 以回傳一個 dict 而不是字串或整數,然后定義輸出模式。
from pyspark.sql import functions as F
from pyspark.sql import types as T
def Api(a):
return {
'status': 200,
'data': '{"a": 1}'
}
schema = T.StructType([
T.StructField('status', T.IntegerType()),
T.StructField('data', T.StringType())
])
(df
.withColumn('output', F.udf(Api, schema)('col'))
.select('col', 'output.*')
.show()
)
# --- ------ --------
# |col|status| data|
# --- ------ --------
# | 10| 200|{"a": 1}|
# | 20| 200|{"a": 1}|
# | 30| 200|{"a": 1}|
# --- ------ --------
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/339895.html
