主頁 > 資料庫 > DataX二次開發——HdfsReader和HdfsWriter插件增加parquet檔案讀寫

DataX二次開發——HdfsReader和HdfsWriter插件增加parquet檔案讀寫

2023-02-14 08:07:00 資料庫

一、研發背景

    DataX官方開源的版本支持HDFS檔案的讀寫,但是截止目前,并沒有支持Parquet檔案的讀寫,得益于DataX出色的資料同步性能,去年公司的專案大部分采用了DataX作為資料同步工具,但是從CDH集群同步Parquet或者將其他資料源的資料以Parquet格式寫入HDFS,這兩個常用場景沒有進行支持,因此只能自己動手,補充HdfsReader和HdfsWriter插件,以支持Parquet檔案的讀寫,

二、HdfsReader插件

    本插件比較簡單,一共五個類,具體類名及對應修改項如下:

  • DFSUtil:增加是否Parquet檔案型別判斷方法、增加Parquet檔案讀取轉換方法,
  • HdfsConstant:增加Parquet檔案類的列舉項,
  • HdfsReader:增加判斷是否配置為Parquet檔案型別的判斷條件分支,
  • HdfsReaderErrorCode:無需更改,
  • Type:無需更改,

    按需修改其中四個類即可,具體代碼如下:

    DFSUtil

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import com.alibaba.datax.common.base.Key;
import com.alibaba.datax.common.element.BoolColumn;
import com.alibaba.datax.common.element.BytesColumn;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.ColumnEntry;
import com.alibaba.datax.common.element.DateColumn;
import com.alibaba.datax.common.element.DoubleColumn;
import com.alibaba.datax.common.element.LongColumn;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.element.StringColumn;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.storage.reader.StorageReaderErrorCode;
import com.alibaba.datax.storage.reader.StorageReaderUtil;
import org.apache.avro.Conversions;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.io.RCFile;
import org.apache.hadoop.hive.ql.io.RCFileRecordReader;
import org.apache.hadoop.hive.ql.io.orc.OrcFile;
import org.apache.hadoop.hive.ql.io.orc.Reader;
import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.orc.TypeDescription;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.io.api.Binary;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.nio.ByteBuffer;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static com.alibaba.datax.common.base.Key.COLUMN;
import static com.alibaba.datax.common.base.Key.NULL_FORMAT;


public class DFSUtil
{
    private static final Logger LOG = LoggerFactory.getLogger(DFSUtil.class);

    // the offset of julian, 2440588 is 1970/1/1
    private static final int JULIAN_EPOCH_OFFSET_DAYS = 2440588;
    private static final long MILLIS_IN_DAY = TimeUnit.DAYS.toMillis(1);
    private static final long NANOS_PER_MILLISECOND = TimeUnit.MILLISECONDS.toNanos(1);

    private static final int DIRECTORY_SIZE_GUESS = 16 * 1024;
    private final org.apache.hadoop.conf.Configuration hadoopConf;
    private final boolean haveKerberos;
    private final HashSet<String> sourceHDFSAllFilesList = new HashSet<>();
    private String specifiedFileType = null;
    private String kerberosKeytabFilePath;
    private String kerberosPrincipal;

    public DFSUtil(Configuration taskConfig)
    {
        hadoopConf = new org.apache.hadoop.conf.Configuration();
        //io.file.buffer.size 性能引數
        //http://blog.csdn.net/yangjl38/article/details/7583374
        Configuration hadoopSiteParams = taskConfig.getConfiguration(Key.HADOOP_CONFIG);
        JSONObject hadoopSiteParamsAsJsonObject = JSON.parseObject(taskConfig.getString(Key.HADOOP_CONFIG));
        if (null != hadoopSiteParams) {
            Set<String> paramKeys = hadoopSiteParams.getKeys();
            for (String each : paramKeys) {
                hadoopConf.set(each, hadoopSiteParamsAsJsonObject.getString(each));
            }
        }
        hadoopConf.set(HdfsConstant.HDFS_DEFAULT_KEY, taskConfig.getString(Key.DEFAULT_FS));

        //是否有Kerberos認證
        this.haveKerberos = taskConfig.getBool(Key.HAVE_KERBEROS, false);
        if (haveKerberos) {
            this.kerberosKeytabFilePath = taskConfig.getString(Key.KERBEROS_KEYTAB_FILE_PATH);
            this.kerberosPrincipal = taskConfig.getString(Key.KERBEROS_PRINCIPAL);
            this.hadoopConf.set(HdfsConstant.HADOOP_SECURITY_AUTHENTICATION_KEY, "kerberos");
        }
        this.kerberosAuthentication(this.kerberosPrincipal, this.kerberosKeytabFilePath);

        LOG.info("hadoopConfig details:{}", JSON.toJSONString(this.hadoopConf));
    }

