我正在嘗試使用 hadoop 從 json 檔案中獲取一些統計資訊,例如評論最多的類別或語言的平均星級數。為此,我使用了 mrjob,我發現了以下代碼:
import re
from mrjob.job import MRJob
from mrjob.protocol import JSONValueProtocol
from mrjob.step import MRStep
WORD_RE = re.compile(r"[\w'] ")
class MRMostUsedWord(MRJob):
FILES = ['stop_words.txt']
OUTPUT_PROTOCOL = JSONValueProtocol
def configure_args(self):
super(MRMostUsedWord, self).configure_args()
# allow for alternate stop words file
self.add_file_arg(
'--stop-words-file',
dest='stop_words_file',
default=None,
help='alternate stop words file. lowercase words, one per line',
)
def mapper_init(self):
stop_words_path = self.options.stop_words_file or 'stop_words.txt'
with open(stop_words_path) as f:
self.stop_words = set(line.strip() for line in f)
def mapper_get_words(self, _, line):
# yield each word in the line
for word in WORD_RE.findall(line):
word = word.lower()
if word not in self.stop_words:
yield (word, 1)
def combiner_count_words(self, word, counts):
# sum the words we've seen so far
yield (word, sum(counts))
def reducer_count_words(self, word, counts):
# send all (num_occurrences, word) pairs to the same reducer.
# num_occurrences is so we can easily use Python's max() function.
yield None, (sum(counts), word)
# discard the key; it is just None
def reducer_find_max_word(self, _, word_count_pairs):
# each item of word_count_pairs is (count, word),
# so yielding one results in key=counts, value=word
try:
yield max(word_count_pairs)
except ValueError:
pass
def steps(self):
return [
MRStep(mapper_init=self.mapper_init,
mapper=self.mapper_get_words,
combiner=self.combiner_count_words,
reducer=self.reducer_count_words),
MRStep(reducer=self.reducer_find_max_word)
]
if __name__ == '__main__':
MRMostUsedWord.run()
它允許找到最常用的單詞,但我不確定如何使用 json 屬性而不是單詞來做到這一點。
json 示例:
{"review_id": "en_0690095", "product_id": "product_en_0440378", "reviewer_id": "reviewer_en_0133349", "stars": "1", "review_body": "內閣點都從背襯上分離了......得到me", "review_title": "不能使用", "language": "en", "product_category": "home_improvement"}
{"review_id": "en_0311558", "product_id": "product_en_0399702", "reviewer_id": "reviewer_en_0152034", "stars": "1", "review_body": "I received my first order of this product and it was broke so I ordered it again. The second one was broke in more places than the first. I can't blame the shipping process as it's shrink wrapped and boxed.", "review_title": "The product is junk.", "language": "en", "product_category": "home"}
uj5u.com熱心網友回復:
對我來說只是使用 json.loads 很有用,比如:
def mapper(self, _, line):
review = json.loads(line)
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/360149.html
標籤:python json hadoop hdfs mrjob
上一篇:如何避免在云上寫入Hive暫存區
