今天沒有多少的心思學習新的內容,但是對手機的依賴也并不是很大,就干脆來發布一條心得,來談談Hadoop的實戰吧
hadoop
介紹
學過大資料的我們都知道,Mapreduce是一種模式, Hadoop是一種框架, Hadoop是一個實作了mapreduce模式的開源的分布式并行編程框架,MapReduce是一種簡化的分布式的編程模式,讓程式自動分配到一個由普普通通的機器組成的超大集群上并發執行的,mapreduce的run-time系統會解決輸入資料的分布細節,跨越機器集群的執行調度,處理機器的實效,并且管理機器之間的通訊請求,這樣的模式允許程式員可以不需要有什么并發處理或者分布式系統的經驗,就可以處理超大的分布式系統得資源,
概論
如果想做一名Hadoop的程式員,那么要做的事情就是
1. 定義Mapper,處理輸入的Key-Value對,然后輸出中間的結果,
2. 定義Reducer,(這個可選),對中間的結果進行規約,輸出結果
3. 定義InputFormat和OutputFormat,這個步驟也是可選的
接觸過Hadoop的肯定對這些有很大的了解,InputFormat將每行輸入檔案的內容轉換成Java類供Mapper函式使用,如果這個時候沒有定義的話,默認的就是String,
4. 定義main函式,在main函式中定義一個Job,并且執行它,
5. 將寫好的Java檔案用maven打包,(當然,可以使用其他的方法打包),然后上傳到Hadoop的madreduce檔案夾中,
6. 應用這個.jar的包去處理hdfs中已經上傳的檔案
7. 然后剩下的事情就交給了系統,
我們來說說,檔案系統和一些方法的基本概念和用法
我們肯定知道,Hadoop的hdfs實作了GooGle的GFS的檔案系統,
Hadoop安裝完成以后,我們通過jps可以查看他有很多的行程,當然
我們肯定也都知道這些行程是什么意思:
1.Hadoop的HDFS實作了google的GFS檔案系統,NameNode作為檔案
系統的負責調度運行在master,DataNode運行在每個機器上,同時
Hadoop實作了Google的MapReduce,JobTracker作為MapReduce
的總調度運行在master,TaskTracker則運行在每個機器上執行Task,
2.main()函式,主要就是創建JobConf,定義一下Mapper,
Reducer,Input/OutputFormat來輸入輸出檔案目錄,最后
把Job提交給JobTracker,
3. JobTracker,創建一個InputFormat的實體,呼叫getSplit()方法,
把輸入目錄的檔案拆分成FileSplit作為task的輸入,然后加入Queue,
4. TaskTrack向JobTracker索求下一個Map/Reduce,
Mapper Task先從InputFormat創建RecordReader,回圈讀入FileSplits的內容生成Key與Value,傳給Mapper函式,處理完后中間結果寫成SequenceFile.
Reducer Task 從運行Mapper的TaskTracker的Jetty上使用http協議獲取所需的中間內容(33%),Sort/Merge后(66%),執行Reducer函式,最后按照OutputFormat寫入結果目錄,
TaskTracker 每10秒向JobTracker報告一次運行情況,每完成一個Task10秒后,就會向JobTracker索求下一個Task,
Nutch專案的全部資料處理都構建在Hadoop之上,詳情可以看我的博客Scalable Computing with Hadoop,
這里寫一個簡單的分布式Grep,簡單對輸入檔案進行逐行的正則匹配,如果符合就將該行列印到輸出檔案,因為是簡單的全部輸出,所以我們只要寫Mapper函式,不用寫Reducer函式,也不用定義Input/Output Format,
import java.io.IOException;
import java.net.URI;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class testGrep {
public static class GrepMapper extends
Mapper<Object, Text, Text, IntWritable> {
@Override
public void map(Object obj, Text text, Context context)
throws IOException, InterruptedException {
String pattern = context.getConfiguration().get("grep");
// System.out.println(split.getPath().toString());
String str = text.toString();
Pattern r = Pattern.compile(pattern);
Matcher m = r.matcher(str);
if (m.find()) {
FileSplit split = (FileSplit) context.getInputSplit();
String filename = split.getPath().getName();
context.write(new Text(filename), new IntWritable(1));
}
}
}
public static class GrepReducer extends
Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text text, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable t : values) {
sum += t.get();
}
context.write(text, new IntWritable(sum));
}
}
public static void main(String[] args) throws IOException,
ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
String pattern = ".22*.";//匹配含有w字符,這里修改我們需要匹配的模式
conf.set("grep", pattern);// 在這里設定需要匹配的正則運算式
Job job = Job.getInstance(conf, "grep");
job.setJarByClass(testGrep.class);
job.setMapperClass(GrepMapper.class);
job.setReducerClass(GrepReducer.class);
job.setCombinerClass(GrepReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//下面根據具體情況進行修改
String args1 = "hdfs://master:9000/user/grep_input";
String args2 = "hdfs://master:9000/user/grep_output";
FileSystem fs = FileSystem.newInstance(URI.create(args1), conf);
fs.delete(new Path(args2), true);
FileInputFormat.addInputPath(job, new Path(args1));
FileOutputFormat.setOutputPath(job, new Path(args2));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Hadoop的運行
Hadoop的運行分為兩種,一種是本地模式,一種是集群模式,這里集群模式參考我之前的文章,Hadoop的安裝教程
1 )執行bin/hadoop dfs 可以看到它所支持的檔案操作指令,
2) 創建目錄輸入input:
$ bin/hadoop dfs -mkdir input
3)上傳檔案xx.log到指定目錄 input :
$ bin/hadoop dfs -put xx.log input
4 ) 執行 bin/hadoop demo.hadoop.HadoopGrep input output
(jar包運行:bin/hadoop jar HadoopGrep.jar HadoopGrep input /tmp/output "[a-b]" )
5 ) 查看輸出檔案:
將輸出檔案從分布式檔案系統拷貝到本地檔案系統查看:
$ bin/hadoop fs -get output output
$ cat output/*
或者
在分布式檔案系統上查看輸出檔案:
$ bin/hadoop fs -cat output/*
重新執行前,運行hadoop/bin/hadoop dfs -rm output洗掉output目錄
7.運行hadoop/bin/stop-all.sh 結束,
總結
我們可以發現,其實Hadoop并不適用于檔案特別小的情況,
我們知道hdfs的一個block塊的大小是128M,整體有一個
10%的溢位情況,即便這樣,你的資料如果只有幾M或者只
有幾十、幾百k的情況,需要對你的海量的資料進行整合處
理,要不然資源進行了浪費,就連帶寬,如果檔案小、數量
小、集群數量小,處理的復雜度小的時候,也不是很有優勢,
例如:
如果我們不用Hadoop用java寫簡單的grep函式處理100M的log文
件主需要4s,用了hadoop local的方式運行是14秒,用了hadoop
單機集群的方式是30秒,用雙機集群10M網口的話則更慢,
這里的內容是我學習Hadoop以后的心得體會,
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/386520.html
標籤:其他
