一.序列化簡介
什么是序列化呢?

序列化:物件———》位元組序列
反序列化:位元組序列——》物件
備注:物件在記憶體(RAM)當中
位元組序列:可以在磁盤(ROM)當中,也可以在網路當中進行傳輸
序列化的根本緣故:將物件從RAM里的資料 轉化成ROM里的資料
二.序列化案例
我們這里將要撰寫的序列化的程式的流程如下圖所示,是一個統計手機耗費總流量的case:
對于這個案例而言,為什么需要進行序列化呢?
因為在第三階段,我們將手機的上行流量和下行流量都分別封裝進了一個物件當中,一個手機號對應兩個流量,因此一個bean物件(就是一個普通的物件,擁有方法,屬性等)當中具有多個資料,因此需要進行序列化,
三.撰寫Bean類
現在我們開始封裝這個手機的資料,代碼如下所示,代碼主要是為了封裝value,也就是手機的上行流量以及下行流量,這里不處理手機號,不將手機號進行封裝,因為手機號在我們的mapper階段,我們將其視為Key同時注意要想把結果顯示在檔案中,需要重寫toString(),且用"\t"分開,方便后續用,
import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; //建立每一個手機號所對應的物件 public class FlowBean implements Writable { private long upFlow;//上行流量 private long downFlow;//下行流量 private long sumFlow;//總流量 //空參構造,為了后續能夠反射 public FlowBean() { super(); } public FlowBean(long upFlow,long downFlow) { super(); this.upFlow=upFlow; this.downFlow=downFlow; sumFlow=upFlow+upFlow; } //序列化方法,這樣這個物件就可以很方便地進行序列化和反序列化了! //序列化的方法必須和反序列化相同 @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeLong(upFlow); dataOutput.writeLong(downFlow); dataOutput.writeLong(sumFlow); } //反序列方法 @Override public void readFields(DataInput dataInput) throws IOException { //必須要求和序列化要求順序一致,順序一致就可以進行接收 upFlow=dataInput.readLong(); downFlow= dataInput.readLong(); sumFlow=dataInput.readLong(); } @Override public String toString() { return upFlow + "\t" + downFlow + "\t" + sumFlow; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } public void set(long upFlow,long downFlow) { upFlow=upFlow; downFlow=downFlow; sumFlow=upFlow+downFlow; } }
在撰寫這個bean類當中,我們擁有了一個bean類所有的特征,比如get/set方法,以及需要擁有的序列化以及反序列化方法,可以在后續呼叫這個bean物件的時候,更加方便地進行序列化和反序列化,
四.撰寫Mapper類
將獲得的手機號碼設定為key,手機號的上行流量和下行流量分別設定為value,value使用bean類FlowBean來表示,(因為有兩個value,而在mapper里面又只能夠有一個輸出,因此只能使用一個類來代表mapper的輸出(valueout)了)
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class FlowCountMapper extends Mapper<LongWritable, Text,Text,FlowBean> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { Text k=new Text(); FlowBean v=new FlowBean(); //1.獲取一行,tostring方法已經被改寫,因此 String line=value.toString(); //2.切割 String[] fields=line.split("\t"); //3.封裝物件 k.set(fields[1]);//封裝手機號 //封裝 long upFlow= Long.parseLong(fields[fields.length-3]); long downFlow=Long.parseLong(fields[fields.length-2]); v.setUpFlow(upFlow); v.setUpFlow(downFlow); //v.set(); //4.寫出 context.write(k,v); } }
五.撰寫Reducer類
import org.apache.hadoop.mapreduce.Reducer; import javax.xml.soap.Text; import java.io.IOException; public class FlowCountReducer extends Reducer<Text,FlowBean,Text,FlowBean> { FlowBean v=new FlowBean(); @Override protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException { long sum_upFlow=0; long sum_downflow=0; //1.累加求和 for (FlowBean flowBean:values) { sum_upFlow+=flowBean.getUpFlow(); sum_downflow+=flowBean.getDownFlow(); } v.set(sum_upFlow,sum_downflow); //2.寫出 context.write(key,v); } }
六.撰寫Driver類
撰寫driver類是一個固定的步驟,可以直接根據注釋當中的步驟進行撰寫即可,代碼如下所示:
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import javax.xml.soap.Text; import java.io.IOException; public class FlowsumDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration configuration=new Configuration(); //1.獲取job物件 Job job=Job.getInstance(configuration); //2.設定jar路徑 job.setJarByClass(FlowsumDriver.class); //3.關聯mapper和reducer job.setMapperClass(FlowCountMapper.class); job.setReducerClass(FlowCountReducer.class); //4.設定mapper輸出的key和value型別 job.setMapOutputKeyClass(Text.class); job.setMapOutputKeyClass(FlowBean.class); //5.設定最終輸出的key和value型別 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); //6.設定輸出路徑 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); //7.提交job boolean result=job.waitForCompletion(true); System.out.println(result); } }
這樣我們就可以完成這個任務了!
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/251506.html
標籤:其他
