背景
需要讀取HDFS上變化的日志檔案,對每一行進行處理,就是類似于Linux中tail -f實作的功能,
看了看好像Spark和Flink都沒有類似的支持,于是就用Flink自定義了Source實作了這個功能,
實作思路
維持一個當前讀取位置的偏移量,然后每隔幾秒去看下檔案的大小是否大于當前偏移量,如果最新檔案大小大于當前偏移量就讀取資料,并將當前偏移量設定為最新的檔案大小;反之,不做任何操作,
以下的代碼,還沒有把當前讀取位置存盤到狀態中,如果重啟會重頭開始讀,
實作代碼
自定義Source
package com.upupfeng.source;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
/**
* 自定義Source實作對HDFS上的檔案進行"tail -f"的類似操作
* @author mawf
*/
public class TailHdfsFileSource extends RichSourceFunction<String> {
// 當前讀取到的偏移量
private volatile Long currentPos = 0L;
// 運行flag
private volatile Boolean running;
// Flink HDFS FileSystem的配置
private Configuration configuration;
// 要監聽的檔案的目錄
private String path;
// 每次輪詢的間隔,秒
private Integer duration;
public TailHdfsFileSource(Configuration configuration, String path, Integer duration) {
this.configuration = configuration;
this.path = path;
this.duration = duration;
init();
}
// 初始化
private void init() {
running = true;
}
@Override
public void run(SourceContext<String> ctx) throws Exception {
FileSystem.initialize(configuration, null);
FileSystem fileSystem = FileSystem.get(FileSystem.getDefaultFsUri());
while (running) {
Long latestLength = getLatestLength(fileSystem);
if (latestLength > currentPos) {
collectRecords(ctx, fileSystem, latestLength);
}
Thread.sleep(duration * 1000);
}
}
// 收集記錄
public void collectRecords(SourceContext<String> ctx, FileSystem fs, Long latestLength) throws IOException {
FSDataInputStream dataInputStream = fs.open(new Path(path));
// 移動InputStream的偏移量
dataInputStream.seek(currentPos);
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(dataInputStream));
String line;
while ((line = bufferedReader.readLine()) != null) {
ctx.collect(line);
}
// 更新偏移量
currentPos = latestLength;
}
// 獲取最新的檔案大小
public Long getLatestLength(FileSystem fs) throws IOException {
FileStatus fileStatus = fs.getFileStatus(new Path(path));
return fileStatus.getLen();
}
@Override
public void cancel() {
running = false;
}
}
使用自定義的Source
package com.upupfeng.demo;
import com.upupfeng.source.TailHdfsFileSource;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @author mawf
*/
public class TailHdfsFileDemo {
public static void main(String[] args) throws Exception {
System.setProperty("user.name", "root");
// 創建配置物件
Configuration configuration = new Configuration();
configuration.setString("fs.default-scheme", "hdfs://hadoop1:8020");
String path = "/user/mwf/a.log";
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TailHdfsFileSource tailHdfsFileSource = new TailHdfsFileSource(configuration, path, 5);
env.addSource(tailHdfsFileSource)
.setParallelism(1)
.print();
env.execute();
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/259194.html
標籤:其他
