MapReduce作業原理及基礎編程(代碼見文章后半部分)
JunLeon——go big or go home

目錄
MapReduce作業原理及基礎編程(代碼見文章后半部分)
一、MapReduce概述
1、什么是MapReduce?
2、WordCount案例決議MapReduce計算程序
(1)運行hadoop自帶的樣例程式
(2)MapReduce作業程序
3、Shuffle程序詳解
二、MapReduce編程基礎
1、Hadoop資料型別
2、資料輸入格式InputFormat
3、輸入資料分塊InputSplit和資料記錄讀入RecordReader
4、資料輸出格式OutputFormat
5、資料記錄輸出類RecordWriter
6、Mapper類
7、Reduce類
三、MapReduce專案案例
1、經典案例——WordCount
2、計算考試平均成績
3、網站日志分析
前言:
Google于2003年在SOSP上發表了《The Google File System》,于2004年在OSDI上發表了《MapReduce: Simplified Data Processing on Large Clusters》,于2006年在OSDI上發表了《Bigtable: A Distributed Storage System for Structured Data》,這三篇論文為大資料及云計算的發展奠定了基礎,
一、MapReduce概述
1、什么是MapReduce?
MapReduce是一個分布式、并行處理的計算框架,
MapReduce 把任務分為 Map 階段和 Reduce 階段,開發人員使用存盤在HDFS 中資料(可實作快速存盤),撰寫 Hadoop 的 MapReduce 任務,由于 MapReduce作業原理的特性, Hadoop 能以并行的方式訪問資料,從而實作快速訪問資料,
表1 map函式和rudece函式
| 函式 | 輸入 | 輸出 | 說明 |
| map | <k1,v1> <0,helle world> <12,hello hadoop> | List<k2,v2> <hello,1> <world,1> <hello,1> <hhadoop,1> | 將獲取到的資料集進一步決議成<key,value>,通過Map函式計算生成中間結果,進過shuffle處理后作為reduce的輸入 |
| reduce | <k2,List(v2)> <hadoop,1> <hello,{1,1}> <world,1> | <k3,v3> <hadoop,1> <hello,2> <world,1> | reduce得到map輸出的中間結果,合并計算將最終結果輸出HDFS,其中List(v2),指同一k2的value |
MapReduce體系結構主要由四個部分組成,分別是:Client、JobTracker、TaskTracker以及Task

1)Client
用戶撰寫的MapReduce程式通過Client提交到JobTracker端 用戶可通過Client提供的一些介面查看作業運行狀態,
2)JobTracker
JobTracker負責資源監控和作業調度 JobTracker 監控所有TaskTracker與Job的健康狀況,一旦發現失敗,就將相應的任務轉移到其他節點 JobTracker 會跟蹤任務的執行進度、資源使用量等資訊,并將這些資訊告訴任務調度器(TaskScheduler),而調度器會在資源出現空閑時,選擇合適的任務去使用這些資源,
3)TaskTracker
TaskTracker 會周期性地通過“心跳”將本節點上資源的使用情況和任務的運行進度匯報給JobTracker,同時接收JobTracker 發送過來的命令并執行相應的操作(如啟動新任務、殺死任務等) TaskTracker 使用“slot”等量劃分本節點上的資源量(CPU、記憶體等),一個Task 獲取到一個slot 后才有機會運行,而Hadoop調度器的作用就是將各個TaskTracker上的空閑slot分配給Task使用,slot 分為Map slot 和Reduce slot 兩種,分別供MapTask 和Reduce Task 使用,
4)Task
Task 分為Map Task 和Reduce Task 兩種,均由TaskTracker 啟動,
MapReduce各個執行階段:

MapReduce應用程式執行程序:

可以參考大佬黎先生的博客:MapReduce基本原理及應用 - 黎先生 - 博客園
2、WordCount案例決議MapReduce計算程序
(1)運行hadoop自帶的樣例程式
WordCount案例是一個經典案例,是Hadoop自帶的樣例程式,
作用:統計單詞數量(出現的次數)
應用:求和、求平均值、求最值,
jar包存盤在$HADOOP_HOME/share/hadoop/mapreduce/:
$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.3.jar
例如:
步驟:
1.在本地創建一個檔案

