主頁 >  其他 > Flink原始碼篇-FLINK的StreamingHive實作流程以及小檔案壓縮流程

Flink原始碼篇-FLINK的StreamingHive實作流程以及小檔案壓縮流程

2021-08-11 06:55:16 其他

Flink的HiveStreamingSink實作流程

前言

目前我們為了增強資料的時效性,增加了Flink實時寫入Hive的流程,基于Flink寫入Hive這里之前基本上是沒有接觸過的,看了官網的文章之后,由于我們的追求資料為1-10分鐘內可見性,但是資料也不足1分鐘就能達到128MB的情況,于是也會產生各種各樣的十幾MB的小檔案,于是需要了解一下這個寫入流程基于上面進行改造,使小檔案能夠達到自動合并的效果,順便記錄一下FlinkStreamingHive的流程


文章目錄

  • Flink的HiveStreamingSink實作流程
    • 前言
    • 1,HiveTableSink初始化校驗流程
      • 1.1創建TableSink物件
      • 1.2回傳SinkRunTimeProvider
    • 2,HiveTableStreamSink創建
      • 2.1 StreamSink的創建
    • 3,HiveTableStreamSink壓縮流程
      • 3.1 CompactFileWriter
      • 3.2CompactCoordinator
      • 3,3 CompactOperator
  • 總結


1,HiveTableSink初始化校驗流程

1.1創建TableSink物件

public HiveTableSink(
        ReadableConfig flinkConf,
        JobConf jobConf,
        ObjectIdentifier identifier,
        CatalogTable table,
        @Nullable Integer configuredParallelism) {
	//構造方法傳入引數
    this.flinkConf = flinkConf;
    this.jobConf = jobConf;
	//這個識別符號是一個catalog,db和tbObjName的標識
    this.identifier = identifier;
	//CataLog中表的資訊
    this.catalogTable = table;
	//HiveVersion這里很重要主要是為了根據不同的版本適配不同的實作方法,沒有寫入會獲取你metastore默認的version
    hiveVersion =
            Preconditions.checkNotNull(
                    jobConf.get(HiveCatalogFactoryOptions.HIVE_VERSION.key()),
                    "Hive version is not defined");
	//hiveShim就是適配不同版本的工具類
    hiveShim = HiveShimLoader.loadHiveShim(hiveVersion);
	//獲取表結構
    tableSchema = TableSchemaUtils.getPhysicalSchema(table.getSchema());
	//獲取配置的Parallelism,這里是在工廠類里獲取傳入的
    this.configuredParallelism = configuredParallelism;
}

1.2回傳SinkRunTimeProvider

 @Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
    //資料結構映射器,映射Pojo為DataRow等操作...
    DataStructureConverter converter =
            context.createDataStructureConverter(tableSchema.toRowDataType());
    //這里就是回傳一個DataStreamSinkProvider,用于在啟動執行時提供Sink
    return (DataStreamSinkProvider)
            dataStream -> consume(dataStream, context.isBounded(), converter);
}

