前言
序列化想必大家都很熟悉了,物件在進行網路傳輸程序中,需要序列化之后才能傳輸到客戶端,或者客戶端的資料序列化之后送達到服務端
序列化的標準解釋如下:
序列化就是把記憶體中的物件,轉換成位元組序列(或其他資料傳輸協議)以便于存盤到磁盤(持久化)和網路傳輸
對應的反序列化為序列化的逆向程序
反序列化就是將收到位元組序列(或其他資料傳輸協議)或者是磁盤的持久化資料,轉換成記憶體中的物件
為什么要序列化
一般來說,程式動態創建出來的“活的” 物件只生存在記憶體里,一旦服務停機或斷電就沒了,而且“活”物件只能存活于本地行程,不能發送到網路上其他的服務器或者行程中使用, 然而通過序列化之后,則可以存盤“活的”物件,從而進行網路傳輸,提供給其他行程或機器使用,
為什么不使用Java序列化
在Java中,創建一個物件如果希望這個物件是序列化的物件,只需要實作Serializable介面即可,但Java的序列化在Hadoop看來,是一個重量級序列化框架,一個物件被序列化后,會附帶很多額外的資訊(各種校驗資訊,Header,繼承體系等),從而不便于在網路中高效傳輸,所以,Hadoop自己開發了一套序列化機制,只需要物件實作Writable介面,重寫里面的兩個方法,
Hadoop序列化特點
- 緊湊 :高效使用存盤空間
- 快速:讀寫資料的額外開銷小
- 互操作:支持多語言的互動
Hadoop序列化業務場景
在真實的業務場景中,類似于wordcount那樣的單個字串的場景很少,而且無法應對各種復雜的大資料場景和海量資料的處理業務,因此在傳輸程序中,為了更加靈活的進行資料在Map、Reduce中的傳輸,將決議到的資料以序列化物件的方式傳輸,是非常便捷的
在Hadoop中,具體實作bean物件序列化步驟如下7步:
- 實作Writable介面
- 反序列化時,需要反射呼叫空參建構式,即類物件中必須有空參構造
- 重寫序列化write的方法
- 重寫反序列化的readFields方法
- 注意反序列化的順序和序列化的順序完全一致
- 若想把結果顯示在檔案中,需重寫toString(),可用"\t"分開,方便后續用
- 如果需將自定義的bean放在key中傳輸,還需要實作Comparable介面,因為MapReduce框中Shuffle程序要求對key必須能排序
案例業務描述
業務需求描述,如下資料為從某個地方匯出來的一批統計手機號峰值流量和低谷流量的文本檔案,現在的業務需求是,通程序式,最終輸出各個手機號對應的峰值流量、低谷流量以及總流量的統計分析檔案

那么最終的效果可按如下格式輸出

了解了上面的業務后,下面開始按照前面描述的幾個步驟進行編碼實作
編碼實作
1、定義一個封裝手機流量各個屬性的物件
從wordcount的案例中我們了解了使用mapreduce編碼的基本編碼套路,即map邏輯中讀取原始資料檔案,然后傳遞到reduce中
同樣,在這里的map邏輯中,需要讀取上面的原始的流量文本檔案,但是既然在reduce中要能實作最終的統計輸出,那么從map中出來的資料格式,必然是已經處理好的bean物件,key為手機號,而value值則為封裝了當前手機號對應的峰值流量、低谷流量以及計算的總流量資訊
了解了這一點,就大概知道這個bean物件該如何定義了
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class PhoneBean implements Writable {
//峰值流量
private long upFlow;
//低谷流量
private long downFlow;
//總流量
private long sumFlow;
//提供無參構造
public PhoneBean() {
}
//提供三個引數的getter和setter方法
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
public void setSumFlow() {
this.sumFlow = this.upFlow + this.downFlow;
}
//實作序列化和反序列化方法,注意順序一定要保持一致
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeLong(upFlow);
dataOutput.writeLong(downFlow);
dataOutput.writeLong(sumFlow);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.upFlow = dataInput.readLong();
this.downFlow = dataInput.readLong();
this.sumFlow = dataInput.readLong();
}
//重寫ToString方法
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}
}
2、自定義Mapper類
該類讀取和決議文本檔案,將各個手機號的屬性封裝到PhoneBean物件中,并輸出到Reduce使用
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class PhoneMapper extends Mapper<LongWritable, Text, Text, PhoneBean> {
private Text outK = new Text();
private PhoneBean outV = new PhoneBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
//分割資料
String[] split = line.split("\t");
//抓取需要的資料:手機號,上行流量,下行流量
String phone = split[1];
String max = split[3];
String mine = split[4];
//封裝outK outV
outK.set(phone);
outV.setUpFlow(Long.parseLong(max));
outV.setDownFlow(Long.parseLong(mine));
outV.setSumFlow();
//寫出outK outV
context.write(outK, outV);
}
}
4、自定義Reduce類
關于Reduce中的入參型別和出參型別,到這里想必都已經了解,就不再過多解釋了
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.util.LinkedList;
public class PhoneMapper extends Mapper<LongWritable, Text, Text, PhoneBean> {
private Text outK = new Text();
private PhoneBean outV = new PhoneBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
//分割資料
String[] splits = line.split("\t");
LinkedList<String> linkedList = new LinkedList<>();
for(String str:splits){
if(StringUtils.isNotEmpty(str)){
linkedList.add(str.trim());
}
}
//抓取需要的資料:手機號,上行流量,下行流量
String phone = linkedList.get(1);
String max = linkedList.get(3);
String mine = linkedList.get(4);
//封裝outK outV
outK.set(phone);
outV.setUpFlow(Long.parseLong(max));
outV.setDownFlow(Long.parseLong(mine));
outV.setSumFlow();
//寫出outK outV
context.write(outK, outV);
}
}
5、job類
依照wordcount案例中的模板做即可
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class PhoneJob {
public static void main(String[] args) throws Exception {
//1 獲取job物件
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//2 關聯本Driver類
job.setJarByClass(PhoneJob.class);
//3 關聯Mapper和Reducer
job.setMapperClass(PhoneMapper.class);
job.setReducerClass(PhoneReducer.class);
//4 設定Map端輸出KV型別
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(PhoneBean.class);
//5 設定程式最終輸出的KV型別
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(PhoneBean.class);
//6 設定程式的輸入輸出路徑
String inPath = "F:\\網盤\\csv\\phone_data.txt";
String outPath = "F:\\網盤\\csv\\out.txt";
FileInputFormat.setInputPaths(job, new Path(inPath));
FileOutputFormat.setOutputPath(job, new Path(outPath));
//7 提交Job
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
}
運行這段程式,觀察是否在輸出的目標路徑下,生成了統計結果

打開最后那個檔案,然后對比下原始的檔案,正好滿足預期的業務需求

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/401594.html
標籤:其他