輸入以下內容:

2.上傳到HDFS指定目錄
在HDFS中創建指定檔案:

上傳檔案:

3.使用hadoop jar命令運行jar程式,統計單詞數量

4.輸出結果
執行部分程序:

查看生成的檔案:

查看計算結果:

(2)MapReduce作業程序
作業流程是Input從HDFS里面并行讀取文本中的內容,經過MapReduce模型,最終把分析出來的結果用Output封裝,持久化到HDFS中,
1.Mapper作業程序:

附上Mapper階段代碼:
public static class WorldCount_Mapper extends Mapper<LongWritable, Text, Text, IntWritable>{
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
System.out.println("split:<" + key + ","+ value + ">" );
String[] strs = value.toString().split(" ");
for (String string : strs) {
System.out.println("map:<" + key + ","+ value + ">" );
context.write(new Text(string),new IntWritable(1));
}
}
}
KEYIN--LongWritable:輸入key型別,記錄資料分片的偏移位置
VALUEIN—Text:輸入的value型別,對應分片中的文本資料
KEYOUT--Text:輸出的key型別,對應map方法中計算結果的key值
VALUEOUT—IntWritable:輸出的value型別,對應map方法中計算結果的value值
Mapper類從分片后傳出的背景關系中接收資料,資料以型別<LongWritable,Text>的鍵值對接收過來,通過重寫map方法默認一行一行的讀取資料并且以<key,value>形式進行遍歷賦值,
2.Reducer作業程序:

