diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/DataTypeUtils.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/DataTypeUtils.java index 1f4e8cc1dc8c2..3ff7e1055cf7c 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/DataTypeUtils.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/util/DataTypeUtils.java @@ -126,24 +126,23 @@ private static HashSet newHashSet(T... ts) { } /** - * Try to find current sparktype whether contains that DecimalType which's scale < Decimal.MAX_LONG_DIGITS(). - * - * @param sparkType spark schema. - * @return found result. + * Checks whether provided {@link DataType} contains {@link DecimalType} whose scale is less than + * {@link Decimal#MAX_LONG_DIGITS()} */ - public static boolean foundSmallPrecisionDecimalType(DataType sparkType) { + public static boolean hasSmallPrecisionDecimalType(DataType sparkType) { if (sparkType instanceof StructType) { StructField[] fields = ((StructType) sparkType).fields(); - return Arrays.stream(fields).anyMatch(f -> foundSmallPrecisionDecimalType(f.dataType())); + return Arrays.stream(fields).anyMatch(f -> hasSmallPrecisionDecimalType(f.dataType())); } else if (sparkType instanceof MapType) { MapType map = (MapType) sparkType; - return foundSmallPrecisionDecimalType(map.keyType()) || foundSmallPrecisionDecimalType(map.valueType()); + return hasSmallPrecisionDecimalType(map.keyType()) || hasSmallPrecisionDecimalType(map.valueType()); } else if (sparkType instanceof ArrayType) { - return foundSmallPrecisionDecimalType(((ArrayType) sparkType).elementType()); + return hasSmallPrecisionDecimalType(((ArrayType) sparkType).elementType()); } else if (sparkType instanceof DecimalType) { DecimalType decimalType = (DecimalType) sparkType; return decimalType.precision() < Decimal.MAX_LONG_DIGITS(); } + return false; } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java index 4042f431d7d56..c5b18576999d4 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java @@ -18,6 +18,9 @@ package org.apache.hudi; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hudi.client.HoodieReadClient; import org.apache.hudi.client.HoodieWriteResult; import org.apache.hudi.client.SparkRDDWriteClient; @@ -45,10 +48,6 @@ import org.apache.hudi.sync.common.HoodieSyncConfig; import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.util.DataTypeUtils; - -import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; @@ -327,14 +326,39 @@ public static HiveSyncConfig buildHiveSyncConfig(TypedProperties props, String b return hiveSyncConfig; } - // Now by default ParquetWriteSupport will write DecimalType to parquet as int32/int64 when the scale of decimalType < Decimal.MAX_LONG_DIGITS(), - // but AvroParquetReader which used by HoodieParquetReader cannot support read int32/int64 as DecimalType. - // try to find current schema whether contains that DecimalType, and auto set the value of "hoodie.parquet.writelegacyformat.enabled" - public static void mayBeOverwriteParquetWriteLegacyFormatProp(Map properties, StructType schema) { - if (DataTypeUtils.foundSmallPrecisionDecimalType(schema) - && !Boolean.parseBoolean(properties.getOrDefault(HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED.key(), "false"))) { + + /** + * Checks whether default value (false) of "hoodie.parquet.writelegacyformat.enabled" should be + * overridden in case: + * + *
    + *
  • Property has not been explicitly set by the writer
  • + *
  • Data schema contains {@code DecimalType} that would be affected by it
  • + *
+ * + * If both of the aforementioned conditions are true, will override the default value of the config + * (by essentially setting the value) to make sure that the produced Parquet data files could be + * read by {@code AvroParquetReader} + * + * @param properties properties specified by the writer + * @param schema schema of the dataset being written + */ + public static void tryOverrideParquetWriteLegacyFormatProperty(Map properties, StructType schema) { + if (DataTypeUtils.hasSmallPrecisionDecimalType(schema) + && properties.get(HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED.key()) == null) { + // ParquetWriteSupport writes DecimalType to parquet as INT32/INT64 when the scale of decimalType + // is less than {@code Decimal.MAX_LONG_DIGITS}, but {@code AvroParquetReader} which is used by + // {@code HoodieParquetReader} does not support DecimalType encoded as INT32/INT64 as. + // + // To work this problem around we're checking whether + // - Schema contains any decimals that could be encoded as INT32/INT64 + // - {@code HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED} has not been explicitly + // set by the writer + // + // If both of these conditions are true, than we override the default value of {@code + // HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED} and set it to "true" + LOG.warn("Small Decimal Type found in the persisted schema, reverting default value of 'hoodie.parquet.writelegacyformat.enabled' to true"); properties.put(HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED.key(), "true"); - LOG.warn("Small Decimal Type found in current schema, auto set the value of hoodie.parquet.writelegacyformat.enabled to true"); } } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java index af5bbe7717959..41c061d2bc91c 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java @@ -18,6 +18,12 @@ package org.apache.hudi; +import org.apache.avro.Conversions; +import org.apache.avro.LogicalTypes; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericFixed; +import org.apache.avro.generic.GenericRecord; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.common.config.TypedProperties; @@ -27,21 +33,16 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.ImmutablePair; import org.apache.hudi.config.HoodieClusteringConfig; +import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner; import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.table.BulkInsertPartitioner; - -import org.apache.avro.Conversions; -import org.apache.avro.LogicalTypes; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericFixed; -import org.apache.avro.generic.GenericRecord; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; +import org.apache.spark.sql.types.DecimalType; import org.apache.spark.sql.types.DecimalType$; import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.StructField; @@ -51,7 +52,8 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; import org.mockito.ArgumentCaptor; import org.mockito.Captor; @@ -61,11 +63,13 @@ import java.math.BigDecimal; import java.time.LocalDate; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; +import java.util.stream.Stream; -import static org.apache.hudi.DataSourceUtils.mayBeOverwriteParquetWriteLegacyFormatProp; +import static org.apache.hudi.DataSourceUtils.tryOverrideParquetWriteLegacyFormatProperty; import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET; import static org.apache.hudi.hive.ddl.HiveSyncMode.HMS; import static org.hamcrest.CoreMatchers.containsString; @@ -313,31 +317,39 @@ public boolean arePartitionRecordsSorted() { } @ParameterizedTest - @CsvSource({"true, false", "true, true", "false, true", "false, false"}) - public void testAutoModifyParquetWriteLegacyFormatParameter(boolean smallDecimal, boolean defaultWriteValue) { - // create test StructType - List structFields = new ArrayList<>(); + @MethodSource("testAutoModifyParquetWriteLegacyFormatParameterParams") + public void testAutoModifyParquetWriteLegacyFormatParameter(boolean smallDecimal, Boolean propValue, Boolean expectedPropValue) { + DecimalType decimalType; if (smallDecimal) { - structFields.add(StructField.apply("d1", DecimalType$.MODULE$.apply(10, 2), false, Metadata.empty())); + decimalType = DecimalType$.MODULE$.apply(10, 2); } else { - structFields.add(StructField.apply("d1", DecimalType$.MODULE$.apply(38, 10), false, Metadata.empty())); + decimalType = DecimalType$.MODULE$.apply(38, 10); } - StructType structType = StructType$.MODULE$.apply(structFields); - // create write options - Map options = new HashMap<>(); - options.put("hoodie.parquet.writelegacyformat.enabled", String.valueOf(defaultWriteValue)); - // start test - mayBeOverwriteParquetWriteLegacyFormatProp(options, structType); + StructType structType = StructType$.MODULE$.apply( + Arrays.asList( + StructField.apply("d1", decimalType, false, Metadata.empty()) + ) + ); - // check result - boolean res = Boolean.parseBoolean(options.get("hoodie.parquet.writelegacyformat.enabled")); - if (smallDecimal) { - // should auto modify "hoodie.parquet.writelegacyformat.enabled" = "true". - assertEquals(true, res); - } else { - // should not modify the value of "hoodie.parquet.writelegacyformat.enabled". - assertEquals(defaultWriteValue, res); - } + Map options = propValue != null + ? Collections.singletonMap(HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED.key(), String.valueOf(propValue)) + : new HashMap<>(); + + tryOverrideParquetWriteLegacyFormatProperty(options, structType); + + Boolean finalPropValue = + Option.ofNullable(options.get(HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED.key())) + .map(Boolean::parseBoolean) + .orElse(null); + assertEquals(expectedPropValue, finalPropValue); + } + + private static Stream testAutoModifyParquetWriteLegacyFormatParameterParams() { + return Arrays.stream(new Object[][] { + {true, null, true}, {false, null, null}, + {true, false, false}, {true, true, true}, + {false, true, true}, {false, false, false} + }).map(Arguments::of); } } diff --git a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/DefaultSource.java b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/DefaultSource.java index 4866a5be5c583..3b3b8eafb86fe 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/DefaultSource.java +++ b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/DefaultSource.java @@ -36,7 +36,7 @@ import java.util.Map; import java.util.Optional; -import static org.apache.hudi.DataSourceUtils.mayBeOverwriteParquetWriteLegacyFormatProp; +import static org.apache.hudi.DataSourceUtils.tryOverrideParquetWriteLegacyFormatProperty; /** * DataSource V2 implementation for managing internal write logic. Only called internally. @@ -69,7 +69,7 @@ public Optional createWriter(String writeUUID, StructType sche HoodieTableConfig.POPULATE_META_FIELDS.defaultValue()); Map properties = options.asMap(); // Auto set the value of "hoodie.parquet.writelegacyformat.enabled" - mayBeOverwriteParquetWriteLegacyFormatProp(properties, schema); + tryOverrideParquetWriteLegacyFormatProperty(properties, schema); // 1st arg to createHoodieConfig is not really required to be set. but passing it anyways. HoodieWriteConfig config = DataSourceUtils.createHoodieConfig(options.get(HoodieWriteConfig.AVRO_SCHEMA_STRING.key()).get(), path, tblName, properties); boolean arePartitionRecordsSorted = HoodieInternalConfig.getBulkInsertIsPartitionRecordsSorted( diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/DefaultSource.java b/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/DefaultSource.java index 4f7ff89a90a33..ab2f16703bcff 100644 --- a/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/DefaultSource.java +++ b/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/DefaultSource.java @@ -34,7 +34,7 @@ import java.util.HashMap; import java.util.Map; -import static org.apache.hudi.DataSourceUtils.mayBeOverwriteParquetWriteLegacyFormatProp; +import static org.apache.hudi.DataSourceUtils.tryOverrideParquetWriteLegacyFormatProperty; /** * DataSource V2 implementation for managing internal write logic. Only called internally. @@ -59,7 +59,7 @@ public Table getTable(StructType schema, Transform[] partitioning, Map newProps = new HashMap<>(properties); // Auto set the value of "hoodie.parquet.writelegacyformat.enabled" - mayBeOverwriteParquetWriteLegacyFormatProp(newProps, schema); + tryOverrideParquetWriteLegacyFormatProperty(newProps, schema); // 1st arg to createHoodieConfig is not really required to be set. but passing it anyways. HoodieWriteConfig config = DataSourceUtils.createHoodieConfig(newProps.get(HoodieWriteConfig.AVRO_SCHEMA_STRING.key()), path, tblName, newProps); return new HoodieDataSourceInternalTable(instantTime, config, schema, getSparkSession(),