diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java index 5c9bc862f9253..336d8ad47b067 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java @@ -233,8 +233,11 @@ public HoodieMetadataPayload(Option recordOpt) { columnStatMetadata = HoodieMetadataColumnStats.newBuilder(METADATA_COLUMN_STATS_BUILDER_STUB.get()) .setFileName((String) columnStatsRecord.get(COLUMN_STATS_FIELD_FILE_NAME)) .setColumnName((String) columnStatsRecord.get(COLUMN_STATS_FIELD_COLUMN_NAME)) - .setMinValue(columnStatsRecord.get(COLUMN_STATS_FIELD_MIN_VALUE)) - .setMaxValue(columnStatsRecord.get(COLUMN_STATS_FIELD_MAX_VALUE)) + // AVRO-2377 1.9.2 Modified the type of org.apache.avro.Schema#FIELD_RESERVED to Collections.unmodifiableSet. + // This causes Kryo to fail when deserializing a GenericRecord, See HUDI-5484. + // We should avoid using GenericRecord and convert GenericRecord into a serializable type. + .setMinValue(wrapStatisticValue(unwrapStatisticValueWrapper(columnStatsRecord.get(COLUMN_STATS_FIELD_MIN_VALUE)))) + .setMaxValue(wrapStatisticValue(unwrapStatisticValueWrapper(columnStatsRecord.get(COLUMN_STATS_FIELD_MAX_VALUE)))) .setValueCount((Long) columnStatsRecord.get(COLUMN_STATS_FIELD_VALUE_COUNT)) .setNullCount((Long) columnStatsRecord.get(COLUMN_STATS_FIELD_NULL_COUNT)) .setTotalSize((Long) columnStatsRecord.get(COLUMN_STATS_FIELD_TOTAL_SIZE)) diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java index 26786e08bbc6c..d5a99dfc1446a 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java @@ -22,15 +22,18 @@ import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.common.config.HoodieStorageConfig; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieColumnRangeMetadata; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.SerializationUtils; import org.apache.hudi.common.util.collection.ImmutablePair; import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner; +import org.apache.hudi.metadata.HoodieMetadataPayload; import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.avro.Conversions; @@ -39,6 +42,7 @@ import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericFixed; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; @@ -59,6 +63,7 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import java.io.IOException; import java.math.BigDecimal; import java.time.LocalDate; import java.util.ArrayList; @@ -323,4 +328,28 @@ private static Stream testAutoModifyParquetWriteLegacyFormatParameter {false, true, true}, {false, false, false} }).map(Arguments::of); } + + @Test + public void testSerHoodieMetadataPayload() throws IOException { + String partitionPath = "2022/10/01"; + String fileName = "file.parquet"; + String targetColName = "c1"; + + HoodieColumnRangeMetadata columnStatsRecord = + HoodieColumnRangeMetadata.create(fileName, targetColName, 0, 500, 0, 100, 12345, 12345); + + HoodieRecord hoodieMetadataPayload = + HoodieMetadataPayload.createColumnStatsRecords(partitionPath, Collections.singletonList(columnStatsRecord), false) + .findFirst().get(); + + IndexedRecord record = hoodieMetadataPayload.getData().getInsertValue(null).get(); + byte[] recordToBytes = HoodieAvroUtils.indexedRecordToBytes(record); + GenericRecord genericRecord = HoodieAvroUtils.bytesToAvro(recordToBytes, record.getSchema()); + + HoodieMetadataPayload genericRecordHoodieMetadataPayload = new HoodieMetadataPayload(Option.of(genericRecord)); + byte[] bytes = SerializationUtils.serialize(genericRecordHoodieMetadataPayload); + HoodieMetadataPayload deserGenericRecordHoodieMetadataPayload = SerializationUtils.deserialize(bytes); + + assertEquals(genericRecordHoodieMetadataPayload, deserGenericRecordHoodieMetadataPayload); + } }