附上Reducer階段代碼:
public static class WorldCount_Reducer extends Reducer<Text, IntWritable, Text, IntWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int index = 0;
for (IntWritable intWritable : values) {
System.out.println("reduce:<" + key + ","+ intWritable + ">" );
index += intWritable.get();
}
context.write(key,new IntWritable(index));
}
}
Reducer任務繼承Reducer類,主要接收的資料來自Map任務的輸出,中間經過Shuffle磁區、排序、分組,最終以<key,value>形式輸出給用戶,
Job提交代碼:
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Job job = Job.getInstance();
job.setJarByClass(WorldCount.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(WorldCount_Mapper.class);
job.setReducerClass(WorldCount_Reducer.class);
FileInputFormat.addInputPath(job,new Path("hdfs://192.168.100.123:8020/input"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.100.123:8020/output"));
job.waitForCompletion(true);
}
JobClients是用戶提交的作業與ResourceManager互動的主要介面,JobClients提供提交作業、追蹤行程、訪問子任務的日志記錄、獲取的MapReduce集群狀態資訊等功能,
3、Shuffle程序詳解
Hadoop運行機制中,將map輸出進行磁區、分組、排序、和合并等處理后作為輸入傳給Reducer的程序,稱為shuffle程序,

shuffle階段又可以分為Map端的shuffle和Reduce端的shuffle,
一、Map端的shuffle
寫磁盤:Map端會處理輸入資料并產生中間結果,這個中間結果會寫到本地磁盤,而不是HDFS,每個Map的輸出會先寫到記憶體緩沖區中,當寫入的資料達到設定的閾值時,系統將會啟動一個執行緒將緩沖區的資料寫到磁盤,這個程序叫做spill,
磁區、分組、排序:在spill寫入之前,會先進行二次排序,首先根據資料所屬的partition進行排序,然后每個磁區(partition)中的資料再按key來排序,partition的目是將記錄劃分到不同的Reducer上去,以期望能夠達到負載均衡,以后的Reducer就會根據partition來讀取自己對應的資料,接著運行combiner(如果設定了的話),combiner的本質也是一個Reducer,其目的是對將要寫入到磁盤上的檔案先進行一次處理,這樣,寫入到磁盤的資料量就會減少,最后將資料寫到本地磁盤產生spill檔案(spill檔案保存在{mapred.local.dir}指定的目錄中,Map任務結束后就會被洗掉),
檔案合并:最后,每個Map任務可能產生多個溢寫檔案(spill file),在每個Map任務完成前,會通過多路歸并演算法將這些spill檔案歸并成一個已經磁區和排序的輸出檔案,至此,Map的shuffle程序就結束了,
壓縮:在shuffle程序中如果壓縮被啟用,在map傳出資料傳入Reduce之前可執行壓縮,默認情況下壓縮是關閉的,可以將mapred.compress.map.output設定為true可實作壓縮,
二、Reduce端的shuffle
Reduce端的shuffle主要包括三個階段,copy、sort(merge)和reduce,
首先要將Map端產生的輸出檔案拷貝到Reduce端,但每個Reducer如何知道自己應該處理哪些資料呢?因為Map端進行partition的時候,實際上就相當于指定了每個Reducer要處理的資料(partition就對應了Reducer),所以Reducer在拷貝資料的時候只需拷貝與自己對應的partition中的資料即可,每個Reducer會處理一個或者多個partition,但需要先將自己對應的partition中的資料從每個Map的輸出結果中拷貝過來,
接下來就是排序(sort)階段,也成為合并(merge)階段,因為這個階段的主要作業是執行了歸并排序,從Map端拷貝到Reduce端的資料都是有序的,所以很適合歸并排序,最終在Reduce端生成一個較大的檔案作為Reduce的輸入,MapReduce編程介面
二、MapReduce編程基礎
1、Hadoop資料型別
Hadoop資料包括:BooleanWritable、ByteWritable、DoubleWritable、FloatWritale、IntWritable、LongWritable、Text、NullWritable等,它們實作了WritableComparable介面,其中Text表示使用UTF8格式存盤的文本、NullWritable型別是當(key,value)中的key或value為空時使用,
表2 Hadoop Writable與Java資料型別參照表
| Java基本型別 | Writable封裝類 | 型別 | 序列化后的長度為 |
| boolean | BooleanWritable | 布爾型 | 1 |
| byte | ByteWritable | 位元組型 | 1 |
| double | DoubleWritable | 雙精度浮點型 | 8 |
| float | FloatWritable | 單精度浮點型 | 8 |
| int | IntWritable VIntWritable | 整型 | 4 1-5 |
| long | LongWritable | 長整型 | 8 |
| short | ShortWritable | 短整型 | 2 |
| null | NullWritable | 空值 | 0 |
| Text | 文本型別 |
除了上述Hadoop型別外,用戶還可以自定義新的資料型別,用戶自定義資料型別需要實作Writable介面,但如果需要作為主鍵key使用或需要比較大小時,則需要實作WritableComparable介面,
2、資料輸入格式InputFormat
抽象類InputFormat<K,V>有三個直接子類:
FileInputFormat<K,V>、DBInputFormat<T>、DelegatingInputFormat<K,V>
其中,檔案輸入格式類FileInputFormat<K,V>類有幾個子類:
TextInputFormat、KeyValueInputFormat、SequenceFileInputFormat<K,V>、NlineInputFormat、CombineFileInputFormat<K,V>
序列化檔案輸入類SequenceFileInputFormat<K,V>有幾個子類:
SequenceFileAsBinaryInputFormat、SequenceFileAsTextInputFormat、SequenceFileInputFilter<K,V>
資料庫輸入格式類DBInputFormat<T>的直接子類是:DataDriverDBInputFormat<T>,而這個子類又派生子類:OracleDataDriverDBInputFormat<T>
表3 常用資料輸入格式類
| InputFormat類 | 描述 | 鍵(Key) | 值(Value) |
| TextInputFormat | 默認輸入格式,讀取文本檔案的行 | 當前行的偏移量 | 當前行內容 |
| KeyValueTextInputFormat | 將行決議成鍵值對 | 行內首個制表符的內容 | 行內其余內容 |
| SequenceFileInputFormat | 專用于高性能的二進制格式 | 用戶定義 | 用戶定義 |
3、輸入資料分塊InputSplit和資料記錄讀入RecordReader
編程時由用戶選擇的資料輸入格式InputFormat型別來自動決定資料分塊InputSplit和資料記錄RecordReader型別,一個InputSplit將單獨作為一個Mapper的輸入,即作業的Mapper數量是由InputSplit個數決定的,
表4 資料輸出格式類對應的Reader型別
| InputFormat類 | RecordReader類 | 描述 |
| TextInputFormat | LineRecordReader | 讀取文本檔案的行 |
| KeyValueTextInputFormat | KeyValueLineRecordReader | 讀取行并將行決議為鍵值對 |
| SequenceFileInputFormat | SequenceFileRecordReader | 用戶定義的格式產生鍵與值 |
| DBInputFormat | DBRecordReader | 僅適合讀取少量資料記錄,不適合資料倉庫聯機資料分析大量資料的讀取處理 |
4、資料輸出格式OutputFormat
抽象類OutputFormat<K,V>有四個直接子類:
FileOutputFormat<K,V>、DBOutputFormat<K,V>、NullOutputFormat<K,V>、FilterOutputFormat<K,V>
FileOutputFormat<K,V>有兩個直接子類:
TextOutputFormat<K,V>、SequenceFileOutputFormat<K,V>
SequenceFileOutputFormat<K,V>有直接子類:SequenceFileAsBinaryOutputFormat
FilterOutputFormat<K,V>有直接子類:LazyOutputFormat<K,V>
5、資料記錄輸出類RecordWriter
資料記錄輸出類RecordWriter是一個抽象類,
表5 資料輸出格式類對應的資料記錄Writer型別
| OutputFormat類 | RecordWriter類 | 描述 |
| TextOutputFormat | LineRecordWriter | 將結果資料以“key + \t + value”形式輸出到文本檔案中 |
| SequenceFileOutputFormat | SequenceFileRecordWriter | 用戶定義的格式產生鍵與值 |
| DBOutputFormat | DBRecordWriter | 將結果寫入到一個資料庫表中 |
| FilterOutputFormat | FilterRecordWriter | 對應于過濾器輸出模式的資料記錄模式,只將過濾器的結果輸出到檔案中 |
6、Mapper類
Mapper類是一個抽象類,位于hadoop-mapreduce-client-core-2.x.x.jar中,其完整類名是:org.apache.hadoop.mapreduce.Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>,需派生子類使用,在子類中重寫map方法:map(KEYIN key,VALUEIN value,Mapper.Context context)對出入的資料分塊每個鍵值對呼叫一次,
7、Reduce類
Reduce類是一個抽象類,位于hadoop-mapreduce-client-core-2.x.x.jar中,其完整類名是:org.apache.hadoop.mapreduce.Reduce<KEYIN,VALUEIN,KEYOUT,VALUEOUT>,需派生子類使用,在子類中重寫reduce方法:reduce(KEYIN key,Inerable <VALUEIN> value,Reducer.Context context)對出入的資料分塊每個鍵值對呼叫一次,
三、MapReduce專案案例
1、經典案例——WordCount
代碼演示:
package hadoop.mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MyWordCount {
/*
* KEYIN:是map階段輸入的key(偏移量)
* VALUEIN:是map階段輸入的value(文本檔案的內容--行)
* KEYOUT:是map階段輸出的key(單詞)
* VALUEOUT:是map階段輸出的value(單詞的計數--1)
*
* Java基本資料型別:
* int、short、long、double、float、char、boolean、byte
* hadoop資料型別
* IntWritable、ShortWritable、LongWritable、DoubleWritable、FloatWritable
* ByteWritable、BooleanWritable、NullWritable、Text
* Text:使用utf8編碼的文本型別
*/
public static class WordCount_Mapper extends Mapper<LongWritable, Text, Text, IntWritable>{
@Override //方法的重寫
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text,
Text, IntWritable>.Context context)
throws IOException, InterruptedException {
String[] line = value.toString().split(" "); //將獲取到的資料以空格進行切分成一個個單詞
for (String word : line) { //遍歷單詞的陣列
context.write(new Text(word), new IntWritable(1)); //單詞進行計數,將中間結果寫入context
}
}
}
/*
* KEYIN:reduce階段輸入的key(單詞)
* VALUEIN:reduce階段輸入的value(單詞的計數)
* KEYOUT:reduce階段輸出的key(單詞)
* VALUEOUT:reduce階段輸出的value(單詞計數的總和)
*
* reduce方法中做以下修改:
* 將Text arg0改為Text key
* 將Iterable<IntWritable> arg1改為Iterable<IntWritable> value
* 將Context arg2修改為Context context
*/
public static class WordCount_Reducer extends Reducer<Text, IntWritable, Text, IntWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Reducer<Text, IntWritable, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
int sum = 0; //創建一個變數,和
for (IntWritable intWritable : values) { //遍歷相同key單詞的計數
sum += intWritable.get(); //將相同key單詞的計數進行累加
}
context.write(key, new IntWritable(sum)); //將計算的結果寫入context
}
}
//提交作業
public static void main(String[] args) throws Exception {
String inPath= "hdfs://192.168.182.10:8020/input.txt";
String outPath = "hdfs://192.168.182.10:8020/output/";
Configuration conf = new Configuration();
Job job = Job.getInstance(); //創建Job物件job
FileSystem fs = FileSystem.get(conf);
if (fs.exists(new Path(outPath))) {
fs.delete(new Path(outPath), true);
}
job.setJarByClass(MyWordCount.class); //設定運行的主類MyWordCount
job.setMapperClass(WordCount_Mapper.class); //設定Mapper的主類
job.setReducerClass(WordCount_Reducer.class); //設定Reduce的主類
job.setOutputKeyClass(Text.class); //設定輸出key的型別
job.setOutputValueClass(IntWritable.class); //設定輸出value的型別
//設定檔案的輸入路徑(根據自己的IP和HDFS地址設定)
FileInputFormat.addInputPath(job, new Path(inPath));
//設定計算結果的輸出路徑(根據自己的IP和HDFS地址設定)
FileOutputFormat.setOutputPath(job, new Path(outPath));
System.exit((job.waitForCompletion(true)?0:1)); //提交任務并等待任務完成
}
}
打包上傳虛擬機:
步驟:
右鍵單擊專案名 --> 選擇 Export --> Java --> JAR file --> Browse...選擇存放路徑 --> 檔案名
命名為wordcount.jar,將打包好的jar包上傳到虛擬機中
運行代碼:
在本地創建一個檔案input.txt
vi input.txt
添加內容:
hello world
hello hadoop
bye world
bye hadoop
上傳到DHFS中:
hadoop fs -put input.txt /
使用jar命令執行專案:
hadoop jar wordcount.jar hadoop.mapreduce.MyWordCount
如下圖:

