diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormatHelper.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormatHelper.scala index f69b04a91ba14..48efe56ed223e 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormatHelper.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormatHelper.scala @@ -24,7 +24,7 @@ import org.apache.parquet.hadoop.metadata.FileMetaData import org.apache.spark.sql.HoodieSchemaUtils import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, UnsafeProjection} -import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, StructType} +import org.apache.spark.sql.types.{ArrayType, DataType, DoubleType, FloatType, MapType, StringType, StructField, StructType} object HoodieParquetFileFormatHelper { @@ -120,7 +120,15 @@ object HoodieParquetFileFormatHelper { val srcType = typeChangeInfos.get(i).getRight val dstType = typeChangeInfos.get(i).getLeft val needTimeZone = Cast.needsTimeZone(srcType, dstType) - Cast(attr, dstType, if (needTimeZone) timeZoneId else None) + + // work around for the case when cast float to double + if (srcType == FloatType && dstType == DoubleType) { + // first cast to string and then to double + val toStringAttr = Cast(attr, StringType, if (needTimeZone) timeZoneId else None) + Cast(toStringAttr, dstType, if (needTimeZone) timeZoneId else None) + } else { + Cast(attr, dstType, if (needTimeZone) timeZoneId else None) + } } else attr } GenerateUnsafeProjection.generate(castSchema, newFullSchema) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/InternalSchemaCache.java b/hudi-common/src/main/java/org/apache/hudi/common/util/InternalSchemaCache.java index ef046389e1942..4ac5b65a96f8c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/InternalSchemaCache.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/InternalSchemaCache.java @@ -20,6 +20,7 @@ import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.CommitMetadataSerDe; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -27,6 +28,8 @@ import org.apache.hudi.common.table.timeline.InstantFileNameGenerator; import org.apache.hudi.common.table.timeline.InstantFileNameParser; import org.apache.hudi.common.table.timeline.InstantGenerator; +import org.apache.hudi.common.table.timeline.TimelineLayout; +import org.apache.hudi.common.table.timeline.TimelinePathProvider; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.internal.schema.InternalSchema; @@ -176,23 +179,27 @@ public static Pair, Option> getInternalSchemaAndAvroSchem * @param tablePath table path * @param storage {@link HoodieStorage} instance. * @param validCommits current validate commits, use to make up the commit file path/verify the validity of the history schema files - * @param fileNameParser InstantFileNameParser - * @param commitMetadataSerDe CommitMetadataSerDe - * @param instantGenerator InstantGenerator + * @param timelineLayout {@link TimelineLayout} instance, used to get {@link InstantFileNameParser}/{@link CommitMetadataSerDe}/{@link InstantGenerator}/{@link TimelinePathProvider} instance. + * @param tableConfig {@link HoodieTableConfig} instance, used to get the timeline path. * @return a internalSchema. */ public static InternalSchema getInternalSchemaByVersionId(long versionId, String tablePath, HoodieStorage storage, String validCommits, - InstantFileNameParser fileNameParser, CommitMetadataSerDe commitMetadataSerDe, InstantGenerator instantGenerator) { + TimelineLayout timelineLayout, HoodieTableConfig tableConfig) { + InstantFileNameParser fileNameParser = timelineLayout.getInstantFileNameParser(); + CommitMetadataSerDe commitMetadataSerDe = timelineLayout.getCommitMetadataSerDe(); + InstantGenerator instantGenerator = timelineLayout.getInstantGenerator(); + TimelinePathProvider timelinePathProvider = timelineLayout.getTimelinePathProvider(); + StoragePath timelinePath = timelinePathProvider.getTimelinePath(tableConfig, new StoragePath(tablePath)); + String avroSchema = ""; Set commitSet = Arrays.stream(validCommits.split(",")).collect(Collectors.toSet()); List validateCommitList = commitSet.stream().map(fileNameParser::extractTimestamp).collect(Collectors.toList()); - StoragePath hoodieMetaPath = new StoragePath(tablePath, HoodieTableMetaClient.METAFOLDER_NAME); //step1: StoragePath candidateCommitFile = commitSet.stream() .filter(fileName -> fileNameParser.extractTimestamp(fileName).equals(versionId + "")) - .findFirst().map(f -> new StoragePath(hoodieMetaPath, f)).orElse(null); + .findFirst().map(f -> new StoragePath(timelinePath, f)).orElse(null); if (candidateCommitFile != null) { try { HoodieCommitMetadata metadata; @@ -231,12 +238,16 @@ public static InternalSchema getInternalSchemaByVersionId(long versionId, String : fileSchema; } + public static InternalSchema getInternalSchemaByVersionId(long versionId, String tablePath, HoodieStorage storage, String validCommits, TimelineLayout timelineLayout) { + return getInternalSchemaByVersionId(versionId, tablePath, storage, validCommits, timelineLayout, HoodieTableConfig.loadFromHoodieProps(storage, tablePath)); + } + public static InternalSchema getInternalSchemaByVersionId(long versionId, HoodieTableMetaClient metaClient) { InstantFileNameGenerator factory = metaClient.getInstantFileNameGenerator(); String validCommitLists = metaClient .getCommitsAndCompactionTimeline().filterCompletedInstants().getInstantsAsStream().map(factory::getFileName).collect(Collectors.joining(",")); return getInternalSchemaByVersionId(versionId, metaClient.getBasePath().toString(), metaClient.getStorage(), - validCommitLists, metaClient.getInstantFileNameParser(), metaClient.getCommitMetadataSerDe(), metaClient.getInstantGenerator()); + validCommitLists, metaClient.getTimelineLayout(), metaClient.getTableConfig()); } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/InternalSchemaManager.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/InternalSchemaManager.java index cdc489e938ca5..35d0b49cf1a79 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/InternalSchemaManager.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/InternalSchemaManager.java @@ -19,6 +19,7 @@ package org.apache.hudi.table.format; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.timeline.InstantFileNameGenerator; @@ -61,13 +62,14 @@ public class InternalSchemaManager implements Serializable { private static final long serialVersionUID = 1L; public static final InternalSchemaManager DISABLED = new InternalSchemaManager(null, InternalSchema.getEmptyInternalSchema(), null, null, - TimelineLayout.fromVersion(TimelineLayoutVersion.CURR_LAYOUT_VERSION)); + TimelineLayout.fromVersion(TimelineLayoutVersion.CURR_LAYOUT_VERSION), null); private final Configuration conf; private final InternalSchema querySchema; private final String validCommits; private final String tablePath; private final TimelineLayout layout; + private final HoodieTableConfig tableConfig; private transient org.apache.hadoop.conf.Configuration hadoopConf; public static InternalSchemaManager get(Configuration conf, HoodieTableMetaClient metaClient) { @@ -86,16 +88,17 @@ public static InternalSchemaManager get(Configuration conf, HoodieTableMetaClien .getInstantsAsStream() .map(factory::getFileName) .collect(Collectors.joining(",")); - return new InternalSchemaManager(conf, internalSchema.get(), validCommits, metaClient.getBasePath().toString(), metaClient.getTimelineLayout()); + return new InternalSchemaManager(conf, internalSchema.get(), validCommits, metaClient.getBasePath().toString(), metaClient.getTimelineLayout(), metaClient.getTableConfig()); } public InternalSchemaManager(Configuration conf, InternalSchema querySchema, String validCommits, String tablePath, - TimelineLayout layout) { + TimelineLayout layout, HoodieTableConfig tableConfig) { this.conf = conf; this.querySchema = querySchema; this.validCommits = validCommits; this.tablePath = tablePath; this.layout = layout; + this.tableConfig = tableConfig; } public InternalSchema getQuerySchema() { @@ -121,8 +124,7 @@ InternalSchema getMergeSchema(String fileName) { InternalSchema fileSchema = InternalSchemaCache.getInternalSchemaByVersionId( commitInstantTime, tablePath, new HoodieHadoopStorage(tablePath, getHadoopConf()), - validCommits, layout.getInstantFileNameParser(), - layout.getCommitMetadataSerDe(), layout.getInstantGenerator()); + validCommits, layout, tableConfig); if (querySchema.equals(fileSchema)) { return InternalSchema.getEmptyInternalSchema(); } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala index eba2e62b52162..16f20b743237d 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala @@ -294,8 +294,8 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { // test change column type float to double spark.sql(s"alter table $tableName alter column col2 type double") checkAnswer(s"select id, col1_new, col2 from $tableName where id = 1 or id = 2 order by id")( - Seq(1, null, getDouble("101.01", isMor)), - Seq(2, null, getDouble("102.02", isMor))) + Seq(1, null, 101.01), + Seq(2, null, 102.02)) spark.sql( s""" | insert into $tableName values @@ -308,11 +308,11 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { new java.math.BigDecimal("100001.0001"), "a000001", java.sql.Date.valueOf("2021-12-25"), java.sql.Timestamp.valueOf("2021-12-25 12:01:01"), true, java.sql.Date.valueOf("2021-12-25")), - Seq(2, null, 2, 12, 100002L, getDouble("102.02", isMor), 1002.0002, + Seq(2, null, 2, 12, 100002L, 102.02, 1002.0002, new java.math.BigDecimal("100002.0002"), "a000002", java.sql.Date.valueOf("2021-12-25"), java.sql.Timestamp.valueOf("2021-12-25 12:02:02"), true, java.sql.Date.valueOf("2021-12-25")), - Seq(3, null, 3, 13, 100003L, getDouble("103.03", isMor), 1003.0003, + Seq(3, null, 3, 13, 100003L, 103.03, 1003.0003, new java.math.BigDecimal("100003.0003"), "a000003", java.sql.Date.valueOf("2021-12-25"), java.sql.Timestamp.valueOf("2021-12-25 12:03:03"), false, java.sql.Date.valueOf("2021-12-25")), @@ -366,7 +366,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { spark.sql(s"alter table $tableName alter column col2 type string") checkAnswer(s"select id, col1_new, col2 from $tableName where id = 1 or id = 2 order by id")( Seq(1, 3, "101.01"), - Seq(2, null, getDouble("102.02", isMor && runClustering).toString)) + Seq(2, null, "102.02")) spark.sql( s""" | insert into $tableName values @@ -375,7 +375,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { |""".stripMargin) checkAnswer(s"select id, col1_new, comb, col0, col1, col2, col3, col4, col5, " - + s"col6, col7, col8, par from $tableName")(getExpectedRowsSecondTime(isMor && runClustering): _*) + + s"col6, col7, col8, par from $tableName")(getExpectedRowsSecondTime(): _*) if (runCompaction) { // try schedule compact if (tableType == "mor") spark.sql(s"schedule compaction on $tableName") @@ -398,7 +398,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { } // Data should not change after scheduling or running table services checkAnswer(s"select id, col1_new, comb, col0, col1, col2, col3, col4, col5, " - + s"col6, col7, col8, par from $tableName")(getExpectedRowsSecondTime(isMor): _*) + + s"col6, col7, col8, par from $tableName")(getExpectedRowsSecondTime(): _*) spark.sql( s""" | insert into $tableName values @@ -410,7 +410,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { + s"where id = 1 or id = 6 or id = 2 or id = 11 order by id")( Seq(1, 3, "101.01"), Seq(11, 3, "101.01"), - Seq(2, null, getDouble("102.02", isMor).toString), + Seq(2, null, "102.02"), Seq(6, 6, "105.05")) } spark.sessionState.conf.unsetConf("spark.sql.storeAssignmentPolicy") @@ -419,18 +419,18 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { } } - private def getExpectedRowsSecondTime(floatToDouble: Boolean): Seq[Seq[Any]] = { + private def getExpectedRowsSecondTime(): Seq[Seq[Any]] = { Seq( Seq(1, 3, 1, 11, 100001L, "101.01", 1001.0001, new java.math.BigDecimal("100001.00010000"), "a000001", java.sql.Date.valueOf("2021-12-25"), java.sql.Timestamp.valueOf("2021-12-25 12:01:01"), true, java.sql.Date.valueOf("2021-12-25")), - Seq(2, null, 2, 12, 100002L, getDouble("102.02", floatToDouble).toString, + Seq(2, null, 2, 12, 100002L, "102.02", 1002.0002, new java.math.BigDecimal("100002.00020000"), "a000002", java.sql.Date.valueOf("2021-12-25"), java.sql.Timestamp.valueOf("2021-12-25 12:02:02"), true, java.sql.Date.valueOf("2021-12-25")), - Seq(3, null, 3, 13, 100003L, getDouble("103.03", floatToDouble).toString, + Seq(3, null, 3, 13, 100003L, "103.03", 1003.0003, new java.math.BigDecimal("100003.00030000"), "a000003", java.sql.Date.valueOf("2021-12-25"), java.sql.Timestamp.valueOf("2021-12-25 12:03:03"), false, @@ -449,16 +449,6 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { java.sql.Date.valueOf("2021-12-26"))) } - private def getDouble(value: String, convertFromFloat: Boolean): Double = { - // TODO(HUDI-8902): Investigate different read behavior on a field after promotion - // from float to double - if (convertFromFloat) { - value.toFloat.toDouble - } else { - value.toDouble - } - } - test("Test Chinese table ") { withRecordType()(withTempDir { tmp => Seq("cow", "mor").foreach { tableType => diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark3ParquetSchemaEvolutionUtils.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark3ParquetSchemaEvolutionUtils.scala index 47678214607c3..912879a602847 100644 --- a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark3ParquetSchemaEvolutionUtils.scala +++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark3ParquetSchemaEvolutionUtils.scala @@ -63,7 +63,7 @@ class Spark3ParquetSchemaEvolutionUtils(sharedConf: Configuration, val layout = TimelineLayout.fromVersion(TimelineLayoutVersion.CURR_LAYOUT_VERSION) InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, tablePath, new HoodieHadoopStorage(tablePath, sharedConf), if (validCommits == null) "" else validCommits, - layout.getInstantFileNameParser, layout.getCommitMetadataSerDe, layout.getInstantGenerator) + layout) } else { null } diff --git a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33LegacyHoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33LegacyHoodieParquetFileFormat.scala index 02b2338d26d6e..db493c4629a88 100644 --- a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33LegacyHoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33LegacyHoodieParquetFileFormat.scala @@ -168,7 +168,7 @@ class Spark33LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValu val storage = new HoodieHadoopStorage(tablePath, sharedConf) InternalSchemaCache.getInternalSchemaByVersionId( commitInstantTime, tablePath, storage, if (validCommits == null) "" else validCommits, - layout.getInstantFileNameParser, layout.getCommitMetadataSerDe, layout.getInstantGenerator) + layout) } else { null } diff --git a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala index 09c9962fe3a9a..4a9067e7486e6 100644 --- a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark34LegacyHoodieParquetFileFormat.scala @@ -179,7 +179,7 @@ class Spark34LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValu val storage = new HoodieHadoopStorage(tablePath, sharedConf) InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, tablePath, storage, if (validCommits == null) "" else validCommits, - layout.getInstantFileNameParser, layout.getCommitMetadataSerDe, layout.getInstantGenerator) + layout) } else { null } diff --git a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35LegacyHoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35LegacyHoodieParquetFileFormat.scala index d482d258b6150..f72acc1179b17 100644 --- a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35LegacyHoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark35LegacyHoodieParquetFileFormat.scala @@ -180,7 +180,7 @@ class Spark35LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValu val layout = TimelineLayout.fromVersion(TimelineLayoutVersion.CURR_LAYOUT_VERSION) InternalSchemaCache.getInternalSchemaByVersionId( commitInstantTime, tablePath, storage, if (validCommits == null) "" else validCommits, - layout.getInstantFileNameParser, layout.getCommitMetadataSerDe, layout.getInstantGenerator) + layout) } else { null }