    private void kerberosAuthentication(String kerberosPrincipal, String kerberosKeytabFilePath)
    {
        if (haveKerberos && StringUtils.isNotBlank(kerberosPrincipal) && StringUtils.isNotBlank(kerberosKeytabFilePath)) {
            UserGroupInformation.setConfiguration(hadoopConf);
            try {
                UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, kerberosKeytabFilePath);
            }
            catch (Exception e) {
                String message = String.format("kerberos認證失敗,請確定kerberosKeytabFilePath[%s]和kerberosPrincipal[%s]填寫正確",
                        kerberosKeytabFilePath, kerberosPrincipal);
                throw DataXException.asDataXException(HdfsReaderErrorCode.KERBEROS_LOGIN_ERROR, message, e);
            }
        }
    }

    /**
     * 獲取指定路徑串列下符合條件的所有檔案的絕對路徑
     *
     * @param srcPaths 路徑串列
     * @param specifiedFileType 指定檔案型別
     * @return set of string
     */
    public Set<String> getAllFiles(List<String> srcPaths, String specifiedFileType)
    {

        this.specifiedFileType = specifiedFileType;

        if (!srcPaths.isEmpty()) {
            for (String eachPath : srcPaths) {
                LOG.info("get HDFS all files in path = [{}]", eachPath);
                getHDFSAllFiles(eachPath);
            }
        }
        return sourceHDFSAllFilesList;
    }

    private void addSourceFileIfNotEmpty(FileStatus f)
    {
        if (f.isFile()) {
            String filePath = f.getPath().toString();
            if (f.getLen() > 0) {
                addSourceFileByType(filePath);
            }
            else {
                LOG.warn("檔案[{}]長度為0,將會跳過不作處理!", filePath);
            }
        }
    }

    public void getHDFSAllFiles(String hdfsPath)
    {

        try {
            FileSystem hdfs = FileSystem.get(hadoopConf);
            //判斷hdfsPath是否包含正則符號
            if (hdfsPath.contains("*") || hdfsPath.contains("?")) {
                Path path = new Path(hdfsPath);
                FileStatus[] stats = hdfs.globStatus(path);
                for (FileStatus f : stats) {
                    if (f.isFile()) {
                        addSourceFileIfNotEmpty(f);
                    }
                    else if (f.isDirectory()) {
                        getHDFSAllFilesNORegex(f.getPath().toString(), hdfs);
                    }
                }
            }
            else {
                getHDFSAllFilesNORegex(hdfsPath, hdfs);
            }
        }
        catch (IOException e) {
            String message = String.format("無法讀取路徑[%s]下的所有檔案,請確認您的配置項fs.defaultFS, path的值是否正確," +
                    "是否有讀寫權限,網路是否已斷開!", hdfsPath);
            LOG.error(message);
            throw DataXException.asDataXException(HdfsReaderErrorCode.PATH_CONFIG_ERROR, e);
        }
    }

    private void getHDFSAllFilesNORegex(String path, FileSystem hdfs)
            throws IOException
    {

        // 獲取要讀取的檔案的根目錄
        Path listFiles = new Path(path);

        // If the network disconnected, this method will retry 45 times
        // each time the retry interval for 20 seconds
        // 獲取要讀取的檔案的根目錄的所有二級子檔案目錄
        FileStatus[] stats = hdfs.listStatus(listFiles);

        for (FileStatus f : stats) {
            // 判斷是不是目錄,如果是目錄,遞回呼叫
            if (f.isDirectory()) {
                LOG.info("[{}] 是目錄, 遞回獲取該目錄下的檔案", f.getPath());
                getHDFSAllFilesNORegex(f.getPath().toString(), hdfs);
            }
            else if (f.isFile()) {
                addSourceFileIfNotEmpty(f);
            }
            else {
                String message = String.format("該路徑[%s]檔案型別既不是目錄也不是檔案,插件自動忽略,", f.getPath());
                LOG.info(message);
            }
        }
    }

    // 根據用戶指定的檔案型別,將指定的檔案型別的路徑加入sourceHDFSAllFilesList
    private void addSourceFileByType(String filePath)
    {
        // 檢查file的型別和用戶配置的fileType型別是否一致
        boolean isMatchedFileType = checkHdfsFileType(filePath, this.specifiedFileType);

        if (isMatchedFileType) {
            String msg = String.format("[%s]是[%s]型別的檔案, 將該檔案加入source files串列", filePath, this.specifiedFileType);
            LOG.info(msg);
            sourceHDFSAllFilesList.add(filePath);
        }
        else {
            String message = String.format("檔案[%s]的型別與用戶配置的fileType型別不一致," +
                            "請確認您配置的目錄下面所有檔案的型別均為[%s]"
                    , filePath, this.specifiedFileType);
            LOG.error(message);
            throw DataXException.asDataXException(
                    HdfsReaderErrorCode.FILE_TYPE_UNSUPPORTED, message);
        }
    }

    public InputStream getInputStream(String filepath)
    {
        InputStream inputStream;
        Path path = new Path(filepath);
        try {
            FileSystem fs = FileSystem.get(hadoopConf);
            //If the network disconnected, this method will retry 45 times
            //each time the retry interval for 20 seconds
            inputStream = fs.open(path);
            return inputStream;
        }
        catch (IOException e) {
            String message = String.format("讀取檔案 : [%s] 時出錯,請確認檔案:[%s]存在且配置的用戶有權限讀取", filepath, filepath);
            throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message, e);
        }
    }

    public void sequenceFileStartRead(String sourceSequenceFilePath, Configuration readerSliceConfig,
            RecordSender recordSender, TaskPluginCollector taskPluginCollector)
    {
        LOG.info("Start Read sequence file [{}].", sourceSequenceFilePath);

        Path seqFilePath = new Path(sourceSequenceFilePath);
        try (SequenceFile.Reader reader = new SequenceFile.Reader(this.hadoopConf,
                SequenceFile.Reader.file(seqFilePath))) {
            //獲取SequenceFile.Reader實體
            //獲取key 與 value
            Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), this.hadoopConf);
            Text value = new Text();
            while (reader.next(key, value)) {
                if (StringUtils.isNotBlank(value.toString())) {
                    StorageReaderUtil.transportOneRecord(recordSender, readerSliceConfig, taskPluginCollector, value.toString());
                }
            }
        }
        catch (Exception e) {
            String message = String.format("SequenceFile.Reader讀取檔案[%s]時出錯", sourceSequenceFilePath);
            LOG.error(message);
            throw DataXException.asDataXException(HdfsReaderErrorCode.READ_SEQUENCE_FILE_ERROR, message, e);
        }
    }

    public void rcFileStartRead(String sourceRcFilePath, Configuration readerSliceConfig,
            RecordSender recordSender, TaskPluginCollector taskPluginCollector)
    {
        LOG.info("Start Read rc-file [{}].", sourceRcFilePath);
        List<ColumnEntry> column = StorageReaderUtil
                .getListColumnEntry(readerSliceConfig, COLUMN);
        // warn: no default value '\N'
        String nullFormat = readerSliceConfig.getString(NULL_FORMAT);

        Path rcFilePath = new Path(sourceRcFilePath);
        RCFileRecordReader recordReader = null;
        try (FileSystem fs = FileSystem.get(rcFilePath.toUri(), hadoopConf)) {
            long fileLen = fs.getFileStatus(rcFilePath).getLen();
            FileSplit split = new FileSplit(rcFilePath, 0, fileLen, (String[]) null);
            recordReader = new RCFileRecordReader(hadoopConf, split);
            LongWritable key = new LongWritable();
            BytesRefArrayWritable value = new BytesRefArrayWritable();
            Text txt = new Text();
            while (recordReader.next(key, value)) {
                String[] sourceLine = new String[value.size()];
                txt.clear();
                for (int i = 0; i < value.size(); i++) {
                    BytesRefWritable v = value.get(i);
                    txt.set(v.getData(), v.getStart(), v.getLength());
                    sourceLine[i] = txt.toString();
                }
                StorageReaderUtil.transportOneRecord(recordSender,
                        column, sourceLine, nullFormat, taskPluginCollector);
            }
        }
        catch (IOException e) {
            String message = String.format("讀取檔案[%s]時出錯", sourceRcFilePath);
            LOG.error(message);
            throw DataXException.asDataXException(HdfsReaderErrorCode.READ_RCFILE_ERROR, message, e);
        }
        finally {
            try {
                if (recordReader != null) {
                    recordReader.close();
                    LOG.info("Finally, Close RCFileRecordReader.");
                }
            }
            catch (IOException e) {
                LOG.warn(String.format("finally: 關閉RCFileRecordReader失敗, %s", e.getMessage()));
            }
        }
    }

    public void orcFileStartRead(String sourceOrcFilePath, Configuration readerSliceConfig,
            RecordSender recordSender, TaskPluginCollector taskPluginCollector)
    {
        LOG.info("Start Read orc-file [{}].", sourceOrcFilePath);
        List<ColumnEntry> column = StorageReaderUtil.getListColumnEntry(readerSliceConfig, COLUMN);
        String nullFormat = readerSliceConfig.getString(NULL_FORMAT);

        try {
            Path orcFilePath = new Path(sourceOrcFilePath);
            Reader reader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(hadoopConf));
            TypeDescription schema = reader.getSchema();
            assert column != null;
            if (column.isEmpty()) {
                for (int i = 0; i < schema.getChildren().size(); i++) {
                    ColumnEntry columnEntry = new ColumnEntry();
                    columnEntry.setIndex(i);
                    columnEntry.setType(schema.getChildren().get(i).getCategory().getName());
                    column.add(columnEntry);
                }
            }

            VectorizedRowBatch rowBatch = schema.createRowBatch(1024);
            org.apache.orc.RecordReader rowIterator = reader.rows(reader.options().schema(schema));
            while (rowIterator.nextBatch(rowBatch)) {
                transportOrcRecord(rowBatch, column, recordSender, taskPluginCollector, nullFormat);
            }
        }
        catch (Exception e) {
            String message = String.format("從orc-file檔案路徑[%s]中讀取資料發生例外,請聯系系統管理員,"
                    , sourceOrcFilePath);
            LOG.error(message);
            throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message);
        }
    }

    private void transportOrcRecord(VectorizedRowBatch rowBatch, List<ColumnEntry> columns, RecordSender recordSender,
            TaskPluginCollector taskPluginCollector, String nullFormat)
    {
        Record record;
        for (int row = 0; row < rowBatch.size; row++) {
            record = recordSender.createRecord();
            try {
                for (ColumnEntry column : columns) {

                    Column columnGenerated;
                    if (column.getValue() != null) {
                        if (!"null".equals(column.getValue())) {
                            columnGenerated = new StringColumn(column.getValue());
                        }
                        else {
                            columnGenerated = new StringColumn();
                        }
                        record.addColumn(columnGenerated);
                        continue;
                    }
                    int i = column.getIndex();
                    String columnType = column.getType().toUpperCase();
                    ColumnVector col = rowBatch.cols[i];
                    Type type = Type.valueOf(columnType);
                    if (col.isNull[row]) {
                        record.addColumn(new StringColumn(null));
                        continue;
                    }
                    switch (type) {
                        case INT:
                        case LONG:
                        case BOOLEAN:
                        case BIGINT:
                            columnGenerated = new LongColumn(((LongColumnVector) col).vector[row]);
                            break;
                        case DATE:
                            columnGenerated = new DateColumn(new Date(((LongColumnVector) col).vector[row]));
                            break;
                        case DOUBLE:
                            columnGenerated = new DoubleColumn(((DoubleColumnVector) col).vector[row]);
                            break;
                        case DECIMAL:
                            columnGenerated = new DoubleColumn(((DecimalColumnVector) col).vector[row].doubleValue());
                            break;
                        case BINARY:
                            BytesColumnVector b = (BytesColumnVector) col;
                            byte[] val = Arrays.copyOfRange(b.vector[row], b.start[row], b.start[row] + b.length[row]);
                            columnGenerated = new BytesColumn(val);
                            break;
                        case TIMESTAMP:
                            columnGenerated = new DateColumn(((TimestampColumnVector) col).getTime(row));
                            break;
                        default:
                            // type is string or other
                            String v = ((BytesColumnVector) col).toString(row);
                            columnGenerated = v.equals(nullFormat) ? new StringColumn() : new StringColumn(v);
                            break;
                    }
                    record.addColumn(columnGenerated);
                }
                recordSender.sendToWriter(record);
            }
            catch (Exception e) {
                if (e instanceof DataXException) {
                    throw (DataXException) e;
                }
                taskPluginCollector.collectDirtyRecord(record, e.getMessage());
            }
        }
    }

    public void parquetFileStartRead(String sourceParquetFilePath, Configuration readerSliceConfig,
            RecordSender recordSender, TaskPluginCollector taskPluginCollector)
    {
        LOG.info("Start Read parquet-file [{}].", sourceParquetFilePath);
        List<ColumnEntry> column = StorageReaderUtil.getListColumnEntry(readerSliceConfig, COLUMN);
        String nullFormat = readerSliceConfig.getString(NULL_FORMAT);
        Path parquetFilePath = new Path(sourceParquetFilePath);

        hadoopConf.set("parquet.avro.readInt96AsFixed", "true");
        JobConf conf = new JobConf(hadoopConf);

        GenericData decimalSupport = new GenericData();
        decimalSupport.addLogicalTypeConversion(new Conversions.DecimalConversion());
        try (ParquetReader<GenericData.Record> reader = AvroParquetReader
                .<GenericData.Record>builder(HadoopInputFile.fromPath(parquetFilePath, hadoopConf))
                .withDataModel(decimalSupport)
                .withConf(conf)
                .build()) {
            GenericData.Record gRecord = reader.read();
            Schema schema = gRecord.getSchema();

            if (null == column || column.isEmpty()) {
                column = new ArrayList<>(schema.getFields().size());
                String sType;
                // 用戶沒有填寫具體的欄位資訊,需要從parquet檔案構建
                for (int i = 0; i < schema.getFields().size(); i++) {
                    ColumnEntry columnEntry = new ColumnEntry();
                    columnEntry.setIndex(i);
                    Schema type;
                    if (schema.getFields().get(i).schema().getType() == Schema.Type.UNION) {
                        type = schema.getFields().get(i).schema().getTypes().get(1);
                    }
                    else {
                        type = schema.getFields().get(i).schema();
                    }
                    sType = type.getProp("logicalType") != null ? type.getProp("logicalType") : type.getType().getName();
                    if (sType.startsWith("timestamp")) {
                        columnEntry.setType("timestamp");
                    }
                    else {
                        columnEntry.setType(sType);
                    }
                    column.add(columnEntry);
                }
            }
            while (gRecord != null) {
                transportParquetRecord(column, gRecord, recordSender, taskPluginCollector, nullFormat);
                gRecord = reader.read();
            }
        }
        catch (IOException e) {
            String message = String.format("從parquet file檔案路徑[%s]中讀取資料發生例外,請聯系系統管理員,"
                    , sourceParquetFilePath);
            LOG.error(message);
            throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message);
        }
    }

    /*
     * create a transport record for Parquet file
     *
     *
     */
    private void transportParquetRecord(List<ColumnEntry> columnConfigs, GenericData.Record gRecord, RecordSender recordSender,
            TaskPluginCollector taskPluginCollector, String nullFormat)
    {
        Record record = recordSender.createRecord();
        Column columnGenerated;
        int scale = 10;
        try {
            for (ColumnEntry columnEntry : columnConfigs) {
                String columnType = columnEntry.getType();
                Integer columnIndex = columnEntry.getIndex();
                String columnConst = columnEntry.getValue();
                String columnValue = null;
                if (null != columnIndex) {
                    if (null != gRecord.get(columnIndex)) {
                        columnValue = gRecord.get(columnIndex).toString();
                    }
                    else {
                        record.addColumn(new StringColumn(null));
                        continue;
                    }
                }
                else {
                    columnValue = columnConst;
                }
                if (columnType.startsWith("decimal(")) {
                    String ps = columnType.replace("decimal(", "").replace(")", "");
                    columnType = "decimal";
                    if (ps.contains(",")) {
                        scale = Integer.parseInt(ps.split(",")[1].trim());
                    }
                    else {
                        scale = 0;
                    }
                }
                Type type = Type.valueOf(columnType.toUpperCase());
                if (StringUtils.equals(columnValue, nullFormat)) {
                    columnValue = null;
                }
                try {
                    switch (type) {
                        case STRING:
                            columnGenerated = new StringColumn(columnValue);
                            break;
                        case INT:
                        case LONG:
                            columnGenerated = new LongColumn(columnValue);
                            break;
                        case DOUBLE:
                            columnGenerated = new DoubleColumn(columnValue);
                            break;
                        case DECIMAL:
                            if (null == columnValue) {
                                columnGenerated = new DoubleColumn((Double) null);
                            }
                            else {
                                columnGenerated = new DoubleColumn(new BigDecimal(columnValue).setScale(scale, RoundingMode.HALF_UP));
                            }
                            break;
                        case BOOLEAN:
                            columnGenerated = new BoolColumn(columnValue);
                            break;
                        case DATE:
                            if (columnValue =https://www.cnblogs.com/wxm2270/archive/2023/02/13/= null) {
                                columnGenerated = new DateColumn((Date) null);
                            }
                            else {
                                String formatString = columnEntry.getFormat();
                                if (StringUtils.isNotBlank(formatString)) {
                                    // 用戶自己配置的格式轉換
                                    SimpleDateFormat format = new SimpleDateFormat(
                                            formatString);
                                    columnGenerated = new DateColumn(
                                            format.parse(columnValue));
                                }
                                else {
                                    // 框架嘗試轉換
                                    columnGenerated = new DateColumn(new StringColumn(columnValue).asDate());
                                }
                            }
                            break;
                        case TIMESTAMP:
                            if (null == columnValue) {
                                columnGenerated = new DateColumn();
                            }
                            else if (columnValue.startsWith("[")) {
                                // INT96 https://github.com/apache/parquet-mr/pull/901
                                GenericData.Fixed fixed = (GenericData.Fixed) gRecord.get(columnIndex);
                                Date date = new Date(getTimestampMills(fixed.bytes()));
                                columnGenerated = new DateColumn(date);
                            }
                            else {
                                columnGenerated = new DateColumn(Long.parseLong(columnValue) * 1000);
                            }
                            break;
                        case BINARY:
                            columnGenerated = new BytesColumn(((ByteBuffer) gRecord.get(columnIndex)).array());
                            break;
                        default:
                            String errorMessage = String.format("您配置的列型別暫不支持 : [%s]", columnType);
                            LOG.error(errorMessage);
                            throw DataXException.asDataXException(StorageReaderErrorCode.NOT_SUPPORT_TYPE, errorMessage);
                    }
                }
                catch (Exception e) {
                    throw new IllegalArgumentException(String.format(
                            "型別轉換錯誤, 無法將[%s] 轉換為[%s], %s", columnValue, type, e));
                }
                record.addColumn(columnGenerated);
            } // end for

            recordSender.sendToWriter(record);
        }
        catch (IllegalArgumentException | IndexOutOfBoundsException iae) {
            taskPluginCollector.collectDirtyRecord(record, iae.getMessage());
        }
        catch (Exception e) {
            if (e instanceof DataXException) {
                throw (DataXException) e;
            }
            // 每一種轉換失敗都是臟資料處理,包括數字格式 & 日期格式
            taskPluginCollector.collectDirtyRecord(record, e.getMessage());
        }
    }

    private TypeDescription getOrcSchema(String filePath)
    {
        Path path = new Path(filePath);
        try {
            Reader reader = OrcFile.createReader(path, OrcFile.readerOptions(hadoopConf));
//            return reader.getTypes().get(0).getSubtypesCount()
            return reader.getSchema();
        }
        catch (IOException e) {
            String message = "讀取orc-file column列數失敗,請聯系系統管理員";
            throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message);
        }
    }

    public boolean checkHdfsFileType(String filepath, String specifiedFileType)
    {

        Path file = new Path(filepath);

        try (FileSystem fs = FileSystem.get(hadoopConf); FSDataInputStream in = fs.open(file)) {
            if (StringUtils.equalsIgnoreCase(specifiedFileType, HdfsConstant.ORC)) {
                return isORCFile(file, fs, in);
            }
            else if (StringUtils.equalsIgnoreCase(specifiedFileType, HdfsConstant.RC)) {
                return isRCFile(filepath, in);
            }
            else if (StringUtils.equalsIgnoreCase(specifiedFileType, HdfsConstant.SEQ)) {

                return isSequenceFile(file, in);
            }
            else if (StringUtils.equalsIgnoreCase(specifiedFileType, HdfsConstant.PARQUET)) {
                return isParquetFile(file);
            }
            else if (StringUtils.equalsIgnoreCase(specifiedFileType, HdfsConstant.CSV)
                    || StringUtils.equalsIgnoreCase(specifiedFileType, HdfsConstant.TEXT)) {
                return true;
            }
        }
        catch (Exception e) {
            String message = String.format("檢查檔案[%s]型別失敗,目前支持 %s 格式的檔案," +
                    "請檢查您檔案型別和檔案是否正確,", filepath, HdfsConstant.SUPPORT_FILE_TYPE);
            LOG.error(message);
            throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message, e);
        }
        return false;
    }

    // 判斷file是否是ORC File
    private boolean isORCFile(Path file, FileSystem fs, FSDataInputStream in)
    {
        try {
            // figure out the size of the file using the option or filesystem
            long size = fs.getFileStatus(file).getLen();

            //read last bytes into buffer to get PostScript
            int readSize = (int) Math.min(size, DIRECTORY_SIZE_GUESS);
            in.seek(size - readSize);
            ByteBuffer buffer = ByteBuffer.allocate(readSize);
            in.readFully(buffer.array(), buffer.arrayOffset() + buffer.position(),
                    buffer.remaining());

            //read the PostScript
            //get length of PostScript
            int psLen = buffer.get(readSize - 1) & 0xff;
            String orcMagic = org.apache.orc.OrcFile.MAGIC;
            int len = orcMagic.length();
            if (psLen < len + 1) {
                return false;
            }
            int offset = buffer.arrayOffset() + buffer.position() + buffer.limit() - 1
                    - len;
            byte[] array = buffer.array();
            // now look for the magic string at the end of the postscript.
            if (Text.decode(array, offset, len).equals(orcMagic)) {
                return true;
            }
            else {
                // If it isn't there, this may be the 0.11.0 version of ORC.
                // Read the first 3 bytes of the file to check for the header
                in.seek(0);
                byte[] header = new byte[len];
                in.readFully(header, 0, len);
                // if it isn't there, this isn't an ORC file
                if (Text.decode(header, 0, len).equals(orcMagic)) {
                    return true;
                }
            }
        }
        catch (IOException e) {
            LOG.info("檢查檔案型別: [{}] 不是ORC File.", file);
        }
        return false;
    }

    // 判斷file是否是RC file
    private boolean isRCFile(String filepath, FSDataInputStream in)
    {

        // The first version of RCFile used the sequence file header.
        final byte[] originalMagic = {(byte) 'S', (byte) 'E', (byte) 'Q'};
        // The 'magic' bytes at the beginning of the RCFile
        final byte[] rcMagic = {(byte) 'R', (byte) 'C', (byte) 'F'};
        // the version that was included with the original magic, which is mapped
        // into ORIGINAL_VERSION
        final byte ORIGINAL_MAGIC_VERSION_WITH_METADATA = https://www.cnblogs.com/wxm2270/archive/2023/02/13/6;
        // All the versions should be place in this list.
        final int ORIGINAL_VERSION = 0;  // version with SEQ
        // version with RCF
        // final int NEW_MAGIC_VERSION = 1
        // final int CURRENT_VERSION = NEW_MAGIC_VERSION
        final int CURRENT_VERSION = 1;
        byte version;

        byte[] magic = new byte[rcMagic.length];
        try {
            in.seek(0);
            in.readFully(magic);

            if (Arrays.equals(magic, originalMagic)) {
                if (in.readByte() != ORIGINAL_MAGIC_VERSION_WITH_METADATA) {
                    return false;
                }
                version = ORIGINAL_VERSION;
            }
            else {
                if (!Arrays.equals(magic, rcMagic)) {
                    return false;
                }

                // Set 'version'
                version = in.readByte();
                if (version > CURRENT_VERSION) {
                    return false;
                }
            }

            if (version == ORIGINAL_VERSION) {
                try {
                    Class<?> keyCls = hadoopConf.getClassByName(Text.readString(in));
                    Class<?> valCls = hadoopConf.getClassByName(Text.readString(in));
                    if (!keyCls.equals(RCFile.KeyBuffer.class) || !valCls.equals(RCFile.ValueBuffer.class)) {
                        return false;
                    }
                }
                catch (ClassNotFoundException e) {
                    return false;
                }
            }
//            boolean decompress = in.readBoolean(); // is compressed?
            if (version == ORIGINAL_VERSION) {
                // is block-compressed? it should be always false.
                boolean blkCompressed = in.readBoolean();
                return !blkCompressed;
            }
            return true;
        }
        catch (IOException e) {
            LOG.info("檢查檔案型別: [{}] 不是RC File.", filepath);
        }
        return false;
    }

    // 判斷file是否是Sequence file
    private boolean isSequenceFile(Path filepath, FSDataInputStream in)
    {
        final byte[] seqMagic = {(byte) 'S', (byte) 'E', (byte) 'Q'};
        byte[] magic = new byte[seqMagic.length];
        try {
            in.seek(0);
            in.readFully(magic);
            return Arrays.equals(magic, seqMagic);
        }
        catch (IOException e) {
            LOG.info("檢查檔案型別: [{}] 不是Sequence File.", filepath);
        }
        return false;
    }

    //判斷是否為parquet(考慮判斷parquet檔案的schema是否不為空)
    private boolean isParquetFile(Path file) 
    {
        try {
            GroupReadSupport readSupport = new GroupReadSupport();
            ParquetReader.Builder<Group> reader = ParquetReader.builder(readSupport, file);
            ParquetReader<Group> build = reader.build();
            if (build.read() != null) {
                return true;
            }
        }
        catch (IOException e) {
            LOG.info("檢查檔案型別: [{}] 不是Parquet File.", file);
        }
        return false;
    }

    /**
     * Returns GMT's timestamp from binary encoded parquet timestamp (12 bytes - julian date + time of day nanos).
     *
     * @param timestampBinary INT96 parquet timestamp
     * @return timestamp in millis, GMT timezone
     */
    public static long getTimestampMillis(Binary timestampBinary)
    {
        if (timestampBinary.length() != 12) {
            return 0;
        }
        byte[] bytes = timestampBinary.getBytes();

        return getTimestampMills(bytes);
    }

    public static long getTimestampMills(byte[] bytes)
    {
        assert bytes.length == 12;
        // little endian encoding - need to invert byte order
        long timeOfDayNanos = Longs.fromBytes(bytes[7], bytes[6], bytes[5], bytes[4], bytes[3], bytes[2], bytes[1], bytes[0]);
        int julianDay = Ints.fromBytes(bytes[11], bytes[10], bytes[9], bytes[8]);

        return julianDayToMillis(julianDay) + (timeOfDayNanos / NANOS_PER_MILLISECOND);
    }

    private static long julianDayToMillis(int julianDay)
    {
        return (julianDay - JULIAN_EPOCH_OFFSET_DAYS) * MILLIS_IN_DAY;
    }
}

