學習背景:
基于美國民航航班的歷年資料(1987年--2008年),開發MapReduce、Pig、hive 應用程式計算其中某一年各個航班的飛行資料(飛行架次、飛行距離);
MapReduce專案:
1.撰寫MapReduce專案;

2.將資料檔案上傳到hadoop;
![]()
3.可以看看有沒有上傳成功,也可以在eclipse中查看;
4.啟動MapReduce專案,對專案進行配置;

5.我們可以匯出flightweekDist.jar 檔案,并將其運行在hadoop上;

6.可以到自己的輸出路徑去查看結果了,
是不是很簡單[手動狗頭]
Pig專案:
1.當然是撰寫pig腳本了
里面用到的和sql陳述句都很像,學過資料庫的應該問題不大,
解釋一下,load 資料后,stream的目的;
有興趣的小伙伴可以去來了解一下 stream 的用途,有過濾、去重、排序、分組,反正很多,很復雜,那句話的用途就是篩掉原資料中的表頭部分,
后面的陳述句不解釋了,不知道的看書,
2.然后我們就可以愉快的執行腳本了;


看到 success 就可以了,
3.直接 hadoop fs -cat 查看結果,
hive專案:
hive就比較有難度了,因為最后還要把結果寫入到 mysql 中,
1.上來直接寫代碼;
很簡單,都是jdbc的東西,javaSE大家都搞過,
注意:記得啟動hiveserver

2.然后就可以跑了,sell中會列印日志資訊,會跑一會,可以看看日志,這樣就知道程式再跑而不是卡了,哈哈,

小提示:
如果大家在啟動了hiveserver后,還需要用到終端的話,可以ctrl + z 后 輸入 bg 就可以掛起了,(大佬教的,哈哈哈);
最麻煩最麻煩的地方終于來了,
使用udf 函式 將 hive 運行結果寫入到 mysql 中;(我要是不把函式放出來我會不會被罵)
1.添加 jar 包(hive-contrib-0.9.0-cdh4.1.2.jar 和 mysql-connector-java-5.1.38.jar);

2.撰寫udf函式打包成jar包匯出,(大概長這個樣子)

3.打包匯出后,就是創建函式了,

