分布式計算MapReduce編程Ⅰ
實驗目的:
1、理解集群分布式計算原理
2、熟悉MR程式中Mapper、Reducer函式的撰寫
實作倒排索引效果,統計每個單詞在不同檔案中的出現次數
實驗要求
- 有三個檔案a.txt,b.txt,c.txt 每個檔案的內容為若干行單詞,單詞之間以空格分開
- 撰寫程式實作單詞的倒排索引效果
- 以 A-M 字母開頭(包含小寫)的單詞出現在 0 區;以 N-Z 字母開頭的單詞出現在 1 區;其余開頭的單詞出現在 2 區
- 輸出形式:hadoop a.txt->2,b.txt->1
其中 hadoop 是單詞(也作為輸出的 key),“a.txt->2,b.txt->1” 表示輸出的 value,即表示 hadoop 單詞在 a.txt 檔案中出現次數為 2,在 b.txt檔案中出現次數為1
實驗思路
要想實作實驗效果,首先我們需要理清此次mapreduce的整體邏輯思路:
- 很顯然,一次MR是無法完成我們任務的,所以這次實驗需要兩次MR
- 第一次MR我們用正常的wordcount思路解決,即對每個檔案進行處理,得到每個單詞在該檔案的出現次數,只是在mapper的最后,context.write輸出的k2值并不是word,而是 filename->word
- reducer正常輸出 k2 和 v3(出現次數)
- 第二次MR,我們利用第一次MR輸出的檔案作為input,將一整行
filename->word( \t )count 作為v1輸入到mapper中 - 在map中進行split以"->“和”\t"為分割點,切割v1,重新組成k2 = word,v2 = filename->count,輸出給reducer
- 因為我們最終的輸出結果的格式為word a.txt->count,b.txt->count
而v2s就是filename->count的集合,所以輸出v2s即可
檔案內容
先將三個檔案上傳至分布式儲存hdfs的 input/DP 目錄下

三個檔案的內容:

第一次MR
- Mapper函式的撰寫
public class FirstMapper extends Mapper<Object, Text, Text, IntWritable> {
protected void map(Object k1, Text v1, Context context)
throws IOException, InterruptedException {
String line = v1.toString().trim();//提取內容
String[] words = line.split(" ");//提取word
FileSplit inputSplit = (FileSplit)context.getInputSplit();
Path path = inputSplit.getPath();
String filename = path.getName();//獲取單詞所在檔案名
for(String word : words) {
context.write(new Text(filename+"->"+word), new IntWritable(1));
}
}
}
- Reducer函式的撰寫
public class FirstReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
protected void reduce(Text k2, Iterable<IntWritable> v2s, Context context)
throws IOException, InterruptedException {
int count = 0;
for(IntWritable val : v2s) {
count += val.get(); //累加v2s得出該單詞的出現總次數
}
context.write(k2, new IntWritable(count));
}
}
- 第一次輸出檔案

第二次MR
- Mapper函式的撰寫
public class SencondMapper extends Mapper<Object, Text, Text, Text> {
protected void map(Object k1, Text v1, Context context)
throws IOException, InterruptedException {
String line=v1.toString().trim();
String[] data=line.split("->");
String filename = data[0];
String[] wordcount = data[1].split("\t");
String word = wordcount[0];
String count = wordcount[1];
context.write(new Text(word), new Text(filename + "->" + count));
}
}
- Reducer函式的撰寫
public class SencondReducer extends Reducer<Text, Text, Text, Text> {
protected void reduce(Text k2, Iterable<Text> v2s, Context context)
throws IOException, InterruptedException {
String str = new String();
for(Text val : v2s) {
str = str + val.toString() + ",";
}
str = str.substring(0, str.length()-1);
context.write(new Text(k2), new Text(str));
}
}
- Partitioner函式撰寫
public class SencondPartitioner extends Partitioner<Text, Text> {
private static int PatitionNumber=0;
public int getPartition(Text k2, Text v2, int numPartitions) {
String word=k2.toString().trim();
if (word.length()==0) return 0;
char firstchar=Character.toUpperCase(word.charAt(0));
if (firstchar>='N'&&firstchar<='Z' || firstchar>='n'&&firstchar<='z')
PatitionNumber=1;
else if (firstchar>='A'&&firstchar<='M' || firstchar>='a'&&firstchar<='m')
PatitionNumber=0;
else PatitionNumber=2;
return PatitionNumber;
}
}
- 第二次輸出檔案

完整代碼
第一次MR
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class FirstMapper extends Mapper<Object, Text, Text, IntWritable> {
protected void map(Object k1, Text v1, Context context)
throws IOException, InterruptedException {
String line = v1.toString().trim();
String[] words = line.split(" ");
FileSplit inputSplit = (FileSplit)context.getInputSplit();
Path path = inputSplit.getPath();
String filename = path.getName();
for(String word : words) {
context.write(new Text(filename+"->"+word), new IntWritable(1));
}
}
}
public class FirstReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
protected void reduce(Text k2, Iterable<IntWritable> v2s, Context context)
throws IOException, InterruptedException {
int count = 0;
for(IntWritable val : v2s) {
count += val.get();
}
context.write(k2, new IntWritable(count));
}
}
public class FirstMain {
public static void main(String[] args) throws Exception, IOException {
Configuration conf=new Configuration();
Job job=Job.getInstance(conf, "FirstDP");
if (args.length!=2){
System.err.println("Usage: WordCount45 <in> <out>");
System.exit(2);
}
job.setJarByClass(FirstMain.class);
job.setMapperClass(FirstMapper.class);
job.setReducerClass(FirstReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
輸出

第二次MR
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Partitioner;
public class SencondMapper extends Mapper<Object, Text, Text, Text> {
protected void map(Object k1, Text v1, Context context)
throws IOException, InterruptedException {
String line=v1.toString().trim();
String[] data=line.split("->");
String filename = data[0];
String[] wordcount = data[1].split("\t");
String word = wordcount[0];
String count = wordcount[1];
context.write(new Text(word), new Text(filename + "->" + count));
}
}
public class SencondReducer extends Reducer<Text, Text, Text, Text> {
protected void reduce(Text k2, Iterable<Text> v2s, Context context)
throws IOException, InterruptedException {
String str = new String();
for(Text val : v2s) {
str = str + val.toString() + ",";
}
str = str.substring(0, str.length()-1);
context.write(new Text(k2), new Text(str));
}
}
public class SencondPartitioner extends Partitioner<Text, Text> {
private static int PatitionNumber=0;
public int getPartition(Text k2, Text v2, int numPartitions) {
String word=k2.toString().trim();
if (word.length()==0) return 0;
char firstchar=Character.toUpperCase(word.charAt(0));
if (firstchar>='N'&&firstchar<='Z' || firstchar>='n'&&firstchar<='z')
PatitionNumber=1;
else if (firstchar>='A'&&firstchar<='M' || firstchar>='a'&&firstchar<='m')
PatitionNumber=0;
else PatitionNumber=2;
return PatitionNumber;
}
}
public class SecendMain {
public static void main(String[] args) throws Exception, IOException {
Configuration conf=new Configuration();
Job job=Job.getInstance(conf, "SecondDP");
if (args.length!=2){
System.err.println("Usage: WordCount45 <in> <out>");
System.exit(2);
}
job.setJarByClass(SecendMain.class);
job.setMapperClass(SencondMapper.class);
job.setReducerClass(SencondReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setPartitionerClass(SencondPartitioner.class);
job.setNumReduceTasks(3);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
輸出

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/260379.html
標籤:其他
