我正在使用 Spark RDD API 收集資料并創建了一個配對的 RDD,如下所示:
spark = SparkSession.builder.master('local').appName('app').getOrCreate()
sc = spark.sparkContext
raw_rdd = sc.textFile("data.csv")
paired_rdd = raw_rdd\
.map(lambda x: x.split(","))\
.map(lambda x: (x[2], [x[1], x[3],x[5]]))
這是配對 RDD 的示例摘錄:
[('VXIO456XLBB630221', ['I', 'Nissan', '2003']),
('VXIO456XLBB630221', ['A', '', '']),
('VXIO456XLBB630221', ['R', '', '']),
('VXIO456XLBB630221', ['R', '', ''])]
如您所見,此配對 RDD 中的鍵對于所有元素都是相同的,但只有一個元素完成了所有欄位。
我們想要完成什么?我們想用具有完整欄位的元素的值替換空欄位。所以我們會有這樣的預期輸出:
[('VXIO456XLBB630221', ['I', 'Nissan', '2003']),
('VXIO456XLBB630221', ['A', 'Nissan', '2003']),
('VXIO456XLBB630221', ['R', 'Nissan', '2003']),
('VXIO456XLBB630221', ['R', 'Nissan', '2003'])]
我知道第一步是做一個groupByKey,即
paired_rdd.groupByKey().map(lambda kv: ____)
我只是不確定如何遍歷這些值以及如何將其放入一個 lambda 函式中。
uj5u.com熱心網友回復:
最好的方法可能是使用資料框和視窗函式。使用 RDD,您也可以使用聚合 ( reduceByKey) 來解決問題,該聚合將填充空白并將串列的第一個元素的串列保留在記憶體中。然后我們可以根據該記憶體重新展平以創建與以前相同數量的行,但填充了值。
# let's define a function that selects the none empty values between two strings
def select_value(a, b):
if a is None or len(a) == 0:
return b
else:
return a
# let's use mapValues to separate the first element of the list and the rest
# Then we use reduceByKey to aggregate the list of all first elements (first
# element of the tuple). For the other elements, we only keep non empty values
# (second element of the tuple).
# Finally, we use flatMapValues to recreate the rows based on the memorized
# first elements of the lists.
paired_rdd\
.mapValues(lambda x: ([x[0]], x[1:]))\
.reduceByKey(lambda a, b: (
a[0] b[0],
[select_value(a[1][i], b[1][i]) for i in range(len(a[1])) ]
) )\
.flatMapValues(lambda x: [[k] x[1] for k in x[0]])\
.collect()
其中產生:
[('VXIO456XLBB630221', ['I', 'Nissan', '2003']),
('VXIO456XLBB630221', ['A', 'Nissan', '2003']),
('VXIO456XLBB630221', ['R', 'Nissan', '2003']),
('VXIO456XLBB630221', ['R', 'Nissan', '2003'])
]
轉載請註明出處,本文鏈接:https://www.uj5u.com/net/319107.html
下一篇:如何聚合多列并輸出為行?
