我想使用 pyaspark 在 rdd(下面給出的示例)中進行排序。輸入rdd:
[('x', {3: [16, 11, 4532], 0: [5390, 3262]}),
('y', {2: [256, 128, 11], 5: [3262, 987], 3: [12]}),
('z', {17: [126, 54, 9], 0: [7654, 1768], 7: [3292, 1235, 7]})]
輸出rdd:
[('x', {0: [3262, 5390], 3: [11, 16, 4532]}),
('y', {2: [11, 128, 256], 3: [12], 5: [987, 3262]}),
('z', {0: [1768, 7654], 7: [7, 1235, 3292], 17: [9, 54, 126]})]
正在發生兩次排序。
我不太確定如何使用 pyspark 解決上述問題。我已經嘗試了下面的代碼,但我猜它不正確(因為當資料很大時,我認為對所有進行排序不是一個好主意)。請幫忙。
def sort_values(x):
return (x[0], dict(sorted(x[1].items())))
rdd1 = input_rdd.map(lambda x: sort_values(x))
uj5u.com熱心網友回復:
這是使用可在 rdd 上使用的 mapPartitions 函式的示例。mapPartitions 將函式應用于 rdd 的每個磁區。
下面的代碼將有助于實作磁區級別排序,即內部字典將被排序 -
rdd = sc.parallelize([('x', {3: [16, 11, 4532], 0: [5390, 3262]}), ('y', {5: [3262, 987], 3: [12], 2: [256, 128, 11]}),('a', {2: [16, 4, 456], 7: [343, 3262]}),('z', {0: [1768, 7654], 7: [7, 1235, 3292], 17: [9, 54, 126]})],4)
def sortedpartition(iterator):
sorted_rdd_partition=[]
for item in iterator:
word=item[0]
values = item[1]
orderDict={}
for key in sorted(values.keys()):
orderDict[key]=sorted(values[key])
sorted_rdd_partition.append((word,orderDict))
return sorted_rdd_partition
rdd.mapPartitions(sortedpartition).collect()
最終結果如下所示
Output -
[('x', {0: [3262, 5390], 3: [11, 16, 4532]}),
('y', {2: [11, 128, 256], 3: [12], 5: [987, 3262]}),
('a', {2: [4, 16, 456], 7: [343, 3262]}),
('z', {0: [1768, 7654], 7: [7, 1235, 3292], 17: [9, 54, 126]})]
如果您需要 rdd 級別排序,請使用下面的代碼行
rdd.mapPartitions(sortedpartition).sortBy(lambda x: x[0]).collect()
or
rdd.mapPartitions(sortedpartition).sortByKey().collect()
輸出 -
[('a', {2: [4, 16, 456], 7: [343, 3262]}),
('x', {0: [3262, 5390], 3: [11, 16, 4532]}),
('y', {2: [11, 128, 256], 3: [12], 5: [987, 3262]}),
('z', {0: [1768, 7654], 7: [7, 1235, 3292], 17: [9, 54, 126]})]
uj5u.com熱心網友回復:
這是您可以做的事情
pdf = pd.DataFrame({'val': [('x', {3: [16, 11, 4532], 0: [5390, 3262]}), ('y', {5: [3262, 987], 3: [12], 2: [256, 128, 11]}), ('z',{17: [126, 54, 9], 7: [3292, 1235, 7], 0:[7654, 1768]})]})
val
0 (x, {3: [16, 11, 4532], 0: [5390, 3262]})
1 (y, {5: [3262, 987], 3: [12], 2: [256, 128, 11]})
2 (z, {17: [126, 54, 9], 7: [3292, 1235, 7], 0: [7654, 1768]})
我們不能在輸出中使用 Map,因此需要在排序后在內部轉換為 tuple()
df=spark.createDataFrame(pdf)
def sort_dict_f(x):
sorted_array = []
for key in sorted(x[1].keys(), reverse=False): #sort the key
sorted_array.append( (key, sorted(x[1][key]) )) #sort each internal list
return (x[0], sorted_array)
schema = StructType([
StructField("word", StringType(), False),
StructField("vals", ArrayType( StructType([StructField('key', IntegerType(), False), StructField('subs', ArrayType(IntegerType()), False)])), False)
])
SorterUDF = F.udf(sort_dict_f, schema)
df2 = df.withColumn('sorted', SorterUDF("val"))
df2.show(20, False)
最終結果如下所示:
------------------------------------------------------------------ ------------------------------------------------------------------
|val |sorted |
------------------------------------------------------------------ ------------------------------------------------------------------
|[x, [0 -> [5390, 3262], 2 -> [9, 8, 7], 3 -> [16, 11, 4532]]] |[x, [[0, [3262, 5390]], [2, [7, 8, 9]], [3, [11, 16, 4532]]]] |
|[y, [2 -> [256, 128, 11], 3 -> [12], 5 -> [3262, 987]]] |[y, [[2, [11, 128, 256]], [3, [12]], [5, [987, 3262]]]] |
|[z, [0 -> [7654, 1768], 17 -> [126, 54, 9], 7 -> [3292, 1235, 7]]]|[z, [[0, [1768, 7654]], [7, [7, 1235, 3292]], [17, [9, 54, 126]]]]|
------------------------------------------------------------------ ------------------------------------------------------------------
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/447322.html