HdfsConstant

package com.alibaba.datax.plugin.reader.hdfsreader;

import com.alibaba.datax.common.base.Constant;
import java.util.Arrays;
import java.util.List;

public class HdfsConstant
        extends Constant
{

    public static final String SOURCE_FILES = "sourceFiles";
    public static final String TEXT = "TEXT";
    public static final String ORC = "ORC";
    public static final String CSV = "CSV";
    public static final String SEQ = "SEQ";
    public static final String RC = "RC";
    public static final String PARQUET = "PARQUET"; //新增parquet檔案型別
    public static final String HDFS_DEFAULT_KEY = "fs.defaultFS";
    public static final String HADOOP_SECURITY_AUTHENTICATION_KEY = "hadoop.security.authentication";
    protected static final List<String> SUPPORT_FILE_TYPE =
            Arrays.asList(HdfsConstant.CSV, HdfsConstant.ORC, HdfsConstant.RC, HdfsConstant.SEQ, HdfsConstant.TEXT, HdfsConstant.PARQUET);

    private HdfsConstant() {}
}

HdfsReader

package com.alibaba.datax.plugin.reader.hdfsreader;

import com.alibaba.datax.common.base.Key;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.spi.Reader;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.storage.reader.StorageReaderUtil;
import com.alibaba.datax.storage.util.FileHelper;
import org.apache.commons.io.Charsets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.InputStream;
import java.nio.charset.UnsupportedCharsetException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;

import static com.alibaba.datax.common.base.Key.COLUMN;
import static com.alibaba.datax.common.base.Key.ENCODING;
import static com.alibaba.datax.common.base.Key.INDEX;
import static com.alibaba.datax.common.base.Key.TYPE;
import static com.alibaba.datax.common.base.Key.VALUE;