private DataStreamSink<?> consume(DataStream<RowData> dataStream, boolean isBounded, DataStructureConverter converter) {
    //檢查是否是Hive的ACID表,在Hive高版本以上提供了ACID的概念,2.x以及3.x對acid支持的更好,Flink目前是不支持寫入Hive的ACID表的
    checkAcidTable(catalogTable, identifier.toObjectPath()); 
	//try with resource  好處就是無論是否exception都會幫你close connector
    //flink 包裝了一層主要就是根據版本來創建metastore client,繼承了AutoCloseable幫助自動釋放
    try (HiveMetastoreClientWrapper client = HiveMetastoreClientFactory.create(HiveConfUtils.create(jobConf), hiveVersion)) {
        //通過identifier獲取Table物件;
        Table table = client.getTable(identifier.getDatabaseName(), identifier.getObjectName());
        //這里是獲取表存盤描述,里面包含表的一些元資料描述
        StorageDescriptor sd = table.getSd();
		//獲取Hive輸出結構Class
        Class hiveOutputFormatClz =hiveShim.getHiveOutputFormatClass(Class.forName(sd.getOutputFormat()));
        //檔案是否進行存盤壓縮,text不壓縮,如果store以orc or parquet即為true
        boolean isCompressed = jobConf.getBoolean(HiveConf.ConfVars.COMPRESSRESULT.varname, false);
        //對Hive操作的一個工廠,可以用來創建記錄寫入到Hive的寫入器
        HiveWriterFactory writerFactory =
                new HiveWriterFactory(
                        jobConf,
                        hiveOutputFormatClz,
                        sd.getSerdeInfo(),
                        tableSchema,
                        getPartitionKeyArray(),
                        HiveReflectionUtils.getTableMetadata(hiveShim, table),
                        hiveShim,
                        isCompressed);
        //獲取檔案的擴展名,如果Table的Store是orc or parquet的話則是沒有擴展名的
        String extension =
                Utilities.getFileExtension(
                        jobConf,
                        isCompressed,
                        (HiveOutputFormat<?, ?>) hiveOutputFormatClz.newInstance());
		//Flink輸出的一個小配置,其中主要包含檔案的前綴和后綴
        //后綴是基于上面獲取的,而前綴則是part-隨機字符創
        //比如說:part-e9ebbc0c-ae29-4ac7-8c84-f80daf385915-0-413
        //這里的前綴并不是最終的檔案名稱,當你開啟了壓縮之后還會在前面添加內容的
        OutputFileConfig.OutputFileConfigBuilder fileNamingBuilder =
                OutputFileConfig.builder()
                        .withPartPrefix("part-" + UUID.randomUUID().toString())
                        .withPartSuffix(extension == null ? "" : extension);
		//獲取parallelism,主要是為了后面使用.
        final int parallelism =
                Optional.ofNullable(configuredParallelism).orElse(dataStream.getParallelism());
        //到這里針對于Sink的初始化基本已經結束了,接下來我們需要判斷本次執行是Stream還是batch
        if (isBounded) {
            //如果是Batch的話輸出檔案名是不同的,需要注意一下這個
            OutputFileConfig fileNaming = fileNamingBuilder.build();
            return createBatchSink(
                    dataStream, converter, sd, writerFactory, fileNaming, parallelism);
        } else {
            //如果是Stream的話,首先肯定是不支持overwrite的,如果是overwrite的話,直接exception
            if (overwrite) {
                throw new IllegalStateException("Streaming mode not support overwrite.");
            }
			//獲取一下Hive表的配置
            Properties tableProps = HiveReflectionUtils.getTableMetadata(hiveShim, table);
            //完成Hive的Sink創建
            return createStreamSink(
                    dataStream, sd, tableProps, writerFactory, fileNamingBuilder, parallelism);
        }
    } catch (TException e) {
        //例外這里也注意一下啦,后續排錯方便~
        throw new CatalogException("Failed to query Hive metaStore", e);
    } catch (IOException e) {
        throw new FlinkRuntimeException("Failed to create staging dir", e);
    } catch (ClassNotFoundException e) {
        throw new FlinkHiveException("Failed to get output format class", e);
    } catch (IllegalAccessException | InstantiationException e) {
        throw new FlinkHiveException("Failed to instantiate output format instance", e);
    }
}

基于這里,我們針對于Flink的HiveTableSink初始化就基本了解的差不多完成了,說實話一個頂級的框架代碼規范以及例外處理都是非常吊的,學框架的基本就是了解思想,其次要去看看別人怎么寫代碼,可以識訓特別多,非常值得我們學習;

2,HiveTableStreamSink創建

