一.實作案例
實作WorldCount的流程如下:
備注:其中輸入的資料是一個txt檔案,里面有各種單詞,每一行中用空格進行空行

一.Mapper的撰寫
我們在IDEA是使用“ctrl+alt+滑鼠左鍵點擊”的方式來查看原始碼,我們首先查看mapper 類的原始碼,同時原始碼我已經使用了,如下所示:
// // Source code recreated from a .class file by IntelliJ IDEA // (powered by FernFlower decompiler) // package org.apache.hadoop.mapreduce; import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; @Public @Stable public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { public Mapper() { }
//在任務開始之前,setup必然被呼叫一次 protected void setup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { }
//在input split的時候,對每一個key/value的pair都call once.大多數程式都會overide這個方法 protected void map(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { context.write(key, value); } //在at the end of the task,這個方法被呼叫一次 protected void cleanup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { } //把整個程式,里面的所有方法串連起來 public void run(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { this.setup(context); try { while(context.nextKeyValue()) {//每次僅讀取一行資料 this.map(context.getCurrentKey(), context.getCurrentValue(), context); } } finally { this.cleanup(context); } }
//背景關系,封裝了程式當中大量的分析方法 public abstract class Context implements MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { public Context() { } } }
因此我們根據里面的原始碼,撰寫wordcount所需要的mapper的代碼,如下所示:
//現在我們開始撰寫wordcount的示例 public class WordcountMapper extends Mapper<LongWritable, Text,Text, IntWritable> { //mapper后面的引數: // 1.輸入資料的key型別 // 2.輸入資料的value型別 // 3.輸出資料的key型別 // 4.輸出資料的value的型別 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //1.首先獲取一行 String line=value.toString(); //2.將獲取后的單詞進行分割,按照空格進行分割 String[] words=line.split(" "); //3.回圈輸出(不是輸出到控制臺上面,是輸出到reducer里進行處理) for(String word:words) { Text k=new Text();//定義我們輸出的型別,肯定是Text,和整個類extends的順序對應 k.set(word); IntWritable v=new IntWritable(); v.set(1);//將value設定為1 context.write(k,v); } } }
二.Reducer的撰寫
reducer的原始碼如下,和mapper的原始碼非常相似,其實也就是對reducer的方法進行了封裝,并沒有方法體:
import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.mapreduce.ReduceContext.ValueIterator; import org.apache.hadoop.mapreduce.task.annotation.Checkpointable; @Checkpointable @Public @Stable public class Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { public Reducer() { } protected void setup(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { } protected void reduce(KEYIN key, Iterable<VALUEIN> values, Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { Iterator i$ = values.iterator(); while(i$.hasNext()) { VALUEIN value = i$.next(); context.write(key, value); } } protected void cleanup(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { } public void run(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { this.setup(context); try { while(context.nextKey()) { this.reduce(context.getCurrentKey(), context.getValues(), context); Iterator<VALUEIN> iter = context.getValues().iterator(); if (iter instanceof ValueIterator) { ((ValueIterator)iter).resetBackupStore(); } } } finally { this.cleanup(context); } } public abstract class Context implements ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { public Context() { } } }
代碼如下:
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Reducer; import javax.xml.soap.Text; import java.io.IOException; public class WordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { super.reduce(key, values, context); //在reduce里拿到的是mapper已經map好的資料 //現在資料的形式是這樣的: //atguigu(key),1(value) //atguigu(key),1(value) int sum=0; //累計求和 for(IntWritable value: values) { sum+=value.get();//將intwrite物件轉化為int物件 } IntWritable v=new IntWritable(); v.set(sum); //2.寫出 atguigu 2 context.write(key,v); //總結,這個程式看起來并沒有起到分開不同單詞,并對同一單詞的value進行相加的作用啊 //唯一的功能則是統計僅有一個單詞的字符之和,這有啥用...... } }
三.Driver程式撰寫,讓mapreduce動起來!
代碼如下:
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; public class wordcoundDriver { //將mapper和reducer進行啟動的類 //driver是完全格式固定的 public static void main(String[] args) throws Exception { Configuration conf=new Configuration(); //1.獲取Job物件 Job job=Job.getInstance(conf); //2.設定jar儲存位置 job.setJarByClass(wordcoundDriver.class); //3.關聯map和reduce類 job.setMapperClass(WordcountMapper.class); job.setReducerClass(WordCountReducer.class); //4.設定mapper階段輸出資料的key和value型別 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //5.設定最終資料輸出的key和value型別 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //6.設定輸入路徑和輸出路徑 FileInputFormat.setInputPaths(job,new Path(args[0])); FileInputFormat.setInputPaths(job,new Path(args[1])); //7.提交Job job.submit(); job.waitForCompletion(true); } }
這樣就可以運行起來了!大家可以嘗試在分布式集群上實作wordcount統計這個功能,只需要將這些代碼進行打成jar包,這樣就可以放到linux作業系統上去運行了!最后運行的時候,路徑寫的是HDFS上的路徑哦!
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/249377.html
標籤:其他