public class HdfsReader
        extends Reader
{

    /**
     * Job 中的方法僅執行一次,Task 中方法會由框架啟動多個 Task 執行緒并行執行,
     * <p>
     * 整個 Reader 執行流程是:
     * <pre>
     * Job類init--&gt;prepare--&gt;split
     * Task類init--&gt;prepare--&gt;startRead--&gt;post--&gt;destroy
     * Task類init--&gt;prepare--&gt;startRead--&gt;post--&gt;destroy
     * Job類post--&gt;destroy
     * </pre>
     */
    public static class Job
            extends Reader.Job
    {
        private static final Logger LOG = LoggerFactory.getLogger(Job.class);

        private Configuration readerOriginConfig = null;
        private HashSet<String> sourceFiles;
        private String specifiedFileType = null;
        private DFSUtil dfsUtil = null;
        private List<String> path = null;

        @Override
        public void init()
        {

            LOG.info("init() begin...");
            this.readerOriginConfig = getPluginJobConf();
            validate();
            dfsUtil = new DFSUtil(readerOriginConfig);
            LOG.info("init() ok and end...");
        }

        public void validate()
        {
            readerOriginConfig.getNecessaryValue(Key.DEFAULT_FS, HdfsReaderErrorCode.DEFAULT_FS_NOT_FIND_ERROR);

            // path check
            String pathInString = readerOriginConfig.getNecessaryValue(Key.PATH, HdfsReaderErrorCode.REQUIRED_VALUE);
            if (!pathInString.startsWith("[") && !pathInString.endsWith("]")) {
                path = Collections.singletonList(pathInString);
            }
            else {
                path = readerOriginConfig.getList(Key.PATH, String.class);
                if (null == path || path.isEmpty()) {
                    throw DataXException.asDataXException(HdfsReaderErrorCode.REQUIRED_VALUE, "您需要指定待讀取的源目錄或檔案");
                }
                for (String eachPath : path) {
                    if (!eachPath.startsWith("/")) {
                        String message = String.format("請檢查引數path:[%s],需要配置為絕對路徑", eachPath);
                        LOG.error(message);
                        throw DataXException.asDataXException(HdfsReaderErrorCode.ILLEGAL_VALUE, message);
                    }
                }
            }

            specifiedFileType = readerOriginConfig.getNecessaryValue(Key.FILE_TYPE, HdfsReaderErrorCode.REQUIRED_VALUE).toUpperCase();
            if (!HdfsConstant.SUPPORT_FILE_TYPE.contains(specifiedFileType)) {
                String message = "HdfsReader插件目前支持 " + HdfsConstant.SUPPORT_FILE_TYPE + " 幾種格式的檔案";
                throw DataXException.asDataXException(HdfsReaderErrorCode.FILE_TYPE_ERROR, message);
            }

            String encoding = this.readerOriginConfig.getString(ENCODING, "UTF-8");

            try {
                Charsets.toCharset(encoding);
            }
            catch (UnsupportedCharsetException uce) {
                throw DataXException.asDataXException(
                        HdfsReaderErrorCode.ILLEGAL_VALUE,
                        String.format("不支持的編碼格式 : [%s]", encoding), uce);
            }
            catch (Exception e) {
                throw DataXException.asDataXException(
                        HdfsReaderErrorCode.ILLEGAL_VALUE,
                        String.format("運行配置例外 : %s", e.getMessage()), e);
            }
            //check Kerberos
            boolean haveKerberos = readerOriginConfig.getBool(Key.HAVE_KERBEROS, false);
            if (haveKerberos) {
                readerOriginConfig.getNecessaryValue(Key.KERBEROS_KEYTAB_FILE_PATH, HdfsReaderErrorCode.REQUIRED_VALUE);
                readerOriginConfig.getNecessaryValue(Key.KERBEROS_PRINCIPAL, HdfsReaderErrorCode.REQUIRED_VALUE);
            }

            // validate the Columns
            validateColumns();

            // validate compress
            String compress = readerOriginConfig.getString(Key.COMPRESS, "NONE");
            if ("gzip".equalsIgnoreCase(compress)) {
                // correct to gz
                readerOriginConfig.set(Key.COMPRESS, "gz");
            }
        }

        private void validateColumns()
        {

            // 檢測是column 是否為 ["*"] 若是則填為空
            List<Configuration> column = this.readerOriginConfig.getListConfiguration(COLUMN);
            if (null != column && 1 == column.size()
                    && ("\"*\"".equals(column.get(0).toString()) || "'*'".equals(column.get(0).toString()))) {
                readerOriginConfig.set(COLUMN, new ArrayList<String>());
            }
            else {
                // column: 1. index type 2.value type 3.when type is Data, may be has dateFormat value
                List<Configuration> columns = readerOriginConfig.getListConfiguration(COLUMN);

                if (null == columns || columns.isEmpty()) {
                    throw DataXException.asDataXException(HdfsReaderErrorCode.CONFIG_INVALID_EXCEPTION, "您需要指定 columns");
                }

                for (Configuration eachColumnConf : columns) {
                    eachColumnConf.getNecessaryValue(TYPE, HdfsReaderErrorCode.REQUIRED_VALUE);
                    Integer columnIndex = eachColumnConf.getInt(INDEX);
                    String columnValue = eachColumnConf.getString(VALUE);

                    if (null == columnIndex && null == columnValue) {
                        throw DataXException.asDataXException(
                                HdfsReaderErrorCode.NO_INDEX_VALUE,
                                "由于您配置了type, 則至少需要配置 index 或 value, 當前配置為:" + eachColumnConf);
                    }

                    if (null != columnIndex && null != columnValue) {
                        throw DataXException.asDataXException(HdfsReaderErrorCode.MIXED_INDEX_VALUE,
                                "您混合配置了index, value, 每一列同時僅能選擇其中一種");
                    }
                }
            }
        }

        @Override
        public void prepare()
        {
            LOG.info("prepare(), start to getAllFiles...");
            this.sourceFiles = (HashSet<String>) dfsUtil.getAllFiles(path, specifiedFileType);
            LOG.info("您即將讀取的檔案數為: [{}], 串列為: [{}]", sourceFiles.size(), sourceFiles);
        }

        @Override
        public List<Configuration> split(int adviceNumber)
        {

            LOG.info("split() begin...");
            List<Configuration> readerSplitConfigs = new ArrayList<>();
            // warn:每個slice拖且僅拖一個檔案,
            int splitNumber = sourceFiles.size();
            if (0 == splitNumber) {
                throw DataXException.asDataXException(HdfsReaderErrorCode.EMPTY_DIR_EXCEPTION,
                        String.format("未能找到待讀取的檔案,請確認您的配置項path: %s", readerOriginConfig.getString(Key.PATH)));
            }

            List<List<String>> splitSourceFiles = FileHelper.splitSourceFiles(new ArrayList<>(sourceFiles), splitNumber);
            for (List<String> files : splitSourceFiles) {
                Configuration splitConfig = readerOriginConfig.clone();
                splitConfig.set(HdfsConstant.SOURCE_FILES, files);
                readerSplitConfigs.add(splitConfig);
            }

            return readerSplitConfigs;
        }

        @Override
        public void post()
        {
            //
        }

        @Override
        public void destroy()
        {
            //
        }
    }

    public static class Task
            extends Reader.Task
    {

        private static final Logger LOG = LoggerFactory.getLogger(Task.class);
        private Configuration taskConfig;
        private List<String> sourceFiles;
        private String specifiedFileType;
        private DFSUtil dfsUtil = null;

        @Override
        public void init()
        {

            this.taskConfig = getPluginJobConf();
            this.sourceFiles = taskConfig.getList(HdfsConstant.SOURCE_FILES, String.class);
            this.specifiedFileType = taskConfig.getNecessaryValue(Key.FILE_TYPE, HdfsReaderErrorCode.REQUIRED_VALUE);
            this.dfsUtil = new DFSUtil(taskConfig);
        }

        @Override
        public void prepare()
        {
            //
        }

        @Override
        public void startRead(RecordSender recordSender)
        {

            LOG.info("read start");
            for (String sourceFile : this.sourceFiles) {
                LOG.info("reading file : [{}]", sourceFile);

                if (specifiedFileType.equalsIgnoreCase(HdfsConstant.TEXT) || specifiedFileType.equalsIgnoreCase(HdfsConstant.CSV)) {
                    InputStream inputStream = dfsUtil.getInputStream(sourceFile);
                    StorageReaderUtil.readFromStream(inputStream, sourceFile, taskConfig, recordSender, getTaskPluginCollector());
                }
                else if (specifiedFileType.equalsIgnoreCase(HdfsConstant.ORC)) {

                    dfsUtil.orcFileStartRead(sourceFile, taskConfig, recordSender, getTaskPluginCollector());
                }
                else if (specifiedFileType.equalsIgnoreCase(HdfsConstant.SEQ)) {

                    dfsUtil.sequenceFileStartRead(sourceFile, taskConfig, recordSender, getTaskPluginCollector());
                }
                else if (specifiedFileType.equalsIgnoreCase(HdfsConstant.RC)) {

                    dfsUtil.rcFileStartRead(sourceFile, taskConfig, recordSender, getTaskPluginCollector());
                }
                else if (specifiedFileType.equalsIgnoreCase(HdfsConstant.PARQUET)) {
                    dfsUtil.parquetFileStartRead(sourceFile, taskConfig, recordSender, getTaskPluginCollector());
                }
                else {
                    String message = "HdfsReader插件目前支持ORC, TEXT, CSV, SEQUENCE, RC、PARQUET等六種格式的檔案," +
                            "請將fileType選項的值配置為ORC, TEXT, CSV, SEQUENCE, RC, PARQUET";
                    throw DataXException.asDataXException(HdfsReaderErrorCode.FILE_TYPE_UNSUPPORTED, message);
                }

                if (recordSender != null) {
                    recordSender.flush();
                }
            }

            LOG.info("end read source files...");
        }

        @Override
        public void post()
        {
            //
        }

        @Override
        public void destroy()
        {
            //
        }
    }
}

HdfsWriter插件

    本插件比較簡單,一共五個類,具體類名及對應修改項如下:

  • HdfsHelper:增加是否Parquet檔案型別判斷方法、增加Parquet檔案讀取轉換方法,
  • HdfsWriter:增加Parquet檔案類的列舉項,
  • SupportHiveDataType:無需更改,
  • HdfsWriterErrorCode:無需更改,
  • Type:無需更改,

    按需修改其中四個類即可,具體代碼如下:

HdfsHelper

package com.alibaba.datax.plugin.writer.hdfswriter;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.datax.common.base.Constant;
import com.alibaba.datax.common.base.Key;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.unstructuredstorage.util.ColumnTypeUtil;
import com.alibaba.datax.unstructuredstorage.util.HdfsUtil;
import org.apache.avro.Conversions;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.util.KerberosName;
import org.apache.orc.CompressionKind;
import org.apache.orc.OrcFile;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Types;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.Field;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.*;

public class HdfsHelper
{
    public static final Logger LOG = LoggerFactory.getLogger(HdfsHelper.class);
    public static final String HADOOP_SECURITY_AUTHENTICATION_KEY = "hadoop.security.authentication";
    public static final String HDFS_DEFAULT_FS_KEY = "fs.defaultFS";
    private FileSystem fileSystem = null;
    private JobConf conf = null;
    private org.apache.hadoop.conf.Configuration hadoopConf = null;
    // Kerberos
    private boolean haveKerberos = false;
    private String kerberosKeytabFilePath;
    private String kerberosPrincipal;
    private String krb5ConfPath;

    public static MutablePair<Text, Boolean> transportOneRecord(
            Record record, char fieldDelimiter, List<Configuration> columnsConfiguration, TaskPluginCollector taskPluginCollector)
    {
        MutablePair<List<Object>, Boolean> transportResultList = transportOneRecord(record, columnsConfiguration, taskPluginCollector);
        //保存<轉換后的資料,是否是臟資料>
        MutablePair<Text, Boolean> transportResult = new MutablePair<>();
        transportResult.setRight(false);
        Text recordResult = new Text(StringUtils.join(transportResultList.getLeft(), fieldDelimiter));
        transportResult.setRight(transportResultList.getRight());
        transportResult.setLeft(recordResult);
        return transportResult;
    }

    public static MutablePair<List<Object>, Boolean> transportOneRecord(
            Record record, List<Configuration> columnsConfiguration,
            TaskPluginCollector taskPluginCollector)
    {

        MutablePair<List<Object>, Boolean> transportResult = new MutablePair<>();
        transportResult.setRight(false);
        List<Object> recordList = new ArrayList<>();
        int recordLength = record.getColumnNumber();
        if (0 != recordLength) {
            Column column;
            for (int i = 0; i < recordLength; i++) {
                column = record.getColumn(i);
                if (null != column.getRawData()) {
                    String rowData = column.getRawData().toString();
                    SupportHiveDataType columnType = SupportHiveDataType.valueOf(
                            columnsConfiguration.get(i).getString(Key.TYPE).toUpperCase());
                    //根據writer端型別配置做型別轉換
                    try {
                        switch (columnType) {
                            case TINYINT:
                                recordList.add(Byte.valueOf(rowData));
                                break;
                            case SMALLINT:
                                recordList.add(Short.valueOf(rowData));
                                break;
                            case INT:
                            case INTEGER:
                                recordList.add(Integer.valueOf(rowData));
                                break;
                            case BIGINT:
                                recordList.add(column.asLong());
                                break;
                            case FLOAT:
                                recordList.add(Float.valueOf(rowData));
                                break;
                            case DOUBLE:
                                recordList.add(column.asDouble());
                                break;
                            case STRING:
                            case VARCHAR:
                            case CHAR:
                                recordList.add(column.asString());
                                break;
                            case DECIMAL:
                                recordList.add(HiveDecimal.create(column.asBigDecimal()));
                                break;
                            case BOOLEAN:
                                recordList.add(column.asBoolean());
                                break;
                            case DATE:
                                recordList.add(org.apache.hadoop.hive.common.type.Date.valueOf(column.asString()));
                                break;
                            case TIMESTAMP:
                                recordList.add(Timestamp.valueOf(column.asString()));
                                break;
                            case BINARY:
                                recordList.add(column.asBytes());
                                break;
                            default:
                                throw DataXException
                                        .asDataXException(
                                                HdfsWriterErrorCode.ILLEGAL_VALUE,
                                                String.format(
                                                        "您的組態檔中的列配置資訊有誤. 不支持資料庫寫入這種欄位型別. 欄位名:[%s], 欄位型別:[%s]. 請修改表中該欄位的型別或者不同步該欄位.",
                                                        columnsConfiguration.get(i).getString(Key.NAME),
                                                        columnsConfiguration.get(i).getString(Key.TYPE)));
                        }
                    }
                    catch (Exception e) {
                        // warn: 此處認為臟資料
                        e.printStackTrace();
                        String message = String.format(
                                "欄位型別轉換錯誤:你目標欄位為[%s]型別,實際欄位值為[%s].",
                                columnsConfiguration.get(i).getString(Key.TYPE), column.getRawData());
                        taskPluginCollector.collectDirtyRecord(record, message);
                        transportResult.setRight(true);
                        break;
                    }
                }
                else {
                    // warn: it's all ok if nullFormat is null
                    recordList.add(null);
                }
            }
        }
        transportResult.setLeft(recordList);
        return transportResult;
    }

    public static GenericRecord transportParRecord(
            Record record, List<Configuration> columnsConfiguration,
            TaskPluginCollector taskPluginCollector, GenericRecordBuilder builder)
    {

        int recordLength = record.getColumnNumber();
        if (0 != recordLength) {
            Column column;
            for (int i = 0; i < recordLength; i++) {
                column = record.getColumn(i);
                String colName = columnsConfiguration.get(i).getString(Key.NAME);
                String typename = columnsConfiguration.get(i).getString(Key.TYPE).toUpperCase();
                if (null == column || column.getRawData() == null) {
                    builder.set(colName, null);
                }
                else {
                    String rowData = column.getRawData().toString();
                    SupportHiveDataType columnType = SupportHiveDataType.valueOf(typename);
                    //根據writer端型別配置做型別轉換
                    try {
                        switch (columnType) {
                            case INT:
                            case INTEGER:
                                builder.set(colName, Integer.valueOf(rowData));
                                break;
                            case LONG:
                                builder.set(colName, column.asLong());
                                break;
                            case FLOAT:
                                builder.set(colName, Float.valueOf(rowData));
                                break;
                            case DOUBLE:
                                builder.set(colName, column.asDouble());
                                break;
                            case STRING:
                                builder.set(colName, column.asString());
                                break;
                            case DECIMAL:
                                builder.set(colName, new BigDecimal(column.asString()).setScale(columnsConfiguration.get(i).getInt(Key.SCALE), BigDecimal.ROUND_HALF_UP));
                                break;
                            case BOOLEAN:
                                builder.set(colName, column.asBoolean());
                                break;
                            case BINARY:
                                builder.set(colName, column.asBytes());
                                break;
                            case TIMESTAMP:
                                builder.set(colName, column.asLong() / 1000);
                                break;
                            default:
                                throw DataXException
                                        .asDataXException(
                                                HdfsWriterErrorCode.ILLEGAL_VALUE,
                                                String.format(
                                                        "您的組態檔中的列配置資訊有誤. 不支持資料庫寫入這種欄位型別. 欄位名:[%s], 欄位型別:[%s]. 請修改表中該欄位的型別或者不同步該欄位.",
                                                        columnsConfiguration.get(i).getString(Key.NAME),
                                                        columnsConfiguration.get(i).getString(Key.TYPE)));
                        }
                    }
                    catch (Exception e) {
                        // warn: 此處認為臟資料
                        String message = String.format(
                                "欄位型別轉換錯誤:目標欄位為[%s]型別,實際欄位值為[%s].",
                                columnsConfiguration.get(i).getString(Key.TYPE), column.getRawData());
                        taskPluginCollector.collectDirtyRecord(record, message);
                        break;
                    }
                }
            }
        }
        return builder.build();
    }