2.1 StreamSink的創建

 private DataStreamSink<?> createStreamSink(  //說實話我有強迫癥,這樣看引數我好難受
         DataStream<RowData> dataStream,	//資料流,不用多說
         StorageDescriptor sd,//table storage description  里面包含一些描述
         Properties tableProps,	//table properties  表的配置就是你創建表的pros
         HiveWriterFactory recordWriterFactory, //記錄寫出工廠
         OutputFileConfig.OutputFileConfigBuilder fileNamingBuilder,//寫出檔案配置
         final int parallelism   //這里是subtask數量
         ) {
     //創建一個Flink Configuration物件
     org.apache.flink.configuration.Configuration conf = new org.apache.flink.configuration.Configuration();
     //然后將表的配置資訊寫入
     catalogTable.getOptions().forEach(conf::setString);
	 //資料磁區計算器~也就是hive的資料存盤磁區啦
     HiveRowDataPartitionComputer partComputer =
             new HiveRowDataPartitionComputer(
                     hiveShim,
                     defaultPartName(),
                     tableSchema.getFieldNames(),
                     tableSchema.getFieldDataTypes(),
                     getPartitionKeyArray());
     //資料表bucket,根據partComputer來區分bucket
     TableBucketAssigner assigner = new TableBucketAssigner(partComputer);
     //資料檔案滾動策略,這個是最早的檔案處理機制,考慮到多磁區的情況會產生小檔案從而有了compress機制
     HiveRollingPolicy rollingPolicy =
             new HiveRollingPolicy(
                     conf.get(SINK_ROLLING_POLICY_FILE_SIZE).getBytes(),
                     conf.get(SINK_ROLLING_POLICY_ROLLOVER_INTERVAL).toMillis());
	 //是否開啟磁區檔案壓縮,磁區檔案壓縮這里說明一下
     //為什么有了滾動策略還有這個壓縮,比如你parallelism是5 那就是會創建5個小檔案...自己想想吧
     boolean autoCompaction = conf.getBoolean(FileSystemOptions.AUTO_COMPACTION);
     //如果開啟了之后所有寫入的檔案(沒合并之前)都是uncompaction前綴的標識~
     if (autoCompaction) {
         fileNamingBuilder.withPartPrefix(
                 convertToUncompacted(fileNamingBuilder.build().getPartPrefix()));
     }
     //然后獲取檔案名配置
     OutputFileConfig outputFileConfig = fileNamingBuilder.build();
	 //獲取path物件,這里的path物件是指表的存盤路徑,并不是某個檔案的絕對路徑,是表在HDFS的絕對路徑
     org.apache.flink.core.fs.Path path = new org.apache.flink.core.fs.Path(sd.getLocation());
	 //BucketBuilder,
     BucketsBuilder<RowData, String, ? extends BucketsBuilder<RowData, ?, ?>> builder;
     //判斷是否是MR的還是FLINK本身的
     if (flinkConf.get(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER)) {
         builder =
                 bucketsBuilderForMRWriter(
                         recordWriterFactory, sd, assigner, rollingPolicy, outputFileConfig);
         LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer.");
     } else {
         Optional<BulkWriter.Factory<RowData>> bulkFactory =
                 createBulkWriterFactory(getPartitionKeyArray(), sd);
         //根據不同的格式創建的bulkfactory,如果不存在則默認創建hadoop mr的
         if (bulkFactory.isPresent()) {
             builder =
                     StreamingFileSink.forBulkFormat(
                                     path,
                                     new FileSystemTableSink.ProjectionBulkFactory(
                                             bulkFactory.get(), partComputer))
                             .withBucketAssigner(assigner)
                             .withRollingPolicy(rollingPolicy)
                             .withOutputFileConfig(outputFileConfig);
             LOG.info("Hive streaming sink: Use native parquet&orc writer.");
         } else {
             builder =
                     bucketsBuilderForMRWriter(
                             recordWriterFactory, sd, assigner, rollingPolicy, outputFileConfig);
             LOG.info(
                     "Hive streaming sink: Use MapReduce RecordWriter writer because BulkWriter Factory not available.");
         }
     }
	 //bucket檢查間隔,是建表是寫在tblPro的引數值,詳情見官網FileSystemSink
     long bucketCheckInterval = conf.get(SINK_ROLLING_POLICY_CHECK_INTERVAL).toMillis();
	 //輸出流,這個sink只是將record寫出到檔案,并不是最終的operator
     DataStream<PartitionCommitInfo> writerStream;
     //判斷是否開啟壓縮,是建表是寫在tblPro的引數值
     if (autoCompaction) {
         //檔案壓縮的大小,這里我們要注意一下如果你不配置的話默認值就是你的SINK_ROLLING_POLICY_FILE_SIZE~
         long compactionSize =
                 conf.getOptional(FileSystemOptions.COMPACTION_FILE_SIZE)
                         .orElse(conf.get(SINK_ROLLING_POLICY_FILE_SIZE))
                         .getBytes();
		 //創建輸出流,通過StreamingSink物件創建
         writerStream =
                 StreamingSink.compactionWriter(
                         dataStream,
                         bucketCheckInterval,
                         builder,
                         fsFactory(),
                         path,
                         createCompactReaderFactory(sd, tableProps),
                         compactionSize,
                         parallelism);
     } else {
         writerStream =
                 StreamingSink.writer(dataStream, bucketCheckInterval, builder, parallelism);
     }
	 //Sink就是掛載了Sink了這里先不急
     return StreamingSink.sink(
             writerStream, path, identifier, getPartitionKeys(), msFactory(), fsFactory(), conf);
 }

到這里StreamSink就掛載結束了,但是其真正的實作我們目前并沒有看到,真正實作,其實實作是在compactionWriter中實作的,我們可以看一下這個內容

public static <T> DataStream<PartitionCommitInfo> compactionWriter(//暈~強迫癥已經犯了
        DataStream<T> inputStream, //輸入流,比如 rowData,String,Struct等
        long bucketCheckInterval,//檢查間隔
        StreamingFileSink.BucketsBuilder<
                       T, String, ? extends StreamingFileSink.BucketsBuilder<T, String, ?>>
                bucketsBuilder,//bucker建造者
        FileSystemFactory fsFactory,//檔案系統工廠
        Path path,//路徑
        CompactReader.Factory<T> readFactory,//合并讀取工廠,這里應該很好理解吧~
        long targetFileSize,//目標檔案大小
        int parallelism  //磁區數
        ) {
    /**
     * 這個類里實作了三個算子~我們來看一下這是哪個算子的用戶
     * writer是用來寫入資料到buckt,并且像下游發送openfile or checkpoint success message操作的
     *
     * coordinator是協調檔案寫入的operator,就是負責計算哪些檔案可以合并	      的
     *
     * compacter是壓縮的operator
     */
    CompactFileWriter<T> writer = new CompactFileWriter<>(bucketCheckInterval, bucketsBuilder);
    ..........
    CompactCoordinator coordinator = new CompactCoordinator(fsSupplier, targetFileSize);
	..........
    CompactOperator<T> compacter = new CompactOperator<>(fsSupplier, readFactory, writerFactory);

}

3,HiveTableStreamSink壓縮流程

3.1 CompactFileWriter

這個類就是將資料寫入檔案中~其本身沒有實作如何寫入,真正寫入資料是在其父類中,但是當其父類提交了檢查點之后,他會向下游發送一條寫入結束的記錄;

package org.apache.flink.table.filesystem.stream.compact;

import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.filesystem.stream.AbstractStreamingWriter;
import org.apache.flink.table.filesystem.stream.compact.CompactMessages.EndCheckpoint;
import org.apache.flink.table.filesystem.stream.compact.CompactMessages.InputFile;