注:函式要每次進入都需要創建的,要是經常需要的話,可以把這個寫入到組態檔中,具體的我就不會了,
hive>select dboutput('jdbc:mysql://192.168.1.100/hive','hive','mysql','INSERT INTO flightinfodistance(flightnumber,distance) VALUES (?,?)',flightnumber,distance) from flightinfodistance;
這句話比較重要,我們來分析一下哈(從左到右)
'dboutput' 是函式名;
'192.168.1.100' 是主機ip;
'hive' 是mysql資料庫名:
'hive' 是mysql資料庫賬號;
mysql是mysql資料庫密碼;
'flightinfodistance'是mysql資料庫名;
(flightnumber,distance)是mysql 資料庫欄位;
flightnumber,distance 是hive表中欄位;
flightinfodistance是hive表名;
執行這句話就行了,可以去自己的mysql資料庫中查看一下哈,
含淚上代碼:
MapReduce;
package com.ssh.flight;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.mockito.asm.tree.IntInsnNode;
public class FlightWeekDist {
//
public static class FlightNumMapper extends
Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text dateofWeek = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String[] fields = value.toString().split(",");
try {
int year = Integer.parseInt(fields[0]); // filter first raw
} catch (NumberFormatException e) {
return;
}
dateofWeek.set(fields[3]); // date of week
context.write(dateofWeek, one);
}
}
//
public static class FlightMilesMapper extends
Mapper<Object, Text, Text, IntWritable> {
private IntWritable Miles = new IntWritable();
private Text FlightNum = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String[] fields = value.toString().split(",");
try {
int year = Integer.parseInt(fields[0]); // filter first raw
} catch (NumberFormatException e) {
return;
}
String flight = fields[8] + fields[9];
FlightNum.set(flight); // class name
int miles = 0;
try {
miles = Integer.parseInt(fields[18]); // filter first raw
} catch (NumberFormatException e) {
}
Miles.set(miles);
context.write(FlightNum, Miles);
}
}
//
public static class FlightSumReducer extends
Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
//
private static void removeOutputPath(Configuration conf, String output1,
String output2) throws IOException {
FileSystem hdfs = FileSystem.get(conf);
Path path = new Path(output1);
hdfs.delete(path, true);
path = new Path(output2);
hdfs.delete(path, true);
}
//
private static Job createFlightNumJob(Configuration conf, String input,
String output) throws IOException {
Job job = new Job(conf, "Flight Numbers");
job.setJarByClass(FlightWeekDist.class);
job.setMapperClass(FlightNumMapper.class);
job.setCombinerClass(FlightSumReducer.class);
job.setReducerClass(FlightSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(input));
FileOutputFormat.setOutputPath(job, new Path(output));
return job;
}
private static Job createFlightMilesJob(Configuration conf, String input,
String output) throws IOException {
Job job = new Job(conf, "Flight Milse");
job.setJarByClass(FlightWeekDist.class);
job.setMapperClass(FlightMilesMapper.class);
job.setCombinerClass(FlightSumReducer.class);
job.setReducerClass(FlightSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(input));
FileOutputFormat.setOutputPath(job, new Path(output));
return job;
}
//此處以后代碼,是將結果寫入到html檔案中的
public static void transHDFSfile2local(Configuration conf, String src,
String dst) {
FileSystem hdfs;
try {
hdfs = FileSystem.get(conf);
Path path_src = new Path(src + "/part-r-00000");
Path path_dst = new Path(dst);
hdfs.copyToLocalFile(path_src, path_dst);
hdfs.close();
System.out.println("File copy success");
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
System.out.println("copy failed:" + e.getMessage());
}
}
public static ArrayList<String> readFile(String path) {
File file = new File(path);
BufferedReader reader = null;
ArrayList<String> lines = new ArrayList<String>();
try {
reader = new BufferedReader(new FileReader(file));
String tempString = null;
int line = 1;
while ((tempString = reader.readLine()) != null) {
// ines.add(tempString.substring(0,
// tempString.lastIndexOf(' ')));
// System.out.println(tempString.substring(0,
// tempString.lastIndexOf(' ')));
// System.out.println(tempString.substring(tempString.lastIndexOf(' ')+1));
lines.add(tempString.substring(0, tempString.lastIndexOf(' ')));
lines.add(tempString.substring(tempString.lastIndexOf(' ') + 1));
}
reader.close();
} catch (IOException e) {
e.printStackTrace();
} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException e1) {
}
}
}
// System.out.println("lines:");
// for (int i = 0; i < lines.size(); i++) {
// System.out.println(lines.get(i));
// }
return lines;
}
public static void writeFile(String path, ArrayList<String> strs) {
try {
FileWriter writer = new FileWriter(path, true);
int m = 0;
for (int i = 0; i < strs.size(); i++) {
if (m == 0) {
writer.write("<tr>" + '\n');
}
writer.write("<td>" + strs.get(i) + "</td>" + '\n');
m++;
if (m == 2) {
writer.write("</tr>" + '\n');
m = 0;
}
}
writer.close();
} catch (IOException e) {
e.printStackTrace();
}
}
public static void writeFileFromOtherFile(String from, String dst) {
try {
File from_file = new File(from);
FileWriter writer = new FileWriter(dst, true);
BufferedReader reader = null;
reader = new BufferedReader(new FileReader(from_file));
String tempString = null;
while ((tempString = reader.readLine()) != null) {
writer.write(tempString+'\n');
}
reader.close();
writer.close();
} catch (IOException e) {
e.printStackTrace();
}
}
public static void transTxT2HTML(String txt_src, String html_src) {
writeFile(html_src, readFile(txt_src));
}
//
public static void main(String[] args) throws Exception {
// hadoop jar flightcountjar.jar
// org.hebeu.hadoop.flightdist.FlightWeekDist ./flightcount/1987-all.csv
// ./flightcount/output1 ./flightcount/output2
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
if (otherArgs.length != 3) {
System.err.println("Usage: ScoreAnalysis <in> <out1> <out2>");
System.exit(2);
}
removeOutputPath(conf, otherArgs[1], otherArgs[2]);
Job job = createFlightNumJob(conf, otherArgs[0], otherArgs[1]);
job.waitForCompletion(true);
job = createFlightMilesJob(conf, otherArgs[0], otherArgs[2]);
job.waitForCompletion(true);
// copy files to local
transHDFSfile2local(conf, "/home/hoodoop/flightcount/output1",
"/home/hoodoop/flight_data/week_flight.dat");
transHDFSfile2local(conf, "/home/hoodoop/flightcount/output2",
"/home/hoodoop/flight_data/distance_flight.dat");
writeFileFromOtherFile("/home/hoodoop/html_format/before.dat", "/home/hoodoop/test.html");
transTxT2HTML("/home/hoodoop/flight_data/week_flight.dat",
"/home/hoodoop/test.html");
writeFileFromOtherFile("/home/hoodoop/html_format/middle.dat", "/home/hoodoop/test.html");
transTxT2HTML("/home/hoodoop/flight_data/distance_flight.dat",
"/home/hoodoop/test.html");
writeFileFromOtherFile("/home/hoodoop/html_format/last.dat", "/home/hoodoop/test.html");
}
}
pig;
records = LOAD '1987-all.csv' USING PigStorage(',') AS
(Year:int,Month:int,DayofMonth:int,DayOfWeek:int,DepTime:int,CRSDepTime:int,ArrTime:int,CRSArrTime:int,UniqueCarrier:chararray,FlightNum:chararray,TailNum:int,ActualElapsedTime:int,CRSElapsedTime:int,AirTime:int,ArrDelay:int,DepDelay:int,Origin:chararray,Dest:chararray,Distance:int,TaxiIn:chararray,TaxiOut:chararray,Cancelled:chararray,CancellationCode:chararray,Diverted:chararray,CarrierDelay:chararray,WeatherDelay:chararray,NASDelay:chararray,SecurityDelay:chararray,LateAircraftDelay:chararray);
flight_without_first_row = STREAM records THROUGH `tail -n +2` AS (Year:int,Month:int,DayofMonth:int,DayOfWeek:int,DepTime:int,CRSDepTime:int,ArrTime:int,CRSArrTime:int,UniqueCarrier:chararray,FlightNum:chararray,TailNum:int,ActualElapsedTime:int,CRSElapsedTime:int,AirTime:int,ArrDelay:int,DepDelay:int,Origin:chararray,Dest:chararray,Distance:int,TaxiIn:chararray,TaxiOut:chararray,Cancelled:chararray,CancellationCode:chararray,Diverted:chararray,CarrierDelay:chararray,WeatherDelay:chararray,NASDelay:chararray,SecurityDelay:chararray,LateAircraftDelay:chararray);
rmf week_flight_sort;
week_sorts = Group flight_without_first_row BY DayOfWeek;
week_sort = FOREACH week_sorts GENERATE group,COUNT($1);
STORE week_sort INTO 'week_flight_sort';
rmf flight_distances_statistices;
flight_distances= Group flight_without_first_row BY CONCAT(UniqueCarrier,FlightNum);
flight_distance = FOREACH flight_distances GENERATE group,SUM($1.Distance);
STORE flight_distance INTO 'flight_distances_statistices';
hive;
package hive;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
public class HiveJdbcTest {
private static final String driverName = "org.apache.hadoop.hive.jdbc.HiveDriver";
private static final String HOST = "192.168.1.100:10021";
private static final String URL = "jdbc:hive://" + HOST + "/default";
public static void main(String[] args) throws Exception {
Class.forName(driverName);
Connection conn = DriverManager.getConnection(URL, "", "");
Statement stmt = conn.createStatement();
String hql = "";
ResultSet res = null;
hql = "insert overwrite table flightinfosort select DayOfWeek,count(*) from FlightInfo1987 group by DayOfWeek ";
stmt.execute(hql);
hql = "insert overwrite table flightinfodistance select concat(UniqueCarrier,FlightNum),sum(Distance) from FlightInfo1987 group by concat(UniqueCarrier,FlightNum)";
stmt.execute(hql);
res.close();
stmt.close();
conn.close();
}
}
udf 函式GenericUDFDBOutput;
package org.apache.Hadoop.hive;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.UDFType;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFUtils;
import org.apache.hadoop.hive.serde.Constants;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
import org.apache.hadoop.io.IntWritable;
/**
* GenericUDFDBOutput is designed to output data directly from Hive to a JDBC
* datastore. This UDF is useful for exporting small to medium summaries that
* have a unique key.
*
* Due to the nature of hadoop, individual mappers, reducers or entire jobs can
* fail. If a failure occurs a mapper or reducer may be retried. This UDF has no
* way of detecting failures or rolling back a transaction. Consequently, you
* should only only use this to export to a table with a unique key. The unique
* key should safeguard against duplicate data.
*
* Use hive's ADD JAR feature to add your JDBC Driver to the distributed cache,
* otherwise GenericUDFDBoutput will fail.
*/
@Description(name = "dboutput",
value = "_FUNC_(jdbcstring,username,password,preparedstatement,[arguments])"
+ " - sends data to a jdbc driver",
extended = "argument 0 is the JDBC connection string\n"
+ "argument 1 is the user name\n"
+ "argument 2 is the password\n"
+ "argument 3 is an SQL query to be used in the PreparedStatement\n"
+ "argument (4-n) The remaining arguments must be primitive and are "
+ "passed to the PreparedStatement object\n")
@UDFType(deterministic = false)
public class GenericUDFDBOutput extends GenericUDF {
private static final Log LOG = LogFactory
.getLog(GenericUDFDBOutput.class.getName());
private transient ObjectInspector[] argumentOI;
private transient Connection connection = null;
private String url;
private String user;
private String pass;
private final IntWritable result = new IntWritable(-1);
/**
* @param arguments
* argument 0 is the JDBC connection string argument 1 is the user
* name argument 2 is the password argument 3 is an SQL query to be
* used in the PreparedStatement argument (4-n) The remaining
* arguments must be primitive and are passed to the
* PreparedStatement object
*/
@Override
public ObjectInspector initialize(ObjectInspector[] arguments)
throws UDFArgumentTypeException {
argumentOI = arguments;
// this should be connection url,username,password,query,column1[,columnn]*
for (int i = 0; i < 4; i++) {
if (arguments[i].getCategory() == ObjectInspector.Category.PRIMITIVE) {
PrimitiveObjectInspector poi = ((PrimitiveObjectInspector) arguments[i]);
if (!(poi.getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING)) {
throw new UDFArgumentTypeException(i,
"The argument of function should be \""
+ Constants.STRING_TYPE_NAME + "\", but \""
+ arguments[i].getTypeName() + "\" is found");
}
}
}
for (int i = 4; i < arguments.length; i++) {
if (arguments[i].getCategory() != ObjectInspector.Category.PRIMITIVE) {
throw new UDFArgumentTypeException(i,
"The argument of function should be primative" + ", but \""
+ arguments[i].getTypeName() + "\" is found");
}
}
return PrimitiveObjectInspectorFactory.writableIntObjectInspector;
}
/**
* @return 0 on success -1 on failure
*/
@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {
url = ((StringObjectInspector) argumentOI[0])
.getPrimitiveJavaObject(arguments[0].get());
user = ((StringObjectInspector) argumentOI[1])
.getPrimitiveJavaObject(arguments[1].get());
pass = ((StringObjectInspector) argumentOI[2])
.getPrimitiveJavaObject(arguments[2].get());
try {
connection = DriverManager.getConnection(url, user, pass);
} catch (SQLException ex) {
LOG.error("Driver loading or connection issue", ex);
result.set(2);
}
if (connection != null) {
try {
PreparedStatement ps = connection
.prepareStatement(((StringObjectInspector) argumentOI[3])
.getPrimitiveJavaObject(arguments[3].get()));
for (int i = 4; i < arguments.length; ++i) {
PrimitiveObjectInspector poi = ((PrimitiveObjectInspector) argumentOI[i]);
ps.setObject(i - 3, poi.getPrimitiveJavaObject(arguments[i].get()));
}
ps.execute();
ps.close();
result.set(0);
} catch (SQLException e) {
LOG.error("Underlying SQL exception", e);
result.set(1);
} finally {
try {
connection.close();
} catch (Exception ex) {
LOG.error("Underlying SQL exception during close", ex);
}
}
}
return result;
}
@Override
public String getDisplayString(String[] children) {
StringBuilder sb = new StringBuilder();
sb.append("dboutput(");
if (children.length > 0) {
sb.append(children[0]);
for (int i = 1; i < children.length; i++) {
sb.append(",");
sb.append(children[i]);
}
}
sb.append(")");
return sb.toString();
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/336200.html
標籤:其他
