Skip to content

Commit

Permalink
[bugfix][plugin][hdfsreader] Fix exception when reading an optional f…
Browse files Browse the repository at this point in the history
…ields with null values in Parquet files

This change addresses the issue where attempting to read optional fields with null values in Parquet files would result in exceptions. By checking for field presence, we ensure more reliable data processing
  • Loading branch information
wgzhao committed Oct 21, 2024
1 parent 2ad2a5b commit 87bddc6
Showing 1 changed file with 26 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -427,7 +428,7 @@ record = recordSender.createRecord();
int i = column.getIndex();
String columnType = column.getType().toUpperCase();
ColumnVector col = rowBatch.cols[i];
Type type = Type.valueOf(columnType);
JavaType type = JavaType.valueOf(columnType);
if (col.isNull[row]) {
record.addColumn(new StringColumn(null));
continue;
Expand Down Expand Up @@ -494,23 +495,23 @@ public void parquetFileStartRead(String sourceParquetFilePath, Configuration rea
.withConf(conf)
.build()) {
Group group = reader.read();
MessageType schema = ParquetFileReader.open(HadoopInputFile.fromPath(parquetFilePath, hadoopConf)).getFileMetaData().getSchema();

if (null == column || column.isEmpty()) {
MessageType schema = ParquetFileReader.open(HadoopInputFile.fromPath(parquetFilePath, hadoopConf)).getFooter().getFileMetaData().getSchema();

List<org.apache.parquet.schema.Type> fields = schema.getFields();
column = new ArrayList<>(fields.size());

String sType;
// 用户没有填写具体的字段信息,需要从parquet文件构建
for (int i = 0; i < schema.getFields().size(); i++) {
ColumnEntry columnEntry = new ColumnEntry();
columnEntry.setIndex(i);
columnEntry.setType(getJavaType(fields.get(i)));
columnEntry.setType(getJavaType(fields.get(i)).name());
column.add(columnEntry);
}
}
while (group != null) {
transportParquetRecord(column, group, recordSender, taskPluginCollector, nullFormat);
transportParquetRecord(column, group, schema.getFields(), recordSender, taskPluginCollector, nullFormat);
group = reader.read();
}
}
Expand All @@ -521,36 +522,34 @@ public void parquetFileStartRead(String sourceParquetFilePath, Configuration rea
}
}

private static String getJavaType(org.apache.parquet.schema.Type field)
private static JavaType getJavaType(Type field)
{
if (field.isPrimitive()) {
switch (field.asPrimitiveType().getPrimitiveTypeName()) {
case BINARY:
return Type.BINARY.name();
case INT32:
return Type.INT.name();
return JavaType.INT;
case INT64:
return Type.LONG.name();
return JavaType.LONG;
case INT96:
return Type.TIMESTAMP.name();
return JavaType.TIMESTAMP;
case FLOAT:
return Type.FLOAT.name();
return JavaType.FLOAT;
case DOUBLE:
return Type.DOUBLE.name();
return JavaType.DOUBLE;
case BOOLEAN:
return Type.BOOLEAN.name();
return JavaType.BOOLEAN;
case FIXED_LEN_BYTE_ARRAY:
return Type.BINARY.name();
return JavaType.BINARY;
default:
return Type.STRING.name();
return JavaType.STRING; //Binary as string
}
}
else {
return Type.STRING.name();
return JavaType.STRING;
}
}

private void transportParquetRecord(List<ColumnEntry> columnConfigs, Group gRecord, RecordSender recordSender,
private void transportParquetRecord(List<ColumnEntry> columnConfigs, Group gRecord, List<Type> fields, RecordSender recordSender,
TaskPluginCollector taskPluginCollector, String nullFormat)
{
Record record = recordSender.createRecord();
Expand All @@ -566,7 +565,14 @@ private void transportParquetRecord(List<ColumnEntry> columnConfigs, Group gReco
record.addColumn(new StringColumn(columnConst));
continue;
}
Type type = Type.valueOf(columnType.toUpperCase());

if (fields.get(columnIndex).getRepetition() == Type.Repetition.OPTIONAL
&& gRecord.getFieldRepetitionCount(columnIndex)== 0) {
record.addColumn(new StringColumn(nullFormat));
continue;
}

JavaType type = JavaType.valueOf(columnType.toUpperCase());
try {
switch (type) {
case STRING:
Expand Down Expand Up @@ -873,7 +879,7 @@ private static long julianDayToMillis(int julianDay)
return (julianDay - JULIAN_EPOCH_OFFSET_DAYS) * MILLIS_IN_DAY;
}

private enum Type
private enum JavaType
{
TINYINT,
SMALLINT,
Expand Down

0 comments on commit 87bddc6

Please sign in to comment.