/**
 * 該operator主要是繼承了一個抽象類,重寫了某些方法,
 * 而abstractStreamingWriter里面同時還包括寫入資料的方法,在這里資料已經被寫入bucket
 */
public class CompactFileWriter<T>
        extends AbstractStreamingWriter<T, CompactMessages.CoordinatorInput> {
	
    private static final long serialVersionUID = 1L;

    public CompactFileWriter(
            long bucketCheckInterval,
            StreamingFileSink.BucketsBuilder<
                            T, String, ? extends StreamingFileSink.BucketsBuilder<T, String, ?>>
                    bucketsBuilder) 
        super(bucketCheckInterval, bucketsBuilder);
    }

    @Override
    protected void partitionCreated(String partition) {}

    @Override
    protected void partitionInactive(String partition) {}

    @Override
    protected void onPartFileOpened(String partition, Path newPath) {
        //像下游發送通知,通知新的檔案已經開始創建
        output.collect(new StreamRecord<>(new InputFile(partition, newPath)));
    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        super.notifyCheckpointComplete(checkpointId);
        //當檢查點結束時,像下游發送檢查點完成的訊息
        output.collect(
                new StreamRecord<>(
                        new EndCheckpoint(
                                checkpointId,
                                getRuntimeContext().getIndexOfThisSubtask(),
                                getRuntimeContext().getNumberOfParallelSubtasks())));
    }
}

/**
 * Operator for file system sink. It is a operator version of {@link StreamingFileSink}. It can send
 * file and bucket information to downstream.
 * 注釋的大概意思是該類是StreamingFileSink的一個operator version,能夠像下游發送file和bucket information
 */
public abstract class AbstractStreamingWriter<IN, OUT> extends AbstractStreamOperator<OUT>
        implements OneInputStreamOperator<IN, OUT>, BoundedOneInput {

    .........(略過代碼標識)

    /** 磁區創建通知 */
    protected abstract void partitionCreated(String partition);

    /**
     * Notifies a partition become inactive. A partition becomes inactive after all the records
     * received so far have been committed.
     */
    protected abstract void partitionInactive(String partition);

    /**
     * Notifies a new file has been opened.
     *
     * <p>Note that this does not mean that the file has been created in the file system. It is only
     * created logically and the actual file will be generated after it is committed.
     */
    protected abstract void onPartFileOpened(String partition, Path newPath);

    /** Commit up to this checkpoint id. */
    protected void commitUpToCheckpoint(long checkpointId) throws Exception {
        helper.commitUpToCheckpoint(checkpointId);
    }

    .........(略過代碼標識)

    @Override//寫出資料到bucket
    public void processElement(StreamRecord<IN> element) throws Exception {
        helper.onElement(
                element.getValue(),
                getProcessingTimeService().getCurrentProcessingTime(),
                element.hasTimestamp() ? element.getTimestamp() : null,
                currentWatermark);
    }
 .........(略過代碼標識)
}

3.2CompactCoordinator

該operator的receiver為當前打開的檔案和檢查點結束訊息,同時會將本次檢查點中打開的檔案存盤到state中,當接收到檢查點結束的標識時,將本次檢查點內的檔案全部取出協調,然后將其發送到下游,下游壓縮時可以隨時開始,而無需去關注可能發生的不好情況

