diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index ca91a51b89220..f1595d8100d86 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -1296,6 +1296,9 @@ public static boolean recordNeedsRewriteForExtendedAvroTypePromotion(Schema writ case STRING: case BYTES: return needsRewriteToString(writerSchema, false); + case DOUBLE: + // To maintain precision, you need to convert Float -> String -> Double + return writerSchema.getType().equals(Schema.Type.FLOAT); default: return false; } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala index 16f20b743237d..ee13b61dd71c2 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.hudi.ddl import org.apache.hudi.{DataSourceWriteOptions, DefaultSparkRecordMerger, QuickstartUtils} import org.apache.hudi.common.config.{HoodieReaderConfig, HoodieStorageConfig} -import org.apache.hudi.common.model.HoodieRecord +import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType} import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType import org.apache.hudi.common.table.TableSchemaResolver import org.apache.hudi.common.table.timeline.HoodieInstant @@ -30,14 +30,17 @@ import org.apache.hudi.testutils.DataSourceTestUtils import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient import org.apache.hadoop.fs.Path +import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Row, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.functions.{arrays_zip, col, expr, lit} import org.apache.spark.sql.hudi.HoodieSqlCommonUtils import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase -import org.apache.spark.sql.types.StringType +import org.apache.spark.sql.streaming.OutputMode.Append +import org.apache.spark.sql.types.{DoubleType, FloatType, IntegerType, StringType, StructField, StructType} import org.junit.jupiter.api.Assertions.assertEquals +import scala.Seq import scala.collection.JavaConverters._ class TestSpark3DDL extends HoodieSparkSqlTestBase { @@ -183,6 +186,86 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { } } + test("Test float to double evolution") { + withTempDir { tmp => + Seq(HoodieTableType.COPY_ON_WRITE, HoodieTableType.MERGE_ON_READ).foreach { tableType => + val tableName = generateTableName + val tablePath = s"${new Path(tmp.getCanonicalPath, tableName).toUri.toString}" + spark.sql("set hoodie.schema.on.read.enable=false") + + val structType = StructType(Array( + StructField("id", StringType, true), + StructField("ts", IntegerType, true), + StructField("partition", StringType, true), + StructField("col", FloatType, true) + )) + + val data = Seq(Row("r1", 0, "p1", 1.01f), Row("r2", 0, "p1", 2.02f), Row("r3", 0, "p2", 3.03f)) + val rowRdd: RDD[Row] = spark.sparkContext.parallelize(data) + val df = spark.createDataFrame(rowRdd, structType) + df.write.format("hudi") + .option("hoodie.datasource.write.recordkey.field", "id") + .option("hoodie.datasource.write.precombine.field", "ts") + .option("hoodie.datasource.write.partitionpath.field", "partition") + .option("hoodie.table.name", tableName) + .option("hoodie.datasource.write.table.type", tableType.name()) + .mode(SaveMode.Overwrite) + .save(tablePath) + + checkAnswer(spark.read.format("hudi").load(tablePath).select("id", "col").orderBy("id").collect())( + Seq("r1", 1.01f), + Seq("r2", 2.02f), + Seq("r3", 3.03f) + ) + + + val data2 = Seq(Row("r2", 1, "p1", 2.03f)) + val rowRdd2: RDD[Row] = spark.sparkContext.parallelize(data2) + val df2 = spark.createDataFrame(rowRdd2, structType) + df2.write.format("hudi") + .option("hoodie.datasource.write.recordkey.field", "id") + .option("hoodie.datasource.write.precombine.field", "ts") + .option("hoodie.datasource.write.partitionpath.field", "partition") + .option("hoodie.table.name", tableName) + .option("hoodie.datasource.write.table.type", tableType.name()) + .mode(SaveMode.Append) + .save(tablePath) + + + checkAnswer(spark.read.format("hudi").load(tablePath).select("id", "col").orderBy("id").collect())( + Seq("r1", 1.01f), + Seq("r2", 2.03f), + Seq("r3", 3.03f) + ) + + val structType3 = StructType(Array( + StructField("id", StringType, true), + StructField("ts", IntegerType, true), + StructField("partition", StringType, true), + StructField("col", DoubleType, true) + )) + + val data3 = Seq(Row("r1", 2, "p1", 1.000000000001d)) + val rowRdd3: RDD[Row] = spark.sparkContext.parallelize(data3) + val df3 = spark.createDataFrame(rowRdd3, structType3) + df3.write.format("hudi") + .option("hoodie.datasource.write.recordkey.field", "id") + .option("hoodie.datasource.write.precombine.field", "ts") + .option("hoodie.datasource.write.partitionpath.field", "partition") + .option("hoodie.table.name", tableName) + .option("hoodie.datasource.write.table.type", tableType.name()) + .mode(SaveMode.Append) + .save(tablePath) + + checkAnswer(spark.read.format("hudi").load(tablePath).select("id", "col").orderBy("id").collect())( + Seq("r1", 1.000000000001d), + Seq("r2", 2.03d), + Seq("r3", 3.03d) + ) + } + } + } + test("Test Enable and Disable Schema on read") { withTempDir { tmp => Seq("cow", "mor").foreach { tableType =>