From 87bddc62ddb9e67716e5ceda4b69c24934522b81 Mon Sep 17 00:00:00 2001 From: wgzhao Date: Mon, 21 Oct 2024 16:59:23 +0800 Subject: [PATCH] [bugfix][plugin][hdfsreader] Fix exception when reading an optional fields with null values in Parquet files This change addresses the issue where attempting to read optional fields with null values in Parquet files would result in exceptions. By checking for field presence, we ensure more reliable data processing --- .../plugin/reader/hdfsreader/DFSUtil.java | 46 +++++++++++-------- 1 file changed, 26 insertions(+), 20 deletions(-) diff --git a/plugin/reader/hdfsreader/src/main/java/com/wgzhao/addax/plugin/reader/hdfsreader/DFSUtil.java b/plugin/reader/hdfsreader/src/main/java/com/wgzhao/addax/plugin/reader/hdfsreader/DFSUtil.java index 7367bb9c0..a37b6b8e6 100644 --- a/plugin/reader/hdfsreader/src/main/java/com/wgzhao/addax/plugin/reader/hdfsreader/DFSUtil.java +++ b/plugin/reader/hdfsreader/src/main/java/com/wgzhao/addax/plugin/reader/hdfsreader/DFSUtil.java @@ -77,6 +77,7 @@ import org.apache.parquet.hadoop.util.HadoopInputFile; import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -427,7 +428,7 @@ record = recordSender.createRecord(); int i = column.getIndex(); String columnType = column.getType().toUpperCase(); ColumnVector col = rowBatch.cols[i]; - Type type = Type.valueOf(columnType); + JavaType type = JavaType.valueOf(columnType); if (col.isNull[row]) { record.addColumn(new StringColumn(null)); continue; @@ -494,23 +495,23 @@ public void parquetFileStartRead(String sourceParquetFilePath, Configuration rea .withConf(conf) .build()) { Group group = reader.read(); + MessageType schema = ParquetFileReader.open(HadoopInputFile.fromPath(parquetFilePath, hadoopConf)).getFileMetaData().getSchema(); if (null == column || column.isEmpty()) { - MessageType schema = ParquetFileReader.open(HadoopInputFile.fromPath(parquetFilePath, hadoopConf)).getFooter().getFileMetaData().getSchema(); + List fields = schema.getFields(); column = new ArrayList<>(fields.size()); - String sType; // 用户没有填写具体的字段信息,需要从parquet文件构建 for (int i = 0; i < schema.getFields().size(); i++) { ColumnEntry columnEntry = new ColumnEntry(); columnEntry.setIndex(i); - columnEntry.setType(getJavaType(fields.get(i))); + columnEntry.setType(getJavaType(fields.get(i)).name()); column.add(columnEntry); } } while (group != null) { - transportParquetRecord(column, group, recordSender, taskPluginCollector, nullFormat); + transportParquetRecord(column, group, schema.getFields(), recordSender, taskPluginCollector, nullFormat); group = reader.read(); } } @@ -521,36 +522,34 @@ public void parquetFileStartRead(String sourceParquetFilePath, Configuration rea } } - private static String getJavaType(org.apache.parquet.schema.Type field) + private static JavaType getJavaType(Type field) { if (field.isPrimitive()) { switch (field.asPrimitiveType().getPrimitiveTypeName()) { - case BINARY: - return Type.BINARY.name(); case INT32: - return Type.INT.name(); + return JavaType.INT; case INT64: - return Type.LONG.name(); + return JavaType.LONG; case INT96: - return Type.TIMESTAMP.name(); + return JavaType.TIMESTAMP; case FLOAT: - return Type.FLOAT.name(); + return JavaType.FLOAT; case DOUBLE: - return Type.DOUBLE.name(); + return JavaType.DOUBLE; case BOOLEAN: - return Type.BOOLEAN.name(); + return JavaType.BOOLEAN; case FIXED_LEN_BYTE_ARRAY: - return Type.BINARY.name(); + return JavaType.BINARY; default: - return Type.STRING.name(); + return JavaType.STRING; //Binary as string } } else { - return Type.STRING.name(); + return JavaType.STRING; } } - private void transportParquetRecord(List columnConfigs, Group gRecord, RecordSender recordSender, + private void transportParquetRecord(List columnConfigs, Group gRecord, List fields, RecordSender recordSender, TaskPluginCollector taskPluginCollector, String nullFormat) { Record record = recordSender.createRecord(); @@ -566,7 +565,14 @@ private void transportParquetRecord(List columnConfigs, Group gReco record.addColumn(new StringColumn(columnConst)); continue; } - Type type = Type.valueOf(columnType.toUpperCase()); + + if (fields.get(columnIndex).getRepetition() == Type.Repetition.OPTIONAL + && gRecord.getFieldRepetitionCount(columnIndex)== 0) { + record.addColumn(new StringColumn(nullFormat)); + continue; + } + + JavaType type = JavaType.valueOf(columnType.toUpperCase()); try { switch (type) { case STRING: @@ -873,7 +879,7 @@ private static long julianDayToMillis(int julianDay) return (julianDay - JULIAN_EPOCH_OFFSET_DAYS) * MILLIS_IN_DAY; } - private enum Type + private enum JavaType { TINYINT, SMALLINT,