public class CompactCoordinator extends AbstractStreamOperator<CoordinatorOutput>
        implements OneInputStreamOperator<CoordinatorInput, CoordinatorOutput> {

    private static final long serialVersionUID = 1L;

    private static final Logger LOG = LoggerFactory.getLogger(CompactCoordinator.class);
	//一個函式介面,如果正常情況下會回傳FileSystem的物件,否則拋出IoException
    private final SupplierWithException<FileSystem, IOException> fsFactory;
    //目標檔案大小.
    private final long targetFileSize;
	//檔案系統物件,并非原生,這里F
    private transient FileSystem fileSystem; 
    private transient ListState<Map<Long, Map<String, List<Path>>>> inputFilesState; //輸入檔案狀態
    private transient TreeMap<Long, Map<String, List<Path>>> inputFiles; //輸入檔案
    private transient Map<String, List<Path>> currentInputFiles;//當前輸入檔案
	//這個物件用來判斷上游是否已經收到了當前檢查點的所有資料
    private transient TaskTracker inputTaskTracker;
	
    public CompactCoordinator(
            SupplierWithException<FileSystem, IOException> fsFactory, long targetFileSize) {
        this.fsFactory = fsFactory;
        this.targetFileSize = targetFileSize;
    }
	//初始化狀態
    //這個方法涉及到Flink state 當一個operator具有可恢復的state是需要重寫該方法
    //每次閱讀一個transformation都會去看一下這個方法,看看恢復時會恢復哪些,是否是自己想的
    @Override
    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
		//獲取檔案系統
        fileSystem = fsFactory.get();
		//這里就是根據給定的名稱和串列元素新建一個listStateDescriptor物件
        //這個物件的作用是一個串列的狀態描述這個狀態很好描述啦,上一次處理到那個檔案的那個位置了
        ListStateDescriptor<Map<Long, Map<String, List<Path>>>> filesDescriptor =
                new ListStateDescriptor<>(
                        "files-state",
                        new MapSerializer<>(
                                LongSerializer.INSTANCE,
                                new MapSerializer<>(
                                        StringSerializer.INSTANCE,
                                        new ListSerializer<>(
                                                new KryoSerializer<>(
                                                        Path.class, getExecutionConfig())))));
        //給定當前輸入檔案的state
        inputFilesState = context.getOperatorStateStore().getListState(filesDescriptor);
        //創建一個Map
        inputFiles = new TreeMap<>();
        currentInputFiles = new HashMap<>();
        //判斷是否是從狀態恢復的,如果不是則gg~簡單來說就是來恢復state的
        if (context.isRestored()) {
            inputFiles.putAll(inputFilesState.get().iterator().next());
        }
    }
	
    @Override
    public void processElement(StreamRecord<CoordinatorInput> element) throws Exception {
        CoordinatorInput value = element.getValue();
        //判斷上游過來的訊息,該訊息如果是InputFile,則將其寫入state
        if (value instanceof InputFile) {
            InputFile file = (InputFile) value;
            currentInputFiles
                    .computeIfAbsent(file.getPartition(), k -> new ArrayList<>())
                    .add(file.getFile());
        } else if (value instanceof EndCheckpoint) { //如果輸入的是結束檢查點,則開始壓縮資料
            EndCheckpoint endCheckpoint = (EndCheckpoint) value;
            if (inputTaskTracker == null) {
            	//創建TaskTracker物件用來追蹤上游是否已處理完當前檢查點資料
                inputTaskTracker = new TaskTracker(endCheckpoint.getNumberOfTasks());
            }

            // 判斷所有task是否已經全部結束,只有全部結束才會回傳true
            boolean triggerCommit =
                    inputTaskTracker.add(
                            endCheckpoint.getCheckpointId(), endCheckpoint.getTaskId());
            // 當所有上游處理完發送完end chk之后,開始提交chk并進行compact 協調
            if (triggerCommit) {
                commitUpToCheckpoint(endCheckpoint.getCheckpointId());
            }
        } else {
            throw new UnsupportedOperationException("Unsupported input message: " + value);
        }
    }

    private void commitUpToCheckpoint(long checkpointId) {
    	//獲取當前的輸入檔案
        Map<Long, Map<String, List<Path>>> headMap = inputFiles.headMap(checkpointId, true);
        //進行壓縮協調
        for (Map.Entry<Long, Map<String, List<Path>>> entry : headMap.entrySet()) {
            coordinate(entry.getKey(), entry.getValue());
        }
        headMap.clear();
    }

    /** Do stable compaction coordination. */
    private void coordinate(long checkpointId, Map<String, List<Path>> partFiles) {
    	//定義一個獲取檔案大小的方法
        Function<Path, Long> sizeFunc =
                path -> {
                    try {
                        return fileSystem.getFileStatus(path).getLen();
                    } catch (IOException e) {
                        throw new UncheckedIOException(e);
                    }
                };

        // We need a stable compaction algorithm.
        Map<String, List<List<Path>>> compactUnits = new HashMap<>();
        
        partFiles.forEach(
                (p, files) -> {
                    // 對檔案進行排序
                    files.sort(Comparator.comparing(Path::getPath));
                    //這里采用的合并演算法是,targetFileSize>=sum(files.len)
                    //同一個partition下將能合并的檔案放到同一個list中
                    compactUnits.put(p, BinPacking.pack(files, sizeFunc, targetFileSize));
                });

        int unitId = 0;
        for (Map.Entry<String, List<List<Path>>> unitsEntry : compactUnits.entrySet()) {
            String partition = unitsEntry.getKey();
            for (List<Path> unit : unitsEntry.getValue()) {
            	//發送unitId,partiton(path or partition),unit(可以合并的檔案)
                output.collect(new StreamRecord<>(new CompactionUnit(unitId, partition, unit)));
                unitId++;
            }
        }

        LOG.debug("Coordinate checkpoint-{}, compaction units are: {}", checkpointId, compactUnits);

        // 發送檢查點
        output.collect(new StreamRecord<>(new EndCompaction(checkpointId)));
    }
}

3,3 CompactOperator

這個類就是HiveStreamSink的最終operator,經過上游發送的檔案就是同路徑下可以壓縮的檔案和unitid,完成壓縮流程其實就是所謂的將小檔案讀取出來,然后寫入到一個新檔案中,然后將原來的舊檔案進行洗掉

/**
 * 這個transformation就是最后一步壓縮算子了,到了這個類,Flink流式寫入Hive基本可以結束了,我們繼續往下看
 */