    public static String generateParquetSchemaFromColumnAndType(List<Configuration> columns) {
        Map<String, ColumnTypeUtil.DecimalInfo> decimalColInfo = new HashMap<>(16);
        ColumnTypeUtil.DecimalInfo PARQUET_DEFAULT_DECIMAL_INFO = new ColumnTypeUtil.DecimalInfo(10, 2);
        Types.MessageTypeBuilder typeBuilder = Types.buildMessage();
        for (Configuration column : columns) {
            String name = column.getString("name");
            String colType = column.getString("type");
            Validate.notNull(name, "column.name can't be null");
            Validate.notNull(colType, "column.type can't be null");
            switch (colType.toLowerCase()) {
                case "tinyint":
                case "smallint":
                case "int":
                    typeBuilder.optional(PrimitiveType.PrimitiveTypeName.INT32).named(name);
                    break;
                case "bigint":
                case "long":
                    typeBuilder.optional(PrimitiveType.PrimitiveTypeName.INT64).named(name);
                    break;
                case "float":
                    typeBuilder.optional(PrimitiveType.PrimitiveTypeName.FLOAT).named(name);
                    break;
                case "double":
                    typeBuilder.optional(PrimitiveType.PrimitiveTypeName.DOUBLE).named(name);
                    break;
                case "binary":
                    typeBuilder.optional(PrimitiveType.PrimitiveTypeName.BINARY).named(name);
                    break;
                case "char":
                case "varchar":
                case "string":
                    typeBuilder.optional(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named(name);
                    break;
                case "boolean":
                    typeBuilder.optional(PrimitiveType.PrimitiveTypeName.BOOLEAN).named(name);
                    break;
                case "timestamp":
                    typeBuilder.optional(PrimitiveType.PrimitiveTypeName.INT96).named(name);
                    break;
                case "date":
                    typeBuilder.optional(PrimitiveType.PrimitiveTypeName.INT32).as(OriginalType.DATE).named(name);
                    break;
                default:
                    if (ColumnTypeUtil.isDecimalType(colType)) {
                        ColumnTypeUtil.DecimalInfo decimalInfo = ColumnTypeUtil.getDecimalInfo(colType, PARQUET_DEFAULT_DECIMAL_INFO);
                        typeBuilder.optional(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY)
                                .as(OriginalType.DECIMAL)
                                .precision(decimalInfo.getPrecision())
                                .scale(decimalInfo.getScale())
                                .length(HdfsUtil.computeMinBytesForPrecision(decimalInfo.getPrecision()))
                                .named(name);

                        decimalColInfo.put(name, decimalInfo);
                    } else {
                        typeBuilder.optional(PrimitiveType.PrimitiveTypeName.BINARY).named(name);
                    }
                    break;
            }
        }
        return typeBuilder.named("m").toString();
    }
    public FileSystem getFileSystem(String defaultFS, Configuration taskConfig)
    {
        this.hadoopConf = new org.apache.hadoop.conf.Configuration();

        Configuration hadoopSiteParams = taskConfig.getConfiguration(Key.HADOOP_CONFIG);
        JSONObject hadoopSiteParamsAsJsonObject = JSON.parseObject(taskConfig.getString(Key.HADOOP_CONFIG));
        if (null != hadoopSiteParams) {
            Set<String> paramKeys = hadoopSiteParams.getKeys();
            for (String each : paramKeys) {
                hadoopConf.set(each, hadoopSiteParamsAsJsonObject.getString(each));
            }
        }
        this.hadoopConf.set(HDFS_DEFAULT_FS_KEY, defaultFS);

        //是否有Kerberos認證
        this.haveKerberos = taskConfig.getBool(Key.HAVE_KERBEROS, false);
        if (haveKerberos) {
            LOG.info("krb5.conf路徑:【{}】 \n keytab路徑:【{}】 \n principal:【{}】\n",
                    taskConfig.getString(Key. KRB5_CONF_FILE_PATH),
                    taskConfig.getString(Key.KERBEROS_KEYTAB_FILE_PATH),
                    taskConfig.getString(Key.KERBEROS_PRINCIPAL));

            this.krb5ConfPath = taskConfig.getString(Key. KRB5_CONF_FILE_PATH);
            this.kerberosKeytabFilePath = taskConfig.getString(Key.KERBEROS_KEYTAB_FILE_PATH);
            this.kerberosPrincipal = taskConfig.getString(Key.KERBEROS_PRINCIPAL);
            LOG.info("檢測到kerberos認證,正在進行認證");

        }
        System.setProperty("java.security.krb5.conf",krb5ConfPath);
        System.setProperty("sun.security.krb5.Config",krb5ConfPath);
        refreshConfig();
        this.kerberosAuthentication(this.kerberosPrincipal, this.kerberosKeytabFilePath,hadoopConf,this.krb5ConfPath);
        conf = new JobConf(hadoopConf);
        try {
            fileSystem = FileSystem.get(conf);
        }
        catch (IOException e) {
            String message = String.format("獲取FileSystem時發生網路IO例外,請檢查您的網路是否正常!HDFS地址:[message:defaultFS = %s]",
                    defaultFS);
            LOG.error(message);
            throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e);
        }
        catch (Exception e) {
            String message = String.format("獲取FileSystem失敗,請檢查HDFS地址是否正確: [%s]",
                    "message:defaultFS =" + defaultFS);
            LOG.error(message);
            throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e);
        }

        if (null == fileSystem) {
            String message = String.format("獲取FileSystem失敗,請檢查HDFS地址是否正確: [message:defaultFS = %s]",
                    defaultFS);
            LOG.error(message);
            throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, message);
        }

        return fileSystem;
    }
    /** 重繪krb內容資訊 */
    public static void refreshConfig() {
        try {
            sun.security.krb5.Config.refresh();
            Field defaultRealmField = KerberosName.class.getDeclaredField("defaultRealm");
            defaultRealmField.setAccessible(true);
            defaultRealmField.set(
                    null,
                    org.apache.hadoop.security.authentication.util.KerberosUtil.getDefaultRealm());
            // reload java.security.auth.login.config
            javax.security.auth.login.Configuration.setConfiguration(null);
        } catch (Exception e) {
            LOG.warn(
                    "resetting default realm failed, current default realm will still be used.", e);
        }
    }

    public   void kerberosAuthentication(String kerberosPrincipal, String kerberosKeytabFilePath, org.apache.hadoop.conf.Configuration hadoopConf,String krb5ConfPath) {


        hadoopConf.set("hadoop.security.authentication", "kerberos");
        hadoopConf.set("hive.security.authentication", "kerberos");
        hadoopConf.set("hadoop.security.authorization", "true");
        hadoopConf.set("dfs.permissions","false");
        hadoopConf.set("hadoop.security.auth_to_local","RULE:[2:$1@$0](.*@CDHDEV.COM)s/.*/hadoop/ \n" +
                "        DEFAULT");

        if (StringUtils.isNotBlank(kerberosPrincipal) && StringUtils.isNotBlank(kerberosKeytabFilePath)) {
            UserGroupInformation.setConfiguration(hadoopConf);
            KerberosName.resetDefaultRealm();
            try {
                LOG.info("開始認證");
                UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, kerberosKeytabFilePath);
            }
            catch (Exception e) {

                LOG.info("kerberos認證失敗");
                String message = String.format("kerberos認證失敗,請檢查 " +
                                "kerberosKeytabFilePath[%s] 和 kerberosPrincipal[%s]",
                        kerberosKeytabFilePath, kerberosPrincipal);
                e.printStackTrace();
                throw DataXException.asDataXException(HdfsWriterErrorCode.KERBEROS_LOGIN_ERROR, message, e);
            }
        }
    }

    /**
     * 獲取指定目錄下的檔案串列
     *
     * @param dir 需要搜索的目錄
     * @return 檔案陣列,檔案是全路徑,
     * eg:hdfs://10.101.204.12:9000/user/hive/warehouse/writer.db/text/test.textfile
     */
    public Path[] hdfsDirList(String dir)
    {
        Path path = new Path(dir);
        Path[] files;
        try {
            FileStatus[] status = fileSystem.listStatus(path);
            files = new Path[status.length];
            for (int i = 0; i < status.length; i++) {
                files[i] = status[i].getPath();
            }
        }
        catch (IOException e) {
            String message = String.format("獲取目錄[%s]檔案串列時發生網路IO例外,請檢查您的網路是否正常!", dir);
            LOG.error(message);
            throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e);
        }
        return files;
    }

