diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java index 04ffc98e84055..a6b59a5fb36c7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java @@ -456,6 +456,16 @@ public Option getInsertValue(Schema schema) throws IOException { return getInsertValue(schema, new Properties()); } + public Option toHoodieMetadataRecord() { + if (key == null || this.isDeletedRecord) { + return Option.empty(); + } + + HoodieMetadataRecord record = new HoodieMetadataRecord(key, type, filesystemMetadata, bloomFilterMetadata, + columnStatMetadata, recordIndexMetadata); + return Option.of(record); + } + /** * Returns the list of filenames added as part of this record. */ diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndices.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndices.java index 0593187660317..e41fc40437cec 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndices.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsIndices.java @@ -27,7 +27,6 @@ import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.common.util.collection.Tuple3; import org.apache.hudi.common.util.hash.ColumnIndexID; -import org.apache.hudi.exception.HoodieException; import org.apache.hudi.metadata.HoodieMetadataPayload; import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.metadata.HoodieTableMetadataUtil; @@ -43,7 +42,6 @@ import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; -import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -300,12 +298,7 @@ private static List readColumnStatsIndexByColumns( return records.collectAsList().stream().parallel().map(record -> { // schema and props are ignored for generating metadata record from the payload // instead, the underlying file system, or bloom filter, or columns stats metadata (part of payload) are directly used - GenericRecord genericRecord; - try { - genericRecord = (GenericRecord) record.getData().getInsertValue(null, null).orElse(null); - } catch (IOException e) { - throw new HoodieException("Exception while getting insert value from metadata payload"); - } + GenericRecord genericRecord = (GenericRecord) record.getData().toHoodieMetadataRecord().orElse(null); return (RowData) converter.convert(genericRecord); } ).collect(Collectors.toList()); diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala index dd76aee2f187b..a82b4857b590c 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala @@ -325,7 +325,7 @@ class ColumnStatsIndexSupport(spark: SparkSession, val columnStatsRecords: HoodieData[HoodieMetadataColumnStats] = // NOTE: Explicit conversion is required for Scala 2.11 metadataRecords.map(JFunction.toJavaSerializableFunction(record => { - toScalaOption(record.getData.getInsertValue(null, null)) + toScalaOption(record.getData.toHoodieMetadataRecord) .map(metadataRecord => metadataRecord.asInstanceOf[HoodieMetadataRecord].getColumnStatsMetadata) .orNull }))