public class CompactOperator<T> extends AbstractStreamOperator<PartitionCommitInfo>
        implements OneInputStreamOperator<CoordinatorOutput, PartitionCommitInfo>, BoundedOneInput {

    private static final long serialVersionUID = 1L;
	//未壓縮的檔案前綴
    public static final String UNCOMPACTED_PREFIX = ".uncompacted-";
	//壓縮后的檔案前綴
    public static final String COMPACTED_PREFIX = "compacted-";
	//檔案系統工廠物件,用于生成檔案系統
    private final SupplierWithException<FileSystem, IOException> fsFactory;
    //讀取工廠   這里同時需要有讀取和寫出,因為壓縮的流程就是先讀取寫入新檔案,然后洗掉舊檔案
    private final CompactReader.Factory<T> readerFactory;
    //寫出工廠
    private final CompactWriter.Factory<T> writerFactory;
	//檔案系統.注意:非原生檔案系統,不過API基本無差異,就是套了一層
    private transient FileSystem fileSystem;
	
    private transient ListState<Map<Long, List<Path>>> expiredFilesState;
    private transient TreeMap<Long, List<Path>> expiredFiles;
    private transient List<Path> currentExpiredFiles;

    private transient Set<String> partitions;

    private transient Path path;

    public CompactOperator(
            SupplierWithException<FileSystem, IOException> fsFactory,
            CompactReader.Factory<T> readerFactory,
            Path path,
            CompactWriter.Factory<T> writerFactory) {
        this.fsFactory = fsFactory;
        this.path = path;
        this.readerFactory = readerFactory;
        this.writerFactory = writerFactory;
    }

    @Override
    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        this.partitions = new HashSet<>();
        this.fileSystem = fsFactory.get();

        ListStateDescriptor<Map<Long, List<Path>>> metaDescriptor =
                new ListStateDescriptor<>(
                        "expired-files",
                        new MapSerializer<>(
                                LongSerializer.INSTANCE,
                                new ListSerializer<>(
                                        new KryoSerializer<>(Path.class, getExecutionConfig()))));
        this.expiredFilesState = context.getOperatorStateStore().getListState(metaDescriptor);
        this.expiredFiles = new TreeMap<>();
        this.currentExpiredFiles = new ArrayList<>();

        if (context.isRestored()) {
            this.expiredFiles.putAll(this.expiredFilesState.get().iterator().next());
        }
    }

    @Override
    public void processElement(StreamRecord<CoordinatorOutput> element) throws Exception {
        CoordinatorOutput value = element.getValue();
        //當我們收到上游的訊息之后,我們開始判斷本次收取到的訊息是unit還是EndCompaction
        if (value instanceof CompactionUnit) {
        	//如果是unit的情況下
            CompactionUnit unit = (CompactionUnit) value;
            //operator每個subTask選擇合并的實體
            if (unit.isTaskMessage(getRuntimeContext().getNumberOfParallelSubtasks(),getRuntimeContext().getIndexOfThisSubtask())) {
                String partition = unit.getPartition();
                List<Path> paths = unit.getPaths();
                //檔案合并
                doCompact(partition, paths);
				//操作的磁區
                this.partitions.add(partition);

                // Only after the current checkpoint is successfully executed can delete
                // the expired files, so as to ensure the existence of the files.
                // 已經處理過的檔案.
                this.currentExpiredFiles.addAll(paths);
            }
        } else if (value instanceof EndCompaction) {
            LOG.info("當前檢查點位是:" + ((EndCompaction) value).getCheckpointId());
            endCompaction(((EndCompaction) value).getCheckpointId());
        }
    }


    private void endCompaction(long checkpoint) {
        this.output.collect(
                new StreamRecord<>(
                        new PartitionCommitInfo(
                                checkpoint,
                                getRuntimeContext().getIndexOfThisSubtask(),
                                getRuntimeContext().getNumberOfParallelSubtasks(),
                                new ArrayList<>(this.partitions))));
        this.partitions.clear();
    }

	...........


    private void clearExpiredFiles(long checkpointId) throws IOException {
        // Don't need these metas anymore.
        NavigableMap<Long, List<Path>> outOfDateMetas = expiredFiles.headMap(checkpointId, true);
        for (List<Path> paths : outOfDateMetas.values()) {
            for (Path meta : paths) {
                fileSystem.delete(meta, true);
            }
        }
        outOfDateMetas.clear();
    }

    /**
     * Do Compaction: - Target file exists, do nothing. - Can do compaction: - Single file, do
     * atomic renaming, there are optimizations for FileSystem. - Multiple file, do reading and
     * writing.
     */
    private void doCompact(String partition, List<Path> paths) throws IOException {
        if (paths.size() == 0) {
        	//判斷
            return;
        }

        Path target = createCompactedFile(paths);
        if (fileSystem.exists(target)) {
        	//判斷
            return;
        }

        checkExist(paths);
		//獲取壓縮開始時間
        long startMillis = System.currentTimeMillis();

        boolean success = false;
        //如果檔案只有一個
        if (paths.size() == 1) {
            // optimizer for single file
            success = doSingleFileMove(paths.get(0), target);
        }
		//如果檔案是多個
        if (!success) {
            doMultiFilesCompact(partition, paths, target);
        }
		//完成壓縮
        double costSeconds = ((double) (System.currentTimeMillis() - startMillis)) / 1000;
        LOG.info(
                "Compaction time cost is '{}S', target file is '{}', input files are '{}'",
                costSeconds,
                target,
                paths);
    }
	......(下面就是壓縮的代碼,沒啥講的就是reader on write)
}

