目錄
一、題目要求
第一題:月平均氣溫統計
第二題:每日空氣質量統計
第三題:各空氣質量分類天數統計
二、問題思路
(一)、月平均氣溫統計
1、思路
2、代碼
(二)、每日空氣質量統計
1、思路
2、代碼
(三)、各空氣質量分類天數統計
1、思路
2、代碼
一、題目要求

原資料如下:

第一題:月平均氣溫統計
得到示例結果如下:

第二題:每日空氣質量統計
示例結果如下:

第三題:各空氣質量分類天數統計
結果示例如下:

下面我將對這三個問題進行詳細解答,
二、問題思路
(一)、月平均氣溫統計
1、思路
這道題的解題思路是,要求每月的平均氣溫,但資料給的是每天的氣溫,所以需要將每天的資料分片,將每日編號(20160101)切片成每月編號(201601),這樣再進行對每月的氣溫取平均值就可以了,
寫這道題之前可以先把mapper和reducer里的k,v都想好了,然后再寫,mapper里是k1,v1,k2,v2,reducer里是k3,v3,k4,v4,如下:
k1 0行,1行 偏移量 LongWritable
v1 每一行的資料 Text
k2 201601 Text
v2 201601 對應的所有氣溫 -2,-3,,,, IntWritable
k3 201601 Text
v3 {-2,-3,-3,,,} IntWritable
k4 201601,201602,,, Text
v4 avg(每月氣溫平均值) FloatWritable
avg = total/count
2、代碼
組態檔:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>ST1-weather</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.3</version>
</dependency>
</dependencies>
</project>
mapper:
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class MavgtmpMapper extends Mapper<LongWritable, Text,Text,IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//忽略首行
//用key判斷,第一行的key一定是0
if (key.get()==0) {
return;
}
//字串型別轉成java型別
String data = value.toString();
//切片成陣列,一共有9個元素
String[] msgs = data.split(",");
//切片,將20160101(精確到日期),變成201601(精確到月)
if (!(msgs[5].equals("N/A"))){
String s1 = msgs[0].substring(0,6);
context.write(new Text(s1),new IntWritable(Integer.parseInt(msgs[5])));
}
}
}
reducer
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class MavgtmpReducer extends Reducer<Text, IntWritable,Text, FloatWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
//把處理的每一個201601的value值都加起來
int total = 0;
int count = 0;
for (IntWritable v:values){
//把IntWritable型別轉化為int型
count++;
total += v.get();
}
float avg = total/count;
//total由int型別變為FloatWritable型別
context.write(key,new FloatWritable(avg));
}
}
main
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MavgtmpMain {
public static void main(String[] args) throws Exception {
Job job = Job.getInstance(new Configuration());
//程式主類
job.setJarByClass(MavgtmpMain.class);
//Mapper類的相關設定
job.setMapperClass(MavgtmpMapper.class);
//Map輸出key,value型別
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//Reducer類的相關設定
job.setReducerClass(MavgtmpReducer.class);
//程式運行輸出key,value型別
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FloatWritable.class);
//設定輸入,輸出路徑
FileInputFormat.setInputPaths(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
//提交任務,并等待任務運行完成
job.waitForCompletion(true);
}
}
(二)、每日空氣質量統計
1、思路
求每日空氣質量統計,由題目可以確定k4是每日編號,v4是每日空氣質量AQI,由于每日每時有多個AQI,所以應取平均值,由此可以推出k3是每日編號,v3是每日AQI的集合,那末k2也能確定是每日編號,v2是每日編號對應的AQI,
k1 0行,1行 LongWritable
v1 每一行的資料 Text
k2 20160101 Text
v2 20160101對應的AQI值,即msg[6],212 IntWritable
k3 20160101 Text
v3 {212,209,204,194,,,} IntWritable
k4 20160101,20160102,20160103.,, Text
v4 avg(每天AQI平均值) IntWritable
avg = total/count
2、代碼
mapper
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class MavgaqiMapper extends Mapper<LongWritable, Text,Text,IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//忽略首行
//用key判斷,第一行的key一定是0
if (key.get()==0) {
return;
}
//字串型別轉成java型別
String data = value.toString();
//切片成陣列,一共有9個元素
String[] msgs = data.split(",");
if (!(msgs[6].equals("N/A"))){
context.write(new Text(msgs[0]),new IntWritable(Integer.parseInt(msgs[6])));
}
}
}
reducer
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class MavgaqiReducer extends Reducer<Text, IntWritable,Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
//把處理的每一個201601的value值都加起來
int total = 0;
int count = 0;
int avg = 0;
for (IntWritable v:values){
//把IntWritable型別轉化為int型
count++;
total += v.get();
}
avg = total/count;
//total由int型別變為FloatWritable型別
context.write(key,new IntWritable(avg));
}
}
main
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MavgaqiMain {
public static void main(String[] args) throws Exception {
Job job = Job.getInstance(new Configuration());
//程式主類
job.setJarByClass(MavgaqiMain.class);
//Mapper類的相關設定
job.setMapperClass(MavgaqiMapper.class);
//Map輸出key,value型別
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//Reducer類的相關設定
job.setReducerClass(MavgaqiReducer.class);
//程式運行輸出key,value型別
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//設定輸入,輸出路徑
FileInputFormat.setInputPaths(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
//提交任務,并等待任務運行完成
job.waitForCompletion(true);
}
}
(三)、各空氣質量分類天數統計
1、思路
問題是求解各空氣質量分類天數,可知k4是空氣質量,v4是天數,可以借助第二題求得的資料,第二題求的資料第一列是天數,第二列是該天的AQI,可以根據AQI的大小評估出該天的空氣質量等級,如下圖:,并將評估出的等級放入字串s,即k2為空氣質量等級,v2賦值為1;

k1 問題二的 0、1行 LongWritable
v1 每一行的資料 Text
k2 優、良、輕度污染 Text
v2 1 IntWritable
k3 優、良 Text
v3 {1,1,1...} IntWritable
k4 優,良 Text
v4 count求和 IntWritable
讀第二個檔案,的時候把text型別轉發為integer型別,區分不同等級,分類寫出來,把資料改成分為類的字符,
2、代碼
mapper
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class McountaqiMapper extends Mapper<LongWritable, Text,Text,IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//忽略首行
//用key判斷,第一行的key一定是0
//if (key.get()==0) {
// return;
// }
//字串型別轉成java型別
String data = value.toString();
//切片成陣列
String[] msgs = data.split("\t");
//索引下標為1的數值,并賦值給aqi
int aqi = Integer.parseInt(msgs[1]);
//s是字串
String s = "";
if (aqi<=50){
s = "優";
}
else if (aqi<=100){
s = "良";
}
else if (aqi<=150){
s = "輕度污染";
}
else if (aqi<=200){
s = "中度污染";
}
else if (aqi<=300){
s = "重度污染";
}
else {
s = "嚴重污染";
}
context.write(new Text(s),new IntWritable(1));
}
}
reducer
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class McountaqiReducer extends Reducer<Text, IntWritable,Text, IntWritable> {
@Override
protected void reduce(Text s, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
//把處理的每一個201601的value值都加起來
int daycount = 0;
for (IntWritable v:values) {
//把IntWritable型別轉化為int型
daycount += v.get();
}
context.write(s,new IntWritable(daycount));
}
}
main
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class McountaqiMain {
public static void main(String[] args) throws Exception {
Job job = Job.getInstance(new Configuration());
//程式主類
job.setJarByClass(McountaqiMain.class);
//Mapper類的相關設定
job.setMapperClass(McountaqiMapper.class);
//Map輸出key,value型別
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//Reducer類的相關設定
job.setReducerClass(McountaqiReducer.class);
//程式運行輸出key,value型別
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//設定輸入,輸出路徑
FileInputFormat.setInputPaths(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
//提交任務,并等待任務運行完成
job.waitForCompletion(true);
}
}
題都不難,好好思考就能解出來,加油!!!
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/287897.html
標籤:其他
