我有一個 .csv 源檔案,格式為:
Xxx,yyy,zzz,uuuu,iii,www,qqq,aaa,rrr,30.95,1,MATT,MORAL,CUREPIPE
Xxx,yyy,zzz,uuuu,iii,www,qqq,aaa,rrr,19.95,1, MATT,MORAL, CUREPIPE
Xxx,yyy,zzz,uuuu,iii,www,qqq,aaa,rrr,89.95,1,LELA,SMI,HASSEE
Xxx,yyy,zzz,uuuu,iii,www,qqq,aaa,rrr,54.50,1,LELA,SMI,HASSEE
Xxx,yyy,zzz,uuuu,iii,www,qqq,aaa,rrr,19.95,2,TOM,SON,FLACQ
Xxx,yyy,zzz,uuuu,iii,www,qqq,aaa,rrr,19.95,1,DYDY,ARD,PLOUIS
Xxx,yyy,zzz,uuuu,iii,www,qqq,aaa,rrr,22.00,1,DYDY,ARD, PLOUIS
Xxx,yyy,zzz,uuuu,iii,www,qqq,aaa,rrr,19.95,1,DYDY,ARD, PLOUIS
Xxx,yyy,zzz,uuuu,iii,www,qqq,aaa,rrr,22.00,2,TAY,ANA,VACOAS
Xxx,yyy,zzz,uuuu,iii,www,qqq,aaa,rrr,35.00,3,TAY,ANA,VACOAS
我想使用 MapReduce 中的組合器計算每個人的平均成本(價格*數量/總數量),結果如下:
馬特道德 25.45
萊拉 SMI 72.225
湯姆兒子 19.95
DYDY ARD 20.36
泰安娜 29.8
所以我想出了以下不起作用的代碼(給了我兩倍的平均值)。我確實覺得我需要在減速器中添加一個 IF ELSE 陳述句來處理組合器的輸出(唯一鍵)與映射器的輸出(重復鍵)不同:
from mrjob.job import MRJob
class Job(MRJob):
def mapper(self, key, value):
words = value.strip().split(',')
full_name = words[-3] ' ' words[-2]
price, qty = float(words[-5]), int(words[-4])
yield full_name, (price, qty)
def combiner(self, key, values):
totalprice, totalqty = 0,0
for value in values:
totalprice = (value[0] * value[1])
totalqty = value[1]
yield key, (totalprice, totalqty)
def reducer(self, key, values):
totalprice, totalqty = 0,0
for value in values:
totalprice = (value[0] * value[1])
totalqty = value[1]
average = round(totalprice/totalqty,2)
yield key, average
if __name__ == '__main__':
Job.run()
非常感謝您能給我一些關于減速器的指導!
uj5u.com熱心網友回復:
您不應該totalprice在減速器中加權 ,因為您已經在組合器中這樣做了 -
def reducer(self, key, values):
totalprice, totalqty = 0,0
for value in values:
totalprice = (value[0])
totalqty = value[1]
average = round(totalprice/totalqty,2)
yield key, average
更多解釋
以下是 Hadoop檔案關于使用“組合器”的說法 -
用戶可以選擇通過 Job.setCombinerClass(Class) 指定一個組合器來執行中間輸出的本地聚合,這有助于減少從 Mapper 傳輸到 Reducer 的資料量。
如果您的歸約操作可以分解為多個“迷你歸約”而不改變最終結果,則將“組合器”引入混合中是可行的。
如果您希望您的combiner和reducer功能相同 - 那么您可能需要對mapper功能進行更改-
像這樣的東西——
from mrjob.job import MRJob
class Job(MRJob):
def mapper(self, key, value):
words = value.strip().split(',')
full_name = words[-3] ' ' words[-2]
price, qty = float(words[-5]), int(words[-4])
price = price * qty # This is the change
yield full_name, (price, qty)
def combiner(self, key, values):
totalprice, totalqty = 0,0
for value in values:
totalprice = (value[0]) # And change here
totalqty = value[1]
yield key, (totalprice, totalqty)
def reducer(self, key, values):
totalprice, totalqty = 0,0
for value in values:
totalprice = (value[0]) # Change here
totalqty = value[1]
average = round(totalprice/totalqty,2)
yield key, average
if __name__ == '__main__':
Job.run()
轉載請註明出處,本文鏈接:https://www.uj5u.com/qiye/367040.html