查看結果:

2、計算考試平均成績
代碼演示:
Mapper類
package hadoop.mapreduce;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Mapper;
/*
* 撰寫CourseScoreAverageMapper繼承Mapper類
*/
public class CourseScoreAverageMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
@Override //方法的重寫
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text,
Text, IntWritable>.Context context)
throws IOException, InterruptedException {
String line = new String(value.getBytes(),0,value.getLength(),"UTF8"); //轉換中文編碼
Counter countPrint = context.getCounter("CourseScoreAverageMapper.Map 輸出傳遞Value:", line); //通過計數器輸出變數值
countPrint.increment(1L); //將計數器加一
StringTokenizer tokenArticle = new StringTokenizer(line,"\n"); //將輸入的資料按行“\n”進行分割
while(tokenArticle.hasMoreElements()) {
StringTokenizer tokenLine = new StringTokenizer(tokenArticle.nextToken()); //每行按空格劃分
String strName = tokenLine.nextToken(); //按空格劃分出學生姓名
String strScore = tokenLine.nextToken(); //按空格劃分出學生成績
Text name = new Text(strName); //轉換為Text型別
int scoreInt = Integer.parseInt(strScore); //轉換為int型別
context.write(name, new IntWritable(scoreInt)); //將中間結果寫入context
countPrint = context.getCounter("CourseScoreAverageMapper.Map中回圈輸出資訊:", "<key,value>:<"+strName+","+strScore+">"); //輸出資訊
countPrint.increment(1L); //將計數器加一
}
}
}
Reducer類
package hadoop.mapreduce;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Reducer;
/*
* 撰寫CourseScoreAverageReducer繼承Reduce類
*/
public class CourseScoreAverageReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
@Override //重寫reduce方法
protected void reduce(Text key, Iterable<IntWritable> values,
Reducer<Text, IntWritable, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
int sum = 0; //總分
int count = 0; //科目數
for (IntWritable val : values) { //遍歷相同key的分數
sum += val.get(); //將相同key的分數進行累加
count++; //計算科目數
}
int average = (int)sum/count; //計算平均分
context.write(key, new IntWritable(average)); //將計算的結果寫入context
Counter countPrint = context.getCounter("CourseScoreAverageReducer.Reducer中輸出資訊:", "<key,value>:<"+key.toString()+","+average+">"); //輸出資訊
countPrint.increment(1L); //計數器加1
}
}
Driver類
package hadoop.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class CourseScoreDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration(); //獲取組態檔
Job job = Job.getInstance(conf,"CourseScoreAverage"); //創建Job物件job
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); //獲取命令列引數
if(otherArgs.length<2) {
System.err.print("Usage:hadoop jar MyAverage.jar <in> <out> ");
System.err.print("hadoop jar MyAverage.jar hadoop.mapreduce.CourseScoreDriver <in> <out>");
System.exit(2);
}else {
for (int i = 0; i < otherArgs.length-1; i++) { //設定檔案輸入路徑
if(!("hadoop.mapreduce.CourseScoreDriver".equalsIgnoreCase(otherArgs[i]))) { //排除hadoop.mapreduce.CourseScoreDriver這個引數
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
System.out.println("引數IN:"+otherArgs[i]);
}
}
//設定檔案輸出路徑
FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length-1])); //設定輸出路徑
System.out.println("引數OUT:"+otherArgs[otherArgs.length-1]);
}
FileSystem hdfs = FileSystem.get(conf); //創建檔案系統
if(hdfs.exists(new Path(otherArgs[otherArgs.length-1]))) { //如果已經存在該路徑,則洗掉該路徑
hdfs.delete(new Path(otherArgs[otherArgs.length-1]), true);
}
job.setJarByClass(CourseScoreDriver.class); //設定運行的主類CourseScoreDriver
job.setMapperClass(CourseScoreAverageMapper.class); //設定Mapper的主類
job.setCombinerClass(CourseScoreAverageReducer.class); //設定Combiner的主類
job.setReducerClass(CourseScoreAverageReducer.class); //設定Reduce的主類
job.setOutputKeyClass(Text.class); //設定輸出key的型別
job.setOutputValueClass(IntWritable.class); //設定輸出value的型別
job.setInputFormatClass(TextInputFormat.class); //設定輸入格式
job.setOutputFormatClass(TextOutputFormat.class); //設定輸出格式
System.exit((job.waitForCompletion(true)?0:1)); //提交任務并等待任務完成
System.out.println("Job Finished!");
}
}
打包上傳虛擬機:
步驟:
右鍵單擊專案名 --> 選擇 Export --> Java --> JAR file --> Browse...選擇存放路徑 --> 檔案名
命名為average.jar , 將打包好的average.jar上傳到虛擬機中
運行代碼:
首先準備三個檔案 Chinese.txt、Math.txt、English.txt,添加如下內容:
將檔案上傳到HDFS的data目錄下:
hadoop fs -mkdir /data
hadoop fs -put Chinese.txt /data/
hadoop fs -put Math.txt /data/
hadoop fs -put English.txt /data/
執行代碼:
hadoop jar average.jar hadoop.mapreduce.CourseScoreDriver /data /data/output
查看結果,如下圖:

3、網站日志分析
代碼演示:
打包上傳虛擬機:
運行代碼:
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/356065.html
標籤:其他