總結

HiveConnector到此基本上就告一段落了,基于其實作小檔案合并可以為當N次檢查點后合并最小的N+1個檔案 or 基于time合并檔案 or 離線部署任務合并檔案 思路有很多主要是要考慮性能問題以及是否符合場景要求

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/292895.html

標籤:其他

上一篇:5. Hadoop集群搭建-免密互通配置 | 更改主機名-基于CentOS7-【連載中】

下一篇:Linux之Zookeeper本地安裝

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 網閘典型架構簡述

    網閘架構一般分為兩種:三主機的三系統架構網閘和雙主機的2+1架構網閘。 三主機架構分別為內端機、外端機和仲裁機。三機無論從軟體和硬體上均各自獨立。首先從硬體上來看,三機都用各自獨立的主板、記憶體及存盤設備。從軟體上來看,三機有各自獨立的作業系統。這樣能達到完全的三機獨立。對于“2+1”系統,“2”分為 ......

    uj5u.com 2020-09-10 02:00:44 more
  • 如何從xshell上傳檔案到centos linux虛擬機里

    如何從xshell上傳檔案到centos linux虛擬機里及:虛擬機CentOs下執行 yum -y install lrzsz命令,出現錯誤:鏡像無法找到軟體包 前言 一、安裝lrzsz步驟 二、上傳檔案 三、遇到的問題及解決方案 總結 前言 提示:其實很簡單,往虛擬機上安裝一個上傳檔案的工具 ......

    uj5u.com 2020-09-10 02:00:47 more
  • 一、SQLMAP入門

    一、SQLMAP入門 1、判斷是否存在注入 sqlmap.py -u 網址/id=1 id=1不可缺少。當注入點后面的引數大于兩個時。需要加雙引號, sqlmap.py -u "網址/id=1&uid=1" 2、判斷文本中的請求是否存在注入 從文本中加載http請求,SQLMAP可以從一個文本檔案中 ......

    uj5u.com 2020-09-10 02:00:50 more
  • Metasploit 簡單使用教程

    metasploit 簡單使用教程 浩先生, 2020-08-28 16:18:25 分類專欄: kail 網路安全 linux 文章標簽: linux資訊安全 編輯 著作權 metasploit 使用教程 前言 一、Metasploit是什么? 二、準備作業 三、具體步驟 前言 Msfconsole ......

    uj5u.com 2020-09-10 02:00:53 more
  • 游戲逆向之驅動層與用戶層通訊

    驅動層代碼: #pragma once #include <ntifs.h> #define add_code CTL_CODE(FILE_DEVICE_UNKNOWN,0x800,METHOD_BUFFERED,FILE_ANY_ACCESS) /* 更多游戲逆向視頻www.yxfzedu.com ......

    uj5u.com 2020-09-10 02:00:56 more
  • 北斗電力時鐘(北斗授時服務器)讓網路資料更精準

    北斗電力時鐘(北斗授時服務器)讓網路資料更精準 北斗電力時鐘(北斗授時服務器)讓網路資料更精準 京準電子科技官微——ahjzsz 近幾年,資訊技術的得了快速發展,互聯網在逐漸普及,其在人們生活和生產中都得到了廣泛應用,并且取得了不錯的應用效果。計算機網路資訊在電力系統中的應用,一方面使電力系統的運行 ......

    uj5u.com 2020-09-10 02:01:03 more
  • 【CTF】CTFHub 技能樹 彩蛋 writeup

    ?碎碎念 CTFHub:https://www.ctfhub.com/ 筆者入門CTF時時剛開始刷的是bugku的舊平臺,后來才有了CTFHub。 感覺不論是網頁UI設計,還是題目質量,賽事跟蹤,工具軟體都做得很不錯。 而且因為獨到的金幣制度的確讓人有一種想去刷題賺金幣的感覺。 個人還是非常喜歡這個 ......

    uj5u.com 2020-09-10 02:04:05 more
  • 02windows基礎操作

    我學到了一下幾點 Windows系統目錄結構與滲透的作用 常見Windows的服務詳解 Windows埠詳解 常用的Windows注冊表詳解 hacker DOS命令詳解(net user / type /md /rd/ dir /cd /net use copy、批處理 等) 利用dos命令制作 ......

    uj5u.com 2020-09-10 02:04:18 more
  • 03.Linux基礎操作

    我學到了以下幾點 01Linux系統介紹02系統安裝,密碼啊破解03Linux常用命令04LAMP 01LINUX windows: win03 8 12 16 19 配置不繁瑣 Linux:redhat,centos(紅帽社區版),Ubuntu server,suse unix:金融機構,證券,銀 ......

    uj5u.com 2020-09-10 02:04:30 more
  • 05HTML

    01HTML介紹 02頭部標簽講解03基礎標簽講解04表單標簽講解 HTML前段語言 js1.了解代碼2.根據代碼 懂得挖掘漏洞 (POST注入/XSS漏洞上傳)3.黑帽seo 白帽seo 客戶網站被黑帽植入劫持代碼如何處理4.熟悉html表單 <html><head><title>TDK標題,描述 ......

    uj5u.com 2020-09-10 02:04:36 more
