Skip to content

Commit

Permalink
[improve][plugin][hdfsreader] Handle data type mismatch by converting…
Browse files Browse the repository at this point in the history
… to string when expected type is not met
  • Loading branch information
wgzhao committed Sep 19, 2024
1 parent 216b08d commit ceefb94
Showing 1 changed file with 7 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -546,11 +546,6 @@ private static String getJavaType(org.apache.parquet.schema.Type field)
}
}

/*
* create a transport record for Parquet file
*
*
*/
private void transportParquetRecord(List<ColumnEntry> columnConfigs, Group gRecord, RecordSender recordSender,
TaskPluginCollector taskPluginCollector, String nullFormat)
{
Expand All @@ -568,7 +563,6 @@ private void transportParquetRecord(List<ColumnEntry> columnConfigs, Group gReco
continue;
}
Type type = Type.valueOf(columnType.toUpperCase());

try {
switch (type) {
case STRING:
Expand Down Expand Up @@ -608,10 +602,8 @@ private void transportParquetRecord(List<ColumnEntry> columnConfigs, Group gReco
String formatString = columnEntry.getFormat();
if (StringUtils.isNotBlank(formatString)) {
// 用户自己配置的格式转换
SimpleDateFormat format = new SimpleDateFormat(
formatString);
columnGenerated = new DateColumn(
format.parse(columnValue));
SimpleDateFormat format = new SimpleDateFormat(formatString);
columnGenerated = new DateColumn(format.parse(columnValue));
}
else {
// 框架尝试转换
Expand All @@ -627,14 +619,14 @@ private void transportParquetRecord(List<ColumnEntry> columnConfigs, Group gReco
columnGenerated = new BytesColumn(gRecord.getBinary(columnIndex, 0).getBytes());
break;
default:
String errorMessage = String.format("The column type [%s] is unsupported.", columnType);
LOG.error(errorMessage);
throw AddaxException.asAddaxException(StorageReaderErrorCode.NOT_SUPPORT_TYPE, errorMessage);
// try to convert it to string
LOG.debug("try to convert column type {} to String, ", columnType);
columnGenerated = new StringColumn(gRecord.getString(columnIndex, 0));
}
}
catch (Exception e) {
throw new IllegalArgumentException(String.format(
"类型转换错误, 无法将[%s] 转换为[%s], %s", gRecord.getString(columnIndex, 0), type, e));
"Can not convert column type %s to %s: %s", columnType, type, e));
}
record.addColumn(columnGenerated);
} // end for
Expand All @@ -648,7 +640,7 @@ private void transportParquetRecord(List<ColumnEntry> columnConfigs, Group gReco
if (e instanceof AddaxException) {
throw (AddaxException) e;
}
// 每一种转换失败都是脏数据处理,包括数字格式 & 日期格式
// cast failed means dirty data, including number format, date format, etc.
taskPluginCollector.collectDirtyRecord(record, e.getMessage());
}
}
Expand All @@ -658,7 +650,6 @@ private TypeDescription getOrcSchema(String filePath)
Path path = new Path(filePath);
try {
Reader reader = OrcFile.createReader(path, OrcFile.readerOptions(hadoopConf));
// return reader.getTypes().get(0).getSubtypesCount()
return reader.getSchema();
}
catch (IOException e) {
Expand Down

0 comments on commit ceefb94

Please sign in to comment.