想用mapreduce找出幾天中,每一天的氣溫最高的記錄;代碼思路很簡單,日期作為key,時間和溫度作為value。然后在reduce方法中對values先遍歷一遍找到最高溫度max,再遍歷一遍values,找到最高溫度的記錄,但是很可惜,程式能輸出,但是輸出檔案沒有內容,不知是何緣由。個人懷疑是不是不能使用兩次遍歷。
輸入檔案中的格式是: 日期 時間 溫度
例:
2017-06-23 08 12
2017-06-23 12 25
代碼如下:
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MaxTemperature {
public static class MaxTemperatureMap extends Mapper<Object, Text, Text, Text>
{
@Override
public void map(Object key,Text value,Context context) throws IOException,InterruptedException
{
String[] str = value.toString().split(" ");
context.write(new Text(str[0]), new Text(str[1]+str[2]));
}
}
public static class MaxTemperatureReduce extends Reducer<Text, Text, Text, Text>
{
@Override
public void reduce(Text key ,Iterable<Text> values ,Context context) throws IOException,InterruptedException
{
//先找到最高溫度max
int max=Integer.MIN_VALUE;
for(Text value:values) {
max=Math.max(max, Integer.parseInt(value.toString().substring(2)));
}
// context.write(key, new Text(String.valueOf(max)));
//再找到取到最高溫度max時的記錄,輸出到檔案
for(Text value:values) {
if (max==Integer.parseInt(value.toString().substring(2)))
{
context.write(key, value);
}
}
// context.write(new Text(key.toString()+":"+value.toString().substring(0, 2)), new IntWritable(max));
}
}
public static void main(String[] args) throws Exception {
// 設定HDFS
String ipName = "127.0.0.1";
String hdfs = "hdfs://"+ipName+":9000";
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
String jobName = "MaxTemperature";
removeOutput(conf,hdfs);
job.setJarByClass(MaxTemperature.class);
job.setMapperClass(MaxTemperatureMap.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// job.setCombinerClass(MaxTemperatureCombine.class);
job.setReducerClass(MaxTemperatureReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//3.設定作業輸入和輸出路徑
String dataDir = "/workspace/flowStatistics/date_data"; //實驗資料目錄
String outputDir = "/workspace/flowStatistics/maxTemperature"; //實驗輸出目錄
Path inPath = new Path(hdfs + dataDir);
Path outPath = new Path(hdfs + outputDir);
FileInputFormat.addInputPath(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);
System.out.println("Job: " + jobName + " is running...");
if(job.waitForCompletion(true)) {
System.out.println("success!");
System.exit(0);
} else {
System.out.println("failed!");
System.exit(1);
}
}
// 這個方法是為了 避免因為output檔案已經存在而報錯。直接刪掉上一次運行的output檔案夾。
private static void removeOutput(Configuration conf, String ipPre)
throws IOException {
String outputPath = ipPre + "/workspace/flowStatistics/maxTemperature";
FileSystem fs = FileSystem.get(URI.create(outputPath), conf);
Path path = new Path(outputPath);
if (fs.exists(path)) {
fs.deleteOnExit(path);
}
fs.close();
}
}
uj5u.com熱心網友回復:
檢查一下counter中的redouce output record數量確實是零么?如果是,得檢查邏輯。uj5u.com熱心網友回復:
你 reduce 的時候, 獲取的 Iterable<Text> values ,在回圈迭代的時候, 獲取最大值的方式對么? value 是 時間 + 溫度 的字串吧.
uj5u.com熱心網友回復:
哦,我去看一下
uj5u.com熱心網友回復:
我后面有 subString(2);把溫度這個量取出來了
uj5u.com熱心網友回復:
請問你解決了嗎可以遍歷兩次嗎
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/239253.html
標籤:分布式計算/Hadoop
上一篇:關于ARToolkitX中制作NFT標記的genTexData生成特征的點的資料集這是屬于用哪種演算法提取出來的?
