實驗要求
Java API實作HDFS以下功能,
1. 在HDFS中創建一個新目錄;
2. 從本地拷貝檔案到新創建的目錄中;
3. 將HDFS中指定檔案的內容輸出到終端中;
4. 顯示一個指定目錄下所有檔案;
5. 完成指定目錄下指定型別檔案的合并
6. 在HDFS中,將檔案從源路徑移動到目的路徑,
7. 洗掉HDFS中指定的檔案;
實驗環境
IDEA(已配置好Hadoop環境)
專案檔案結構

代碼
1. 在HDFS中創建一個新目錄
新建Java class命名為 CreateDir
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.util.Scanner;
import java.net.URI;
public class CreateDir {
public static void main(String[] args) {
try {
Scanner sc = new Scanner(System.in);
String dirPath = '/'+sc.next();
FileSystem fs = FileSystem.get(new URI("hdfs://172.18.0.2:9000"), new Configuration());
Path hdfsPath = new Path(dirPath);
if(fs.mkdirs(hdfsPath)){
System.out.println("Directory "+ dirPath +" has been created successfully!");
}
}catch(Exception e) {
e.printStackTrace();
}
}
}
運行截圖:
終端上進行Hadoop檢測

2. 從本地拷貝檔案到新創建的目錄中
import java.net.URI;
import java.util.Scanner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class CopyFile {
public static void main(String args[]) throws Exception {
System.out.println("Input the filepath:");
Scanner sc = new Scanner(System.in);
Path src=new Path(sc.next());
FileSystem fs = FileSystem.get(new URI("hdfs://172.18.0.2:9000"), new Configuration());
Path dst =new Path("/mydir");
fs.copyFromLocalFile(src, dst);
}
}
運行結果

終端上進行Hadoop檢測

3. 將HDFS中指定檔案的內容輸出到終端中
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.net.URI;
import java.util.Scanner;
public class ReadFile {
public static void main(String[] args) {
try {
Scanner sc = new Scanner(System.in);
String filePath = '/'+sc.next();
FileSystem fs = FileSystem.get(new URI("hdfs://172.18.0.2:9000"), new Configuration());
Path srcPath = new Path(filePath);
FSDataInputStream is = fs.open(srcPath);
while(true) {
String line = is.readLine();
if(line == null) {
break;
}
System.out.println(line);
}
is.close();
}catch(Exception e) {
e.printStackTrace();
}
}
}
運行結果
終端上進行Hadoop檢測

這里為了第4,5問達到更好看的效果,Hadoop下面的mydir目錄中,又上傳了有人file.txt檔案,內容為“goodbye,big data!",
4. 顯示一個指定目錄下所有檔案
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import java.net.URI;
import java.util.Scanner;
public class ListFiles {
public static void main(String[] args) {
try {
Scanner sc = new Scanner(System.in);
String filePath = sc.next();
FileSystem fs = FileSystem.get(new URI("hdfs://172.18.0.2:9000"), new Configuration());
Path srcPath = new Path(filePath);
FileStatus[] stats = fs.listStatus(srcPath);
Path[] paths = FileUtil.stat2Paths(stats);
for(Path p : paths)
System.out.println(p.getName());
}catch(Exception e) {
e.printStackTrace();
}
}
}
運行結果

終端上進行Hadoop檢測

5. 完成指定目錄下指定型別檔案的合并
(本人對此處的“合并”,理解為“內容合并”,即將指定目錄下的同型別檔案內容寫進一個檔案,比如mydir目錄下有兩個文本檔案myfile.txt、yourfile.txt,需將兩個檔案內容寫入一個檔案mergeNew.txt,進行“合并”,)
import java.io.*;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
public class MergeFiles{
Path inputPath = null;
Path outputPath = null;
public MergeFiles(String input, String output) {
this.inputPath = new Path(input);
this.outputPath = new Path(output);
}
public void doMerge() throws IOException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://172.18.0.2:9000");
conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
FileSystem fsSource = FileSystem.get(URI.create(inputPath.toString()), conf);
FileSystem fsDst = FileSystem.get(URI.create(outputPath.toString()), conf);
FileStatus[] sourceStatus = fsSource.listStatus(inputPath);
FSDataOutputStream fsdos = fsDst.create(outputPath);
PrintStream ps = new PrintStream(System.out);
for (FileStatus sta : sourceStatus) {
System.out.println("path:" + sta.getPath() + " file size:" + sta.getLen() +
" auth:" + sta.getPermission() + " content:");
FSDataInputStream fsdis = fsSource.open(sta.getPath());
byte[] data = new byte[1024];
int read = -1;
while ((read = fsdis.read(data)) > 0) {
ps.write(data, 0, read);
fsdos.write(data, 0, read);
}
fsdis.close();
}//end of for
ps.close();
fsdos.close();
}//end of doMerge()
public static void main(String[] args) throws IOException{
MergeFiles MergeFiles = new MergeFiles(
"hdfs://172.18.0.2:9000/mydir",
"hdfs://172.18.0.2:9000/mergeNew.txt"
);
MergeFiles.doMerge();
}
}
運行結果

終端上進行Hadoop檢測

6. 在HDFS中,將檔案從源路徑移動到目的路徑
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.net.URI;
import java.util.Scanner;
public class MvFile{
public static void main(String[] args) {
try {
Scanner sc = new Scanner(System.in);
String srcStrPath = '/'+sc.next();
String dstStrPath = '/'+sc.next();
FileSystem fs = FileSystem.get(new URI("hdfs://172.18.0.2:9000"), new Configuration());
Path srcPath = new Path(srcStrPath);
Path dstPath = new Path(dstStrPath);
if(fs.rename(srcPath,dstPath)) {
System.out.println("movefile from " + srcStrPath + " to " + dstStrPath + "successfully!");
}
}catch(Exception e) {
e.printStackTrace();
}
}
}
運行結果

終端上進行Hadoop檢測

7. 洗掉HDFS中指定的檔案
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.net.URI;
import java.util.Scanner;
public class DeleteFile {
public static void main(String[] args) {
try {
Scanner sc = new Scanner(System.in);
String filePath = '/'+sc.next();
FileSystem fs = FileSystem.get(new URI("hdfs://172.18.0.2:9000"), new Configuration());
Path hdfsPath = new Path(filePath);
if(fs.delete(hdfsPath,false)){
System.out.println("File "+ filePath +" has been deleted successfully!");
}
}catch(Exception e) {
e.printStackTrace();
}
}
}
運行結果

終端上進行Hadoop檢測

備注
問題1

用java操作hdfs的時候會報這個錯誤,大概是這個原因:因為在配置Hadoop時

fs.defaultFS值使用的是節點名稱,所以ip要與master節點對應,"hdfs://100.65.0.2:9000"
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/336195.html
標籤:其他
上一篇:waterdrop1.5.1
