在 Scala 中,當我有一個像 : 這樣的 RDD 串列時
List(("a",1),("a",2),("b",3),("b",4),("b",5),("a",6)),我想計算avg每個字符的數量。
因為a它顯示了 3 次,值計數為 1 2 6 = 9,所以我期望的結果是(a, 3).
在 Scala 中,我可以撰寫如下代碼:
val newRdd = rdd.aggregateByKey((0,0))((t,v) => {(t._1 v, t._2 1)}, (t1, t2) => {(t1._1 t2._1, t1._2 t2._2)})
val result = newRdd.mapValues{
case(num, count) => {
num/count
}
}
所以結果 RDD 將回傳我期望的那個。
但是,我如何向 pyspark 解釋 case(num/count)?
我試過了:
avg_rdd_2 = avg_rdd_1.mapValues(lambda x, y : x / y)
但我會得到下面的錯誤。
21/12/24 01:27:02 錯誤執行程式:7.0 階段任務 0.0 中的例外(TID 6)org.apache.spark.api.python.PythonException:回溯(最近一次呼叫):檔案“/root/PycharmProjects /pythonProject/venv/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main process() File "/root/PycharmProjects/pythonProject/venv /lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 611, in process serializer.dump_stream(out_iter, outfile) File "/root/PycharmProjects/pythonProject/ venv/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py”,第 259 行,在 dump_stream vs = list(itertools.islice(iterator, batch)) 檔案“/ root/PycharmProjects/pythonProject/venv/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util。py”,第 74 行,在包裝器中回傳 f(*args, **kwargs) 檔案“/root/PycharmProjects/pythonProject/venv/lib/python3.6/site-packages/pyspark/rdd.py”,第 2278 行,在map_values_fn = lambda kv: (kv[0], f(kv[1])) TypeError: () 缺少 1 個必需的位置引數:'y'
uj5u.com熱心網友回復:
假設我們有一個 RDD:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
rdd = sc.parallelize([("a", 1), ("a", 2), ("b", 3), ("b", 4), ("b", 5), ("a", 6)])
在您首先按鍵聚合的 Scala 示例中,可以使用以下方法完成相同的操作:
groupByKey
new_rdd = rdd.groupByKey().mapValues(lambda x: sum(x) / len(x))
print(new_rdd.collect())
# [('b', 4.0), ('a', 3.0)]
aggregateByKey
new_rdd = rdd.aggregateByKey(
(0, 0),
lambda x, y: (x[0] y, x[1] 1),
lambda x, y: (x[0] y[0], x[1] y[1]),
)
result = new_rdd.mapValues(lambda x: x[0] / x[1])
print(result.collect())
# [('b', 4.0), ('a', 3.0)]
reduceByKey
result = (
rdd.mapValues(lambda x: (x, 1))
.reduceByKey(lambda x, y: (x[0] y[0], x[1] y[1]))
.mapValues(lambda x: x[0] / x[1])
)
print(result.collect())
# [('b', 4.0), ('a', 3.0)]
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/393191.html
