我是Hadoop的初學者。我最近了解到,您可以使用 ToolRunner 和分布式快取通過 Hadoop 中的命令列讀取檔案。我想知道如何實作它以讀取 CSV 檔案。我主要對輸入目錄應該是什么感到困惑,因為 ToolRunner 中的 -files 方法應該即時添加檔案對嗎?有人可以幫忙嗎?
ToolRunner / 驅動程式
package stubs;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class AvgWordLength extends Configured implements Tool {
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new Configuration(), new AvgWordLength(), args);
System.exit(exitCode);
}
@Override
public int run(String[] args) throws Exception {
/*
* Validate that two arguments were passed from the command line.
*/
if (args.length != 2) {
System.out.printf("Usage: AvgWordLength <input dir> <output dir>\n");
System.exit(-1);
}
/*
* Instantiate a Job object for your job's configuration.
*/
Job job = Job.getInstance(getConf());
/*
* Specify the jar file that contains your driver, mapper, and reducer.
* Hadoop will transfer this jar file to nodes in your cluster running
* mapper and reducer tasks.
*/
job.setJarByClass(AvgWordLength.class);
/*
* Specify an easily-decipherable name for the job. This job name will
* appear in reports and logs.
*/
job.setJobName("Average Word Length");
/*
* Specify the paths to the input and output data based on the
* command-line arguments.
*/
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
/*
* Specify the mapper and reducer classes.
*/
job.setMapperClass(LetterMapper.class);
job.setReducerClass(AverageReducer.class);
/*
* The input file and output files are text files, so there is no need
* to call the setInputFormatClass and setOutputFormatClass methods.
*/
/*
* The mapper's output keys and values have different data types than
* the reducer's output keys and values. Therefore, you must call the
* setMapOutputKeyClass and setMapOutputValueClass methods.
*/
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
/*
* Specify the job's output key and value classes.
*/
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
/*
* Start the MapReduce job and wait for it to finish. If it finishes
* successfully, return 0. If not, return 1.
*/
boolean success = job.waitForCompletion(true);
return (success ? 0 : 1);
}
}
映射器
package stubs;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.conf.Configuration;
/**
* To define a map function for your MapReduce job, subclass the Mapper class
* and override the map method. The class definition requires four parameters:
*
* @param The
* data type of the input key - LongWritable
* @param The
* data type of the input value - Text
* @param The
* data type of the output key - Text
* @param The
* data type of the output value - IntWritable
*/
public class LetterMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
/**
* The map method runs once for each line of text in the input file. The
* method receives:
*
* @param A
* key of type LongWritable
* @param A
* value of type Text
* @param A
* Context object.
*/
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
/*
* Convert the line, which is received as a Text object, to a String
* object.
*/
String line = value.toString();
/*
* The line.split("\\W ") call uses regular expressions to split the
* line up by non-word characters. If you are not familiar with the use
* of regular expressions in Java code, search the web for
* "Java Regex Tutorial."
*/
if(line.contains("Colour,"){
String[] word = line.split(",");
String[] genre = word[9].split("\\|");
for(int i = 0; i < (genre.length) -1 ; i ){
context.write(new Text(genre[i]), new IntWritable(word.length()));
}
}
}
}
我使用的命令:
hadoop jar gc.jar stubs.AvgWordLength -files /home/training/training_materials/developer/data/movies_metadata.csv inputs gc_res
Inputs 是一個空目錄。如果我不放入輸入目錄,整個程式就會掛起。
uj5u.com熱心網友回復:
通用選項(例如檔案)位于命令(jar)和命令選項(類和你的兩個路徑,給 args 陣列)之前。
hadoop -files /home/training/training_materials/developer/data/movies_metadata.csv jar gc.jar stubs.AvgWordLength inputs gc_res
但是檔案只是將檔案復制到集群中,而不是分布式快取。
根據我的經驗,快取主要用作跨 mapreduce 階段共享資料的地方,而不是hdfs put在運行代碼之前替代跳過命令。
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/511160.html
