寫在前面: 博主是一名大資料的初學者,昵稱來源于《愛麗絲夢游仙境》中的Alice和自己的昵稱,作為一名互聯網小白,
寫博客一方面是為了記錄自己的學習歷程,一方面是希望能夠幫助到很多和自己一樣處于起步階段的萌新,由于水平有限,博客中難免會有一些錯誤,有紕漏之處懇請各位大佬不吝賜教!個人小站:http://alices.ibilibili.xyz/ , 博客主頁:https://alice.blog.csdn.net/
盡管當前水平可能不及各位大佬,但我還是希望自己能夠做得更好,因為一天的生活就是一生的縮影,我希望在最美的年華,做最好的自己!
在正式開始之前,我們先來看看一個倒排索引的例子,

而具體什么是倒排索引?這里參考一下維基百科上的定義:
倒排索引(英語:Inverted index),也常被稱為反向索引、置入檔案或反向檔案,是一種索引方法,被用來存盤在全文搜索下某個單詞在一個檔案或者一組檔案中的存盤位置的映射,它是檔案檢索系統中最常用的資料結構,
有兩種不同的反向索引形式:
- 一條記錄的水平反向索引(或者反向檔案索引)包含每個參考單詞的檔案的串列,
- 一個單詞的水平反向索引(或者完全反向索引)又包含每個單詞在一個檔案中的位置,
后者的形式提供了更多的兼容性(比如短語搜索),但是需要更多的時間和空間來創建,
倒排索引在搜索引擎中比較常見,百度,谷歌等大型互聯網搜索引擎提供商均在搜索引擎業務中構建了倒序索引,本篇文章,就用一個簡單的demo教大家如何使用Hadoop實作倒序索引,
需求
現在有3個檔案,分別為 log_a.txt ,log_b.txt 和 log_c.txt,每個檔案的內容如下所示:
log_a.txt
hello java
hello hadoop
hello java
log_b.txt
hello hadoop
hello hadoop
java hadoop
log_c.txt
hello hadoop
hello java
要求經過 Hadoop 的處理后,輸出如下資訊:
hadoop log_c.txt-->1 log_b.txt-->3 log_a.txt-->1
hello log_c.txt-->2 log_b.txt-->2 log_a.txt-->3
java log_c.txt-->1 log_b.txt-->1 log_a.txt-->2
需求分析
為了實作這種效果,我們可以很自然想到用MapReduce去處理,但是考慮到只用一個MapReduce處理,代碼會寫的比較冗長,可讀性不強,對于新手小白不是很友好,于是本篇文章,作者介紹的就是如何通過兩個MapReduce來實作“倒排索引”的功能!
主要思路如下:
倒排索引第一步的Mapper類
我們輸出如下結果:
context.wirte(“hadoop->log_a.txt”, “1”)
context.wirte(“hadoop->log_b.txt”, “1”)
context.wirte(“hadoop->log_c.txt”, “1”)
…
倒排索引第一步的Reducer
最終輸出結果為:
hello --> log_a.txt 3
hello --> log_b.txt 2
hello --> log_c.txt 2
…
倒排索引第二步的mapper
hello --> log_a.txt 3
hello–>log_b.txt 2
hello–>log_c.txt 2
…
倒排索引第二步的Reducer
hello log_c.txt–>2 log_b.txt–>2 log_a.txt–>3
hadoop log_c.txt–>1 log_b.txt–>3 log_a.txt–>1
java log_c.txt–>1 log_b.txt–>1 log_a.txt–>2
好了,現在需求明確了,現在我們可以寫代碼了,
這是倒排索引第一步的Mapper:InverseIndexStepOneMapper
package io.alice;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
/**
* @Author: Alice菌
* @Date: 2020/10/4 20:38
* @Description:
* 讀取檔案的格式:
* log_a.txt
* hello java
* hello hadoop
* hello java
*
* 倒排索引第一步的Mapper類,
* 輸出結果如下:
* context.wirte("hadoop->log_a.txt", "1")
* context.wirte("hadoop->log_b.txt", "1")
* context.wirte("hadoop->log_c.txt", "1")
*/
public class InverseIndexStepOneMapper extends Mapper<LongWritable, Text,Text,LongWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
if (value != null){
// 獲取一行的資料
String line = value.toString();
// 按照空格拆分每個單詞
String[] words = line.split(" ");
if (words.length > 0){
// 獲取資料的切片資訊,并根據切片資訊獲取到檔案的名稱
FileSplit fileSplit = (FileSplit)context.getInputSplit();
String fileName = fileSplit.getPath().getName();
for (String word : words) {
context.write(new Text(word + "-->" + fileName),new LongWritable(1));
}
}
}
}
}
倒排索引第一步的Reducer,InverseIndexStepOneReducer
package io.alice;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* @Author: Alice菌
* @Date: 2020/10/4 20:47
* @description: 完成倒排索引第一步的Reducer程式
* 最終輸出結果為:
* hello-->log_a.txt 3
* hello-->log_b.txt 2
* hello-->log_c.txt 2
* hadoop-->log_a.txt 1
* hadoop-->log_b.txt 3
* hadoop-->log_c.txt 1
* java-->log_a.txt 2
* java-->log_b.txt 1
* java-->log_c.txt 1
*/
public class InverseIndexStepOneReducer extends Reducer<Text, LongWritable,Text,LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
if (values != null){
// 初始化一個變數 sum ,保存每個單詞在每個檔案中出現的次數
long sum = 0;
for (LongWritable value : values) {
sum += value.get();
}
context.write(key,new LongWritable(sum));
}
}
}
這是倒排索引第二步的Mapper:InverseIndexStepTwoMapper
package io.alice;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* @Author: Alice菌
* @Date: 2020/10/4 21:03
* @Description: 完成倒排索引第二步的mapper程式
*
* * hello-->log_a.txt 3
* hello-->log_b.txt 2
* hello-->log_c.txt 2
* hadoop-->log_a.txt 1
* hadoop-->log_b.txt 3
* hadoop-->log_c.txt 1
* java-->log_a.txt 2
* java-->log_b.txt 1
* java-->log_c.txt 1
*
* 輸出的資訊為:
* context.write("hadoop", "log_a.txt->1")
* context.write("hadoop", "log_b.txt->3")
* context.write("hadoop", "log_c.txt->1")
*
* context.write("hello", "log_a.txt->3")
* context.write("hello", "log_b.txt->2")
* context.write("hello", "log_c.txt->2")
*
* context.write("java", "log_a.txt->2")
* context.write("java", "log_b.txt->1")
* context.write("java", "log_c.txt->1")
*/
public class InverseIndexStepTwoMapper extends Mapper<LongWritable, Text,Text,Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
if (value != null){
String line = value.toString();
// 將第一步的Reduce輸出結果按照 \t 拆分
String[] fields = line.split("\t");
// 將拆分后的結果陣列的第一個元素再按照 --> 分隔
String[] wordAndFileName = fields[0].split("-->");
// 獲取到單詞
String word = wordAndFileName[0];
// 獲取到檔案名
String fileName = wordAndFileName[1];
// 獲取到單詞數量
long count = Long.parseLong(fields[1]);
context.write(new Text(word),new Text(fileName + "-->" + count));
}
}
}
倒排索引第二步的Reducer,InverseIndexStepTwoReducer
package io.alice;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* @Author: Alice菌
* @Date: 2020/10/4 21:21
* @Description: 完成倒排索引第二步的Reducer程式
* 得到的輸入資訊格式為:
* <"hello", {"log_a.txt->3", "log_b.txt->2", "log_c.txt->2"}>
* * 最終輸出結果如下:
* * hello log_c.txt-->2 log_b.txt-->2 log_a.txt-->3
* hadoop log_c.txt-->1 log_b.txt-->3 log_a.txt-->1
* java log_c.txt-->1 log_b.txt-->1 log_a.txt-->2
*/
public class InverseIndexStepTwoReducer extends Reducer<Text,Text,Text,Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
if (values != null){
String result = "";
for (Text value : values) {
result = result.concat(value.toString()).concat(" ");
}
context.write(key,new Text(result));
}
}
}
倒排索引的執行類:InverseIndexRunner
這里需要格外的注意,因為我們平時接觸到的MapReduce程度大多都是由一個Job完成的,本次案例在執行類中如何實作多個Job依次執行,大家可以借鑒學習!
package io.alice;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
/**io.alice.InverseIndexRunner
* @Author: Alice菌
* @Date: 2020/10/4 21:39
* @Description: 倒排索引的執行類
*/
public class InverseIndexRunner extends Configured implements Tool {
public static void main(String[] args) throws Exception {
ToolRunner.run(new Configuration(),new InverseIndexRunner(),args);
}
public int run(String[] args) throws Exception {
if (!runStepOneMapReduce(args)) {
return 1;
}
return runStepTwoMapReduce(args) ? 0:1;
}
private static boolean runStepOneMapReduce(String[] args) throws Exception {
Job job = getJob();
job.setJarByClass(InverseIndexRunner.class);
job.setMapperClass(InverseIndexStepOneMapper.class);
job.setReducerClass(InverseIndexStepOneReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true);
}
private static boolean runStepTwoMapReduce(String []args) throws Exception {
Job job = getJob();
job.setJarByClass(InverseIndexRunner.class);
job.setMapperClass(InverseIndexStepTwoMapper.class);
job.setReducerClass(InverseIndexStepTwoReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job,new Path(args[1] + "/part-r-00000"));
FileOutputFormat.setOutputPath(job,new Path(args[2]));
return job.waitForCompletion(true);
}
private static Job getJob() throws IOException {
Configuration conf = new Configuration();
return Job.getInstance(conf);
}
}
測驗執行
我們將專案打成jar包上傳至linux

然后將資料源所需要的檔案上傳至HDFS

然后執行命令:
hadoop jar /home/hadoop/alice_data-1.0-SNAPSHOT.jar io.alice.InverseIndexRunner /data/input /data/oneoutput /data/twooutput
程式就開始 奔跑 起來~

待到程式運行完畢,我們可以查看程式正確運行后的結果

看到最后的效果跟我們題目需求所想要的完全一致時,就說明我們的思路是沒錯滴~

小結
我們每向他人學習到一項新的技能,一定要主動去思考別人解決問題的出發點,只有學會思考,才能舉一反三,融會貫通!
本篇文章就到這里,更多精彩文章及福利,敬請關注博主原創公眾號【猿人菌】!

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/159657.html
標籤:其他
上一篇:如何快速建站,有沒有好的辦法