最新发布
  • 2023年最新微信小程式抓包教程

    01 開門見山 隔一個月發一篇文章,不過分。 首先回顧一下《微信系結手機號資料庫被脫庫事件》,我也是第一時間得知了這個訊息,然后跟蹤了整件事情的經過。下面是這起事件的相關截圖以及近日流出的一萬條資料樣本: 個人認為這件事也沒什么,還不如關注一下之前45億快遞資料查詢渠道疑似在近日復活的訊息。 訊息是 ......

    uj5u.com 2023-04-20 08:48:24 more
  • web3 產品介紹:metamask 錢包 使用最多的瀏覽器插件錢包

    Metamask錢包是一種基于區塊鏈技術的數字貨幣錢包,它允許用戶在安全、便捷的環境下管理自己的加密資產。Metamask錢包是以太坊生態系統中最流行的錢包之一,它具有易于使用、安全性高和功能強大等優點。 本文將詳細介紹Metamask錢包的功能和使用方法。 一、 Metamask錢包的功能 數字資 ......

    uj5u.com 2023-04-20 08:47:46 more
  • vulnhub_Earth

    前言 靶機地址->>>vulnhub_Earth 攻擊機ip:192.168.20.121 靶機ip:192.168.20.122 參考文章 https://www.cnblogs.com/Jing-X/archive/2022/04/03/16097695.html https://www.cnb ......

    uj5u.com 2023-04-20 07:46:20 more
  • 從4k到42k,軟體測驗工程師的漲薪史,給我看哭了

    清明節一過,盲猜大家已經無心上班,在數著日子準備過五一,但一想到銀行卡里的余額……瞬間心情就不美麗了。最近,2023年高校畢業生就業調查顯示,本科畢業月平均起薪為5825元。調查一出,便有很多同學表示自己又被平均了。看著這一資料,不免讓人想到前不久中國青年報的一項調查:近六成大學生認為畢業10年內會 ......

    uj5u.com 2023-04-20 07:44:00 more
  • 最新版本 Stable Diffusion 開源 AI 繪畫工具之中文自動提詞篇

    🎈 標簽生成器 由于輸入正向提示詞 prompt 和反向提示詞 negative prompt 都是使用英文,所以對學習母語的我們非常不友好 使用網址:https://tinygeeker.github.io/p/ai-prompt-generator 這個網址是為了讓大家在使用 AI 繪畫的時候 ......

    uj5u.com 2023-04-20 07:43:36 more
  • 漫談前端自動化測驗演進之路及測驗工具分析

    隨著前端技術的不斷發展和應用程式的日益復雜,前端自動化測驗也在不斷演進。隨著 Web 應用程式變得越來越復雜,自動化測驗的需求也越來越高。如今,自動化測驗已經成為 Web 應用程式開發程序中不可或缺的一部分,它們可以幫助開發人員更快地發現和修復錯誤,提高應用程式的性能和可靠性。 ......

    uj5u.com 2023-04-20 07:43:16 more
  • CANN開發實踐:4個DVPP記憶體問題的典型案例解讀

    摘要:由于DVPP媒體資料處理功能對存放輸入、輸出資料的記憶體有更高的要求(例如,記憶體首地址128位元組對齊),因此需呼叫專用的記憶體申請介面,那么本期就分享幾個關于DVPP記憶體問題的典型案例,并給出原因分析及解決方法。 本文分享自華為云社區《FAQ_DVPP記憶體問題案例》,作者:昇騰CANN。 DVPP ......

    uj5u.com 2023-04-20 07:43:03 more
  • msf學習

    msf學習 以kali自帶的msf為例 一、msf核心模塊與功能 msf模塊都放在/usr/share/metasploit-framework/modules目錄下 1、auxiliary 輔助模塊,輔助滲透(埠掃描、登錄密碼爆破、漏洞驗證等) 2、encoders 編碼器模塊,主要包含各種編碼 ......

    uj5u.com 2023-04-20 07:42:59 more
  • Halcon軟體安裝與界面簡介

    1. 下載Halcon17版本到到本地 2. 雙擊安裝包后 3. 步驟如下 1.2 Halcon軟體安裝 界面分為四大塊 1. Halcon的五個助手 1) 影像采集助手:與相機連接,設定相機引數,采集影像 2) 標定助手:九點標定或是其它的標定,生成標定檔案及內參外參,可以將像素單位轉換為長度單位 ......

    uj5u.com 2023-04-20 07:42:17 more
  • 在MacOS下使用Unity3D開發游戲

    第一次發博客,先發一下我的游戲開發環境吧。 去年2月份買了一臺MacBookPro2021 M1pro(以下簡稱mbp),這一年來一直在用mbp開發游戲。我大致分享一下我的開發工具以及使用體驗。 1、Unity 官網鏈接: https://unity.cn/releases 我一般使用的Apple ......

    uj5u.com 2023-04-20 07:40:19 more