一、研發背景
DataX官方開源的版本支持HDFS檔案的讀寫,但是截止目前,并沒有支持Parquet檔案的讀寫,得益于DataX出色的資料同步性能,去年公司的專案大部分采用了DataX作為資料同步工具,但是從CDH集群同步Parquet或者將其他資料源的資料以Parquet格式寫入HDFS,這兩個常用場景沒有進行支持,因此只能自己動手,補充HdfsReader和HdfsWriter插件,以支持Parquet檔案的讀寫,
二、HdfsReader插件
本插件比較簡單,一共五個類,具體類名及對應修改項如下:
- DFSUtil:增加是否Parquet檔案型別判斷方法、增加Parquet檔案讀取轉換方法,
- HdfsConstant:增加Parquet檔案類的列舉項,
- HdfsReader:增加判斷是否配置為Parquet檔案型別的判斷條件分支,
- HdfsReaderErrorCode:無需更改,
- Type:無需更改,
按需修改其中四個類即可,具體代碼如下:
DFSUtil
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.google.common.primitives.Ints; import com.google.common.primitives.Longs; import com.alibaba.datax.common.base.Key; import com.alibaba.datax.common.element.BoolColumn; import com.alibaba.datax.common.element.BytesColumn; import com.alibaba.datax.common.element.Column; import com.alibaba.datax.common.element.ColumnEntry; import com.alibaba.datax.common.element.DateColumn; import com.alibaba.datax.common.element.DoubleColumn; import com.alibaba.datax.common.element.LongColumn; import com.alibaba.datax.common.element.Record; import com.alibaba.datax.common.element.StringColumn; import com.alibaba.datax.common.exception.DataXException; import com.alibaba.datax.common.plugin.RecordSender; import com.alibaba.datax.common.plugin.TaskPluginCollector; import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.storage.reader.StorageReaderErrorCode; import com.alibaba.datax.storage.reader.StorageReaderUtil; import org.apache.avro.Conversions; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.io.RCFile; import org.apache.hadoop.hive.ql.io.RCFileRecordReader; import org.apache.hadoop.hive.ql.io.orc.OrcFile; import org.apache.hadoop.hive.ql.io.orc.Reader; import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable; import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ReflectionUtils; import org.apache.orc.TypeDescription; import org.apache.parquet.avro.AvroParquetReader; import org.apache.parquet.example.data.Group; import org.apache.parquet.hadoop.ParquetReader; import org.apache.parquet.hadoop.example.GroupReadSupport; import org.apache.parquet.hadoop.util.HadoopInputFile; import org.apache.parquet.io.api.Binary; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.InputStream; import java.math.BigDecimal; import java.math.RoundingMode; import java.nio.ByteBuffer; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; import java.util.Date; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; import static com.alibaba.datax.common.base.Key.COLUMN; import static com.alibaba.datax.common.base.Key.NULL_FORMAT; public class DFSUtil { private static final Logger LOG = LoggerFactory.getLogger(DFSUtil.class); // the offset of julian, 2440588 is 1970/1/1 private static final int JULIAN_EPOCH_OFFSET_DAYS = 2440588; private static final long MILLIS_IN_DAY = TimeUnit.DAYS.toMillis(1); private static final long NANOS_PER_MILLISECOND = TimeUnit.MILLISECONDS.toNanos(1); private static final int DIRECTORY_SIZE_GUESS = 16 * 1024; private final org.apache.hadoop.conf.Configuration hadoopConf; private final boolean haveKerberos; private final HashSet<String> sourceHDFSAllFilesList = new HashSet<>(); private String specifiedFileType = null; private String kerberosKeytabFilePath; private String kerberosPrincipal; public DFSUtil(Configuration taskConfig) { hadoopConf = new org.apache.hadoop.conf.Configuration(); //io.file.buffer.size 性能引數 //http://blog.csdn.net/yangjl38/article/details/7583374 Configuration hadoopSiteParams = taskConfig.getConfiguration(Key.HADOOP_CONFIG); JSONObject hadoopSiteParamsAsJsonObject = JSON.parseObject(taskConfig.getString(Key.HADOOP_CONFIG)); if (null != hadoopSiteParams) { Set<String> paramKeys = hadoopSiteParams.getKeys(); for (String each : paramKeys) { hadoopConf.set(each, hadoopSiteParamsAsJsonObject.getString(each)); } } hadoopConf.set(HdfsConstant.HDFS_DEFAULT_KEY, taskConfig.getString(Key.DEFAULT_FS)); //是否有Kerberos認證 this.haveKerberos = taskConfig.getBool(Key.HAVE_KERBEROS, false); if (haveKerberos) { this.kerberosKeytabFilePath = taskConfig.getString(Key.KERBEROS_KEYTAB_FILE_PATH); this.kerberosPrincipal = taskConfig.getString(Key.KERBEROS_PRINCIPAL); this.hadoopConf.set(HdfsConstant.HADOOP_SECURITY_AUTHENTICATION_KEY, "kerberos"); } this.kerberosAuthentication(this.kerberosPrincipal, this.kerberosKeytabFilePath); LOG.info("hadoopConfig details:{}", JSON.toJSONString(this.hadoopConf)); } private void kerberosAuthentication(String kerberosPrincipal, String kerberosKeytabFilePath) { if (haveKerberos && StringUtils.isNotBlank(kerberosPrincipal) && StringUtils.isNotBlank(kerberosKeytabFilePath)) { UserGroupInformation.setConfiguration(hadoopConf); try { UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, kerberosKeytabFilePath); } catch (Exception e) { String message = String.format("kerberos認證失敗,請確定kerberosKeytabFilePath[%s]和kerberosPrincipal[%s]填寫正確", kerberosKeytabFilePath, kerberosPrincipal); throw DataXException.asDataXException(HdfsReaderErrorCode.KERBEROS_LOGIN_ERROR, message, e); } } } /** * 獲取指定路徑串列下符合條件的所有檔案的絕對路徑 * * @param srcPaths 路徑串列 * @param specifiedFileType 指定檔案型別 * @return set of string */ public Set<String> getAllFiles(List<String> srcPaths, String specifiedFileType) { this.specifiedFileType = specifiedFileType; if (!srcPaths.isEmpty()) { for (String eachPath : srcPaths) { LOG.info("get HDFS all files in path = [{}]", eachPath); getHDFSAllFiles(eachPath); } } return sourceHDFSAllFilesList; } private void addSourceFileIfNotEmpty(FileStatus f) { if (f.isFile()) { String filePath = f.getPath().toString(); if (f.getLen() > 0) { addSourceFileByType(filePath); } else { LOG.warn("檔案[{}]長度為0,將會跳過不作處理!", filePath); } } } public void getHDFSAllFiles(String hdfsPath) { try { FileSystem hdfs = FileSystem.get(hadoopConf); //判斷hdfsPath是否包含正則符號 if (hdfsPath.contains("*") || hdfsPath.contains("?")) { Path path = new Path(hdfsPath); FileStatus[] stats = hdfs.globStatus(path); for (FileStatus f : stats) { if (f.isFile()) { addSourceFileIfNotEmpty(f); } else if (f.isDirectory()) { getHDFSAllFilesNORegex(f.getPath().toString(), hdfs); } } } else { getHDFSAllFilesNORegex(hdfsPath, hdfs); } } catch (IOException e) { String message = String.format("無法讀取路徑[%s]下的所有檔案,請確認您的配置項fs.defaultFS, path的值是否正確," + "是否有讀寫權限,網路是否已斷開!", hdfsPath); LOG.error(message); throw DataXException.asDataXException(HdfsReaderErrorCode.PATH_CONFIG_ERROR, e); } } private void getHDFSAllFilesNORegex(String path, FileSystem hdfs) throws IOException { // 獲取要讀取的檔案的根目錄 Path listFiles = new Path(path); // If the network disconnected, this method will retry 45 times // each time the retry interval for 20 seconds // 獲取要讀取的檔案的根目錄的所有二級子檔案目錄 FileStatus[] stats = hdfs.listStatus(listFiles); for (FileStatus f : stats) { // 判斷是不是目錄,如果是目錄,遞回呼叫 if (f.isDirectory()) { LOG.info("[{}] 是目錄, 遞回獲取該目錄下的檔案", f.getPath()); getHDFSAllFilesNORegex(f.getPath().toString(), hdfs); } else if (f.isFile()) { addSourceFileIfNotEmpty(f); } else { String message = String.format("該路徑[%s]檔案型別既不是目錄也不是檔案,插件自動忽略,", f.getPath()); LOG.info(message); } } } // 根據用戶指定的檔案型別,將指定的檔案型別的路徑加入sourceHDFSAllFilesList private void addSourceFileByType(String filePath) { // 檢查file的型別和用戶配置的fileType型別是否一致 boolean isMatchedFileType = checkHdfsFileType(filePath, this.specifiedFileType); if (isMatchedFileType) { String msg = String.format("[%s]是[%s]型別的檔案, 將該檔案加入source files串列", filePath, this.specifiedFileType); LOG.info(msg); sourceHDFSAllFilesList.add(filePath); } else { String message = String.format("檔案[%s]的型別與用戶配置的fileType型別不一致," + "請確認您配置的目錄下面所有檔案的型別均為[%s]" , filePath, this.specifiedFileType); LOG.error(message); throw DataXException.asDataXException( HdfsReaderErrorCode.FILE_TYPE_UNSUPPORTED, message); } } public InputStream getInputStream(String filepath) { InputStream inputStream; Path path = new Path(filepath); try { FileSystem fs = FileSystem.get(hadoopConf); //If the network disconnected, this method will retry 45 times //each time the retry interval for 20 seconds inputStream = fs.open(path); return inputStream; } catch (IOException e) { String message = String.format("讀取檔案 : [%s] 時出錯,請確認檔案:[%s]存在且配置的用戶有權限讀取", filepath, filepath); throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message, e); } } public void sequenceFileStartRead(String sourceSequenceFilePath, Configuration readerSliceConfig, RecordSender recordSender, TaskPluginCollector taskPluginCollector) { LOG.info("Start Read sequence file [{}].", sourceSequenceFilePath); Path seqFilePath = new Path(sourceSequenceFilePath); try (SequenceFile.Reader reader = new SequenceFile.Reader(this.hadoopConf, SequenceFile.Reader.file(seqFilePath))) { //獲取SequenceFile.Reader實體 //獲取key 與 value Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), this.hadoopConf); Text value = new Text(); while (reader.next(key, value)) { if (StringUtils.isNotBlank(value.toString())) { StorageReaderUtil.transportOneRecord(recordSender, readerSliceConfig, taskPluginCollector, value.toString()); } } } catch (Exception e) { String message = String.format("SequenceFile.Reader讀取檔案[%s]時出錯", sourceSequenceFilePath); LOG.error(message); throw DataXException.asDataXException(HdfsReaderErrorCode.READ_SEQUENCE_FILE_ERROR, message, e); } } public void rcFileStartRead(String sourceRcFilePath, Configuration readerSliceConfig, RecordSender recordSender, TaskPluginCollector taskPluginCollector) { LOG.info("Start Read rc-file [{}].", sourceRcFilePath); List<ColumnEntry> column = StorageReaderUtil .getListColumnEntry(readerSliceConfig, COLUMN); // warn: no default value '\N' String nullFormat = readerSliceConfig.getString(NULL_FORMAT); Path rcFilePath = new Path(sourceRcFilePath); RCFileRecordReader recordReader = null; try (FileSystem fs = FileSystem.get(rcFilePath.toUri(), hadoopConf)) { long fileLen = fs.getFileStatus(rcFilePath).getLen(); FileSplit split = new FileSplit(rcFilePath, 0, fileLen, (String[]) null); recordReader = new RCFileRecordReader(hadoopConf, split); LongWritable key = new LongWritable(); BytesRefArrayWritable value = new BytesRefArrayWritable(); Text txt = new Text(); while (recordReader.next(key, value)) { String[] sourceLine = new String[value.size()]; txt.clear(); for (int i = 0; i < value.size(); i++) { BytesRefWritable v = value.get(i); txt.set(v.getData(), v.getStart(), v.getLength()); sourceLine[i] = txt.toString(); } StorageReaderUtil.transportOneRecord(recordSender, column, sourceLine, nullFormat, taskPluginCollector); } } catch (IOException e) { String message = String.format("讀取檔案[%s]時出錯", sourceRcFilePath); LOG.error(message); throw DataXException.asDataXException(HdfsReaderErrorCode.READ_RCFILE_ERROR, message, e); } finally { try { if (recordReader != null) { recordReader.close(); LOG.info("Finally, Close RCFileRecordReader."); } } catch (IOException e) { LOG.warn(String.format("finally: 關閉RCFileRecordReader失敗, %s", e.getMessage())); } } } public void orcFileStartRead(String sourceOrcFilePath, Configuration readerSliceConfig, RecordSender recordSender, TaskPluginCollector taskPluginCollector) { LOG.info("Start Read orc-file [{}].", sourceOrcFilePath); List<ColumnEntry> column = StorageReaderUtil.getListColumnEntry(readerSliceConfig, COLUMN); String nullFormat = readerSliceConfig.getString(NULL_FORMAT); try { Path orcFilePath = new Path(sourceOrcFilePath); Reader reader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(hadoopConf)); TypeDescription schema = reader.getSchema(); assert column != null; if (column.isEmpty()) { for (int i = 0; i < schema.getChildren().size(); i++) { ColumnEntry columnEntry = new ColumnEntry(); columnEntry.setIndex(i); columnEntry.setType(schema.getChildren().get(i).getCategory().getName()); column.add(columnEntry); } } VectorizedRowBatch rowBatch = schema.createRowBatch(1024); org.apache.orc.RecordReader rowIterator = reader.rows(reader.options().schema(schema)); while (rowIterator.nextBatch(rowBatch)) { transportOrcRecord(rowBatch, column, recordSender, taskPluginCollector, nullFormat); } } catch (Exception e) { String message = String.format("從orc-file檔案路徑[%s]中讀取資料發生例外,請聯系系統管理員," , sourceOrcFilePath); LOG.error(message); throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message); } } private void transportOrcRecord(VectorizedRowBatch rowBatch, List<ColumnEntry> columns, RecordSender recordSender, TaskPluginCollector taskPluginCollector, String nullFormat) { Record record; for (int row = 0; row < rowBatch.size; row++) { record = recordSender.createRecord(); try { for (ColumnEntry column : columns) { Column columnGenerated; if (column.getValue() != null) { if (!"null".equals(column.getValue())) { columnGenerated = new StringColumn(column.getValue()); } else { columnGenerated = new StringColumn(); } record.addColumn(columnGenerated); continue; } int i = column.getIndex(); String columnType = column.getType().toUpperCase(); ColumnVector col = rowBatch.cols[i]; Type type = Type.valueOf(columnType); if (col.isNull[row]) { record.addColumn(new StringColumn(null)); continue; } switch (type) { case INT: case LONG: case BOOLEAN: case BIGINT: columnGenerated = new LongColumn(((LongColumnVector) col).vector[row]); break; case DATE: columnGenerated = new DateColumn(new Date(((LongColumnVector) col).vector[row])); break; case DOUBLE: columnGenerated = new DoubleColumn(((DoubleColumnVector) col).vector[row]); break; case DECIMAL: columnGenerated = new DoubleColumn(((DecimalColumnVector) col).vector[row].doubleValue()); break; case BINARY: BytesColumnVector b = (BytesColumnVector) col; byte[] val = Arrays.copyOfRange(b.vector[row], b.start[row], b.start[row] + b.length[row]); columnGenerated = new BytesColumn(val); break; case TIMESTAMP: columnGenerated = new DateColumn(((TimestampColumnVector) col).getTime(row)); break; default: // type is string or other String v = ((BytesColumnVector) col).toString(row); columnGenerated = v.equals(nullFormat) ? new StringColumn() : new StringColumn(v); break; } record.addColumn(columnGenerated); } recordSender.sendToWriter(record); } catch (Exception e) { if (e instanceof DataXException) { throw (DataXException) e; } taskPluginCollector.collectDirtyRecord(record, e.getMessage()); } } } public void parquetFileStartRead(String sourceParquetFilePath, Configuration readerSliceConfig, RecordSender recordSender, TaskPluginCollector taskPluginCollector) { LOG.info("Start Read parquet-file [{}].", sourceParquetFilePath); List<ColumnEntry> column = StorageReaderUtil.getListColumnEntry(readerSliceConfig, COLUMN); String nullFormat = readerSliceConfig.getString(NULL_FORMAT); Path parquetFilePath = new Path(sourceParquetFilePath); hadoopConf.set("parquet.avro.readInt96AsFixed", "true"); JobConf conf = new JobConf(hadoopConf); GenericData decimalSupport = new GenericData(); decimalSupport.addLogicalTypeConversion(new Conversions.DecimalConversion()); try (ParquetReader<GenericData.Record> reader = AvroParquetReader .<GenericData.Record>builder(HadoopInputFile.fromPath(parquetFilePath, hadoopConf)) .withDataModel(decimalSupport) .withConf(conf) .build()) { GenericData.Record gRecord = reader.read(); Schema schema = gRecord.getSchema(); if (null == column || column.isEmpty()) { column = new ArrayList<>(schema.getFields().size()); String sType; // 用戶沒有填寫具體的欄位資訊,需要從parquet檔案構建 for (int i = 0; i < schema.getFields().size(); i++) { ColumnEntry columnEntry = new ColumnEntry(); columnEntry.setIndex(i); Schema type; if (schema.getFields().get(i).schema().getType() == Schema.Type.UNION) { type = schema.getFields().get(i).schema().getTypes().get(1); } else { type = schema.getFields().get(i).schema(); } sType = type.getProp("logicalType") != null ? type.getProp("logicalType") : type.getType().getName(); if (sType.startsWith("timestamp")) { columnEntry.setType("timestamp"); } else { columnEntry.setType(sType); } column.add(columnEntry); } } while (gRecord != null) { transportParquetRecord(column, gRecord, recordSender, taskPluginCollector, nullFormat); gRecord = reader.read(); } } catch (IOException e) { String message = String.format("從parquet file檔案路徑[%s]中讀取資料發生例外,請聯系系統管理員," , sourceParquetFilePath); LOG.error(message); throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message); } } /* * create a transport record for Parquet file * * */ private void transportParquetRecord(List<ColumnEntry> columnConfigs, GenericData.Record gRecord, RecordSender recordSender, TaskPluginCollector taskPluginCollector, String nullFormat) { Record record = recordSender.createRecord(); Column columnGenerated; int scale = 10; try { for (ColumnEntry columnEntry : columnConfigs) { String columnType = columnEntry.getType(); Integer columnIndex = columnEntry.getIndex(); String columnConst = columnEntry.getValue(); String columnValue = null; if (null != columnIndex) { if (null != gRecord.get(columnIndex)) { columnValue = gRecord.get(columnIndex).toString(); } else { record.addColumn(new StringColumn(null)); continue; } } else { columnValue = columnConst; } if (columnType.startsWith("decimal(")) { String ps = columnType.replace("decimal(", "").replace(")", ""); columnType = "decimal"; if (ps.contains(",")) { scale = Integer.parseInt(ps.split(",")[1].trim()); } else { scale = 0; } } Type type = Type.valueOf(columnType.toUpperCase()); if (StringUtils.equals(columnValue, nullFormat)) { columnValue = null; } try { switch (type) { case STRING: columnGenerated = new StringColumn(columnValue); break; case INT: case LONG: columnGenerated = new LongColumn(columnValue); break; case DOUBLE: columnGenerated = new DoubleColumn(columnValue); break; case DECIMAL: if (null == columnValue) { columnGenerated = new DoubleColumn((Double) null); } else { columnGenerated = new DoubleColumn(new BigDecimal(columnValue).setScale(scale, RoundingMode.HALF_UP)); } break; case BOOLEAN: columnGenerated = new BoolColumn(columnValue); break; case DATE: if (columnValue =https://www.cnblogs.com/wxm2270/archive/2023/02/13/= null) { columnGenerated = new DateColumn((Date) null); } else { String formatString = columnEntry.getFormat(); if (StringUtils.isNotBlank(formatString)) { // 用戶自己配置的格式轉換 SimpleDateFormat format = new SimpleDateFormat( formatString); columnGenerated = new DateColumn( format.parse(columnValue)); } else { // 框架嘗試轉換 columnGenerated = new DateColumn(new StringColumn(columnValue).asDate()); } } break; case TIMESTAMP: if (null == columnValue) { columnGenerated = new DateColumn(); } else if (columnValue.startsWith("[")) { // INT96 https://github.com/apache/parquet-mr/pull/901 GenericData.Fixed fixed = (GenericData.Fixed) gRecord.get(columnIndex); Date date = new Date(getTimestampMills(fixed.bytes())); columnGenerated = new DateColumn(date); } else { columnGenerated = new DateColumn(Long.parseLong(columnValue) * 1000); } break; case BINARY: columnGenerated = new BytesColumn(((ByteBuffer) gRecord.get(columnIndex)).array()); break; default: String errorMessage = String.format("您配置的列型別暫不支持 : [%s]", columnType); LOG.error(errorMessage); throw DataXException.asDataXException(StorageReaderErrorCode.NOT_SUPPORT_TYPE, errorMessage); } } catch (Exception e) { throw new IllegalArgumentException(String.format( "型別轉換錯誤, 無法將[%s] 轉換為[%s], %s", columnValue, type, e)); } record.addColumn(columnGenerated); } // end for recordSender.sendToWriter(record); } catch (IllegalArgumentException | IndexOutOfBoundsException iae) { taskPluginCollector.collectDirtyRecord(record, iae.getMessage()); } catch (Exception e) { if (e instanceof DataXException) { throw (DataXException) e; } // 每一種轉換失敗都是臟資料處理,包括數字格式 & 日期格式 taskPluginCollector.collectDirtyRecord(record, e.getMessage()); } } private TypeDescription getOrcSchema(String filePath) { Path path = new Path(filePath); try { Reader reader = OrcFile.createReader(path, OrcFile.readerOptions(hadoopConf)); // return reader.getTypes().get(0).getSubtypesCount() return reader.getSchema(); } catch (IOException e) { String message = "讀取orc-file column列數失敗,請聯系系統管理員"; throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message); } } public boolean checkHdfsFileType(String filepath, String specifiedFileType) { Path file = new Path(filepath); try (FileSystem fs = FileSystem.get(hadoopConf); FSDataInputStream in = fs.open(file)) { if (StringUtils.equalsIgnoreCase(specifiedFileType, HdfsConstant.ORC)) { return isORCFile(file, fs, in); } else if (StringUtils.equalsIgnoreCase(specifiedFileType, HdfsConstant.RC)) { return isRCFile(filepath, in); } else if (StringUtils.equalsIgnoreCase(specifiedFileType, HdfsConstant.SEQ)) { return isSequenceFile(file, in); } else if (StringUtils.equalsIgnoreCase(specifiedFileType, HdfsConstant.PARQUET)) { return isParquetFile(file); } else if (StringUtils.equalsIgnoreCase(specifiedFileType, HdfsConstant.CSV) || StringUtils.equalsIgnoreCase(specifiedFileType, HdfsConstant.TEXT)) { return true; } } catch (Exception e) { String message = String.format("檢查檔案[%s]型別失敗,目前支持 %s 格式的檔案," + "請檢查您檔案型別和檔案是否正確,", filepath, HdfsConstant.SUPPORT_FILE_TYPE); LOG.error(message); throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message, e); } return false; } // 判斷file是否是ORC File private boolean isORCFile(Path file, FileSystem fs, FSDataInputStream in) { try { // figure out the size of the file using the option or filesystem long size = fs.getFileStatus(file).getLen(); //read last bytes into buffer to get PostScript int readSize = (int) Math.min(size, DIRECTORY_SIZE_GUESS); in.seek(size - readSize); ByteBuffer buffer = ByteBuffer.allocate(readSize); in.readFully(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining()); //read the PostScript //get length of PostScript int psLen = buffer.get(readSize - 1) & 0xff; String orcMagic = org.apache.orc.OrcFile.MAGIC; int len = orcMagic.length(); if (psLen < len + 1) { return false; } int offset = buffer.arrayOffset() + buffer.position() + buffer.limit() - 1 - len; byte[] array = buffer.array(); // now look for the magic string at the end of the postscript. if (Text.decode(array, offset, len).equals(orcMagic)) { return true; } else { // If it isn't there, this may be the 0.11.0 version of ORC. // Read the first 3 bytes of the file to check for the header in.seek(0); byte[] header = new byte[len]; in.readFully(header, 0, len); // if it isn't there, this isn't an ORC file if (Text.decode(header, 0, len).equals(orcMagic)) { return true; } } } catch (IOException e) { LOG.info("檢查檔案型別: [{}] 不是ORC File.", file); } return false; } // 判斷file是否是RC file private boolean isRCFile(String filepath, FSDataInputStream in) { // The first version of RCFile used the sequence file header. final byte[] originalMagic = {(byte) 'S', (byte) 'E', (byte) 'Q'}; // The 'magic' bytes at the beginning of the RCFile final byte[] rcMagic = {(byte) 'R', (byte) 'C', (byte) 'F'}; // the version that was included with the original magic, which is mapped // into ORIGINAL_VERSION final byte ORIGINAL_MAGIC_VERSION_WITH_METADATA = https://www.cnblogs.com/wxm2270/archive/2023/02/13/6; // All the versions should be place in this list. final int ORIGINAL_VERSION = 0; // version with SEQ // version with RCF // final int NEW_MAGIC_VERSION = 1 // final int CURRENT_VERSION = NEW_MAGIC_VERSION final int CURRENT_VERSION = 1; byte version; byte[] magic = new byte[rcMagic.length]; try { in.seek(0); in.readFully(magic); if (Arrays.equals(magic, originalMagic)) { if (in.readByte() != ORIGINAL_MAGIC_VERSION_WITH_METADATA) { return false; } version = ORIGINAL_VERSION; } else { if (!Arrays.equals(magic, rcMagic)) { return false; } // Set 'version' version = in.readByte(); if (version > CURRENT_VERSION) { return false; } } if (version == ORIGINAL_VERSION) { try { Class<?> keyCls = hadoopConf.getClassByName(Text.readString(in)); Class<?> valCls = hadoopConf.getClassByName(Text.readString(in)); if (!keyCls.equals(RCFile.KeyBuffer.class) || !valCls.equals(RCFile.ValueBuffer.class)) { return false; } } catch (ClassNotFoundException e) { return false; } } // boolean decompress = in.readBoolean(); // is compressed? if (version == ORIGINAL_VERSION) { // is block-compressed? it should be always false. boolean blkCompressed = in.readBoolean(); return !blkCompressed; } return true; } catch (IOException e) { LOG.info("檢查檔案型別: [{}] 不是RC File.", filepath); } return false; } // 判斷file是否是Sequence file private boolean isSequenceFile(Path filepath, FSDataInputStream in) { final byte[] seqMagic = {(byte) 'S', (byte) 'E', (byte) 'Q'}; byte[] magic = new byte[seqMagic.length]; try { in.seek(0); in.readFully(magic); return Arrays.equals(magic, seqMagic); } catch (IOException e) { LOG.info("檢查檔案型別: [{}] 不是Sequence File.", filepath); } return false; } //判斷是否為parquet(考慮判斷parquet檔案的schema是否不為空) private boolean isParquetFile(Path file) { try { GroupReadSupport readSupport = new GroupReadSupport(); ParquetReader.Builder<Group> reader = ParquetReader.builder(readSupport, file); ParquetReader<Group> build = reader.build(); if (build.read() != null) { return true; } } catch (IOException e) { LOG.info("檢查檔案型別: [{}] 不是Parquet File.", file); } return false; } /** * Returns GMT's timestamp from binary encoded parquet timestamp (12 bytes - julian date + time of day nanos). * * @param timestampBinary INT96 parquet timestamp * @return timestamp in millis, GMT timezone */ public static long getTimestampMillis(Binary timestampBinary) { if (timestampBinary.length() != 12) { return 0; } byte[] bytes = timestampBinary.getBytes(); return getTimestampMills(bytes); } public static long getTimestampMills(byte[] bytes) { assert bytes.length == 12; // little endian encoding - need to invert byte order long timeOfDayNanos = Longs.fromBytes(bytes[7], bytes[6], bytes[5], bytes[4], bytes[3], bytes[2], bytes[1], bytes[0]); int julianDay = Ints.fromBytes(bytes[11], bytes[10], bytes[9], bytes[8]); return julianDayToMillis(julianDay) + (timeOfDayNanos / NANOS_PER_MILLISECOND); } private static long julianDayToMillis(int julianDay) { return (julianDay - JULIAN_EPOCH_OFFSET_DAYS) * MILLIS_IN_DAY; } }
HdfsConstant
package com.alibaba.datax.plugin.reader.hdfsreader; import com.alibaba.datax.common.base.Constant; import java.util.Arrays; import java.util.List; public class HdfsConstant extends Constant { public static final String SOURCE_FILES = "sourceFiles"; public static final String TEXT = "TEXT"; public static final String ORC = "ORC"; public static final String CSV = "CSV"; public static final String SEQ = "SEQ"; public static final String RC = "RC"; public static final String PARQUET = "PARQUET"; //新增parquet檔案型別 public static final String HDFS_DEFAULT_KEY = "fs.defaultFS"; public static final String HADOOP_SECURITY_AUTHENTICATION_KEY = "hadoop.security.authentication"; protected static final List<String> SUPPORT_FILE_TYPE = Arrays.asList(HdfsConstant.CSV, HdfsConstant.ORC, HdfsConstant.RC, HdfsConstant.SEQ, HdfsConstant.TEXT, HdfsConstant.PARQUET); private HdfsConstant() {} }
HdfsReader
package com.alibaba.datax.plugin.reader.hdfsreader; import com.alibaba.datax.common.base.Key; import com.alibaba.datax.common.exception.DataXException; import com.alibaba.datax.common.plugin.RecordSender; import com.alibaba.datax.common.spi.Reader; import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.storage.reader.StorageReaderUtil; import com.alibaba.datax.storage.util.FileHelper; import org.apache.commons.io.Charsets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.InputStream; import java.nio.charset.UnsupportedCharsetException; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; import static com.alibaba.datax.common.base.Key.COLUMN; import static com.alibaba.datax.common.base.Key.ENCODING; import static com.alibaba.datax.common.base.Key.INDEX; import static com.alibaba.datax.common.base.Key.TYPE; import static com.alibaba.datax.common.base.Key.VALUE; public class HdfsReader extends Reader { /** * Job 中的方法僅執行一次,Task 中方法會由框架啟動多個 Task 執行緒并行執行, * <p> * 整個 Reader 執行流程是: * <pre> * Job類init-->prepare-->split * Task類init-->prepare-->startRead-->post-->destroy * Task類init-->prepare-->startRead-->post-->destroy * Job類post-->destroy * </pre> */ public static class Job extends Reader.Job { private static final Logger LOG = LoggerFactory.getLogger(Job.class); private Configuration readerOriginConfig = null; private HashSet<String> sourceFiles; private String specifiedFileType = null; private DFSUtil dfsUtil = null; private List<String> path = null; @Override public void init() { LOG.info("init() begin..."); this.readerOriginConfig = getPluginJobConf(); validate(); dfsUtil = new DFSUtil(readerOriginConfig); LOG.info("init() ok and end..."); } public void validate() { readerOriginConfig.getNecessaryValue(Key.DEFAULT_FS, HdfsReaderErrorCode.DEFAULT_FS_NOT_FIND_ERROR); // path check String pathInString = readerOriginConfig.getNecessaryValue(Key.PATH, HdfsReaderErrorCode.REQUIRED_VALUE); if (!pathInString.startsWith("[") && !pathInString.endsWith("]")) { path = Collections.singletonList(pathInString); } else { path = readerOriginConfig.getList(Key.PATH, String.class); if (null == path || path.isEmpty()) { throw DataXException.asDataXException(HdfsReaderErrorCode.REQUIRED_VALUE, "您需要指定待讀取的源目錄或檔案"); } for (String eachPath : path) { if (!eachPath.startsWith("/")) { String message = String.format("請檢查引數path:[%s],需要配置為絕對路徑", eachPath); LOG.error(message); throw DataXException.asDataXException(HdfsReaderErrorCode.ILLEGAL_VALUE, message); } } } specifiedFileType = readerOriginConfig.getNecessaryValue(Key.FILE_TYPE, HdfsReaderErrorCode.REQUIRED_VALUE).toUpperCase(); if (!HdfsConstant.SUPPORT_FILE_TYPE.contains(specifiedFileType)) { String message = "HdfsReader插件目前支持 " + HdfsConstant.SUPPORT_FILE_TYPE + " 幾種格式的檔案"; throw DataXException.asDataXException(HdfsReaderErrorCode.FILE_TYPE_ERROR, message); } String encoding = this.readerOriginConfig.getString(ENCODING, "UTF-8"); try { Charsets.toCharset(encoding); } catch (UnsupportedCharsetException uce) { throw DataXException.asDataXException( HdfsReaderErrorCode.ILLEGAL_VALUE, String.format("不支持的編碼格式 : [%s]", encoding), uce); } catch (Exception e) { throw DataXException.asDataXException( HdfsReaderErrorCode.ILLEGAL_VALUE, String.format("運行配置例外 : %s", e.getMessage()), e); } //check Kerberos boolean haveKerberos = readerOriginConfig.getBool(Key.HAVE_KERBEROS, false); if (haveKerberos) { readerOriginConfig.getNecessaryValue(Key.KERBEROS_KEYTAB_FILE_PATH, HdfsReaderErrorCode.REQUIRED_VALUE); readerOriginConfig.getNecessaryValue(Key.KERBEROS_PRINCIPAL, HdfsReaderErrorCode.REQUIRED_VALUE); } // validate the Columns validateColumns(); // validate compress String compress = readerOriginConfig.getString(Key.COMPRESS, "NONE"); if ("gzip".equalsIgnoreCase(compress)) { // correct to gz readerOriginConfig.set(Key.COMPRESS, "gz"); } } private void validateColumns() { // 檢測是column 是否為 ["*"] 若是則填為空 List<Configuration> column = this.readerOriginConfig.getListConfiguration(COLUMN); if (null != column && 1 == column.size() && ("\"*\"".equals(column.get(0).toString()) || "'*'".equals(column.get(0).toString()))) { readerOriginConfig.set(COLUMN, new ArrayList<String>()); } else { // column: 1. index type 2.value type 3.when type is Data, may be has dateFormat value List<Configuration> columns = readerOriginConfig.getListConfiguration(COLUMN); if (null == columns || columns.isEmpty()) { throw DataXException.asDataXException(HdfsReaderErrorCode.CONFIG_INVALID_EXCEPTION, "您需要指定 columns"); } for (Configuration eachColumnConf : columns) { eachColumnConf.getNecessaryValue(TYPE, HdfsReaderErrorCode.REQUIRED_VALUE); Integer columnIndex = eachColumnConf.getInt(INDEX); String columnValue = eachColumnConf.getString(VALUE); if (null == columnIndex && null == columnValue) { throw DataXException.asDataXException( HdfsReaderErrorCode.NO_INDEX_VALUE, "由于您配置了type, 則至少需要配置 index 或 value, 當前配置為:" + eachColumnConf); } if (null != columnIndex && null != columnValue) { throw DataXException.asDataXException(HdfsReaderErrorCode.MIXED_INDEX_VALUE, "您混合配置了index, value, 每一列同時僅能選擇其中一種"); } } } } @Override public void prepare() { LOG.info("prepare(), start to getAllFiles..."); this.sourceFiles = (HashSet<String>) dfsUtil.getAllFiles(path, specifiedFileType); LOG.info("您即將讀取的檔案數為: [{}], 串列為: [{}]", sourceFiles.size(), sourceFiles); } @Override public List<Configuration> split(int adviceNumber) { LOG.info("split() begin..."); List<Configuration> readerSplitConfigs = new ArrayList<>(); // warn:每個slice拖且僅拖一個檔案, int splitNumber = sourceFiles.size(); if (0 == splitNumber) { throw DataXException.asDataXException(HdfsReaderErrorCode.EMPTY_DIR_EXCEPTION, String.format("未能找到待讀取的檔案,請確認您的配置項path: %s", readerOriginConfig.getString(Key.PATH))); } List<List<String>> splitSourceFiles = FileHelper.splitSourceFiles(new ArrayList<>(sourceFiles), splitNumber); for (List<String> files : splitSourceFiles) { Configuration splitConfig = readerOriginConfig.clone(); splitConfig.set(HdfsConstant.SOURCE_FILES, files); readerSplitConfigs.add(splitConfig); } return readerSplitConfigs; } @Override public void post() { // } @Override public void destroy() { // } } public static class Task extends Reader.Task { private static final Logger LOG = LoggerFactory.getLogger(Task.class); private Configuration taskConfig; private List<String> sourceFiles; private String specifiedFileType; private DFSUtil dfsUtil = null; @Override public void init() { this.taskConfig = getPluginJobConf(); this.sourceFiles = taskConfig.getList(HdfsConstant.SOURCE_FILES, String.class); this.specifiedFileType = taskConfig.getNecessaryValue(Key.FILE_TYPE, HdfsReaderErrorCode.REQUIRED_VALUE); this.dfsUtil = new DFSUtil(taskConfig); } @Override public void prepare() { // } @Override public void startRead(RecordSender recordSender) { LOG.info("read start"); for (String sourceFile : this.sourceFiles) { LOG.info("reading file : [{}]", sourceFile); if (specifiedFileType.equalsIgnoreCase(HdfsConstant.TEXT) || specifiedFileType.equalsIgnoreCase(HdfsConstant.CSV)) { InputStream inputStream = dfsUtil.getInputStream(sourceFile); StorageReaderUtil.readFromStream(inputStream, sourceFile, taskConfig, recordSender, getTaskPluginCollector()); } else if (specifiedFileType.equalsIgnoreCase(HdfsConstant.ORC)) { dfsUtil.orcFileStartRead(sourceFile, taskConfig, recordSender, getTaskPluginCollector()); } else if (specifiedFileType.equalsIgnoreCase(HdfsConstant.SEQ)) { dfsUtil.sequenceFileStartRead(sourceFile, taskConfig, recordSender, getTaskPluginCollector()); } else if (specifiedFileType.equalsIgnoreCase(HdfsConstant.RC)) { dfsUtil.rcFileStartRead(sourceFile, taskConfig, recordSender, getTaskPluginCollector()); } else if (specifiedFileType.equalsIgnoreCase(HdfsConstant.PARQUET)) { dfsUtil.parquetFileStartRead(sourceFile, taskConfig, recordSender, getTaskPluginCollector()); } else { String message = "HdfsReader插件目前支持ORC, TEXT, CSV, SEQUENCE, RC、PARQUET等六種格式的檔案," + "請將fileType選項的值配置為ORC, TEXT, CSV, SEQUENCE, RC, PARQUET"; throw DataXException.asDataXException(HdfsReaderErrorCode.FILE_TYPE_UNSUPPORTED, message); } if (recordSender != null) { recordSender.flush(); } } LOG.info("end read source files..."); } @Override public void post() { // } @Override public void destroy() { // } } }
HdfsWriter插件
本插件比較簡單,一共五個類,具體類名及對應修改項如下:
- HdfsHelper:增加是否Parquet檔案型別判斷方法、增加Parquet檔案讀取轉換方法,
- HdfsWriter:增加Parquet檔案類的列舉項,
- SupportHiveDataType:無需更改,
- HdfsWriterErrorCode:無需更改,
- Type:無需更改,
按需修改其中四個類即可,具體代碼如下:
HdfsHelper
package com.alibaba.datax.plugin.writer.hdfswriter; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.alibaba.datax.common.base.Constant; import com.alibaba.datax.common.base.Key; import com.alibaba.datax.common.element.Column; import com.alibaba.datax.common.element.Record; import com.alibaba.datax.common.exception.DataXException; import com.alibaba.datax.common.plugin.RecordReceiver; import com.alibaba.datax.common.plugin.TaskPluginCollector; import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.unstructuredstorage.util.ColumnTypeUtil; import com.alibaba.datax.unstructuredstorage.util.HdfsUtil; import org.apache.avro.Conversions; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecordBuilder; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.Validate; import org.apache.commons.lang3.tuple.MutablePair; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobContext; import org.apache.hadoop.mapred.RecordWriter; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authentication.util.KerberosName; import org.apache.orc.CompressionKind; import org.apache.orc.OrcFile; import org.apache.orc.TypeDescription; import org.apache.orc.Writer; import org.apache.parquet.avro.AvroParquetWriter; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.schema.OriginalType; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Types; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.FileNotFoundException; import java.io.IOException; import java.lang.reflect.Field; import java.math.BigDecimal; import java.nio.charset.StandardCharsets; import java.sql.Timestamp; import java.text.SimpleDateFormat; import java.util.*; public class HdfsHelper { public static final Logger LOG = LoggerFactory.getLogger(HdfsHelper.class); public static final String HADOOP_SECURITY_AUTHENTICATION_KEY = "hadoop.security.authentication"; public static final String HDFS_DEFAULT_FS_KEY = "fs.defaultFS"; private FileSystem fileSystem = null; private JobConf conf = null; private org.apache.hadoop.conf.Configuration hadoopConf = null; // Kerberos private boolean haveKerberos = false; private String kerberosKeytabFilePath; private String kerberosPrincipal; private String krb5ConfPath; public static MutablePair<Text, Boolean> transportOneRecord( Record record, char fieldDelimiter, List<Configuration> columnsConfiguration, TaskPluginCollector taskPluginCollector) { MutablePair<List<Object>, Boolean> transportResultList = transportOneRecord(record, columnsConfiguration, taskPluginCollector); //保存<轉換后的資料,是否是臟資料> MutablePair<Text, Boolean> transportResult = new MutablePair<>(); transportResult.setRight(false); Text recordResult = new Text(StringUtils.join(transportResultList.getLeft(), fieldDelimiter)); transportResult.setRight(transportResultList.getRight()); transportResult.setLeft(recordResult); return transportResult; } public static MutablePair<List<Object>, Boolean> transportOneRecord( Record record, List<Configuration> columnsConfiguration, TaskPluginCollector taskPluginCollector) { MutablePair<List<Object>, Boolean> transportResult = new MutablePair<>(); transportResult.setRight(false); List<Object> recordList = new ArrayList<>(); int recordLength = record.getColumnNumber(); if (0 != recordLength) { Column column; for (int i = 0; i < recordLength; i++) { column = record.getColumn(i); if (null != column.getRawData()) { String rowData = column.getRawData().toString(); SupportHiveDataType columnType = SupportHiveDataType.valueOf( columnsConfiguration.get(i).getString(Key.TYPE).toUpperCase()); //根據writer端型別配置做型別轉換 try { switch (columnType) { case TINYINT: recordList.add(Byte.valueOf(rowData)); break; case SMALLINT: recordList.add(Short.valueOf(rowData)); break; case INT: case INTEGER: recordList.add(Integer.valueOf(rowData)); break; case BIGINT: recordList.add(column.asLong()); break; case FLOAT: recordList.add(Float.valueOf(rowData)); break; case DOUBLE: recordList.add(column.asDouble()); break; case STRING: case VARCHAR: case CHAR: recordList.add(column.asString()); break; case DECIMAL: recordList.add(HiveDecimal.create(column.asBigDecimal())); break; case BOOLEAN: recordList.add(column.asBoolean()); break; case DATE: recordList.add(org.apache.hadoop.hive.common.type.Date.valueOf(column.asString())); break; case TIMESTAMP: recordList.add(Timestamp.valueOf(column.asString())); break; case BINARY: recordList.add(column.asBytes()); break; default: throw DataXException .asDataXException( HdfsWriterErrorCode.ILLEGAL_VALUE, String.format( "您的組態檔中的列配置資訊有誤. 不支持資料庫寫入這種欄位型別. 欄位名:[%s], 欄位型別:[%s]. 請修改表中該欄位的型別或者不同步該欄位.", columnsConfiguration.get(i).getString(Key.NAME), columnsConfiguration.get(i).getString(Key.TYPE))); } } catch (Exception e) { // warn: 此處認為臟資料 e.printStackTrace(); String message = String.format( "欄位型別轉換錯誤:你目標欄位為[%s]型別,實際欄位值為[%s].", columnsConfiguration.get(i).getString(Key.TYPE), column.getRawData()); taskPluginCollector.collectDirtyRecord(record, message); transportResult.setRight(true); break; } } else { // warn: it's all ok if nullFormat is null recordList.add(null); } } } transportResult.setLeft(recordList); return transportResult; } public static GenericRecord transportParRecord( Record record, List<Configuration> columnsConfiguration, TaskPluginCollector taskPluginCollector, GenericRecordBuilder builder) { int recordLength = record.getColumnNumber(); if (0 != recordLength) { Column column; for (int i = 0; i < recordLength; i++) { column = record.getColumn(i); String colName = columnsConfiguration.get(i).getString(Key.NAME); String typename = columnsConfiguration.get(i).getString(Key.TYPE).toUpperCase(); if (null == column || column.getRawData() == null) { builder.set(colName, null); } else { String rowData = column.getRawData().toString(); SupportHiveDataType columnType = SupportHiveDataType.valueOf(typename); //根據writer端型別配置做型別轉換 try { switch (columnType) { case INT: case INTEGER: builder.set(colName, Integer.valueOf(rowData)); break; case LONG: builder.set(colName, column.asLong()); break; case FLOAT: builder.set(colName, Float.valueOf(rowData)); break; case DOUBLE: builder.set(colName, column.asDouble()); break; case STRING: builder.set(colName, column.asString()); break; case DECIMAL: builder.set(colName, new BigDecimal(column.asString()).setScale(columnsConfiguration.get(i).getInt(Key.SCALE), BigDecimal.ROUND_HALF_UP)); break; case BOOLEAN: builder.set(colName, column.asBoolean()); break; case BINARY: builder.set(colName, column.asBytes()); break; case TIMESTAMP: builder.set(colName, column.asLong() / 1000); break; default: throw DataXException .asDataXException( HdfsWriterErrorCode.ILLEGAL_VALUE, String.format( "您的組態檔中的列配置資訊有誤. 不支持資料庫寫入這種欄位型別. 欄位名:[%s], 欄位型別:[%s]. 請修改表中該欄位的型別或者不同步該欄位.", columnsConfiguration.get(i).getString(Key.NAME), columnsConfiguration.get(i).getString(Key.TYPE))); } } catch (Exception e) { // warn: 此處認為臟資料 String message = String.format( "欄位型別轉換錯誤:目標欄位為[%s]型別,實際欄位值為[%s].", columnsConfiguration.get(i).getString(Key.TYPE), column.getRawData()); taskPluginCollector.collectDirtyRecord(record, message); break; } } } } return builder.build(); } public static String generateParquetSchemaFromColumnAndType(List<Configuration> columns) { Map<String, ColumnTypeUtil.DecimalInfo> decimalColInfo = new HashMap<>(16); ColumnTypeUtil.DecimalInfo PARQUET_DEFAULT_DECIMAL_INFO = new ColumnTypeUtil.DecimalInfo(10, 2); Types.MessageTypeBuilder typeBuilder = Types.buildMessage(); for (Configuration column : columns) { String name = column.getString("name"); String colType = column.getString("type"); Validate.notNull(name, "column.name can't be null"); Validate.notNull(colType, "column.type can't be null"); switch (colType.toLowerCase()) { case "tinyint": case "smallint": case "int": typeBuilder.optional(PrimitiveType.PrimitiveTypeName.INT32).named(name); break; case "bigint": case "long": typeBuilder.optional(PrimitiveType.PrimitiveTypeName.INT64).named(name); break; case "float": typeBuilder.optional(PrimitiveType.PrimitiveTypeName.FLOAT).named(name); break; case "double": typeBuilder.optional(PrimitiveType.PrimitiveTypeName.DOUBLE).named(name); break; case "binary": typeBuilder.optional(PrimitiveType.PrimitiveTypeName.BINARY).named(name); break; case "char": case "varchar": case "string": typeBuilder.optional(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named(name); break; case "boolean": typeBuilder.optional(PrimitiveType.PrimitiveTypeName.BOOLEAN).named(name); break; case "timestamp": typeBuilder.optional(PrimitiveType.PrimitiveTypeName.INT96).named(name); break; case "date": typeBuilder.optional(PrimitiveType.PrimitiveTypeName.INT32).as(OriginalType.DATE).named(name); break; default: if (ColumnTypeUtil.isDecimalType(colType)) { ColumnTypeUtil.DecimalInfo decimalInfo = ColumnTypeUtil.getDecimalInfo(colType, PARQUET_DEFAULT_DECIMAL_INFO); typeBuilder.optional(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) .as(OriginalType.DECIMAL) .precision(decimalInfo.getPrecision()) .scale(decimalInfo.getScale()) .length(HdfsUtil.computeMinBytesForPrecision(decimalInfo.getPrecision())) .named(name); decimalColInfo.put(name, decimalInfo); } else { typeBuilder.optional(PrimitiveType.PrimitiveTypeName.BINARY).named(name); } break; } } return typeBuilder.named("m").toString(); } public FileSystem getFileSystem(String defaultFS, Configuration taskConfig) { this.hadoopConf = new org.apache.hadoop.conf.Configuration(); Configuration hadoopSiteParams = taskConfig.getConfiguration(Key.HADOOP_CONFIG); JSONObject hadoopSiteParamsAsJsonObject = JSON.parseObject(taskConfig.getString(Key.HADOOP_CONFIG)); if (null != hadoopSiteParams) { Set<String> paramKeys = hadoopSiteParams.getKeys(); for (String each : paramKeys) { hadoopConf.set(each, hadoopSiteParamsAsJsonObject.getString(each)); } } this.hadoopConf.set(HDFS_DEFAULT_FS_KEY, defaultFS); //是否有Kerberos認證 this.haveKerberos = taskConfig.getBool(Key.HAVE_KERBEROS, false); if (haveKerberos) { LOG.info("krb5.conf路徑:【{}】 \n keytab路徑:【{}】 \n principal:【{}】\n", taskConfig.getString(Key. KRB5_CONF_FILE_PATH), taskConfig.getString(Key.KERBEROS_KEYTAB_FILE_PATH), taskConfig.getString(Key.KERBEROS_PRINCIPAL)); this.krb5ConfPath = taskConfig.getString(Key. KRB5_CONF_FILE_PATH); this.kerberosKeytabFilePath = taskConfig.getString(Key.KERBEROS_KEYTAB_FILE_PATH); this.kerberosPrincipal = taskConfig.getString(Key.KERBEROS_PRINCIPAL); LOG.info("檢測到kerberos認證,正在進行認證"); } System.setProperty("java.security.krb5.conf",krb5ConfPath); System.setProperty("sun.security.krb5.Config",krb5ConfPath); refreshConfig(); this.kerberosAuthentication(this.kerberosPrincipal, this.kerberosKeytabFilePath,hadoopConf,this.krb5ConfPath); conf = new JobConf(hadoopConf); try { fileSystem = FileSystem.get(conf); } catch (IOException e) { String message = String.format("獲取FileSystem時發生網路IO例外,請檢查您的網路是否正常!HDFS地址:[message:defaultFS = %s]", defaultFS); LOG.error(message); throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e); } catch (Exception e) { String message = String.format("獲取FileSystem失敗,請檢查HDFS地址是否正確: [%s]", "message:defaultFS =" + defaultFS); LOG.error(message); throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e); } if (null == fileSystem) { String message = String.format("獲取FileSystem失敗,請檢查HDFS地址是否正確: [message:defaultFS = %s]", defaultFS); LOG.error(message); throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, message); } return fileSystem; } /** 重繪krb內容資訊 */ public static void refreshConfig() { try { sun.security.krb5.Config.refresh(); Field defaultRealmField = KerberosName.class.getDeclaredField("defaultRealm"); defaultRealmField.setAccessible(true); defaultRealmField.set( null, org.apache.hadoop.security.authentication.util.KerberosUtil.getDefaultRealm()); // reload java.security.auth.login.config javax.security.auth.login.Configuration.setConfiguration(null); } catch (Exception e) { LOG.warn( "resetting default realm failed, current default realm will still be used.", e); } } public void kerberosAuthentication(String kerberosPrincipal, String kerberosKeytabFilePath, org.apache.hadoop.conf.Configuration hadoopConf,String krb5ConfPath) { hadoopConf.set("hadoop.security.authentication", "kerberos"); hadoopConf.set("hive.security.authentication", "kerberos"); hadoopConf.set("hadoop.security.authorization", "true"); hadoopConf.set("dfs.permissions","false"); hadoopConf.set("hadoop.security.auth_to_local","RULE:[2:$1@$0](.*@CDHDEV.COM)s/.*/hadoop/ \n" + " DEFAULT"); if (StringUtils.isNotBlank(kerberosPrincipal) && StringUtils.isNotBlank(kerberosKeytabFilePath)) { UserGroupInformation.setConfiguration(hadoopConf); KerberosName.resetDefaultRealm(); try { LOG.info("開始認證"); UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, kerberosKeytabFilePath); } catch (Exception e) { LOG.info("kerberos認證失敗"); String message = String.format("kerberos認證失敗,請檢查 " + "kerberosKeytabFilePath[%s] 和 kerberosPrincipal[%s]", kerberosKeytabFilePath, kerberosPrincipal); e.printStackTrace(); throw DataXException.asDataXException(HdfsWriterErrorCode.KERBEROS_LOGIN_ERROR, message, e); } } } /** * 獲取指定目錄下的檔案串列 * * @param dir 需要搜索的目錄 * @return 檔案陣列,檔案是全路徑, * eg:hdfs://10.101.204.12:9000/user/hive/warehouse/writer.db/text/test.textfile */ public Path[] hdfsDirList(String dir) { Path path = new Path(dir); Path[] files; try { FileStatus[] status = fileSystem.listStatus(path); files = new Path[status.length]; for (int i = 0; i < status.length; i++) { files[i] = status[i].getPath(); } } catch (IOException e) { String message = String.format("獲取目錄[%s]檔案串列時發生網路IO例外,請檢查您的網路是否正常!", dir); LOG.error(message); throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e); } return files; } // public boolean isPathExists(String filePath) { // // Path path = new Path(filePath); // boolean exist; // try { // exist = fileSystem.exists(path); // } // catch (IOException e) { // String message = String.format("判斷檔案路徑[%s]是否存在時發生網路IO例外,請檢查您的網路是否正常!", // "message:filePath =" + filePath); // e.printStackTrace(); // LOG.error(message); // throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e); // } // return exist; // } public boolean isPathDir(String filePath) { Path path = new Path(filePath); boolean isDir; try { isDir = fileSystem.getFileStatus(path).isDirectory(); } catch (IOException e) { String message = String.format("判斷路徑[%s]是否是目錄時發生網路IO例外,請檢查您的網路是否正常!", filePath); LOG.error(message); throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e); } return isDir; } public void deleteFilesFromDir(Path dir) { try { final RemoteIterator<LocatedFileStatus> files = fileSystem.listFiles(dir, false); while (files.hasNext()) { final LocatedFileStatus next = files.next(); fileSystem.deleteOnExit(next.getPath()); } } catch (FileNotFoundException fileNotFoundException) { throw new DataXException(HdfsWriterErrorCode.FILE_NOT_FOUND, fileNotFoundException.getMessage()); } catch (IOException ioException) { throw new DataXException(HdfsWriterErrorCode.IO_ERROR, ioException.getMessage()); } } public void deleteDir(Path path) { LOG.info("start delete tmp dir [{}] .", path); try { if (fileSystem.exists(path)) { fileSystem.delete(path, true); } } catch (Exception e) { LOG.error("洗掉臨時目錄[{}]時發生IO例外,請檢查您的網路是否正常!", path); throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e); } LOG.info("finish delete tmp dir [{}] .", path); } /** * move all files in sourceDir to targetDir * * @param sourceDir the source directory * @param targetDir the target directory */ public void moveFilesToDest(Path sourceDir, Path targetDir) { try { final FileStatus[] fileStatuses = fileSystem.listStatus(sourceDir); for (FileStatus file : fileStatuses) { if (file.isFile() && file.getLen() > 0) { LOG.info("start move file [{}] to dir [{}].", file.getPath(), targetDir.getName()); fileSystem.rename(file.getPath(), new Path(targetDir, file.getPath().getName())); } } } catch (IOException e) { throw DataXException.asDataXException(HdfsWriterErrorCode.IO_ERROR, e); } LOG.info("finish move file(s)."); } //關閉FileSystem public void closeFileSystem() { try { fileSystem.close(); } catch (IOException e) { LOG.error("關閉FileSystem時發生IO例外,請檢查您的網路是否正常!"); throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e); } } // 寫text file型別檔案 public void textFileStartWrite(RecordReceiver lineReceiver, Configuration config, String fileName, TaskPluginCollector taskPluginCollector) { char fieldDelimiter = config.getChar(Key.FIELD_DELIMITER); List<Configuration> columns = config.getListConfiguration(Key.COLUMN); String compress = config.getString(Key.COMPRESS, "NONE").toUpperCase().trim(); SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmm"); String attempt = "attempt_" + dateFormat.format(new Date()) + "_0001_m_000000_0"; conf.set(JobContext.TASK_ATTEMPT_ID, attempt); if (!"NONE".equals(compress)) { // fileName must remove suffix, because the FileOutputFormat will add suffix fileName = fileName.substring(0, fileName.lastIndexOf(".")); Class<? extends CompressionCodec> codecClass = getCompressCodec(compress); if (null != codecClass) { FileOutputFormat.setOutputCompressorClass(conf, codecClass); } } Path outputPath = new Path(fileName); FileOutputFormat.setOutputPath(conf, outputPath); FileOutputFormat.setWorkOutputPath(conf, outputPath); try { RecordWriter<NullWritable, Text> writer = new TextOutputFormat<NullWritable, Text>() .getRecordWriter(fileSystem, conf, outputPath.toString(), Reporter.NULL); Record record; while ((record = lineReceiver.getFromReader()) != null) { MutablePair<Text, Boolean> transportResult = transportOneRecord(record, fieldDelimiter, columns, taskPluginCollector); if (Boolean.FALSE.equals(transportResult.getRight())) { writer.write(NullWritable.get(), transportResult.getLeft()); } } writer.close(Reporter.NULL); } catch (Exception e) { LOG.error("寫檔案檔案[{}]時發生IO例外,請檢查您的網路是否正常!", fileName); Path path = new Path(fileName); deleteDir(path.getParent()); throw DataXException.asDataXException(HdfsWriterErrorCode.Write_FILE_IO_ERROR, e); } } // compress 已經轉為大寫 public Class<? extends CompressionCodec> getCompressCodec(String compress) { compress = compress.toUpperCase(); Class<? extends CompressionCodec> codecClass; switch (compress) { case "GZIP": codecClass = org.apache.hadoop.io.compress.GzipCodec.class; break; case "BZIP2": codecClass = org.apache.hadoop.io.compress.BZip2Codec.class; break; case "SNAPPY": codecClass = org.apache.hadoop.io.compress.SnappyCodec.class; break; case "LZ4": codecClass = org.apache.hadoop.io.compress.Lz4Codec.class; break; case "ZSTD": codecClass = org.apache.hadoop.io.compress.ZStandardCodec.class; break; case "DEFLATE": case "ZLIB": codecClass = org.apache.hadoop.io.compress.DeflateCodec.class; break; default: throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, String.format("目前不支持您配置的 compress 模式 : [%s]", compress)); } return codecClass; } /* * 寫Parquet file型別檔案 * 一個parquet檔案的schema類似如下: * { * "type": "record", * "name": "testFile", * "doc": "test records", * "fields": * [{ * "name": "id", * "type": ["null", "int"] * * }, * { * "name": "empName", * "type": "string" * } * ] * } * "null" 表示該欄位允許為空 */ public void parquetFileStartWrite(RecordReceiver lineReceiver, Configuration config, String fileName, TaskPluginCollector taskPluginCollector) { List<Configuration> columns = config.getListConfiguration(Key.COLUMN); String compress = config.getString(Key.COMPRESS, "UNCOMPRESSED").toUpperCase().trim(); if ("NONE".equals(compress)) { compress = "UNCOMPRESSED"; } // construct parquet schema Schema schema = generateParquetSchema(columns); Path path = new Path(fileName); LOG.info("write parquet file {}", fileName); CompressionCodecName codecName = CompressionCodecName.fromConf(compress); GenericData decimalSupport = new GenericData(); decimalSupport.addLogicalTypeConversion(new Conversions.DecimalConversion()); try (ParquetWriter<GenericRecord> writer = AvroParquetWriter .<GenericRecord>builder(path) .withRowGroupSize(ParquetWriter.DEFAULT_BLOCK_SIZE) .withPageSize(ParquetWriter.DEFAULT_PAGE_SIZE) .withSchema(schema) .withConf(hadoopConf) .withCompressionCodec(codecName) .withValidation(false) .withDictionaryEncoding(false) .withDataModel(decimalSupport) .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0) .build()) { Record record; while ((record = lineReceiver.getFromReader()) != null) { GenericRecordBuilder builder = new GenericRecordBuilder(schema); GenericRecord transportResult = transportParRecord(record, columns, taskPluginCollector, builder); writer.write(transportResult); } } catch (Exception e) { LOG.error("寫檔案檔案[{}]時發生IO例外,請檢查您的網路是否正常!", fileName); deleteDir(path.getParent()); throw DataXException.asDataXException(HdfsWriterErrorCode.Write_FILE_IO_ERROR, e); } } private Schema generateParquetSchema(List<Configuration> columns) { List<Schema.Field> fields = new ArrayList<>(); String fieldName; String type; List<Schema> unionList = new ArrayList<>(2); for (Configuration column : columns) { unionList.clear(); fieldName = column.getString(Key.NAME); type = column.getString(Key.TYPE).trim().toUpperCase(); unionList.add(Schema.create(Schema.Type.NULL)); switch (type) { case "DECIMAL": Schema dec = LogicalTypes .decimal(column.getInt(Key.PRECISION, Constant.DEFAULT_DECIMAL_MAX_PRECISION), column.getInt(Key.SCALE, Constant.DEFAULT_DECIMAL_MAX_SCALE)) .addToSchema(Schema.createFixed(fieldName, null, null, 16)); unionList.add(dec); break; case "DATE": Schema date = LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT)); unionList.add(date); break; case "TIMESTAMP": Schema ts = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG)); unionList.add(ts); break; case "UUID": Schema uuid = LogicalTypes.uuid().addToSchema(Schema.create(Schema.Type.STRING)); unionList.add(uuid); break; case "BINARY": unionList.add(Schema.create(Schema.Type.BYTES)); break; default: // other types unionList.add(Schema.create(Schema.Type.valueOf(type))); break; } fields.add(new Schema.Field(fieldName, Schema.createUnion(unionList), null, null)); } Schema schema = Schema.createRecord("dataxTestParquet", null, "parquet", false); schema.setFields(fields); return schema; } private void setRow(VectorizedRowBatch batch, int row, Record record, List<Configuration> columns, TaskPluginCollector taskPluginCollector) { for (int i = 0; i < columns.size(); i++) { Configuration eachColumnConf = columns.get(i); String type = eachColumnConf.getString(Key.TYPE).trim().toUpperCase(); SupportHiveDataType columnType; ColumnVector col = batch.cols[i]; if (type.startsWith("DECIMAL")) { columnType = SupportHiveDataType.DECIMAL; } else { columnType = SupportHiveDataType.valueOf(type); } if (record.getColumn(i) == null || record.getColumn(i).getRawData() == null) { col.isNull[row] = true; col.noNulls = false; continue; } try { switch (columnType) { case TINYINT: case SMALLINT: case INT: case BIGINT: case BOOLEAN: case DATE: ((LongColumnVector) col).vector[row] = record.getColumn(i).asLong(); break; case FLOAT: case DOUBLE: ((DoubleColumnVector) col).vector[row] = record.getColumn(i).asDouble(); break; case DECIMAL: HiveDecimalWritable hdw = new HiveDecimalWritable(); hdw.set(HiveDecimal.create(record.getColumn(i).asBigDecimal()) .setScale(eachColumnConf.getInt(Key.SCALE), HiveDecimal.ROUND_HALF_UP)); ((DecimalColumnVector) col).set(row, hdw); break; case TIMESTAMP: ((TimestampColumnVector) col).set(row, java.sql.Timestamp.valueOf(record.getColumn(i).asString())); break; case STRING: case VARCHAR: case CHAR: byte[] buffer; if (record.getColumn(i).getType() == Column.Type.BYTES) { //convert bytes to base64 string buffer = Base64.getEncoder().encode((byte[]) record.getColumn(i).getRawData()); } else { buffer = record.getColumn(i).getRawData().toString().getBytes(StandardCharsets.UTF_8); } ((BytesColumnVector) col).setRef(row, buffer, 0, buffer.length); break; case BINARY: byte[] content = (byte[]) record.getColumn(i).getRawData(); ((BytesColumnVector) col).setRef(row, content, 0, content.length); break; default: throw DataXException .asDataXException( HdfsWriterErrorCode.ILLEGAL_VALUE, String.format("您的組態檔中的列配置資訊有誤. 不支持資料庫寫入這種欄位型別. 欄位名:[%s], 欄位型別:[%s]. " + "請修改表中該欄位的型別或者不同步該欄位.", eachColumnConf.getString(Key.NAME), eachColumnConf.getString(Key.TYPE))); } } catch (Exception e) { taskPluginCollector.collectDirtyRecord(record, e.getMessage()); throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, String.format("設定Orc資料行失敗,源列型別: %s, 目的原始型別:%s, 目的列Hive型別: %s, 欄位名稱: %s, 源值: %s, 錯誤根源:%n%s", record.getColumn(i).getType(), columnType, eachColumnConf.getString(Key.TYPE), eachColumnConf.getString(Key.NAME), record.getColumn(i).getRawData(), e)); } } } /* * 寫orcfile型別檔案 */ public void orcFileStartWrite(RecordReceiver lineReceiver, Configuration config, String fileName, TaskPluginCollector taskPluginCollector) { List<Configuration> columns = config.getListConfiguration(Key.COLUMN); String compress = config.getString(Key.COMPRESS, "NONE").toUpperCase(); StringJoiner joiner = new StringJoiner(","); for (Configuration column : columns) { if ("decimal".equals(column.getString(Key.TYPE))) { joiner.add(String.format("%s:%s(%s,%s)", column.getString(Key.NAME), "decimal", column.getInt(Key.PRECISION, Constant.DEFAULT_DECIMAL_MAX_PRECISION), column.getInt(Key.SCALE, Constant.DEFAULT_DECIMAL_MAX_SCALE))); } else if ("date".equalsIgnoreCase(column.getString(Key.TYPE))) { joiner.add(String.format("%s:bigint", column.getString(Key.NAME))); } else { joiner.add(String.format("%s:%s", column.getString(Key.NAME), column.getString(Key.TYPE))); } } TypeDescription schema = TypeDescription.fromString("struct<" + joiner + ">"); try (Writer writer = OrcFile.createWriter(new Path(fileName), OrcFile.writerOptions(conf) .setSchema(schema) .compress(CompressionKind.valueOf(compress)))) { Record record; VectorizedRowBatch batch = schema.createRowBatch(1024); while ((record = lineReceiver.getFromReader()) != null) { int row = batch.size++; setRow(batch, row, record, columns, taskPluginCollector); if (batch.size == batch.getMaxSize()) { writer.addRowBatch(batch); batch.reset(); } } if (batch.size != 0) { writer.addRowBatch(batch); batch.reset(); } } catch (IOException e) { LOG.error("寫檔案檔案[{}]時發生IO例外,請檢查您的網路是否正常!", fileName); Path path = new Path(fileName); deleteDir(path.getParent()); throw DataXException.asDataXException(HdfsWriterErrorCode.Write_FILE_IO_ERROR, e); } } }
HdfsWriter
package com.alibaba.datax.plugin.writer.hdfswriter; import com.alibaba.datax.common.base.Constant; import com.alibaba.datax.common.base.Key; import com.alibaba.datax.common.exception.DataXException; import com.alibaba.datax.common.plugin.RecordReceiver; import com.alibaba.datax.common.spi.Writer; import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.storage.util.FileHelper; import org.apache.commons.io.Charsets; import org.apache.commons.lang3.StringUtils; import com.alibaba.datax.unstructuredstorage.util.ColumnTypeUtil; import com.alibaba.datax.unstructuredstorage.util.HdfsUtil; import org.apache.commons.lang3.Validate; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.orc.CompressionKind; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.schema.MessageTypeParser; import org.apache.parquet.schema.OriginalType; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Types; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.file.Paths; import java.util.*; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; public class HdfsWriter extends Writer { public static class Job extends Writer.Job { private static final Logger LOG = LoggerFactory.getLogger(Job.class); // 寫入檔案的臨時目錄,完成寫入后,該目錄需要洗掉 private String tmpStorePath; private Configuration writerSliceConfig = null; private String defaultFS; private String path; private String fileName; private String writeMode; private HdfsHelper hdfsHelper = null; private FileSystem filsSystem; public static final Set<String> SUPPORT_FORMAT = new HashSet<>(Arrays.asList("ORC", "PARQUET", "TEXT")); @Override public void init() { this.writerSliceConfig = this.getPluginJobConf(); this.validateParameter(); hdfsHelper = new HdfsHelper(); filsSystem = hdfsHelper.getFileSystem(defaultFS, this.writerSliceConfig); } private void validateParameter() { this.defaultFS = this.writerSliceConfig.getNecessaryValue(Key.DEFAULT_FS, HdfsWriterErrorCode.REQUIRED_VALUE); //fileType check String fileType = this.writerSliceConfig.getNecessaryValue(Key.FILE_TYPE, HdfsWriterErrorCode.REQUIRED_VALUE).toUpperCase(); if (!SUPPORT_FORMAT.contains(fileType)) { String message = String.format("[%s] 檔案格式不支持, HdfsWriter插件目前僅支持 %s, ", fileType, SUPPORT_FORMAT); throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, message); } //path this.path = this.writerSliceConfig.getNecessaryValue(Key.PATH, HdfsWriterErrorCode.REQUIRED_VALUE); if (!path.startsWith("/")) { String message = String.format("請檢查引數path:[%s],需要配置為絕對路徑", path); LOG.error(message); throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, message); } if (path.contains("*") || path.contains("?")) { String message = String.format("請檢查引數path:[%s],不能包含*,?等特殊字符", path); LOG.error(message); throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, message); } //fileName this.fileName = this.writerSliceConfig.getNecessaryValue(Key.FILE_NAME, HdfsWriterErrorCode.REQUIRED_VALUE); //columns check List<Configuration> columns = this.writerSliceConfig.getListConfiguration(Key.COLUMN); if (null == columns || columns.isEmpty()) { throw DataXException.asDataXException(HdfsWriterErrorCode.REQUIRED_VALUE, "您需要指定 columns"); } else { boolean rewriteFlag = false; for (int i = 0; i < columns.size(); i++) { Configuration eachColumnConf = columns.get(i); eachColumnConf.getNecessaryValue(Key.NAME, HdfsWriterErrorCode.COLUMN_REQUIRED_VALUE); eachColumnConf.getNecessaryValue(Key.TYPE, HdfsWriterErrorCode.COLUMN_REQUIRED_VALUE); if (eachColumnConf.getString(Key.TYPE).toUpperCase().startsWith("DECIMAL")) { String type = eachColumnConf.getString(Key.TYPE); eachColumnConf.set(Key.TYPE, "decimal"); eachColumnConf.set(Key.PRECISION, getDecimalPrecision(type)); eachColumnConf.set(Key.SCALE, getDecimalScale(type)); columns.set(i, eachColumnConf); rewriteFlag = true; } } if (rewriteFlag) { this.writerSliceConfig.set(Key.COLUMN, columns); } } //writeMode check this.writeMode = this.writerSliceConfig.getNecessaryValue(Key.WRITE_MODE, HdfsWriterErrorCode.REQUIRED_VALUE); if (!Constant.SUPPORTED_WRITE_MODE.contains(writeMode)) { throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, String.format("僅支持append, nonConflict, overwrite三種模式, 不支持您配置的 writeMode 模式 : [%s]", writeMode)); } if ("TEXT".equals(fileType)) { //fieldDelimiter check String fieldDelimiter = this.writerSliceConfig.getString(Key.FIELD_DELIMITER, null); if (StringUtils.isEmpty(fieldDelimiter)) { throw DataXException.asDataXException(HdfsWriterErrorCode.REQUIRED_VALUE, String.format("寫TEXT格式檔案,必須提供有效的[%s] 引數.", Key.FIELD_DELIMITER)); } if (1 != fieldDelimiter.length()) { // warn: if it has, length must be one throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, String.format("僅僅支持單字符切分, 您配置的切分為 : [%s]", fieldDelimiter)); } } //compress check String compress = this.writerSliceConfig.getString(Key.COMPRESS, "NONE").toUpperCase().trim(); if ("ORC".equals(fileType)) { try { CompressionKind.valueOf(compress); } catch (IllegalArgumentException e) { throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, String.format("目前ORC 格式僅支持 %s 壓縮,不支持您配置的 compress 模式 : [%s]", Arrays.toString(CompressionKind.values()), compress)); } } if ("PARQUET".equals(fileType)) { // parquet 默認的非壓縮標志是 UNCOMPRESSED ,而不是常見的 NONE,這里統一為 NONE if ("NONE".equals(compress)) { compress = "UNCOMPRESSED"; } try { CompressionCodecName.fromConf(compress); } catch (Exception e) { throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, String.format("目前PARQUET 格式僅支持 %s 壓縮, 不支持您配置的 compress 模式 : [%s]", Arrays.toString(CompressionCodecName.values()), compress)); } } boolean haveKerberos = this.writerSliceConfig.getBool(Key.HAVE_KERBEROS, false); if (haveKerberos) { this.writerSliceConfig.getNecessaryValue(Key.KERBEROS_KEYTAB_FILE_PATH, HdfsWriterErrorCode.REQUIRED_VALUE); this.writerSliceConfig.getNecessaryValue(Key.KERBEROS_PRINCIPAL, HdfsWriterErrorCode.REQUIRED_VALUE); } // encoding check String encoding = this.writerSliceConfig.getString(Key.ENCODING, Constant.DEFAULT_ENCODING); try { encoding = encoding.trim(); this.writerSliceConfig.set(Key.ENCODING, encoding); Charsets.toCharset(encoding); } catch (Exception e) { throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, String.format("不支持您配置的編碼格式:[%s]", encoding), e); } } public boolean isPathExists(String filePath) { Path path = new Path(filePath); boolean exist; try { exist = hdfsHelper.getFileSystem(this.defaultFS,this.writerSliceConfig).exists(path); } catch (IOException e) { String message = String.format("判斷檔案路徑[%s]是否存在時發生網路IO例外,請檢查您的網路是否正常!", "message:filePath =" + filePath); e.printStackTrace(); LOG.error(message); throw DataXException.asDataXException(HdfsWriterErrorCode.CONNECT_HDFS_IO_ERROR, e); } return exist; } @Override public void prepare() { //臨時存放路徑 this.tmpStorePath = buildTmpFilePath(path); //若路徑已經存在,檢查path是否是目錄 if (isPathExists(path)) { if (!hdfsHelper.isPathDir(path)) { throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, String.format("您配置的path: [%s] 不是一個合法的目錄, 請您注意檔案重名, 不合法目錄名等情況.", path)); } //根據writeMode對目錄下檔案進行處理 // 寫入之前,當前目錄下已有的檔案,根據writeMode判斷是否覆寫 Path[] existFilePaths = hdfsHelper.hdfsDirList(path); boolean isExistFile = existFilePaths.length > 0; if ("append".equals(writeMode)) { LOG.info("由于您配置了writeMode = append, 寫入前不做清理作業, [{}] 目錄下寫入相應檔案名前綴 [{}] 的檔案", path, fileName); } else if ("nonConflict".equals(writeMode) && isExistFile) { throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, String.format("由于您配置了writeMode= nonConflict,但您配置的 path: [%s] 目錄不為空, 下面存在其他檔案或檔案夾: %s", path, String.join(",", Arrays.stream(existFilePaths).map(Path::getName).collect(Collectors.toSet())))); } } else { throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, String.format("您配置的path: [%s] 不存在, 請先創建目錄.", path)); } } @Override public void post() { if ("overwrite".equals(writeMode)) { hdfsHelper.deleteFilesFromDir(new Path(path)); } hdfsHelper.moveFilesToDest(new Path(this.tmpStorePath), new Path(this.path)); // 洗掉臨時目錄 hdfsHelper.deleteDir(new Path(tmpStorePath)); } @Override public void destroy() { hdfsHelper.closeFileSystem(); } @Override public List<Configuration> split(int mandatoryNumber) { LOG.info("begin splitting ..."); List<Configuration> writerSplitConfigs = new ArrayList<>(); String filePrefix = fileName; //獲取該路徑下的所有已有檔案串列 Set<String> allFiles = Arrays.stream(hdfsHelper.hdfsDirList(path)).map(Path::toString).collect(Collectors.toSet()); String fileType = this.writerSliceConfig.getString(Key.FILE_TYPE, "txt").toLowerCase(); String tmpFullFileName; String endFullFileName; for (int i = 0; i < mandatoryNumber; i++) { // handle same file name Configuration splitTaskConfig = this.writerSliceConfig.clone(); tmpFullFileName = String.format("%s/%s_%s.%s", tmpStorePath, filePrefix, FileHelper.generateFileMiddleName(), fileType); endFullFileName = String.format("%s/%s_%s.%s", path, filePrefix, FileHelper.generateFileMiddleName(), fileType); // 如果檔案已經存在,則重新生成檔案名 while (allFiles.contains(endFullFileName)) { tmpFullFileName = String.format("%s/%s_%s.%s", tmpStorePath, filePrefix, FileHelper.generateFileMiddleName(), fileType); endFullFileName = String.format("%s/%s_%s.%s", path, filePrefix, FileHelper.generateFileMiddleName(), fileType); } allFiles.add(endFullFileName); splitTaskConfig.set(Key.FILE_NAME, tmpFullFileName); LOG.info("split wrote file name:[{}]", tmpFullFileName); writerSplitConfigs.add(splitTaskConfig); } LOG.info("end splitting."); return writerSplitConfigs; } /** * 創建臨時目錄 * 在給定目錄的下,創建一個已點開頭,uuid為名字的檔案夾,用于臨時存盤寫入的檔案 * * @param userPath hdfs path * @return temporary path */ private String buildTmpFilePath(String userPath) { String tmpDir; String tmpFilePath; // while (true) { tmpDir = "." + UUID.randomUUID().toString().replace('-', '_'); tmpFilePath = Paths.get(userPath, tmpDir).toString(); if (isPathExists(tmpFilePath)) { return tmpFilePath; } else { return null; } //} } /** * get decimal type precision * if not specified, use DECIMAL_DEFAULT_PRECISION as default * example: * <pre> * decimal -> 38 * decimal(10) -> 10 * </pre> * * @param type decimal type including precision and scale (if present) * @return decimal precision */ private static int getDecimalPrecision(String type) { if (!type.contains("(")) { return Constant.DEFAULT_DECIMAL_MAX_PRECISION; } else { String regEx = "[^0-9]"; Pattern p = Pattern.compile(regEx); Matcher m = p.matcher(type); return Integer.parseInt(m.replaceAll(" ").trim().split(" ")[0]); } } /** * get decimal type scale * if precision is not present, return DECIMAL_DEFAULT_SCALE * if precision is present and not specify scale, return 0 * example: * <pre> * decimal -> 10 * decimal(8) -> 0 * decimal(8,2) -> 2 * </pre> * * @param type decimal type string, including precision and scale (if present) * @return decimal scale */ private static int getDecimalScale(String type) { if (!type.contains("(")) { return Constant.DEFAULT_DECIMAL_MAX_SCALE; } if (!type.contains(",")) { return 0; } else { return Integer.parseInt(type.split(",")[1].replace(")", "").trim()); } } public void unitizeParquetConfig(Configuration writerSliceConfig) { String parquetSchema = writerSliceConfig.getString(Key.PARQUET_SCHEMA); if (StringUtils.isNotBlank(parquetSchema)) { LOG.info("parquetSchema has config. use parquetSchema:\n{}", parquetSchema); return; } List<Configuration> columns = writerSliceConfig.getListConfiguration(Key.COLUMN); if (columns == null || columns.isEmpty()) { throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_BLANK,"parquetSchema or column can't be blank!"); } parquetSchema = generateParquetSchemaFromColumn(columns); // 為了兼容歷史邏輯,對之前的邏輯做保留,但是如果配置的時候報錯,則走新邏輯 try { MessageTypeParser.parseMessageType(parquetSchema); } catch (Throwable e) { LOG.warn("The generated parquetSchema {} is illegal, try to generate parquetSchema in another way", parquetSchema); parquetSchema = HdfsHelper.generateParquetSchemaFromColumnAndType(columns); LOG.info("The last generated parquet schema is {}", parquetSchema); } writerSliceConfig.set(Key.PARQUET_SCHEMA, parquetSchema); LOG.info("DataxParquetMode use default fields."); writerSliceConfig.set(Key.PARQUET_MODE, "fields"); } private String generateParquetSchemaFromColumn(List<Configuration> columns) { StringBuffer parquetSchemaStringBuffer = new StringBuffer(); parquetSchemaStringBuffer.append("message m {"); for (Configuration column: columns) { String name = column.getString("name"); Validate.notNull(name, "column.name can't be null"); String type = column.getString("type"); Validate.notNull(type, "column.type can't be null"); String parquetColumn = String.format("optional %s %s;", type, name); parquetSchemaStringBuffer.append(parquetColumn); } parquetSchemaStringBuffer.append("}"); String parquetSchema = parquetSchemaStringBuffer.toString(); LOG.info("generate parquetSchema:\n{}", parquetSchema); return parquetSchema; } } public static class Task extends Writer.Task { private static final Logger LOG = LoggerFactory.getLogger(Task.class); private Configuration writerSliceConfig; private String fileType; private String fileName; private HdfsHelper hdfsHelper = null; @Override public void init() { this.writerSliceConfig = this.getPluginJobConf(); String defaultFS = this.writerSliceConfig.getString(Key.DEFAULT_FS); this.fileType = this.writerSliceConfig.getString(Key.FILE_TYPE).toUpperCase(); hdfsHelper = new HdfsHelper(); hdfsHelper.getFileSystem(defaultFS, writerSliceConfig); //得當的已經是絕對路徑,eg:/user/hive/warehouse/writer.db/text/test.snappy this.fileName = this.writerSliceConfig.getString(Key.FILE_NAME); } @Override public void prepare() { // } @Override public void startWrite(RecordReceiver lineReceiver) { LOG.info("write to file : [{}]", this.fileName); if ("TEXT".equals(fileType)) { //寫TEXT FILE hdfsHelper.textFileStartWrite(lineReceiver, writerSliceConfig, fileName, getTaskPluginCollector()); } else if ("ORC".equals(fileType)) { //寫ORC FILE hdfsHelper.orcFileStartWrite(lineReceiver, writerSliceConfig, fileName, getTaskPluginCollector()); } else if ("PARQUET".equals(fileType)) { //寫Parquet FILE hdfsHelper.parquetFileStartWrite(lineReceiver, writerSliceConfig, fileName, getTaskPluginCollector()); } LOG.info("end do write"); } @Override public void post() { // } @Override public void destroy() { // } } }
以上類需修改或增加方法,以支持Parquet檔案的讀寫,當前代碼已在生產環境穩定運行一年有余,未遇到報錯問題,大家如有問題可聯系我,
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/543822.html
標籤:其他
上一篇:DataX同步mysql資料報錯 無法連接mysql
下一篇:Mysql存盤引擎