//    public boolean isPathExists(String filePath) {
//
//        Path path = new Path(filePath);
//        boolean exist;
//        try {
//            exist = fileSystem.exists(path);
//        }
//        catch (IOException e) {
//            String message = String.format("判斷檔案路徑[%s]是否存在時發生網路IO例外,請檢查您的網路是否正常!",
//                    "message:filePath =" + filePath);
//            e.printStackTrace();
//            LOG.error(message);
//            throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e);
//        }
//        return exist;
//    }

    public boolean isPathDir(String filePath)
    {
        Path path = new Path(filePath);
        boolean isDir;
        try {
            isDir = fileSystem.getFileStatus(path).isDirectory();
        }
        catch (IOException e) {
            String message = String.format("判斷路徑[%s]是否是目錄時發生網路IO例外,請檢查您的網路是否正常!", filePath);
            LOG.error(message);
            throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e);
        }
        return isDir;
    }

    public void deleteFilesFromDir(Path dir)
    {
        try {
            final RemoteIterator<LocatedFileStatus> files = fileSystem.listFiles(dir, false);
            while (files.hasNext()) {
                final LocatedFileStatus next = files.next();
                fileSystem.deleteOnExit(next.getPath());
            }
        }
        catch (FileNotFoundException fileNotFoundException) {
            throw new DataXException(HdfsWriterErrorCode.FILE_NOT_FOUND, fileNotFoundException.getMessage());
        }
        catch (IOException ioException) {
            throw new DataXException(HdfsWriterErrorCode.IO_ERROR, ioException.getMessage());
        }
    }

    public void deleteDir(Path path)
    {
        LOG.info("start delete tmp dir [{}] .", path);
        try {
            if (fileSystem.exists(path)) {
                fileSystem.delete(path, true);
            }
        }
        catch (Exception e) {
            LOG.error("洗掉臨時目錄[{}]時發生IO例外,請檢查您的網路是否正常!", path);
            throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e);
        }
        LOG.info("finish delete tmp dir [{}] .", path);
    }

    /**
     * move all files in sourceDir to targetDir
     *
     * @param sourceDir the source directory
     * @param targetDir the target directory
     */
    public void moveFilesToDest(Path sourceDir, Path targetDir)
    {
        try {
            final FileStatus[] fileStatuses = fileSystem.listStatus(sourceDir);
            for (FileStatus file : fileStatuses) {
                if (file.isFile() && file.getLen() > 0) {
                    LOG.info("start move file [{}] to dir [{}].", file.getPath(), targetDir.getName());
                    fileSystem.rename(file.getPath(), new Path(targetDir, file.getPath().getName()));
                }
            }
        }
        catch (IOException e) {
            throw DataXException.asDataXException(HdfsWriterErrorCode.IO_ERROR, e);
        }
        LOG.info("finish move file(s).");
    }

    //關閉FileSystem
    public void closeFileSystem()
    {
        try {
            fileSystem.close();
        }
        catch (IOException e) {
            LOG.error("關閉FileSystem時發生IO例外,請檢查您的網路是否正常!");
            throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e);
        }
    }

    // 寫text file型別檔案
    public void textFileStartWrite(RecordReceiver lineReceiver, Configuration config, String fileName,
                                   TaskPluginCollector taskPluginCollector)
    {
        char fieldDelimiter = config.getChar(Key.FIELD_DELIMITER);
        List<Configuration> columns = config.getListConfiguration(Key.COLUMN);
        String compress = config.getString(Key.COMPRESS, "NONE").toUpperCase().trim();

        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmm");
        String attempt = "attempt_" + dateFormat.format(new Date()) + "_0001_m_000000_0";
        conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
        if (!"NONE".equals(compress)) {
            // fileName must remove suffix, because the FileOutputFormat will add suffix
            fileName = fileName.substring(0, fileName.lastIndexOf("."));
            Class<? extends CompressionCodec> codecClass = getCompressCodec(compress);
            if (null != codecClass) {
                FileOutputFormat.setOutputCompressorClass(conf, codecClass);
            }
        }
        Path outputPath = new Path(fileName);
        FileOutputFormat.setOutputPath(conf, outputPath);
        FileOutputFormat.setWorkOutputPath(conf, outputPath);
        try {
            RecordWriter<NullWritable, Text> writer = new TextOutputFormat<NullWritable, Text>()
                    .getRecordWriter(fileSystem, conf, outputPath.toString(), Reporter.NULL);
            Record record;
            while ((record = lineReceiver.getFromReader()) != null) {
                MutablePair<Text, Boolean> transportResult = transportOneRecord(record, fieldDelimiter, columns, taskPluginCollector);
                if (Boolean.FALSE.equals(transportResult.getRight())) {
                    writer.write(NullWritable.get(), transportResult.getLeft());
                }
            }
            writer.close(Reporter.NULL);
        }
        catch (Exception e) {
            LOG.error("寫檔案檔案[{}]時發生IO例外,請檢查您的網路是否正常!", fileName);
            Path path = new Path(fileName);
            deleteDir(path.getParent());
            throw DataXException.asDataXException(HdfsWriterErrorCode.Write_FILE_IO_ERROR, e);
        }
    }

    // compress 已經轉為大寫
    public Class<? extends CompressionCodec> getCompressCodec(String compress)
    {
        compress = compress.toUpperCase();
        Class<? extends CompressionCodec> codecClass;
        switch (compress) {
            case "GZIP":
                codecClass = org.apache.hadoop.io.compress.GzipCodec.class;
                break;
            case "BZIP2":
                codecClass = org.apache.hadoop.io.compress.BZip2Codec.class;
                break;
            case "SNAPPY":
                codecClass = org.apache.hadoop.io.compress.SnappyCodec.class;
                break;
            case "LZ4":
                codecClass = org.apache.hadoop.io.compress.Lz4Codec.class;
                break;
            case "ZSTD":
                codecClass = org.apache.hadoop.io.compress.ZStandardCodec.class;
                break;
            case "DEFLATE":
            case "ZLIB":
                codecClass = org.apache.hadoop.io.compress.DeflateCodec.class;
                break;
            default:
                throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE,
                        String.format("目前不支持您配置的 compress 模式 : [%s]", compress));
        }
        return codecClass;
    }

    /*
     * 寫Parquet file型別檔案
     * 一個parquet檔案的schema類似如下:
     * {
     *    "type": "record",
     *    "name": "testFile",
     *    "doc":  "test records",
     *    "fields":
     *      [{
     *        "name": "id",
     *        "type": ["null", "int"]
     *
     *      },
     *      {
     *        "name": "empName",
     *        "type": "string"
     *      }
     *    ]
     *  }
     * "null" 表示該欄位允許為空
     */
    public void parquetFileStartWrite(RecordReceiver lineReceiver, Configuration config, String fileName,
                                      TaskPluginCollector taskPluginCollector)
    {

        List<Configuration> columns = config.getListConfiguration(Key.COLUMN);
        String compress = config.getString(Key.COMPRESS, "UNCOMPRESSED").toUpperCase().trim();
        if ("NONE".equals(compress)) {
            compress = "UNCOMPRESSED";
        }
        // construct parquet schema
        Schema schema = generateParquetSchema(columns);

        Path path = new Path(fileName);
        LOG.info("write parquet file {}", fileName);
        CompressionCodecName codecName = CompressionCodecName.fromConf(compress);

        GenericData decimalSupport = new GenericData();
        decimalSupport.addLogicalTypeConversion(new Conversions.DecimalConversion());

        try (ParquetWriter<GenericRecord> writer = AvroParquetWriter
                .<GenericRecord>builder(path)
                .withRowGroupSize(ParquetWriter.DEFAULT_BLOCK_SIZE)
                .withPageSize(ParquetWriter.DEFAULT_PAGE_SIZE)
                .withSchema(schema)
                .withConf(hadoopConf)
                .withCompressionCodec(codecName)
                .withValidation(false)
                .withDictionaryEncoding(false)
                .withDataModel(decimalSupport)
                .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0)
                .build()) {

            Record record;
            while ((record = lineReceiver.getFromReader()) != null) {
                GenericRecordBuilder builder = new GenericRecordBuilder(schema);
                GenericRecord transportResult = transportParRecord(record, columns, taskPluginCollector, builder);
                writer.write(transportResult);
            }
        }
        catch (Exception e) {
            LOG.error("寫檔案檔案[{}]時發生IO例外,請檢查您的網路是否正常!", fileName);
            deleteDir(path.getParent());
            throw DataXException.asDataXException(HdfsWriterErrorCode.Write_FILE_IO_ERROR, e);
        }
    }

    private Schema generateParquetSchema(List<Configuration> columns)
    {
        List<Schema.Field> fields = new ArrayList<>();
        String fieldName;
        String type;
        List<Schema> unionList = new ArrayList<>(2);
        for (Configuration column : columns) {
            unionList.clear();
            fieldName = column.getString(Key.NAME);
            type = column.getString(Key.TYPE).trim().toUpperCase();
            unionList.add(Schema.create(Schema.Type.NULL));
            switch (type) {
                case "DECIMAL":
                    Schema dec = LogicalTypes
                            .decimal(column.getInt(Key.PRECISION, Constant.DEFAULT_DECIMAL_MAX_PRECISION),
                                    column.getInt(Key.SCALE, Constant.DEFAULT_DECIMAL_MAX_SCALE))
                            .addToSchema(Schema.createFixed(fieldName, null, null, 16));
                    unionList.add(dec);
                    break;
                case "DATE":
                    Schema date = LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT));
                    unionList.add(date);
                    break;
                case "TIMESTAMP":
                    Schema ts = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG));
                    unionList.add(ts);
                    break;
                case "UUID":
                    Schema uuid = LogicalTypes.uuid().addToSchema(Schema.create(Schema.Type.STRING));
                    unionList.add(uuid);
                    break;
                case "BINARY":
                    unionList.add(Schema.create(Schema.Type.BYTES));
                    break;
                default:
                    // other types
                    unionList.add(Schema.create(Schema.Type.valueOf(type)));
                    break;
            }
            fields.add(new Schema.Field(fieldName, Schema.createUnion(unionList), null, null));
        }
        Schema schema = Schema.createRecord("dataxTestParquet", null, "parquet", false);
        schema.setFields(fields);
        return schema;
    }

    private void setRow(VectorizedRowBatch batch, int row, Record record, List<Configuration> columns,
                        TaskPluginCollector taskPluginCollector)
    {
        for (int i = 0; i < columns.size(); i++) {
            Configuration eachColumnConf = columns.get(i);
            String type = eachColumnConf.getString(Key.TYPE).trim().toUpperCase();
            SupportHiveDataType columnType;
            ColumnVector col = batch.cols[i];
            if (type.startsWith("DECIMAL")) {
                columnType = SupportHiveDataType.DECIMAL;
            }
            else {
                columnType = SupportHiveDataType.valueOf(type);
            }
            if (record.getColumn(i) == null || record.getColumn(i).getRawData() == null) {
                col.isNull[row] = true;
                col.noNulls = false;
                continue;
            }

            try {
                switch (columnType) {
                    case TINYINT:
                    case SMALLINT:
                    case INT:
                    case BIGINT:
                    case BOOLEAN:
                    case DATE:
                        ((LongColumnVector) col).vector[row] = record.getColumn(i).asLong();
                        break;
                    case FLOAT:
                    case DOUBLE:
                        ((DoubleColumnVector) col).vector[row] = record.getColumn(i).asDouble();
                        break;
                    case DECIMAL:
                        HiveDecimalWritable hdw = new HiveDecimalWritable();
                        hdw.set(HiveDecimal.create(record.getColumn(i).asBigDecimal())
                                .setScale(eachColumnConf.getInt(Key.SCALE), HiveDecimal.ROUND_HALF_UP));
                        ((DecimalColumnVector) col).set(row, hdw);
                        break;
                    case TIMESTAMP:
                        ((TimestampColumnVector) col).set(row, java.sql.Timestamp.valueOf(record.getColumn(i).asString()));
                        break;
                    case STRING:
                    case VARCHAR:
                    case CHAR:
                        byte[] buffer;
                        if (record.getColumn(i).getType() == Column.Type.BYTES) {
                            //convert bytes to base64 string
                            buffer = Base64.getEncoder().encode((byte[]) record.getColumn(i).getRawData());
                        }
                        else {
                            buffer = record.getColumn(i).getRawData().toString().getBytes(StandardCharsets.UTF_8);
                        }
                        ((BytesColumnVector) col).setRef(row, buffer, 0, buffer.length);
                        break;
                    case BINARY:
                        byte[] content = (byte[]) record.getColumn(i).getRawData();
                        ((BytesColumnVector) col).setRef(row, content, 0, content.length);
                        break;
                    default:
                        throw DataXException
                                .asDataXException(
                                        HdfsWriterErrorCode.ILLEGAL_VALUE,
                                        String.format("您的組態檔中的列配置資訊有誤. 不支持資料庫寫入這種欄位型別. 欄位名:[%s], 欄位型別:[%s]. " +
                                                        "請修改表中該欄位的型別或者不同步該欄位.",
                                                eachColumnConf.getString(Key.NAME),
                                                eachColumnConf.getString(Key.TYPE)));
                }
            }
            catch (Exception e) {
                taskPluginCollector.collectDirtyRecord(record, e.getMessage());
                throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE,
                        String.format("設定Orc資料行失敗,源列型別: %s, 目的原始型別:%s, 目的列Hive型別: %s, 欄位名稱: %s, 源值: %s, 錯誤根源:%n%s",
                                record.getColumn(i).getType(), columnType, eachColumnConf.getString(Key.TYPE),
                                eachColumnConf.getString(Key.NAME),
                                record.getColumn(i).getRawData(), e));
            }
        }
    }

    /*
     * 寫orcfile型別檔案
     */
    public void orcFileStartWrite(RecordReceiver lineReceiver, Configuration config, String fileName,
                                  TaskPluginCollector taskPluginCollector)
    {
        List<Configuration> columns = config.getListConfiguration(Key.COLUMN);
        String compress = config.getString(Key.COMPRESS, "NONE").toUpperCase();
        StringJoiner joiner = new StringJoiner(",");
        for (Configuration column : columns) {
            if ("decimal".equals(column.getString(Key.TYPE))) {
                joiner.add(String.format("%s:%s(%s,%s)", column.getString(Key.NAME), "decimal",
                        column.getInt(Key.PRECISION, Constant.DEFAULT_DECIMAL_MAX_PRECISION),
                        column.getInt(Key.SCALE, Constant.DEFAULT_DECIMAL_MAX_SCALE)));
            }
            else if ("date".equalsIgnoreCase(column.getString(Key.TYPE))) {
                joiner.add(String.format("%s:bigint", column.getString(Key.NAME)));
            }
            else {
                joiner.add(String.format("%s:%s", column.getString(Key.NAME), column.getString(Key.TYPE)));
            }
        }
        TypeDescription schema = TypeDescription.fromString("struct<" + joiner + ">");
        try (Writer writer = OrcFile.createWriter(new Path(fileName),
                OrcFile.writerOptions(conf)
                        .setSchema(schema)
                        .compress(CompressionKind.valueOf(compress)))) {
            Record record;
            VectorizedRowBatch batch = schema.createRowBatch(1024);
            while ((record = lineReceiver.getFromReader()) != null) {
                int row = batch.size++;
                setRow(batch, row, record, columns, taskPluginCollector);
                if (batch.size == batch.getMaxSize()) {
                    writer.addRowBatch(batch);
                    batch.reset();
                }
            }
            if (batch.size != 0) {
                writer.addRowBatch(batch);
                batch.reset();
            }
        }
        catch (IOException e) {
            LOG.error("寫檔案檔案[{}]時發生IO例外,請檢查您的網路是否正常!", fileName);
            Path path = new Path(fileName);
            deleteDir(path.getParent());
            throw DataXException.asDataXException(HdfsWriterErrorCode.Write_FILE_IO_ERROR, e);
        }
    }
}

HdfsWriter

package com.alibaba.datax.plugin.writer.hdfswriter;

import com.alibaba.datax.common.base.Constant;
import com.alibaba.datax.common.base.Key;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.storage.util.FileHelper;
import org.apache.commons.io.Charsets;
import org.apache.commons.lang3.StringUtils;
import com.alibaba.datax.unstructuredstorage.util.ColumnTypeUtil;
import com.alibaba.datax.unstructuredstorage.util.HdfsUtil;
import org.apache.commons.lang3.Validate;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.orc.CompressionKind;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageTypeParser;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Types;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.file.Paths;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

