原始碼附上,請問為什么會卡在map0%reduce0%的情況?
用的是偽分布式安裝
package cn.itcast.mapreduce3;
import cn.itcast.mapreduce2.JobMain2;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.net.URI;
public class JobMain3 extends Configured implements Tool {
public int run(String[] strings) throws Exception {
Job job3 = Job.getInstance(super.getConf(), "Job3");
System.setProperty("HADOOP_USER_NAME", "root");
job3.addCacheFile(new URI("hdfs://localhost:9000/TFIDF/part-r-00000"));
job3.addCacheFile(new URI("hdfs://localhost:9000/TFIDF/part-r-00003"));
job3.setMapperClass(TFIDF_mapper3.class);
job3.setMapOutputKeyClass(Text.class);
job3.setMapOutputValueClass(Text.class);
job3.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job3,new Path("file:///D:\\hdfsdata\\TFIDF_result1"));
job3.setReducerClass(TFIDF_reudce3.class);
job3.setOutputKeyClass(Text.class);
job3.setOutputValueClass(Text.class);
job3.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job3,new Path("file:///D:\\hdfsdata\\TFIDF_result3"));
boolean b = job3.waitForCompletion(true);
return b ? 0:1;
}
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
int run = ToolRunner.run(configuration, new JobMain3(), args);
System.exit(run);
}
}
package cn.itcast.mapreduce3;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.text.NumberFormat;
import java.util.HashMap;
import java.util.Map;
public class TFIDF_mapper3 extends Mapper<LongWritable, Text,Text,Text> {
//存放微博總數D
private static Map<String, Integer> cmap = null;
//存放df 某一詞在文本中的數量
private static Map<String, Integer> df = null;
protected void setup(Context context) throws IOException, InterruptedException {
if (cmap == null || cmap.size() == 0 || df == null || df.size() == 0) {
URI[] cacheFiles = context.getCacheFiles(); // part-r-00000,part-r-00003
if (cacheFiles != null) {
//有倆個檔案 所以就要用for回圈找對應檔案
for (int i = 0; i < cacheFiles.length; i++) {
URI uri = cacheFiles[i];
if (uri.getPath().endsWith("part-r-00003")) {
//Path path = new Path(uri.getPath()); //這個Path就是以part-r-00003為路徑的Path
FileSystem fileSystem = FileSystem.get(cacheFiles[i],context.getConfiguration());
FSDataInputStream dataInputStream = fileSystem.open(new Path(cacheFiles[i]));
//獲取對應Path內容,是一個位元組流--》字符緩沖流
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(dataInputStream));
String readLine = bufferedReader.readLine();
if (readLine.startsWith("count")) {
String[] split = readLine.split("\t"); //只有一組資料的時候就用if操作
cmap = new HashMap<String, Integer>();
cmap.put(split[0], Integer.parseInt(split[1].trim()));
}
bufferedReader.close();
fileSystem.close();
}
else if (uri.getPath().endsWith("part-r-00000")) {
FileSystem fileSystem = FileSystem.get(cacheFiles[i],context.getConfiguration());
FSDataInputStream dataInputStream = fileSystem.open(new Path(cacheFiles[i]));
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(dataInputStream));
String readLine = bufferedReader.readLine();
while (readLine != null) {
String[] split = readLine.split("\t");
df = new HashMap<String, Integer>();
df.put(split[0], Integer.parseInt(split[1].trim()));
}
bufferedReader.close();
fileSystem.close();
}
}
}
}
}
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//Map階段讀取資料是: part-r-00000,00001,00002... 所以 我們要讀取這四個檔案的路徑 getInputSplit
FileSplit fileSplit = (FileSplit) context.getInputSplit();
String name = fileSplit.getPath().getName(); //得到了 當前路徑的檔案名 part-r-00000或part-r-00001 and so on
if (!name.contains("part-r-00003")){
String[] split = value.toString().trim().split("\t");
if(split.length>=2){
String Tf = split[1];
int tf = Integer.parseInt(Tf.trim());
String[] split1 = split[0].split("_");
if(split1.length>=2){
String word = split1[0];
String ID = split1[1];
/*
TF:就是第一次Maperduce中part-r-00000到00002中的VALUE值 但是還未歸一化
cmap:就是第一次Mapreduce中的Value的count后面的數字 Int
df:就是第二次Mapreduce中的part-r-00000中的VALUE值
*/
double tfidf = tf * Math.log( cmap.get("count") / df.get(word) );
NumberFormat numberFormat = NumberFormat.getInstance(); //獲取格式化資料
numberFormat.setMaximumFractionDigits(5); //double --》 String 就是通過numberformat的最大化小數方法轉換String
context.write(new Text(ID),new Text(word+":"+numberFormat.format(tfidf)));
}
}
}
}
}
package cn.itcast.mapreduce3;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class TFIDF_reudce3 extends Reducer<Text,Text,Text,Text> {
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
StringBuilder stringBuffer = new StringBuilder();
for (Text value : values) {
stringBuffer.append(value).append("\t");
}
context.write(key,new Text(stringBuffer.toString()));
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/195450.html
標籤:分布式計算/Hadoop
