Flink的HiveStreamingSink實作流程
前言
目前我們為了增強資料的時效性,增加了Flink實時寫入Hive的流程,基于Flink寫入Hive這里之前基本上是沒有接觸過的,看了官網的文章之后,由于我們的追求資料為1-10分鐘內可見性,但是資料也不足1分鐘就能達到128MB的情況,于是也會產生各種各樣的十幾MB的小檔案,于是需要了解一下這個寫入流程基于上面進行改造,使小檔案能夠達到自動合并的效果,順便記錄一下FlinkStreamingHive的流程
文章目錄
- Flink的HiveStreamingSink實作流程
- 前言
- 1,HiveTableSink初始化校驗流程
- 1.1創建TableSink物件
- 1.2回傳SinkRunTimeProvider
- 2,HiveTableStreamSink創建
- 2.1 StreamSink的創建
- 3,HiveTableStreamSink壓縮流程
- 3.1 CompactFileWriter
- 3.2CompactCoordinator
- 3,3 CompactOperator
- 總結
1,HiveTableSink初始化校驗流程
1.1創建TableSink物件
public HiveTableSink(
ReadableConfig flinkConf,
JobConf jobConf,
ObjectIdentifier identifier,
CatalogTable table,
@Nullable Integer configuredParallelism) {
//構造方法傳入引數
this.flinkConf = flinkConf;
this.jobConf = jobConf;
//這個識別符號是一個catalog,db和tbObjName的標識
this.identifier = identifier;
//CataLog中表的資訊
this.catalogTable = table;
//HiveVersion這里很重要主要是為了根據不同的版本適配不同的實作方法,沒有寫入會獲取你metastore默認的version
hiveVersion =
Preconditions.checkNotNull(
jobConf.get(HiveCatalogFactoryOptions.HIVE_VERSION.key()),
"Hive version is not defined");
//hiveShim就是適配不同版本的工具類
hiveShim = HiveShimLoader.loadHiveShim(hiveVersion);
//獲取表結構
tableSchema = TableSchemaUtils.getPhysicalSchema(table.getSchema());
//獲取配置的Parallelism,這里是在工廠類里獲取傳入的
this.configuredParallelism = configuredParallelism;
}
1.2回傳SinkRunTimeProvider
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
//資料結構映射器,映射Pojo為DataRow等操作...
DataStructureConverter converter =
context.createDataStructureConverter(tableSchema.toRowDataType());
//這里就是回傳一個DataStreamSinkProvider,用于在啟動執行時提供Sink
return (DataStreamSinkProvider)
dataStream -> consume(dataStream, context.isBounded(), converter);
}
private DataStreamSink<?> consume(DataStream<RowData> dataStream, boolean isBounded, DataStructureConverter converter) {
//檢查是否是Hive的ACID表,在Hive高版本以上提供了ACID的概念,2.x以及3.x對acid支持的更好,Flink目前是不支持寫入Hive的ACID表的
checkAcidTable(catalogTable, identifier.toObjectPath());
//try with resource 好處就是無論是否exception都會幫你close connector
//flink 包裝了一層主要就是根據版本來創建metastore client,繼承了AutoCloseable幫助自動釋放
try (HiveMetastoreClientWrapper client = HiveMetastoreClientFactory.create(HiveConfUtils.create(jobConf), hiveVersion)) {
//通過identifier獲取Table物件;
Table table = client.getTable(identifier.getDatabaseName(), identifier.getObjectName());
//這里是獲取表存盤描述,里面包含表的一些元資料描述
StorageDescriptor sd = table.getSd();
//獲取Hive輸出結構Class
Class hiveOutputFormatClz =hiveShim.getHiveOutputFormatClass(Class.forName(sd.getOutputFormat()));
//檔案是否進行存盤壓縮,text不壓縮,如果store以orc or parquet即為true
boolean isCompressed = jobConf.getBoolean(HiveConf.ConfVars.COMPRESSRESULT.varname, false);
//對Hive操作的一個工廠,可以用來創建記錄寫入到Hive的寫入器
HiveWriterFactory writerFactory =
new HiveWriterFactory(
jobConf,
hiveOutputFormatClz,
sd.getSerdeInfo(),
tableSchema,
getPartitionKeyArray(),
HiveReflectionUtils.getTableMetadata(hiveShim, table),
hiveShim,
isCompressed);
//獲取檔案的擴展名,如果Table的Store是orc or parquet的話則是沒有擴展名的
String extension =
Utilities.getFileExtension(
jobConf,
isCompressed,
(HiveOutputFormat<?, ?>) hiveOutputFormatClz.newInstance());
//Flink輸出的一個小配置,其中主要包含檔案的前綴和后綴
//后綴是基于上面獲取的,而前綴則是part-隨機字符創
//比如說:part-e9ebbc0c-ae29-4ac7-8c84-f80daf385915-0-413
//這里的前綴并不是最終的檔案名稱,當你開啟了壓縮之后還會在前面添加內容的
OutputFileConfig.OutputFileConfigBuilder fileNamingBuilder =
OutputFileConfig.builder()
.withPartPrefix("part-" + UUID.randomUUID().toString())
.withPartSuffix(extension == null ? "" : extension);
//獲取parallelism,主要是為了后面使用.
final int parallelism =
Optional.ofNullable(configuredParallelism).orElse(dataStream.getParallelism());
//到這里針對于Sink的初始化基本已經結束了,接下來我們需要判斷本次執行是Stream還是batch
if (isBounded) {
//如果是Batch的話輸出檔案名是不同的,需要注意一下這個
OutputFileConfig fileNaming = fileNamingBuilder.build();
return createBatchSink(
dataStream, converter, sd, writerFactory, fileNaming, parallelism);
} else {
//如果是Stream的話,首先肯定是不支持overwrite的,如果是overwrite的話,直接exception
if (overwrite) {
throw new IllegalStateException("Streaming mode not support overwrite.");
}
//獲取一下Hive表的配置
Properties tableProps = HiveReflectionUtils.getTableMetadata(hiveShim, table);
//完成Hive的Sink創建
return createStreamSink(
dataStream, sd, tableProps, writerFactory, fileNamingBuilder, parallelism);
}
} catch (TException e) {
//例外這里也注意一下啦,后續排錯方便~
throw new CatalogException("Failed to query Hive metaStore", e);
} catch (IOException e) {
throw new FlinkRuntimeException("Failed to create staging dir", e);
} catch (ClassNotFoundException e) {
throw new FlinkHiveException("Failed to get output format class", e);
} catch (IllegalAccessException | InstantiationException e) {
throw new FlinkHiveException("Failed to instantiate output format instance", e);
}
}
基于這里,我們針對于Flink的HiveTableSink初始化就基本了解的差不多完成了,說實話一個頂級的框架代碼規范以及例外處理都是非常吊的,學框架的基本就是了解思想,其次要去看看別人怎么寫代碼,可以識訓特別多,非常值得我們學習;
2,HiveTableStreamSink創建
2.1 StreamSink的創建
private DataStreamSink<?> createStreamSink( //說實話我有強迫癥,這樣看引數我好難受
DataStream<RowData> dataStream, //資料流,不用多說
StorageDescriptor sd,//table storage description 里面包含一些描述
Properties tableProps, //table properties 表的配置就是你創建表的pros
HiveWriterFactory recordWriterFactory, //記錄寫出工廠
OutputFileConfig.OutputFileConfigBuilder fileNamingBuilder,//寫出檔案配置
final int parallelism //這里是subtask數量
) {
//創建一個Flink Configuration物件
org.apache.flink.configuration.Configuration conf = new org.apache.flink.configuration.Configuration();
//然后將表的配置資訊寫入
catalogTable.getOptions().forEach(conf::setString);
//資料磁區計算器~也就是hive的資料存盤磁區啦
HiveRowDataPartitionComputer partComputer =
new HiveRowDataPartitionComputer(
hiveShim,
defaultPartName(),
tableSchema.getFieldNames(),
tableSchema.getFieldDataTypes(),
getPartitionKeyArray());
//資料表bucket,根據partComputer來區分bucket
TableBucketAssigner assigner = new TableBucketAssigner(partComputer);
//資料檔案滾動策略,這個是最早的檔案處理機制,考慮到多磁區的情況會產生小檔案從而有了compress機制
HiveRollingPolicy rollingPolicy =
new HiveRollingPolicy(
conf.get(SINK_ROLLING_POLICY_FILE_SIZE).getBytes(),
conf.get(SINK_ROLLING_POLICY_ROLLOVER_INTERVAL).toMillis());
//是否開啟磁區檔案壓縮,磁區檔案壓縮這里說明一下
//為什么有了滾動策略還有這個壓縮,比如你parallelism是5 那就是會創建5個小檔案...自己想想吧
boolean autoCompaction = conf.getBoolean(FileSystemOptions.AUTO_COMPACTION);
//如果開啟了之后所有寫入的檔案(沒合并之前)都是uncompaction前綴的標識~
if (autoCompaction) {
fileNamingBuilder.withPartPrefix(
convertToUncompacted(fileNamingBuilder.build().getPartPrefix()));
}
//然后獲取檔案名配置
OutputFileConfig outputFileConfig = fileNamingBuilder.build();
//獲取path物件,這里的path物件是指表的存盤路徑,并不是某個檔案的絕對路徑,是表在HDFS的絕對路徑
org.apache.flink.core.fs.Path path = new org.apache.flink.core.fs.Path(sd.getLocation());
//BucketBuilder,
BucketsBuilder<RowData, String, ? extends BucketsBuilder<RowData, ?, ?>> builder;
//判斷是否是MR的還是FLINK本身的
if (flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER)) {
builder =
bucketsBuilderForMRWriter(
recordWriterFactory, sd, assigner, rollingPolicy, outputFileConfig);
LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer.");
} else {
Optional<BulkWriter.Factory<RowData>> bulkFactory =
createBulkWriterFactory(getPartitionKeyArray(), sd);
//根據不同的格式創建的bulkfactory,如果不存在則默認創建hadoop mr的
if (bulkFactory.isPresent()) {
builder =
StreamingFileSink.forBulkFormat(
path,
new FileSystemTableSink.ProjectionBulkFactory(
bulkFactory.get(), partComputer))
.withBucketAssigner(assigner)
.withRollingPolicy(rollingPolicy)
.withOutputFileConfig(outputFileConfig);
LOG.info("Hive streaming sink: Use native parquet&orc writer.");
} else {
builder =
bucketsBuilderForMRWriter(
recordWriterFactory, sd, assigner, rollingPolicy, outputFileConfig);
LOG.info(
"Hive streaming sink: Use MapReduce RecordWriter writer because BulkWriter Factory not available.");
}
}
//bucket檢查間隔,是建表是寫在tblPro的引數值,詳情見官網FileSystemSink
long bucketCheckInterval = conf.get(SINK_ROLLING_POLICY_CHECK_INTERVAL).toMillis();
//輸出流,這個sink只是將record寫出到檔案,并不是最終的operator
DataStream<PartitionCommitInfo> writerStream;
//判斷是否開啟壓縮,是建表是寫在tblPro的引數值
if (autoCompaction) {
//檔案壓縮的大小,這里我們要注意一下如果你不配置的話默認值就是你的SINK_ROLLING_POLICY_FILE_SIZE~
long compactionSize =
conf.getOptional(FileSystemOptions.COMPACTION_FILE_SIZE)
.orElse(conf.get(SINK_ROLLING_POLICY_FILE_SIZE))
.getBytes();
//創建輸出流,通過StreamingSink物件創建
writerStream =
StreamingSink.compactionWriter(
dataStream,
bucketCheckInterval,
builder,
fsFactory(),
path,
createCompactReaderFactory(sd, tableProps),
compactionSize,
parallelism);
} else {
writerStream =
StreamingSink.writer(dataStream, bucketCheckInterval, builder, parallelism);
}
//Sink就是掛載了Sink了這里先不急
return StreamingSink.sink(
writerStream, path, identifier, getPartitionKeys(), msFactory(), fsFactory(), conf);
}
到這里StreamSink就掛載結束了,但是其真正的實作我們目前并沒有看到,真正實作,其實實作是在compactionWriter中實作的,我們可以看一下這個內容
public static <T> DataStream<PartitionCommitInfo> compactionWriter(//暈~強迫癥已經犯了
DataStream<T> inputStream, //輸入流,比如 rowData,String,Struct等
long bucketCheckInterval,//檢查間隔
StreamingFileSink.BucketsBuilder<
T, String, ? extends StreamingFileSink.BucketsBuilder<T, String, ?>>
bucketsBuilder,//bucker建造者
FileSystemFactory fsFactory,//檔案系統工廠
Path path,//路徑
CompactReader.Factory<T> readFactory,//合并讀取工廠,這里應該很好理解吧~
long targetFileSize,//目標檔案大小
int parallelism //磁區數
) {
/**
* 這個類里實作了三個算子~我們來看一下這是哪個算子的用戶
* writer是用來寫入資料到buckt,并且像下游發送openfile or checkpoint success message操作的
*
* coordinator是協調檔案寫入的operator,就是負責計算哪些檔案可以合并 的
*
* compacter是壓縮的operator
*/
CompactFileWriter<T> writer = new CompactFileWriter<>(bucketCheckInterval, bucketsBuilder);
..........
CompactCoordinator coordinator = new CompactCoordinator(fsSupplier, targetFileSize);
..........
CompactOperator<T> compacter = new CompactOperator<>(fsSupplier, readFactory, writerFactory);
}
3,HiveTableStreamSink壓縮流程
3.1 CompactFileWriter
這個類就是將資料寫入檔案中~其本身沒有實作如何寫入,真正寫入資料是在其父類中,但是當其父類提交了檢查點之后,他會向下游發送一條寫入結束的記錄;
package org.apache.flink.table.filesystem.stream.compact;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.filesystem.stream.AbstractStreamingWriter;
import org.apache.flink.table.filesystem.stream.compact.CompactMessages.EndCheckpoint;
import org.apache.flink.table.filesystem.stream.compact.CompactMessages.InputFile;
/**
* 該operator主要是繼承了一個抽象類,重寫了某些方法,
* 而abstractStreamingWriter里面同時還包括寫入資料的方法,在這里資料已經被寫入bucket
*/
public class CompactFileWriter<T>
extends AbstractStreamingWriter<T, CompactMessages.CoordinatorInput> {
private static final long serialVersionUID = 1L;
public CompactFileWriter(
long bucketCheckInterval,
StreamingFileSink.BucketsBuilder<
T, String, ? extends StreamingFileSink.BucketsBuilder<T, String, ?>>
bucketsBuilder)
super(bucketCheckInterval, bucketsBuilder);
}
@Override
protected void partitionCreated(String partition) {}
@Override
protected void partitionInactive(String partition) {}
@Override
protected void onPartFileOpened(String partition, Path newPath) {
//像下游發送通知,通知新的檔案已經開始創建
output.collect(new StreamRecord<>(new InputFile(partition, newPath)));
}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
super.notifyCheckpointComplete(checkpointId);
//當檢查點結束時,像下游發送檢查點完成的訊息
output.collect(
new StreamRecord<>(
new EndCheckpoint(
checkpointId,
getRuntimeContext().getIndexOfThisSubtask(),
getRuntimeContext().getNumberOfParallelSubtasks())));
}
}
/**
* Operator for file system sink. It is a operator version of {@link StreamingFileSink}. It can send
* file and bucket information to downstream.
* 注釋的大概意思是該類是StreamingFileSink的一個operator version,能夠像下游發送file和bucket information
*/
public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOperator<OUT>
implements OneInputStreamOperator<IN, OUT>, BoundedOneInput {
.........(略過代碼標識)
/** 磁區創建通知 */
protected abstract void partitionCreated(String partition);
/**
* Notifies a partition become inactive. A partition becomes inactive after all the records
* received so far have been committed.
*/
protected abstract void partitionInactive(String partition);
/**
* Notifies a new file has been opened.
*
* <p>Note that this does not mean that the file has been created in the file system. It is only
* created logically and the actual file will be generated after it is committed.
*/
protected abstract void onPartFileOpened(String partition, Path newPath);
/** Commit up to this checkpoint id. */
protected void commitUpToCheckpoint(long checkpointId) throws Exception {
helper.commitUpToCheckpoint(checkpointId);
}
.........(略過代碼標識)
@Override//寫出資料到bucket
public void processElement(StreamRecord<IN> element) throws Exception {
helper.onElement(
element.getValue(),
getProcessingTimeService().getCurrentProcessingTime(),
element.hasTimestamp() ? element.getTimestamp() : null,
currentWatermark);
}
.........(略過代碼標識)
}
3.2CompactCoordinator
該operator的receiver為當前打開的檔案和檢查點結束訊息,同時會將本次檢查點中打開的檔案存盤到state中,當接收到檢查點結束的標識時,將本次檢查點內的檔案全部取出協調,然后將其發送到下游,下游壓縮時可以隨時開始,而無需去關注可能發生的不好情況
public class CompactCoordinator extends AbstractStreamOperator<CoordinatorOutput>
implements OneInputStreamOperator<CoordinatorInput, CoordinatorOutput> {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(CompactCoordinator.class);
//一個函式介面,如果正常情況下會回傳FileSystem的物件,否則拋出IoException
private final SupplierWithException<FileSystem, IOException> fsFactory;
//目標檔案大小.
private final long targetFileSize;
//檔案系統物件,并非原生,這里F
private transient FileSystem fileSystem;
private transient ListState<Map<Long, Map<String, List<Path>>>> inputFilesState; //輸入檔案狀態
private transient TreeMap<Long, Map<String, List<Path>>> inputFiles; //輸入檔案
private transient Map<String, List<Path>> currentInputFiles;//當前輸入檔案
//這個物件用來判斷上游是否已經收到了當前檢查點的所有資料
private transient TaskTracker inputTaskTracker;
public CompactCoordinator(
SupplierWithException<FileSystem, IOException> fsFactory, long targetFileSize) {
this.fsFactory = fsFactory;
this.targetFileSize = targetFileSize;
}
//初始化狀態
//這個方法涉及到Flink state 當一個operator具有可恢復的state是需要重寫該方法
//每次閱讀一個transformation都會去看一下這個方法,看看恢復時會恢復哪些,是否是自己想的
@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
//獲取檔案系統
fileSystem = fsFactory.get();
//這里就是根據給定的名稱和串列元素新建一個listStateDescriptor物件
//這個物件的作用是一個串列的狀態描述這個狀態很好描述啦,上一次處理到那個檔案的那個位置了
ListStateDescriptor<Map<Long, Map<String, List<Path>>>> filesDescriptor =
new ListStateDescriptor<>(
"files-state",
new MapSerializer<>(
LongSerializer.INSTANCE,
new MapSerializer<>(
StringSerializer.INSTANCE,
new ListSerializer<>(
new KryoSerializer<>(
Path.class, getExecutionConfig())))));
//給定當前輸入檔案的state
inputFilesState = context.getOperatorStateStore().getListState(filesDescriptor);
//創建一個Map
inputFiles = new TreeMap<>();
currentInputFiles = new HashMap<>();
//判斷是否是從狀態恢復的,如果不是則gg~簡單來說就是來恢復state的
if (context.isRestored()) {
inputFiles.putAll(inputFilesState.get().iterator().next());
}
}
@Override
public void processElement(StreamRecord<CoordinatorInput> element) throws Exception {
CoordinatorInput value = element.getValue();
//判斷上游過來的訊息,該訊息如果是InputFile,則將其寫入state
if (value instanceof InputFile) {
InputFile file = (InputFile) value;
currentInputFiles
.computeIfAbsent(file.getPartition(), k -> new ArrayList<>())
.add(file.getFile());
} else if (value instanceof EndCheckpoint) { //如果輸入的是結束檢查點,則開始壓縮資料
EndCheckpoint endCheckpoint = (EndCheckpoint) value;
if (inputTaskTracker == null) {
//創建TaskTracker物件用來追蹤上游是否已處理完當前檢查點資料
inputTaskTracker = new TaskTracker(endCheckpoint.getNumberOfTasks());
}
// 判斷所有task是否已經全部結束,只有全部結束才會回傳true
boolean triggerCommit =
inputTaskTracker.add(
endCheckpoint.getCheckpointId(), endCheckpoint.getTaskId());
// 當所有上游處理完發送完end chk之后,開始提交chk并進行compact 協調
if (triggerCommit) {
commitUpToCheckpoint(endCheckpoint.getCheckpointId());
}
} else {
throw new UnsupportedOperationException("Unsupported input message: " + value);
}
}
private void commitUpToCheckpoint(long checkpointId) {
//獲取當前的輸入檔案
Map<Long, Map<String, List<Path>>> headMap = inputFiles.headMap(checkpointId, true);
//進行壓縮協調
for (Map.Entry<Long, Map<String, List<Path>>> entry : headMap.entrySet()) {
coordinate(entry.getKey(), entry.getValue());
}
headMap.clear();
}
/** Do stable compaction coordination. */
private void coordinate(long checkpointId, Map<String, List<Path>> partFiles) {
//定義一個獲取檔案大小的方法
Function<Path, Long> sizeFunc =
path -> {
try {
return fileSystem.getFileStatus(path).getLen();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
};
// We need a stable compaction algorithm.
Map<String, List<List<Path>>> compactUnits = new HashMap<>();
partFiles.forEach(
(p, files) -> {
// 對檔案進行排序
files.sort(Comparator.comparing(Path::getPath));
//這里采用的合并演算法是,targetFileSize>=sum(files.len)
//同一個partition下將能合并的檔案放到同一個list中
compactUnits.put(p, BinPacking.pack(files, sizeFunc, targetFileSize));
});
int unitId = 0;
for (Map.Entry<String, List<List<Path>>> unitsEntry : compactUnits.entrySet()) {
String partition = unitsEntry.getKey();
for (List<Path> unit : unitsEntry.getValue()) {
//發送unitId,partiton(path or partition),unit(可以合并的檔案)
output.collect(new StreamRecord<>(new CompactionUnit(unitId, partition, unit)));
unitId++;
}
}
LOG.debug("Coordinate checkpoint-{}, compaction units are: {}", checkpointId, compactUnits);
// 發送檢查點
output.collect(new StreamRecord<>(new EndCompaction(checkpointId)));
}
}
3,3 CompactOperator
這個類就是HiveStreamSink的最終operator,經過上游發送的檔案就是同路徑下可以壓縮的檔案和unitid,完成壓縮流程其實就是所謂的將小檔案讀取出來,然后寫入到一個新檔案中,然后將原來的舊檔案進行洗掉
/**
* 這個transformation就是最后一步壓縮算子了,到了這個類,Flink流式寫入Hive基本可以結束了,我們繼續往下看
*/
public class CompactOperator<T> extends AbstractStreamOperator<PartitionCommitInfo>
implements OneInputStreamOperator<CoordinatorOutput, PartitionCommitInfo>, BoundedOneInput {
private static final long serialVersionUID = 1L;
//未壓縮的檔案前綴
public static final String UNCOMPACTED_PREFIX = ".uncompacted-";
//壓縮后的檔案前綴
public static final String COMPACTED_PREFIX = "compacted-";
//檔案系統工廠物件,用于生成檔案系統
private final SupplierWithException<FileSystem, IOException> fsFactory;
//讀取工廠 這里同時需要有讀取和寫出,因為壓縮的流程就是先讀取寫入新檔案,然后洗掉舊檔案
private final CompactReader.Factory<T> readerFactory;
//寫出工廠
private final CompactWriter.Factory<T> writerFactory;
//檔案系統.注意:非原生檔案系統,不過API基本無差異,就是套了一層
private transient FileSystem fileSystem;
private transient ListState<Map<Long, List<Path>>> expiredFilesState;
private transient TreeMap<Long, List<Path>> expiredFiles;
private transient List<Path> currentExpiredFiles;
private transient Set<String> partitions;
private transient Path path;
public CompactOperator(
SupplierWithException<FileSystem, IOException> fsFactory,
CompactReader.Factory<T> readerFactory,
Path path,
CompactWriter.Factory<T> writerFactory) {
this.fsFactory = fsFactory;
this.path = path;
this.readerFactory = readerFactory;
this.writerFactory = writerFactory;
}
@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
this.partitions = new HashSet<>();
this.fileSystem = fsFactory.get();
ListStateDescriptor<Map<Long, List<Path>>> metaDescriptor =
new ListStateDescriptor<>(
"expired-files",
new MapSerializer<>(
LongSerializer.INSTANCE,
new ListSerializer<>(
new KryoSerializer<>(Path.class, getExecutionConfig()))));
this.expiredFilesState = context.getOperatorStateStore().getListState(metaDescriptor);
this.expiredFiles = new TreeMap<>();
this.currentExpiredFiles = new ArrayList<>();
if (context.isRestored()) {
this.expiredFiles.putAll(this.expiredFilesState.get().iterator().next());
}
}
@Override
public void processElement(StreamRecord<CoordinatorOutput> element) throws Exception {
CoordinatorOutput value = element.getValue();
//當我們收到上游的訊息之后,我們開始判斷本次收取到的訊息是unit還是EndCompaction
if (value instanceof CompactionUnit) {
//如果是unit的情況下
CompactionUnit unit = (CompactionUnit) value;
//operator每個subTask選擇合并的實體
if (unit.isTaskMessage(getRuntimeContext().getNumberOfParallelSubtasks(),getRuntimeContext().getIndexOfThisSubtask())) {
String partition = unit.getPartition();
List<Path> paths = unit.getPaths();
//檔案合并
doCompact(partition, paths);
//操作的磁區
this.partitions.add(partition);
// Only after the current checkpoint is successfully executed can delete
// the expired files, so as to ensure the existence of the files.
// 已經處理過的檔案.
this.currentExpiredFiles.addAll(paths);
}
} else if (value instanceof EndCompaction) {
LOG.info("當前檢查點位是:" + ((EndCompaction) value).getCheckpointId());
endCompaction(((EndCompaction) value).getCheckpointId());
}
}
private void endCompaction(long checkpoint) {
this.output.collect(
new StreamRecord<>(
new PartitionCommitInfo(
checkpoint,
getRuntimeContext().getIndexOfThisSubtask(),
getRuntimeContext().getNumberOfParallelSubtasks(),
new ArrayList<>(this.partitions))));
this.partitions.clear();
}
...........
private void clearExpiredFiles(long checkpointId) throws IOException {
// Don't need these metas anymore.
NavigableMap<Long, List<Path>> outOfDateMetas = expiredFiles.headMap(checkpointId, true);
for (List<Path> paths : outOfDateMetas.values()) {
for (Path meta : paths) {
fileSystem.delete(meta, true);
}
}
outOfDateMetas.clear();
}
/**
* Do Compaction: - Target file exists, do nothing. - Can do compaction: - Single file, do
* atomic renaming, there are optimizations for FileSystem. - Multiple file, do reading and
* writing.
*/
private void doCompact(String partition, List<Path> paths) throws IOException {
if (paths.size() == 0) {
//判斷
return;
}
Path target = createCompactedFile(paths);
if (fileSystem.exists(target)) {
//判斷
return;
}
checkExist(paths);
//獲取壓縮開始時間
long startMillis = System.currentTimeMillis();
boolean success = false;
//如果檔案只有一個
if (paths.size() == 1) {
// optimizer for single file
success = doSingleFileMove(paths.get(0), target);
}
//如果檔案是多個
if (!success) {
doMultiFilesCompact(partition, paths, target);
}
//完成壓縮
double costSeconds = ((double) (System.currentTimeMillis() - startMillis)) / 1000;
LOG.info(
"Compaction time cost is '{}S', target file is '{}', input files are '{}'",
costSeconds,
target,
paths);
}
......(下面就是壓縮的代碼,沒啥講的就是reader on write)
}
總結
HiveConnector到此基本上就告一段落了,基于其實作小檔案合并可以為當N次檢查點后合并最小的N+1個檔案 or 基于time合并檔案 or 離線部署任務合并檔案 思路有很多主要是要考慮性能問題以及是否符合場景要求
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/292895.html
標籤:其他