public class HdfsWriter
        extends Writer
{
    public static class Job
            extends Writer.Job
    {
        private static final Logger LOG = LoggerFactory.getLogger(Job.class);

        // 寫入檔案的臨時目錄,完成寫入后,該目錄需要洗掉
        private String tmpStorePath;
        private Configuration writerSliceConfig = null;
        private String defaultFS;
        private String path;
        private String fileName;
        private String writeMode;
        private HdfsHelper hdfsHelper = null;

        private FileSystem filsSystem;
        public static final Set<String> SUPPORT_FORMAT = new HashSet<>(Arrays.asList("ORC", "PARQUET", "TEXT"));

        @Override
        public void init()
        {
            this.writerSliceConfig = this.getPluginJobConf();
            this.validateParameter();

            hdfsHelper = new HdfsHelper();

            filsSystem = hdfsHelper.getFileSystem(defaultFS, this.writerSliceConfig);
        }

        private void validateParameter()
        {
            this.defaultFS = this.writerSliceConfig.getNecessaryValue(Key.DEFAULT_FS, HdfsWriterErrorCode.REQUIRED_VALUE);
            //fileType check
            String fileType = this.writerSliceConfig.getNecessaryValue(Key.FILE_TYPE, HdfsWriterErrorCode.REQUIRED_VALUE).toUpperCase();
            if (!SUPPORT_FORMAT.contains(fileType)) {
                String message = String.format("[%s] 檔案格式不支持, HdfsWriter插件目前僅支持 %s, ", fileType, SUPPORT_FORMAT);
                throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, message);
            }
            //path
            this.path = this.writerSliceConfig.getNecessaryValue(Key.PATH, HdfsWriterErrorCode.REQUIRED_VALUE);
            if (!path.startsWith("/")) {
                String message = String.format("請檢查引數path:[%s],需要配置為絕對路徑", path);
                LOG.error(message);
                throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, message);
            }
            if (path.contains("*") || path.contains("?")) {
                String message = String.format("請檢查引數path:[%s],不能包含*,?等特殊字符", path);
                LOG.error(message);
                throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, message);
            }
            //fileName
            this.fileName = this.writerSliceConfig.getNecessaryValue(Key.FILE_NAME, HdfsWriterErrorCode.REQUIRED_VALUE);
            //columns check
            List<Configuration> columns = this.writerSliceConfig.getListConfiguration(Key.COLUMN);
            if (null == columns || columns.isEmpty()) {
                throw DataXException.asDataXException(HdfsWriterErrorCode.REQUIRED_VALUE, "您需要指定 columns");
            }
            else {
                boolean rewriteFlag = false;
                for (int i = 0; i < columns.size(); i++) {
                    Configuration eachColumnConf = columns.get(i);
                    eachColumnConf.getNecessaryValue(Key.NAME, HdfsWriterErrorCode.COLUMN_REQUIRED_VALUE);
                    eachColumnConf.getNecessaryValue(Key.TYPE, HdfsWriterErrorCode.COLUMN_REQUIRED_VALUE);
                    if (eachColumnConf.getString(Key.TYPE).toUpperCase().startsWith("DECIMAL")) {
                        String type = eachColumnConf.getString(Key.TYPE);
                        eachColumnConf.set(Key.TYPE, "decimal");
                        eachColumnConf.set(Key.PRECISION, getDecimalPrecision(type));
                        eachColumnConf.set(Key.SCALE, getDecimalScale(type));
                        columns.set(i, eachColumnConf);
                        rewriteFlag = true;
                    }
                }
                if (rewriteFlag) {
                    this.writerSliceConfig.set(Key.COLUMN, columns);
                }
            }
            //writeMode check
            this.writeMode = this.writerSliceConfig.getNecessaryValue(Key.WRITE_MODE, HdfsWriterErrorCode.REQUIRED_VALUE);
            if (!Constant.SUPPORTED_WRITE_MODE.contains(writeMode)) {
                throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE,
                        String.format("僅支持append, nonConflict, overwrite三種模式, 不支持您配置的 writeMode 模式 : [%s]",
                                writeMode));
            }
            if ("TEXT".equals(fileType)) {
                //fieldDelimiter check
                String fieldDelimiter = this.writerSliceConfig.getString(Key.FIELD_DELIMITER, null);
                if (StringUtils.isEmpty(fieldDelimiter)) {
                    throw DataXException.asDataXException(HdfsWriterErrorCode.REQUIRED_VALUE,
                            String.format("寫TEXT格式檔案,必須提供有效的[%s] 引數.", Key.FIELD_DELIMITER));
                }

                if (1 != fieldDelimiter.length()) {
                    // warn: if it has, length must be one
                    throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE,
                            String.format("僅僅支持單字符切分, 您配置的切分為 : [%s]", fieldDelimiter));
                }
            }

            //compress check
            String compress = this.writerSliceConfig.getString(Key.COMPRESS, "NONE").toUpperCase().trim();
            if ("ORC".equals(fileType)) {
                try {
                    CompressionKind.valueOf(compress);
                }
                catch (IllegalArgumentException e) {
                    throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE,
                            String.format("目前ORC 格式僅支持 %s 壓縮,不支持您配置的 compress 模式 : [%s]",
                                    Arrays.toString(CompressionKind.values()), compress));
                }
            }
            if ("PARQUET".equals(fileType)) {
                // parquet 默認的非壓縮標志是 UNCOMPRESSED ,而不是常見的 NONE,這里統一為 NONE
                if ("NONE".equals(compress)) {
                    compress = "UNCOMPRESSED";
                }
                try {
                    CompressionCodecName.fromConf(compress);
                }
                catch (Exception e) {
                    throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE,
                            String.format("目前PARQUET 格式僅支持 %s 壓縮, 不支持您配置的 compress 模式 : [%s]",
                                    Arrays.toString(CompressionCodecName.values()), compress));
                }
            }

            boolean haveKerberos = this.writerSliceConfig.getBool(Key.HAVE_KERBEROS, false);
            if (haveKerberos) {
                this.writerSliceConfig.getNecessaryValue(Key.KERBEROS_KEYTAB_FILE_PATH, HdfsWriterErrorCode.REQUIRED_VALUE);
                this.writerSliceConfig.getNecessaryValue(Key.KERBEROS_PRINCIPAL, HdfsWriterErrorCode.REQUIRED_VALUE);
            }

            // encoding check
            String encoding = this.writerSliceConfig.getString(Key.ENCODING, Constant.DEFAULT_ENCODING);
            try {
                encoding = encoding.trim();
                this.writerSliceConfig.set(Key.ENCODING, encoding);
                Charsets.toCharset(encoding);
            }
            catch (Exception e) {
                throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE,
                        String.format("不支持您配置的編碼格式:[%s]", encoding), e);
            }
        }


        public boolean isPathExists(String filePath) {

            Path path = new Path(filePath);
            boolean exist;
            try {
                exist = hdfsHelper.getFileSystem(this.defaultFS,this.writerSliceConfig).exists(path);
            }
            catch (IOException e) {
                String message = String.format("判斷檔案路徑[%s]是否存在時發生網路IO例外,請檢查您的網路是否正常!",
                        "message:filePath =" + filePath);
                e.printStackTrace();
                LOG.error(message);
                throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e);
            }
            return exist;
        }
        @Override
        public void prepare()
        {
            //臨時存放路徑
            this.tmpStorePath = buildTmpFilePath(path);

            //若路徑已經存在,檢查path是否是目錄
            if (isPathExists(path)) {
                if (!hdfsHelper.isPathDir(path)) {
                    throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE,
                            String.format("您配置的path: [%s] 不是一個合法的目錄, 請您注意檔案重名, 不合法目錄名等情況.", path));
                }

                //根據writeMode對目錄下檔案進行處理
                // 寫入之前,當前目錄下已有的檔案,根據writeMode判斷是否覆寫
                Path[] existFilePaths = hdfsHelper.hdfsDirList(path);

                boolean isExistFile = existFilePaths.length > 0;
                if ("append".equals(writeMode)) {
                    LOG.info("由于您配置了writeMode = append, 寫入前不做清理作業, [{}] 目錄下寫入相應檔案名前綴 [{}] 的檔案", path, fileName);
                }
                else if ("nonConflict".equals(writeMode) && isExistFile) {
                    throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE,
                            String.format("由于您配置了writeMode= nonConflict,但您配置的 path: [%s] 目錄不為空, 下面存在其他檔案或檔案夾: %s",
                                    path, String.join(",", Arrays.stream(existFilePaths).map(Path::getName).collect(Collectors.toSet()))));
                }
            }
            else {
                throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE,
                        String.format("您配置的path: [%s] 不存在, 請先創建目錄.", path));
            }
        }

        @Override
        public void post()
        {
            if ("overwrite".equals(writeMode)) {
                hdfsHelper.deleteFilesFromDir(new Path(path));
            }

            hdfsHelper.moveFilesToDest(new Path(this.tmpStorePath), new Path(this.path));

            // 洗掉臨時目錄
            hdfsHelper.deleteDir(new Path(tmpStorePath));
        }

        @Override
        public void destroy()
        {
            hdfsHelper.closeFileSystem();
        }

        @Override
        public List<Configuration> split(int mandatoryNumber)
        {
            LOG.info("begin splitting ...");

            List<Configuration> writerSplitConfigs = new ArrayList<>();
            String filePrefix = fileName;

            //獲取該路徑下的所有已有檔案串列
            Set<String> allFiles = Arrays.stream(hdfsHelper.hdfsDirList(path)).map(Path::toString).collect(Collectors.toSet());

            String fileType = this.writerSliceConfig.getString(Key.FILE_TYPE, "txt").toLowerCase();
            String tmpFullFileName;
            String endFullFileName;
            for (int i = 0; i < mandatoryNumber; i++) {
                // handle same file name
                Configuration splitTaskConfig = this.writerSliceConfig.clone();
                tmpFullFileName = String.format("%s/%s_%s.%s", tmpStorePath, filePrefix, FileHelper.generateFileMiddleName(), fileType);
                endFullFileName = String.format("%s/%s_%s.%s", path, filePrefix, FileHelper.generateFileMiddleName(), fileType);

                // 如果檔案已經存在,則重新生成檔案名
                while (allFiles.contains(endFullFileName)) {
                    tmpFullFileName = String.format("%s/%s_%s.%s", tmpStorePath, filePrefix, FileHelper.generateFileMiddleName(), fileType);
                    endFullFileName = String.format("%s/%s_%s.%s", path, filePrefix, FileHelper.generateFileMiddleName(), fileType);
                }
                allFiles.add(endFullFileName);

                splitTaskConfig.set(Key.FILE_NAME, tmpFullFileName);

                LOG.info("split wrote file name:[{}]", tmpFullFileName);

                writerSplitConfigs.add(splitTaskConfig);
            }
            LOG.info("end splitting.");
            return writerSplitConfigs;
        }

        /**
         * 創建臨時目錄
         * 在給定目錄的下,創建一個已點開頭,uuid為名字的檔案夾,用于臨時存盤寫入的檔案
         *
         * @param userPath hdfs path
         * @return temporary path
         */
        private String buildTmpFilePath(String userPath)
        {
            String tmpDir;
            String tmpFilePath;

          //  while (true) {
                tmpDir = "." + UUID.randomUUID().toString().replace('-', '_');
                tmpFilePath = Paths.get(userPath, tmpDir).toString();
                if (isPathExists(tmpFilePath)) {
                    return tmpFilePath;
                } else {
                    return null;
                }
            //}
        }

        /**
         * get decimal type precision
         * if not specified, use DECIMAL_DEFAULT_PRECISION as default
         * example:
         * <pre>
         *  decimal -&gt; 38
         *  decimal(10) -&gt; 10
         *  </pre>
         *
         * @param type decimal type including precision and scale (if present)
         * @return decimal precision
         */
        private static int getDecimalPrecision(String type)
        {
            if (!type.contains("(")) {
                return Constant.DEFAULT_DECIMAL_MAX_PRECISION;
            }
            else {
                String regEx = "[^0-9]";
                Pattern p = Pattern.compile(regEx);
                Matcher m = p.matcher(type);
                return Integer.parseInt(m.replaceAll(" ").trim().split(" ")[0]);
            }
        }

        /**
         * get decimal type scale
         * if precision is not present, return DECIMAL_DEFAULT_SCALE
         * if precision is present and not specify scale, return 0
         * example:
         * <pre>
         *  decimal -&gt; 10
         *  decimal(8) -&gt; 0
         *  decimal(8,2) -&gt; 2
         *  </pre>
         *
         * @param type decimal type string, including precision and scale (if present)
         * @return decimal scale
         */
        private static int getDecimalScale(String type)
        {
            if (!type.contains("(")) {
                return Constant.DEFAULT_DECIMAL_MAX_SCALE;
            }
            if (!type.contains(",")) {
                return 0;
            }
            else {
                return Integer.parseInt(type.split(",")[1].replace(")", "").trim());
            }
        }

        public void unitizeParquetConfig(Configuration writerSliceConfig) {
            String parquetSchema = writerSliceConfig.getString(Key.PARQUET_SCHEMA);
            if (StringUtils.isNotBlank(parquetSchema)) {
                LOG.info("parquetSchema has config. use parquetSchema:\n{}", parquetSchema);
                return;
            }

            List<Configuration> columns = writerSliceConfig.getListConfiguration(Key.COLUMN);
            if (columns == null || columns.isEmpty()) {
                throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_BLANK,"parquetSchema or column can't be blank!");
            }

            parquetSchema = generateParquetSchemaFromColumn(columns);
            // 為了兼容歷史邏輯,對之前的邏輯做保留,但是如果配置的時候報錯,則走新邏輯
            try {
                MessageTypeParser.parseMessageType(parquetSchema);
            } catch (Throwable e) {
                LOG.warn("The generated parquetSchema {} is illegal, try to generate parquetSchema in another way", parquetSchema);
                parquetSchema = HdfsHelper.generateParquetSchemaFromColumnAndType(columns);
                LOG.info("The last generated parquet schema is {}", parquetSchema);
            }
            writerSliceConfig.set(Key.PARQUET_SCHEMA, parquetSchema);
            LOG.info("DataxParquetMode use default fields.");
            writerSliceConfig.set(Key.PARQUET_MODE, "fields");
        }

        private String generateParquetSchemaFromColumn(List<Configuration> columns) {
            StringBuffer parquetSchemaStringBuffer = new StringBuffer();
            parquetSchemaStringBuffer.append("message m {");
            for (Configuration column: columns) {
                String name = column.getString("name");
                Validate.notNull(name, "column.name can't be null");

                String type = column.getString("type");
                Validate.notNull(type, "column.type can't be null");

                String parquetColumn = String.format("optional %s %s;", type, name);
                parquetSchemaStringBuffer.append(parquetColumn);
            }
            parquetSchemaStringBuffer.append("}");
            String parquetSchema = parquetSchemaStringBuffer.toString();
            LOG.info("generate parquetSchema:\n{}", parquetSchema);
            return parquetSchema;
        }


    }

    public static class Task
            extends Writer.Task
    {
        private static final Logger LOG = LoggerFactory.getLogger(Task.class);

        private Configuration writerSliceConfig;

        private String fileType;
        private String fileName;

        private HdfsHelper hdfsHelper = null;

        @Override
        public void init()
        {
            this.writerSliceConfig = this.getPluginJobConf();

            String defaultFS = this.writerSliceConfig.getString(Key.DEFAULT_FS);
            this.fileType = this.writerSliceConfig.getString(Key.FILE_TYPE).toUpperCase();

            hdfsHelper = new HdfsHelper();
            hdfsHelper.getFileSystem(defaultFS, writerSliceConfig);
            //得當的已經是絕對路徑,eg:/user/hive/warehouse/writer.db/text/test.snappy
            this.fileName = this.writerSliceConfig.getString(Key.FILE_NAME);
        }

        @Override
        public void prepare()
        {
            //
        }

        @Override
        public void startWrite(RecordReceiver lineReceiver)
        {
            LOG.info("write to file : [{}]", this.fileName);
            if ("TEXT".equals(fileType)) {
                //寫TEXT FILE
                hdfsHelper.textFileStartWrite(lineReceiver, writerSliceConfig, fileName, getTaskPluginCollector());
            }
            else if ("ORC".equals(fileType)) {
                //寫ORC FILE
                hdfsHelper.orcFileStartWrite(lineReceiver, writerSliceConfig, fileName, getTaskPluginCollector());
            }
            else if ("PARQUET".equals(fileType)) {
                //寫Parquet FILE
                hdfsHelper.parquetFileStartWrite(lineReceiver, writerSliceConfig, fileName, getTaskPluginCollector());
            }

            LOG.info("end do write");
        }

        @Override
        public void post()
        {
            //
        }

        @Override
        public void destroy()
        {
            //
        }
    }
}

    以上類需修改或增加方法,以支持Parquet檔案的讀寫,當前代碼已在生產環境穩定運行一年有余,未遇到報錯問題,大家如有問題可聯系我,

 

 

 

 

 

 

 

轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/543822.html

標籤:其他

上一篇:DataX同步mysql資料報錯 無法連接mysql

下一篇:Mysql存盤引擎

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • GPU虛擬機創建時間深度優化

    **?桔妹導讀:**GPU虛擬機實體創建速度慢是公有云面臨的普遍問題,由于通常情況下創建虛擬機屬于低頻操作而未引起業界的重視,實際生產中還是存在對GPU實體創建時間有苛刻要求的業務場景。本文將介紹滴滴云在解決該問題時的思路、方法、并展示最終的優化成果。 從公有云服務商那里購買過虛擬主機的資深用戶,一 ......

    uj5u.com 2020-09-10 06:09:13 more
  • 可編程網卡芯片在滴滴云網路的應用實踐

    **?桔妹導讀:**隨著云規模不斷擴大以及業務層面對延遲、帶寬的要求越來越高,采用DPDK 加速網路報文處理的方式在橫向縱向擴展都出現了局限性。可編程芯片成為業界熱點。本文主要講述了可編程網卡芯片在滴滴云網路中的應用實踐,遇到的問題、帶來的收益以及開源社區貢獻。 #1. 資料中心面臨的問題 隨著滴滴 ......

    uj5u.com 2020-09-10 06:10:21 more
  • 滴滴資料通道服務演進之路

    **?桔妹導讀:**滴滴資料通道引擎承載著全公司的資料同步,為下游實時和離線場景提供了必不可少的源資料。隨著任務量的不斷增加,資料通道的整體架構也隨之發生改變。本文介紹了滴滴資料通道的發展歷程,遇到的問題以及今后的規劃。 #1. 背景 資料,對于任何一家互聯網公司來說都是非常重要的資產,公司的大資料 ......

    uj5u.com 2020-09-10 06:11:05 more
  • 滴滴AI Labs斬獲國際機器翻譯大賽中譯英方向世界第三

    **桔妹導讀:**深耕人工智能領域,致力于探索AI讓出行更美好的滴滴AI Labs再次斬獲國際大獎,這次獲獎的專案是什么呢?一起來看看詳細報道吧! 近日,由國際計算語言學協會ACL(The Association for Computational Linguistics)舉辦的世界最具影響力的機器 ......

    uj5u.com 2020-09-10 06:11:29 more
  • MPP (Massively Parallel Processing)大規模并行處理

    1、什么是mpp? MPP (Massively Parallel Processing),即大規模并行處理,在資料庫非共享集群中,每個節點都有獨立的磁盤存盤系統和記憶體系統,業務資料根據資料庫模型和應用特點劃分到各個節點上,每臺資料節點通過專用網路或者商業通用網路互相連接,彼此協同計算,作為整體提供 ......

    uj5u.com 2020-09-10 06:11:41 more
  • 滴滴資料倉庫指標體系建設實踐

    **桔妹導讀:**指標體系是什么?如何使用OSM模型和AARRR模型搭建指標體系?如何統一流程、規范化、工具化管理指標體系?本文會對建設的方法論結合滴滴資料指標體系建設實踐進行解答分析。 #1. 什么是指標體系 ##1.1 指標體系定義 指標體系是將零散單點的具有相互聯系的指標,系統化的組織起來,通 ......

    uj5u.com 2020-09-10 06:12:52 more
  • 單表千萬行資料庫 LIKE 搜索優化手記

    我們經常在資料庫中使用 LIKE 運算子來完成對資料的模糊搜索,LIKE 運算子用于在 WHERE 子句中搜索列中的指定模式。 如果需要查找客戶表中所有姓氏是“張”的資料,可以使用下面的 SQL 陳述句: SELECT * FROM Customer WHERE Name LIKE '張%' 如果需要 ......

    uj5u.com 2020-09-10 06:13:25 more
  • 滴滴Ceph分布式存盤系統優化之鎖優化

    **桔妹導讀:**Ceph是國際知名的開源分布式存盤系統,在工業界和學術界都有著重要的影響。Ceph的架構和演算法設計發表在國際系統領域頂級會議OSDI、SOSP、SC等上。Ceph社區得到Red Hat、SUSE、Intel等大公司的大力支持。Ceph是國際云計算領域應用最廣泛的開源分布式存盤系統, ......

    uj5u.com 2020-09-10 06:14:51 more
  • es~通過ElasticsearchTemplate進行聚合~嵌套聚合

    之前寫過《es~通過ElasticsearchTemplate進行聚合操作》的文章,這一次主要寫一個嵌套的聚合,例如先對sex集合,再對desc聚合,最后再對age求和,共三層嵌套。 Aggregations的部分特性類似于SQL語言中的group by,avg,sum等函式,Aggregation ......

    uj5u.com 2020-09-10 06:14:59 more
  • 爬蟲日志監控 -- Elastc Stack(ELK)部署

    傻瓜式部署,只需替換IP與用戶 導讀: 現ELK四大組件分別為:Elasticsearch(核心)、logstash(處理)、filebeat(采集)、kibana(可視化) 下載均在https://www.elastic.co/cn/downloads/下tar包,各組件版本最好一致,配合fdm會 ......

    uj5u.com 2020-09-10 06:15:05 more
最新发布
  • day02-2-商鋪查詢快取

    功能02-商鋪查詢快取 3.商鋪詳情快取查詢 3.1什么是快取? 快取就是資料交換的緩沖區(稱作Cache),是存盤資料的臨時地方,一般讀寫性能較高。 快取的作用: 降低后端負載 提高讀寫效率,降低回應時間 快取的成本: 資料一致性成本 代碼維護成本 運維成本 3.2需求說明 如下,當我們點擊商店詳 ......

    uj5u.com 2023-04-20 08:33:24 more
  • MySQL中binlog備份腳本分享

    關于MySQL的二進制日志(binlog),我們都知道二進制日志(binlog)非常重要,尤其當你需要point to point災難恢復的時侯,所以我們要對其進行備份。關于二進制日志(binlog)的備份,可以基于flush logs方式先切換binlog,然后拷貝&壓縮到到遠程服務器或本地服務器 ......

    uj5u.com 2023-04-20 08:28:06 more
  • day02-短信登錄

    功能實作02 2.功能01-短信登錄 2.1基于Session實作登錄 2.1.1思路分析 2.1.2代碼實作 2.1.2.1發送短信驗證碼 發送短信驗證碼: 發送驗證碼的介面為:http://127.0.0.1:8080/api/user/code?phone=xxxxx<手機號> 請求方式:PO ......

    uj5u.com 2023-04-20 08:27:27 more
  • 快取與資料庫雙寫一致性幾種策略分析

    本文將對幾種快取與資料庫保證資料一致性的使用方式進行分析。為保證高并發性能,以下分析場景不考慮執行的原子性及加鎖等強一致性要求的場景,僅追求最終一致性。 ......

    uj5u.com 2023-04-20 08:26:48 more
  • sql陳述句優化

    問題查找及措施 問題查找 需要找到具體的代碼,對其進行一對一優化,而非一直把關注點放在服務器和sql平臺 降低簡化每個事務中處理的問題,盡量不要讓一個事務拖太長的時間 例如檔案上傳時,應將檔案上傳這一步放在事務外面 微軟建議 4.啟動sql定時執行計劃 怎么啟動sqlserver代理服務-百度經驗 ......

    uj5u.com 2023-04-20 08:26:35 more
  • 云時代,MySQL到ClickHouse資料同步產品對比推薦

    ClickHouse 在執行分析查詢時的速度優勢很好的彌補了MySQL的不足,但是對于很多開發者和DBA來說,如何將MySQL穩定、高效、簡單的同步到 ClickHouse 卻很困難。本文對比了 NineData、MaterializeMySQL(ClickHouse自帶)、Bifrost 三款產品... ......

    uj5u.com 2023-04-20 08:26:29 more
  • sql陳述句優化

    問題查找及措施 問題查找 需要找到具體的代碼,對其進行一對一優化,而非一直把關注點放在服務器和sql平臺 降低簡化每個事務中處理的問題,盡量不要讓一個事務拖太長的時間 例如檔案上傳時,應將檔案上傳這一步放在事務外面 微軟建議 4.啟動sql定時執行計劃 怎么啟動sqlserver代理服務-百度經驗 ......

    uj5u.com 2023-04-20 08:25:13 more
  • Redis 報”OutOfDirectMemoryError“(堆外記憶體溢位)

    Redis 報錯“OutOfDirectMemoryError(堆外記憶體溢位) ”問題如下: 一、報錯資訊: 使用 Redis 的業務介面 ,產生 OutOfDirectMemoryError(堆外記憶體溢位),如圖: 格式化后的報錯資訊: { "timestamp": "2023-04-17 22: ......

    uj5u.com 2023-04-20 08:24:54 more
  • day02-2-商鋪查詢快取

    功能02-商鋪查詢快取 3.商鋪詳情快取查詢 3.1什么是快取? 快取就是資料交換的緩沖區(稱作Cache),是存盤資料的臨時地方,一般讀寫性能較高。 快取的作用: 降低后端負載 提高讀寫效率,降低回應時間 快取的成本: 資料一致性成本 代碼維護成本 運維成本 3.2需求說明 如下,當我們點擊商店詳 ......

    uj5u.com 2023-04-20 08:24:03 more
  • day02-短信登錄

    功能實作02 2.功能01-短信登錄 2.1基于Session實作登錄 2.1.1思路分析 2.1.2代碼實作 2.1.2.1發送短信驗證碼 發送短信驗證碼: 發送驗證碼的介面為:http://127.0.0.1:8080/api/user/code?phone=xxxxx<手機號> 請求方式:PO ......

    uj5u.com 2023-04-20 08:23:11 more