- 多表連接

思路為,通過map階段將資料按<key,value>進行map,key為id,則shuffle階段會自動進行組合,但同時對兩個表的內容進行標記,進行笛卡爾積時可以進行區分,
代碼如下
package org.apache.hadoop.examples;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
public class table_lianjie {
public static class mapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//下面兩步能獲取當前行資料的輸入檔案名稱
FileSplit fileSplit = (FileSplit) context.getInputSplit();
String name = fileSplit.getPath().getName();
//將當前行資料轉換為標準的String
String line = value.toString();
//若資料無效則丟棄
if (line == null || line.equals("")) return;
//根據空格進行分割
String[] split = line.split("\\s+");
if (name.contains("a")) {
//如果當前行是表一,在city前添加一個標記“#”,以跟表二區分
String id = split[0];
String city = split[1];
//輸出key為id,value為city
context.write(new Text(id), new Text("#" + city));
} else if (name.contains("b")) {
//如果當前行是表二,在輸出的value欄位前添加“$”,以跟表一區分
String id = split[0];
String num1 = split[1];
String num2 = split[2];
context.write(new Text(id), new Text("$" + num1 + "\t" + num2));
}
}
}
//reducer類
public static class reducer extends Reducer<Text, Text, Text, Text> {
//輸入的資料為<id,{value1,value2,....}>
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//list1存表一帶來的資料
List<String> list1 = new LinkedList<>();
//list2存表二帶來的資料
List<String> list2 = new LinkedList<>();
//遍歷values
for (Text text : values) {
String value = text.toString();
//如果value資料以#開頭,則為表一中的資料,添加至list1中
if (value.startsWith("#")) {
value = value.substring(1);
list1.add(value);
} else if (value.startsWith("$")) {
//如果value資料以$開頭,則為表二中的資料,添加至list2中
value = value.substring(1);
list2.add(value);
}
}
//將兩表id相同的資料進行笛卡爾積,key為id,value為list1與list2的組合
for (String a : list1) {
for (String b : list2) {
context.write(key, new Text(a + "\t" + b));
}
}
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//下面都是模板,只需修改輸入與輸出位置即可
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(table_lianjie.class);
job.setMapperClass(mapper.class);
job.setReducerClass(reducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, new Path("hdfs://lsn-linux:9000/input2"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://lsn-linux:9000/onput_lianjie"));
System.exit(job.waitForCompletion(true) ? 0 : -1);
}
}
- 對單詞進行排序:
mapreduce中 對資料進行map操作后,shuffle階段會自動排序,所以意在map階段處理資料,reduce階段則直接寫入
代碼如下:
package org.apache.hadoop.examples;
import java.io.IOException;
import java.util.StringTokenizer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordSort {
// 執行Map
public static class WordSortMapper
extends Mapper<LongWritable, Text, Text, Text> {
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line2 = value.toString();
String line = line2.replaceAll(","," ");
StringTokenizer token = new StringTokenizer(line);
while (token.hasMoreTokens()) {
String word = token.nextToken();
Pattern p = Pattern.compile("[A-Za-z]+");
Matcher m = p.matcher(word);
if (m.find()) {
context.write(new Text(m.group(0)), new Text());
}
}
}
public static class IntSumReducer2 extends
Reducer<Text, NullWritable, Text, NullWritable> {
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
context.write(key,NullWritable.get());
}
}
public static void main(String[] args) throws Exception {
Job job = Job.getInstance();
job.setJarByClass(WordSort.class);
job.setJobName("Word Sort");
// 添加輸入輸出路徑
FileInputFormat.addInputPath(job, new Path("hdfs://lsn-linux:9000/wordcount/zz.txt"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://lsn-linux:9000/ust9"));
// 設定執行Map和Reduce的類
job.setMapperClass(WordSortMapper.class);
job.setReducerClass(IntSumReducer2.class);
//設定輸出資料型別
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
}
- 對資料進行過濾
對資料過濾,如果是對字串簡單過濾,那么可以直接在map或者reduce中都可以寫,但如果,過濾程序較為復雜可以寫個函式進行過濾,另外過濾處理可能出現在reduce階段也可能出現在map階段,需要根據需求制定代碼策略,
代碼如下
package org.apache.hadoop.examples;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class wordcount_guolv {
public static class TokenizerMapper extends
Mapper<Object, Text, Text, Text>
{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String[] words = value.toString().split(",");
words[0]="2020年"+words[0];
if(words.length>=5)
if(!words[1].contains("湖北")) {
for(int i=1;i<5;i++) {
words[0]=words[0]+","+words[i];
}
context.write(new Text(words[0]), new Text());
}
}
}
public static class IntSumReducer2 extends
Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Text values,
Context context) throws IOException, InterruptedException {
context.write(key,new Text());
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Path mypath = new Path("hdfs://lsn-linux:9000/usr/root");
FileSystem hdfs = mypath.getFileSystem(conf);
if (hdfs.isDirectory(mypath)) {
hdfs.delete(mypath, true);
}
Job job = Job.getInstance();
job.setJarByClass(wordcount_guolv.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer2.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(
"hdfs://lsn-linux:9000/input/yq.csv"));
FileOutputFormat.setOutputPath(job, new Path(
"hdfs://lsn-linux:9000/ouptput5"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
- 統計單詞數量并排序:
思路:對于mapreduce進行單詞數量統計,很簡單,我們對資料進行分詞,分詞之后,每個詞語作為key,1作為value,傳入之后,reduce階段,將value進行求和即可,
代碼如下:
package org.apache.hadoop.examples;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
public static class TokenizerMapper extends
Mapper<Object, Text, Text, IntWritable>
{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String line= value.toString();
StringTokenizer itr = new StringTokenizer(line);
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer extends
Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Path mypath = new Path("hdfs://lsn-linux:9000/usr/root");
FileSystem hdfs = mypath.getFileSystem(conf);
if (hdfs.isDirectory(mypath)) {
hdfs.delete(mypath, true);
}
Job job = Job.getInstance();
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(
"hdfs://lsn-linux:9000/wordcount/zz.txt"));
FileOutputFormat.setOutputPath(job, new Path(
"hdfs://lsn-linux:9000/ust"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
- 求三個品牌一年一共銷售多少部手機
思路:該資料為三個銷手機品牌的銷售記錄,那么很簡單,只需三個品牌的手機的銷售數量在map階段都傳入value,之后求和即可
package com.sheng.hdfs;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
//計算
//maper計算框架,輸出:
class Mapper1 extends Mapper<LongWritable, Text,Text, IntWritable>{
@Override
protected void map(LongWritable key,Text values,Context context) throws IOException, InterruptedException {
//得到每一行的值
String lines =values.toString();
//對每一行的字串按逗號來分
String[] s= values.toString().split(",");
//輸出:key值和value值
context.write(new Text("總銷售量"), new IntWritable(Integer.parseInt(s[1])));
}
}
//Reducer輸出到Hadoop中:他的輸入是mapper的輸出
class WcReduce1 extends Reducer<Text, IntWritable,Text, IntWritable> {
@Override
protected void reduce(Text key,Iterable<IntWritable>values,Context context)throws IOException, InterruptedException{
int sum=0;
for (IntWritable val:values) {
sum+=val.get();
}
context.write(key,new IntWritable(sum));
}
}
- 計算這三個品牌手機這一年分別銷售多少部手機
思路:對于該問題,我們只需要在mao階段將傳入的key進行分類即可,那么在reduce階段,進行三次統計即可得出結果
package com.sheng.hdfs;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
//計算三個手機這一年的總銷量
class Mapper2 extends Mapper<LongWritable, Text, Text, IntWritable>{
@Override
protected void map(LongWritable key,Text value,Context context)throws IOException,InterruptedException {
String lines=value.toString();
String[] s=lines.split(",");
//key值是手機品牌名字,vaule值是銷售量
context.write(new Text(s[0]),new IntWritable(Integer.parseInt(s[1])));
}
}
//Reduce
class Reduce1 extends Reducer<Text, IntWritable, Text, IntWritable>{
@Override
protected void reduce(Text key,Iterable<IntWritable>value,Context context) throws IOException,InterruptedException {
int sum=0;
for(IntWritable val:value) {
sum+=val.get();
}
context.write(key, new IntWritable(sum));
}
}
//自定義磁區
//注意:磁區欄位要和value欄位相同
class Mypartitioner extends Partitioner<Text, IntWritable>{
@Override
public int getPartition(Text key, IntWritable value, int numPrtitons) {
if (key.toString().equals("xiaomi")) {
return 0;
}
if (key.toString().equals("華為")) {
return 1;
}
if (key.toString().equals("IP"))
return 2;
return 3;
}
}
public class Home2 {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//
Configuration conf = new Configuration();
// conf.set("HADOOP_USER_NAME","ambow");
// Job對像
Job job = Job.getInstance(conf);
// 注冊Jar驅動類
job.setJarByClass(Home2.class);
// 注冊Mapper驅動類
job.setMapperClass(Mapper2.class);
//注冊Reducer驅動類
job.setReducerClass(Reduce1.class);
// 設定MapOutPut輸出的的型別
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 設定最終輸出的型別
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 設定輸入輸出路徑
// org.apache.hadoop.mapred.FileInputFormat 老版本
// org.apache.hadoop.mapreduce.lib.input.FileInputFormat 新版本
FileInputFormat.setInputPaths(job, new Path("/user/test/data.csv"));
FileOutputFormat.setOutputPath(job, new Path("/user/test/data5.csv"));
// FileInputFormat.setInputPaths(job, new Path(args[0]));
// FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 設定reduce任務數為0 磁區多少個???
job.setPartitionerClass(Mypartitioner.class);
job.setNumReduceTasks(3);
// 提交作業
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
- 計算每個品牌手機的每個月的銷售總量
這時,我們不僅需要將每個品牌區分出來,還需要按月進行劃區,之后再reduce階段,按每個品牌,每個月進行銷售量求和即可
package com.sheng.hdfs;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/*
* 統計每個牌子的每個月的銷售總量
*/
class WcMapper2 extends Mapper<LongWritable, Text, Text, IntWritable> {
/*
* KeyIn:LongWritable 行的偏移量 ValueIn:Text 這一行的值 TextInputformat
*
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 得到每一行的值,反序化為字串
String lines = value.toString();
// 對每一行的字串按空格來拆分
String[] s = value.toString().split(",");
String str1=s[2];
String[] s1 = str1.toString().split("月");
String str3=s1[0];
String str2=str3.substring(5);
// 對每個單詞寫入Hadoop中 寫入的資料必須是Hadoop的序列化
context.write(new Text(str2+"月"+s[0]), new IntWritable(Integer.parseInt(s[1])));
// hello:1 word:1 aaaa:1 空格 :1 空格 :1 空格 :1
}
}
class WcReduce2 extends Reducer<Text, IntWritable, Text, IntWritable> {
// reduce(單詞key, 指定的單詞mapper統計的List, Context context)
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}
class MyPartitioner1 extends Partitioner<Text, IntWritable> {
//轉發給4個不同的reducer
//轉發給4個不同的reducer
@Override
public int getPartition(Text key, IntWritable value, int numPartitons) {
if (key.toString().equals("xiaomi"))
return 0;
if (key.toString().equals("huawei"))
return 1;
if (key.toString().equals("iphone7"))
return 2;
return 3;
}
}
public class Home3 {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//
Configuration conf = new Configuration();
// conf.set("HADOOP_USER_NAME","ambow");
// Job對像
Job job = Job.getInstance(conf);
// 注冊Jar驅動類
job.setJarByClass(Home3.class);
// 注冊Mapper驅動類
job.setMapperClass(WcMapper2.class);
//注冊Reducer驅動類
job.setReducerClass(WcReduce2.class);
// 設定MapOutPut輸出的的型別
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 設定最終輸出的型別
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 設定輸入輸出路徑
// org.apache.hadoop.mapred.FileInputFormat 老版本
// org.apache.hadoop.mapreduce.lib.input.FileInputFormat 新版本
FileInputFormat.setInputPaths(job, new Path("/user/test/data.csv"));
FileOutputFormat.setOutputPath(job, new Path("/user/test/data6.csv"));
// FileInputFormat.setInputPaths(job, new Path(args[0]));
// FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 設定reduce任務數為0 磁區多少個???
job.setPartitionerClass(MyPartitioner1.class);
//job.setNumReduceTasks(3);
// 提交作業
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/235453.html
標籤:其他
上一篇:【重寫爬蟲案例】百度圖片、今日頭條今日街拍爬取圖片中遇到的問題
下一篇:【MATLAB統計分析與應用100例】案例011:matlab讀取Excel資料,呼叫regress函式作一元線性回歸分析
