我是Hadoop和Linux的初學者。
問題所在
2021-08-08 22:53:12,350 INFO mapreduce.Job: map 100% reduce 67%.成功運行的內容
2021-08-08 19:44:13,350 INFO mapreduce.Job: map 100% reduce 100%。Mapper (Python)
#!/usr/bin/env python3。
import sys
from itertools import islice
from operator import itemgetter
def read_input(file)。
# read file except first line.
for line in islice(file, 1, None):
# 將該行分割成單詞。
yield line.split(' , ')
def main(separator=' ') 。
# 輸入來自STDIN(標準輸入)。
data = read_input(sys.stdin)
for words in data:
# 對于每一行我們只取需要的列。
data_row = list( itemgetter(*[1, 2, 4, 5, 6, 9, 10, 18] ( words))
data_row[7] = data_row[7].replace('
', '')
# 從第一列中抽取年份和月份號碼來創建。
# key that will send to reducer[/span].
date = data_row[0].split(' '/span>)[0].split('-'/span>)
key = str(date[0]) '_' str(date[1] )
# value that will send to reducer[/span].
value = ','.join(data_row)
# print here will send the output pair (key, value)
print('%s%s%s' % (key, separator, value)
if __name__ == "__main__"/span>:
main()
Reducer (Python)
#!/usr/bin/env python3。
from itertools import groupby
from operator import itemgetter
import sys
import pandas as pd
import numpy as np
import time
def read_mapper_output(file)。
for line in file:
yield行
def main(separator=' ') 。
all_rows_2015 = []
all_rows_2016 = []
start_time = time.time()
names = ['tpep_pickup_datetime', 'tpep_dropoff_datetime', 'traffic_distance',
'pickup_longitude'/span>, 'pickup_latitude'/span>, 'dropoff_longitude'/span>,
'dropoff_latitude', 'total_amount']
df = pd.DataFrame(columns=names)
# 輸入來自STDIN(標準輸入)。
data = read_mapper_output(sys.stdin)
for words in data:
# get key & value from Mapper
key, value = words.split(separator)
row = value.split(',')
# 將屬于2015年的資料與屬于2016年的資料進行分割。
if key in '2015_01 2015_02 2015_03':
all_rows_2015.append(row)
if len(all_rows_2015) >=10:
df=df.append(pd.DataFrame(all_rows_2015, columns=names))
all_rows_2015 = []
elif key in '2016_01 2016_02 2016_03':
all_rows_2016.append(row)
if len(all_rows_2016) >=10:
df=df.append(pd.DataFrame(all_rows_2016, columns=names))
all_rows_2016 = []
print(df.to_string())
print("-- %s seconds ---" % (time.time() - start_time)
if __name__ == "__main__"/span>:
main()
更多資訊
我在安裝在VMware上的Hadoop v3.2.1上使用Linux,在Python中運行MapReduce作業。數字的Reduce作業:
| 輸入資料大小 | 行數。減少作業時間 | |||
|---|---|---|---|---|
| ~98 Kb | 600行 | |||
| 600行? | ~0.1秒 | ~0.1秒 | 良好 | |
| 良好
| ~953 Kb | |||
| ~953 Kb | 6,000行 | |||
| 6,000行 | ~1秒 | ~1秒 | 良好 | |
| 良好
| ~9.5 Mb | |||
| 9.5 Mb | 60,000行 | |||
| 60,000行 | ~52秒 | ~52秒
| ~94 Mb | |
| ~94 Mb | 60萬行 | |||
| 60萬行? | ~5647 sec (~94 min) | 非常慢 | ||
| 非常慢? | ||||
| ~11 Gb | ~11 Gb?
| 76,000,000行 | 76,000,000行 | ?? | <
impossible | <
目標是在 ~76M 行的輸入資料上運行,如果還有這個問題,那是不可能的。
uj5u.com熱心網友回復:
"當減少到67%時,只有一個CPU保持100%的運行,其余的都在睡覺"--你有偏差。一個鍵的值遠遠多于其他鍵的值。
uj5u.com熱心網友回復:
我在這里看到了一些問題。
在reduce階段,你沒有做任何總結,只是適合2015Q1和2015Q2 - reduce應該用于總結,比如按鍵分組或基于鍵做一些計算。
如果您只需要過濾資料,請在地圖階段進行過濾以節省周期(假設您對所有資料進行了計費):
你在資料框架內的 RAM 中存盤了很多東西。由于你不知道鍵有多大,你正在經歷trashing。這與重鍵相結合,將使您的行程在一段時間后對每個 DataFrame.append 進行頁面錯誤處理。
有一些修復方法:
你真的需要一個減少階段嗎?由于你只是在過濾2015年和2016年的前三個月,你可以在地圖階段做這個。如果你稍后需要還原,這將使程序進行得更快一些,因為還原階段需要的資料更少。
def main(separator=' '/span>)。
# 輸入來自STDIN(標準輸入)。
data = read_input(sys.stdin)
for words in data:
# 對于每一行我們只取需要的列。
data_row = list( itemgetter(*[1, 2, 4, 5, 6, 9, 10, 18] ( words))
# 首先找出你是否在過濾這個資料。
# 從第一列中抽取年份和月號來創建。
# key that will send to reducer[/span]。
date = data_row[0].split(' '/span>)[0].split('-'/span>)
# 篩選出。
if (date[1] in [1,2, 3]) and (date[0] in[2015, 2016] )。)
# 我們保留這個資料。計算key并清理data_row[7]
key = str(date[0]) '_' str(date[1] )
data_row[7] = data_row[7].replace('
', '')
# value that will send to reducer[/span]。
value = ','.join(data_row)
# print here will send the output pair (key, value)
print('%s%s%s' % (key, separator, value)
在還原程序中,盡量不要在記憶體中存盤資料。因為你正在過濾,所以一旦有了結果,就立即列印()。如果你的源資料沒有被排序,reduce將作為一種方式,將來自同一個月的所有資料放在一起。
你的 reduce 階段有一個錯誤:你正在失去 number_of_records_per_key modulo 10,因為你沒有將結果追加到資料框架中。不要追加到資料框架中,并盡快列印結果。
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/330233.html
標籤:
