我有一個包含幾個字串值/欄位名稱的串列。我也有一個 Spark RDD,我想迭代 rdd 并洗掉串列中存在的任何欄位名稱。例如:
field_list = ["name_1", "name_2"]
RDD 看起來像這樣:
[Row(field_1=1, field_2=Row(field_3=[Row(field_4=[Row(name_1='apple', name_2='banana', name_3='F'), Row(name_1='tomato', name_2='eggplant', name_3='F')])]))]
我對RDD不是很熟悉,我知道我可以map()用來執行迭代,但是如何添加條件,如果找到"name_1"或"name_2"存在于 中field_list,則洗掉值和欄位,因此預期結果是新的 RDD 看起來像:
[Row(field_1=1, field_2=Row(field_3=[Row(field_4=[Row(name_3='F'), Row(name_3='F')])]))]
uj5u.com熱心網友回復:
您可以重新創建整個結構,但沒有不需要的欄位。我不確定,也許有更好的方法,但查看Row檔案我們發現它受限于方法。
輸入:
from pyspark.sql import Row
rdd = spark.sparkContext.parallelize([
Row(field_1=1, field_2=Row(field_3=[Row(field_4=[Row(name_1='apple', name_2='banana', name_3='F'), Row(name_1='tomato', name_2='eggplant', name_3='F')])]))
])
print(rdd.collect())
# [Row(field_1=1, field_2=Row(field_3=[Row(field_4=[Row(name_1='apple', name_2='banana', name_3='F'), Row(name_1='tomato', name_2='eggplant', name_3='F')])]))]
field_list = ["name_1", "name_2"]
腳本:
F4 = Row('field_4')
F3 = Row('field_3')
F2 = Row('field_1', 'field_2')
def transform(row):
f3 = []
for x in row['field_2']['field_3']:
f4 = []
for y in x['field_4']:
Names = Row(*(set(y.asDict()) - set(field_list)))
f4.append(Names(*[y[n] for n in Names]))
f3.append(F4(f4))
return F2(row['field_1'], F3(f3))
rdd = rdd.map(transform)
print(rdd.collect())
# [Row(field_1=1, field_2=Row(field_3=[Row(field_4=[Row(name_3='F'), Row(name_3='F')])]))]
轉載請註明出處,本文鏈接:https://www.uj5u.com/qiye/528198.html
下一篇:從字典中動態呼叫不同型別的集合
