感謝這里的一些幫助。使用 Pyspark(請不要使用 SQL)。所以我有一個存盤為 RDD 對的元組串列:
[(('City1', '2020-03-27', 'X1'), 44),
(('City1', '2020-03-28', 'X1'), 44),
(('City3', '2020-03-28', 'X3'), 15),
(('City4', '2020-03-27', 'X4'), 5),
(('City4', '2020-03-26', 'X4'), 4),
(('City2', '2020-03-26', 'X2'), 14),
(('City2', '2020-03-25', 'X2'), 4),
(('City4', '2020-03-25', 'X4'), 1),
(('City1', '2020-03-29', 'X1'), 1),
(('City5', '2020-03-25', 'X5'), 15)]
例如 ('City5', '2020-03-25', 'X5') 作為 Key,15 作為最后一對的值。
我想獲得以下結果:
City1, X1, 2020-03-27, 44
City1, X1, 2020-03-28, 44
City5, X3, 2020-03-25, 15
City3, X3, 2020-03-28, 15
City2, X2, 2020-03-26, 14
City4, X4, 2020-03-27, 5
請注意結果顯示:
每個城市具有最大值的鍵(這是最難的部分,如果它們在不同日期具有相似的最大值(值),則顯示兩次相同的城市,我假設不能使用 ReduceByKey(),因為 Key 不是唯一的,也許 GroupBy() 或 Filter() ?
在以下順序/排序順序中:
- 降序最大值
- 升序日期
- 降序城市名稱(例如:City1)
所以我嘗試了以下代碼:
res = rdd2.map(lambda x: ((x[0][0],x[0][2]), (x[0][1], x[1])))
rdd3 = res.reduceByKey(lambda x1, x2: max(x1, x2, key=lambda x: x[1]))
rdd4 = rdd3.sortBy(lambda a: a[1][1], ascending=False)
rdd5 = rdd4.sortBy(lambda a: a[1][0])
雖然它確實給了我最大值的城市,但如果 2 個城市在 2 個不同的日期具有相似的最大值,它不會兩次回傳同一個城市(因為被 Key: City 減少)。
我希望它足夠清楚,任何精度請詢問!非常感謝!
uj5u.com熱心網友回復:
要保持所有城市的值等于最大值,您仍然可以使用reduceByKey但超過陣列而不是超過值:
- 您將行轉換為鍵/值,值是元組陣列而不是元組
- 您按鍵減少,合并包含相同值的陣列,否則保留具有最大值的陣列,
reduceByKey - 您將值陣列展平,將鍵與它們合并,與
flatMap - 最后你執行你的排序
完整的代碼如下:
def merge(array1, array2):
if array1[0][2] > array2[0][2]:
return array1
elif array1[0][2] == array2[0][2]:
return array1 array2
else:
return array2
res = rdd2.map(lambda x: (x[0][0], [(x[0][1], x[0][2], x[1])]))
rdd3 = res.reduceByKey(lambda x1, x2: merge(x1, x2))
rdd4 = rdd3.flatMap(lambda x: map(lambda y: (x[0], y[1], y[0], y[2]), x[1]))
rdd5 = rdd4.sortBy(lambda a: (-a[3], a[2], a[0]))
然后你可以列印你的 RDD:
[print(', '.join([row[0], row[1], row[2], str(row[3])])) for row in rdd5.collect()]
有了您的輸入,您將得到以下輸出:
City1, X1, 2020-03-27, 44
City1, X1, 2020-03-28, 44
City5, X5, 2020-03-25, 15
City3, X3, 2020-03-28, 15
City2, X2, 2020-03-26, 14
City4, X4, 2020-03-27, 5
uj5u.com熱心網友回復:
您可以使用 Dataframes 作業/輸出嗎?
List = [(('City1', '2020-03-27', 'X1'), 44),
(('City1', '2020-03-28', 'X1'), 44),
(('City3', '2020-03-28', 'X3'), 15),
(('City4', '2020-03-27', 'X4'), 5),
(('City4', '2020-03-26', 'X4'), 4),
(('City2', '2020-03-26', 'X2'), 14),
(('City2', '2020-03-25', 'X2'), 4),
(('City4', '2020-03-25', 'X4'), 1),
(('City1', '2020-03-29', 'X1'), 1),
(('City5', '2020-03-25', 'X5'), 15)]
rdd = sc.parallelize(List)
import pyspark.sql.functions as F
df = rdd\
.toDF()\
.select('_1.*', F.col('_2').alias('value'))\
.orderBy(F.desc('value'), F.asc('_2'), F.desc('_1'))
df.show(truncate=False)
----- ---------- --- -----
|_1 |_2 |_3 |value|
----- ---------- --- -----
|City1|2020-03-27|X1 |44 |
|City1|2020-03-28|X1 |44 |
|City5|2020-03-25|X5 |15 |
|City3|2020-03-28|X3 |15 |
|City2|2020-03-26|X2 |14 |
|City4|2020-03-27|X4 |5 |
|City2|2020-03-25|X2 |4 |
|City4|2020-03-26|X4 |4 |
|City4|2020-03-25|X4 |1 |
|City1|2020-03-29|X1 |1 |
----- ---------- --- -----
uj5u.com熱心網友回復:
您可以將 rdd 轉換為資料框,然后使用Spark 的視窗獲取每個城市的最大值,使用此值過濾行,最后根據需要對資料框進行排序:
from pyspark.sql import functions as F
from pyspark.sql import Window
window = Window.partitionBy('City').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df = rdd.toDF().select(
F.col('_1._1').alias('city'),
F.col('_1._2').alias('date'),
F.col('_1._3').alias('key'),
F.col('_2').alias('value'),
).withColumn('max_value', F.max('value').over(window))\
.filter(F.col('value') == F.col('max_value'))\
.drop('max_value')\
.orderBy(F.desc('value'), F.asc('date'), F.asc('city'))
并且您通過輸入 rdd 獲得以下資料框:
----- ---------- --- -----
|city |date |key|value|
----- ---------- --- -----
|City1|2020-03-27|X1 |44 |
|City1|2020-03-28|X1 |44 |
|City5|2020-03-25|X5 |15 |
|City3|2020-03-28|X3 |15 |
|City2|2020-03-26|X2 |14 |
|City4|2020-03-27|X4 |5 |
----- ---------- --- -----
如果在流程結束時需要 RDD,可以使用以下.rdd方法檢索它:
df.rdd
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/374919.html
