From ce8cfbf8b83af05c782469856d2ad2d488c58846 Mon Sep 17 00:00:00 2001 From: wgzhao Date: Tue, 29 Jun 2021 17:08:35 +0800 Subject: [PATCH] plugin hdfsreader: simplify parquet file reader codes --- .../plugin/reader/hdfsreader/DFSUtil.java | 310 ++++++++++-------- 1 file changed, 169 insertions(+), 141 deletions(-) diff --git a/plugin/reader/hdfsreader/src/main/java/com/wgzhao/datax/plugin/reader/hdfsreader/DFSUtil.java b/plugin/reader/hdfsreader/src/main/java/com/wgzhao/datax/plugin/reader/hdfsreader/DFSUtil.java index a43e355c3..7a86fa71f 100644 --- a/plugin/reader/hdfsreader/src/main/java/com/wgzhao/datax/plugin/reader/hdfsreader/DFSUtil.java +++ b/plugin/reader/hdfsreader/src/main/java/com/wgzhao/datax/plugin/reader/hdfsreader/DFSUtil.java @@ -46,6 +46,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; @@ -74,6 +75,7 @@ import java.io.IOException; import java.io.InputStream; +import java.math.BigDecimal; import java.nio.ByteBuffer; import java.text.SimpleDateFormat; import java.util.ArrayList; @@ -356,31 +358,23 @@ public void orcFileStartRead(String sourceOrcFilePath, Configuration readerSlice String nullFormat = readerSliceConfig.getString(NULL_FORMAT); try { - // 要读取的字段索引,存在读取指定的字段而不是全部 - List readColumnIndex = new ArrayList<>(); - Path orcFilePath = new Path(sourceOrcFilePath); Reader reader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(hadoopConf)); TypeDescription schema = reader.getSchema(); - // 判断是否读取所有列 - // 1. 没有配置 column 字段 - // 2. column 配置为空 list - // 3. column 配置为 ["*"] - if (null == column || column.isEmpty() || (column.size() == 1 && "*".equals(column.get(0).getValue()))) { + assert column != null; + if (column.isEmpty()) { for (int i = 0; i < schema.getChildren().size(); i++) { - readColumnIndex.add(i); - } - } - else { - for (ColumnEntry col : column) { - readColumnIndex.add(col.getIndex()); + ColumnEntry columnEntry = new ColumnEntry(); + columnEntry.setIndex(i); + columnEntry.setType(schema.getChildren().get(i).getCategory().getName()); + column.add(columnEntry); } } VectorizedRowBatch rowBatch = schema.createRowBatch(1024); org.apache.orc.RecordReader rowIterator = reader.rows(reader.options().schema(schema)); while (rowIterator.nextBatch(rowBatch)) { - transportOrcRecord(rowBatch, readColumnIndex, column, recordSender, taskPluginCollector, nullFormat); + transportOrcRecord(rowBatch, column, recordSender, taskPluginCollector, nullFormat); } } catch (Exception e) { @@ -391,23 +385,35 @@ public void orcFileStartRead(String sourceOrcFilePath, Configuration readerSlice } } - private void transportOrcRecord(VectorizedRowBatch rowBatch, List columnIndex, List columns, RecordSender recordSender, - TaskPluginCollector taskPluginCollector, String nullFormat) + private void transportOrcRecord(VectorizedRowBatch rowBatch, List columns, RecordSender recordSender, TaskPluginCollector taskPluginCollector, String nullFormat) { Record record; for (int row = 0; row < rowBatch.size; row++) { record = recordSender.createRecord(); try { - for (int i : columnIndex) { + for (ColumnEntry column : columns) { + Column columnGenerated; + if (column.getValue() != null) { + if (!"null".equals(column.getValue())) { + columnGenerated = new StringColumn(column.getValue()); + } + else { + columnGenerated = null; + } + record.addColumn(columnGenerated); + continue; + } + int i = column.getIndex(); + String columnType = column.getType().toUpperCase(); ColumnVector col = rowBatch.cols[i]; - String columnType = columns.get(i).getType().toUpperCase(); Type type = Type.valueOf(columnType); if (col.isNull[row]) { record.addColumn(null); continue; } switch (type) { + case INT: case LONG: case DATE: case BOOLEAN: @@ -416,8 +422,8 @@ record = recordSender.createRecord(); case DOUBLE: columnGenerated = new DoubleColumn(((DoubleColumnVector) col).vector[row]); break; - case STRING: - columnGenerated = new StringColumn(((BytesColumnVector) col).toString(row)); + case DECIMAL: + columnGenerated = new DoubleColumn(((DecimalColumnVector) col).vector[row].doubleValue()); break; case BINARY: BytesColumnVector b = (BytesColumnVector) col; @@ -425,10 +431,12 @@ record = recordSender.createRecord(); columnGenerated = new BytesColumn(val); break; case TIMESTAMP: - columnGenerated = new DateColumn(((TimestampColumnVector) col).getTimestampAsLong(row) * 1000); + columnGenerated = new DateColumn(((TimestampColumnVector) col).getTime(row)); break; default: - columnGenerated = new StringColumn(((BytesColumnVector) col).toString(row)); + // type is string or other + String v = ((BytesColumnVector) col).toString(row); + columnGenerated = v.equals(nullFormat) ? null : new StringColumn(v); break; } record.addColumn(columnGenerated); @@ -448,49 +456,58 @@ public void parquetFileStartRead(String sourceParquestFilePath, Configuration re RecordSender recordSender, TaskPluginCollector taskPluginCollector) { LOG.info("Start Read orcfile [{}].", sourceParquestFilePath); - List column = StorageReaderUtil - .getListColumnEntry(readerSliceConfig, COLUMN); + List column = StorageReaderUtil.getListColumnEntry(readerSliceConfig, COLUMN); String nullFormat = readerSliceConfig.getString(NULL_FORMAT); - boolean isReadAllColumns = false; - int columnIndexMax; + Path parquetFilePath = new Path(sourceParquestFilePath); + boolean isReadAllColumns = null == column || column.isEmpty(); // 判断是否读取所有列 - if (null == column || column.isEmpty()) { - int allColumnsCount = getAllColumnsCount(sourceParquestFilePath); - columnIndexMax = allColumnsCount - 1; - isReadAllColumns = true; - } - else { - columnIndexMax = getMaxIndex(column); - } - if (columnIndexMax >= 0) { - JobConf conf = new JobConf(hadoopConf); - Path parquetFilePath = new Path(sourceParquestFilePath); -// InputFile inputFile = - GenericData decimalSupport = new GenericData(); - decimalSupport.addLogicalTypeConversion(new Conversions.DecimalConversion()); - try (ParquetReader reader = AvroParquetReader - .builder(parquetFilePath) - .withDataModel(decimalSupport) - .withConf(conf) - .build()) { - GenericData.Record gRecord = reader.read(); - Schema schema = gRecord.getSchema(); - while (gRecord != null) { - transportOneRecord(column, gRecord, schema, recordSender, taskPluginCollector, isReadAllColumns, nullFormat - ); - gRecord = reader.read(); + + JobConf conf = new JobConf(hadoopConf); + + GenericData decimalSupport = new GenericData(); + decimalSupport.addLogicalTypeConversion(new Conversions.DecimalConversion()); + try (ParquetReader reader = AvroParquetReader + .builder(parquetFilePath) + .withDataModel(decimalSupport) + .withConf(conf) + .build()) { + GenericData.Record gRecord = reader.read(); + Schema schema = gRecord.getSchema(); + + if (null == column || column.isEmpty()) { + column = new ArrayList<>(schema.getFields().size()); + String stype; + // 用户没有填写具体的字段信息,需要从parquet文件构建 + for (int i = 0; i < schema.getFields().size(); i++) { + ColumnEntry columnEntry = new ColumnEntry(); + columnEntry.setIndex(i); + Schema type; + if (schema.getFields().get(i).schema().getType() == Schema.Type.UNION) { + type = schema.getFields().get(i).schema().getTypes().get(1); + } + else { + type = schema.getFields().get(i).schema(); + } + stype = type.getProp("logicalType") != null ? type.getProp("logicalType") : type.getType().getName(); + if (stype.startsWith("timestamp")) { + columnEntry.setType("timestamp"); + } else{ + columnEntry.setType(stype); + } + column.add(columnEntry); } } - catch (IOException e) { - String message = String.format("从parquetfile文件路径[%s]中读取数据发生异常,请联系系统管理员。" - , sourceParquestFilePath); - LOG.error(message); - throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message); + while (gRecord != null) { + transportOneRecord(column, gRecord, schema, recordSender, taskPluginCollector, isReadAllColumns, nullFormat + ); + gRecord = reader.read(); } } - else { - String message = String.format("请确认您所读取的列配置正确!columnIndexMax 小于0,column:%s", JSON.toJSONString(column)); - throw DataXException.asDataXException(HdfsReaderErrorCode.BAD_CONFIG_VALUE, message); + catch (IOException e) { + String message = String.format("从parquetfile文件路径[%s]中读取数据发生异常,请联系系统管理员。" + , sourceParquestFilePath); + LOG.error(message); + throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message); } } @@ -504,95 +521,106 @@ private void transportOneRecord(List columnConfigs, GenericData.Rec { Record record = recordSender.createRecord(); Column columnGenerated; + int scale = 10; try { - if (isReadAllColumns) { - for (int i = 0; i < schema.getFields().size(); i++) { - record.addColumn(new StringColumn((String) gRecord.get(i))); - } - } - else { - for (ColumnEntry columnEntry : columnConfigs) { - String columnType = columnEntry.getType(); - Integer columnIndex = columnEntry.getIndex(); - String columnConst = columnEntry.getValue(); - String columnValue = null; - if (null != columnIndex) { - if (null != gRecord.get(columnIndex)) { - columnValue = gRecord.get(columnIndex).toString(); - } else { - record.addColumn(null); - continue; - } + for (ColumnEntry columnEntry : columnConfigs) { + String columnType = columnEntry.getType(); + Integer columnIndex = columnEntry.getIndex(); + String columnConst = columnEntry.getValue(); + String columnValue = null; + if (null != columnIndex) { + if (null != gRecord.get(columnIndex)) { + columnValue = gRecord.get(columnIndex).toString(); } else { - columnValue = columnConst; + record.addColumn(null); + continue; + } + } + else { + columnValue = columnConst; + } + if (columnType.startsWith("decimal(")) { + String ps = columnType.replace("decimal(", "").replace(")", ""); + columnType = "decimal"; + if (ps.contains(",")) { + scale = Integer.parseInt(ps.split(",")[1].trim()); } - Type type = Type.valueOf(columnType.toUpperCase()); - if (StringUtils.equals(columnValue, nullFormat)) { - columnValue = null; + else { + scale = 0; } - try { - switch (type) { - case STRING: - columnGenerated = new StringColumn(columnValue); - break; - case LONG: - columnGenerated = new LongColumn(columnValue); - break; - case DOUBLE: - columnGenerated = new DoubleColumn(columnValue); - break; - case BOOLEAN: - columnGenerated = new BoolColumn(columnValue); - break; - case DATE: - if (columnValue == null) { - columnGenerated = new DateColumn((Date) null); + } + Type type = Type.valueOf(columnType.toUpperCase()); + if (StringUtils.equals(columnValue, nullFormat)) { + columnValue = null; + } + try { + switch (type) { + case STRING: + columnGenerated = new StringColumn(columnValue); + break; + case INT: + case LONG: + columnGenerated = new LongColumn(columnValue); + break; + case DOUBLE: + columnGenerated = new DoubleColumn(columnValue); + break; + case DECIMAL: + columnGenerated = new DoubleColumn(new BigDecimal(columnValue).setScale(scale, BigDecimal.ROUND_HALF_UP)); + break; + case BOOLEAN: + columnGenerated = new BoolColumn(columnValue); + break; + case DATE: + if (columnValue == null) { + columnGenerated = new DateColumn((Date) null); + } + else { + String formatString = columnEntry.getFormat(); + if (StringUtils.isNotBlank(formatString)) { + // 用户自己配置的格式转换 + SimpleDateFormat format = new SimpleDateFormat( + formatString); + columnGenerated = new DateColumn( + format.parse(columnValue)); } else { - String formatString = columnEntry.getFormat(); - if (StringUtils.isNotBlank(formatString)) { - // 用户自己配置的格式转换 - SimpleDateFormat format = new SimpleDateFormat( - formatString); - columnGenerated = new DateColumn( - format.parse(columnValue)); - } - else { - // 框架尝试转换 - columnGenerated = new DateColumn( - new StringColumn(columnValue) - .asDate()); - } - } - break; - case TIMESTAMP: - if ( null == columnValue) { - columnGenerated = null; - } else { - columnGenerated = new DateColumn(Long.parseLong(columnValue) * 1000); + // 框架尝试转换 + columnGenerated = new DateColumn( + new StringColumn(columnValue) + .asDate()); } - break; - case BINARY: - columnGenerated = new BytesColumn(((ByteBuffer)gRecord.get(columnIndex)).array()); - break; - default: - String errorMessage = String.format( - "您配置的列类型暂不支持 : [%s]", columnType); - LOG.error(errorMessage); - throw DataXException - .asDataXException( - StorageReaderErrorCode.NOT_SUPPORT_TYPE, - errorMessage); - } - } - catch (Exception e) { - throw new IllegalArgumentException(String.format( - "类型转换错误, 无法将[%s] 转换为[%s], %s", columnValue, type, e)); + } + break; + case TIMESTAMP: + if (null == columnValue) { + columnGenerated = null; + } + else { + columnGenerated = new DateColumn(Long.parseLong(columnValue) * 1000); + } + break; + case BINARY: + columnGenerated = new BytesColumn(((ByteBuffer) gRecord.get(columnIndex)).array()); + break; + default: + String errorMessage = String.format( + "您配置的列类型暂不支持 : [%s]", columnType); + LOG.error(errorMessage); + throw DataXException + .asDataXException( + StorageReaderErrorCode.NOT_SUPPORT_TYPE, + errorMessage); } - record.addColumn(columnGenerated); - } // end for - } // end else + } + catch (Exception e) { + throw new IllegalArgumentException(String.format( + "类型转换错误, 无法将[%s] 转换为[%s], %s", columnValue, type, e)); + } + record.addColumn(columnGenerated); + } // end for + recordSender.sendToWriter(record); } catch (IllegalArgumentException | IndexOutOfBoundsException iae) { @@ -830,6 +858,6 @@ private boolean isParquetFile(Path file, FSDataInputStream in) private enum Type { - STRING, LONG, BOOLEAN, DOUBLE, DATE, BINARY, TIMESTAMP, + INT, STRING, LONG, BOOLEAN, DOUBLE, DATE, BINARY, TIMESTAMP, DECIMAL, } }