Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.parquet.hadoop.metadata.FileMetaData
import org.apache.spark.sql.HoodieSchemaUtils
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, UnsafeProjection}
import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, StructType}
import org.apache.spark.sql.types.{ArrayType, DataType, DoubleType, FloatType, MapType, StringType, StructField, StructType}

object HoodieParquetFileFormatHelper {

Expand Down Expand Up @@ -120,7 +120,15 @@ object HoodieParquetFileFormatHelper {
val srcType = typeChangeInfos.get(i).getRight
val dstType = typeChangeInfos.get(i).getLeft
val needTimeZone = Cast.needsTimeZone(srcType, dstType)
Cast(attr, dstType, if (needTimeZone) timeZoneId else None)

// work around for the case when cast float to double
if (srcType == FloatType && dstType == DoubleType) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ideally double cast should be avoided but given that spark's type conversion loses precision and it is not as if we are going to do this for every record every time, I am ok with this change.

// first cast to string and then to double
val toStringAttr = Cast(attr, StringType, if (needTimeZone) timeZoneId else None)
Cast(toStringAttr, dstType, if (needTimeZone) timeZoneId else None)
} else {
Cast(attr, dstType, if (needTimeZone) timeZoneId else None)
}
} else attr
}
GenerateUnsafeProjection.generate(castSchema, newFullSchema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.CommitMetadataSerDe;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.InstantFileNameGenerator;
import org.apache.hudi.common.table.timeline.InstantFileNameParser;
import org.apache.hudi.common.table.timeline.InstantGenerator;
import org.apache.hudi.common.table.timeline.TimelineLayout;
import org.apache.hudi.common.table.timeline.TimelinePathProvider;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.internal.schema.InternalSchema;
Expand Down Expand Up @@ -176,23 +179,27 @@ public static Pair<Option<String>, Option<String>> getInternalSchemaAndAvroSchem
* @param tablePath table path
* @param storage {@link HoodieStorage} instance.
* @param validCommits current validate commits, use to make up the commit file path/verify the validity of the history schema files
* @param fileNameParser InstantFileNameParser
* @param commitMetadataSerDe CommitMetadataSerDe
* @param instantGenerator InstantGenerator
* @param timelineLayout {@link TimelineLayout} instance, used to get {@link InstantFileNameParser}/{@link CommitMetadataSerDe}/{@link InstantGenerator}/{@link TimelinePathProvider} instance.
* @param tableConfig {@link HoodieTableConfig} instance, used to get the timeline path.
* @return a internalSchema.
*/
public static InternalSchema getInternalSchemaByVersionId(long versionId, String tablePath, HoodieStorage storage, String validCommits,
InstantFileNameParser fileNameParser, CommitMetadataSerDe commitMetadataSerDe, InstantGenerator instantGenerator) {
TimelineLayout timelineLayout, HoodieTableConfig tableConfig) {
InstantFileNameParser fileNameParser = timelineLayout.getInstantFileNameParser();
CommitMetadataSerDe commitMetadataSerDe = timelineLayout.getCommitMetadataSerDe();
InstantGenerator instantGenerator = timelineLayout.getInstantGenerator();
TimelinePathProvider timelinePathProvider = timelineLayout.getTimelinePathProvider();
StoragePath timelinePath = timelinePathProvider.getTimelinePath(tableConfig, new StoragePath(tablePath));

String avroSchema = "";
Set<String> commitSet = Arrays.stream(validCommits.split(",")).collect(Collectors.toSet());
List<String> validateCommitList =
commitSet.stream().map(fileNameParser::extractTimestamp).collect(Collectors.toList());

StoragePath hoodieMetaPath = new StoragePath(tablePath, HoodieTableMetaClient.METAFOLDER_NAME);
//step1:
StoragePath candidateCommitFile = commitSet.stream()
.filter(fileName -> fileNameParser.extractTimestamp(fileName).equals(versionId + ""))
.findFirst().map(f -> new StoragePath(hoodieMetaPath, f)).orElse(null);
.findFirst().map(f -> new StoragePath(timelinePath, f)).orElse(null);
if (candidateCommitFile != null) {
try {
HoodieCommitMetadata metadata;
Expand Down Expand Up @@ -231,12 +238,16 @@ public static InternalSchema getInternalSchemaByVersionId(long versionId, String
: fileSchema;
}

public static InternalSchema getInternalSchemaByVersionId(long versionId, String tablePath, HoodieStorage storage, String validCommits, TimelineLayout timelineLayout) {
return getInternalSchemaByVersionId(versionId, tablePath, storage, validCommits, timelineLayout, HoodieTableConfig.loadFromHoodieProps(storage, tablePath));
}

public static InternalSchema getInternalSchemaByVersionId(long versionId, HoodieTableMetaClient metaClient) {
InstantFileNameGenerator factory = metaClient.getInstantFileNameGenerator();
String validCommitLists = metaClient
.getCommitsAndCompactionTimeline().filterCompletedInstants().getInstantsAsStream().map(factory::getFileName).collect(Collectors.joining(","));
return getInternalSchemaByVersionId(versionId, metaClient.getBasePath().toString(), metaClient.getStorage(),
validCommitLists, metaClient.getInstantFileNameParser(), metaClient.getCommitMetadataSerDe(), metaClient.getInstantGenerator());
validCommitLists, metaClient.getTimelineLayout(), metaClient.getTableConfig());
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.table.format;

import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.InstantFileNameGenerator;
Expand Down Expand Up @@ -61,13 +62,14 @@ public class InternalSchemaManager implements Serializable {
private static final long serialVersionUID = 1L;

public static final InternalSchemaManager DISABLED = new InternalSchemaManager(null, InternalSchema.getEmptyInternalSchema(), null, null,
TimelineLayout.fromVersion(TimelineLayoutVersion.CURR_LAYOUT_VERSION));
TimelineLayout.fromVersion(TimelineLayoutVersion.CURR_LAYOUT_VERSION), null);

private final Configuration conf;
private final InternalSchema querySchema;
private final String validCommits;
private final String tablePath;
private final TimelineLayout layout;
private final HoodieTableConfig tableConfig;
private transient org.apache.hadoop.conf.Configuration hadoopConf;

public static InternalSchemaManager get(Configuration conf, HoodieTableMetaClient metaClient) {
Expand All @@ -86,16 +88,17 @@ public static InternalSchemaManager get(Configuration conf, HoodieTableMetaClien
.getInstantsAsStream()
.map(factory::getFileName)
.collect(Collectors.joining(","));
return new InternalSchemaManager(conf, internalSchema.get(), validCommits, metaClient.getBasePath().toString(), metaClient.getTimelineLayout());
return new InternalSchemaManager(conf, internalSchema.get(), validCommits, metaClient.getBasePath().toString(), metaClient.getTimelineLayout(), metaClient.getTableConfig());
}

public InternalSchemaManager(Configuration conf, InternalSchema querySchema, String validCommits, String tablePath,
TimelineLayout layout) {
TimelineLayout layout, HoodieTableConfig tableConfig) {
this.conf = conf;
this.querySchema = querySchema;
this.validCommits = validCommits;
this.tablePath = tablePath;
this.layout = layout;
this.tableConfig = tableConfig;
}

public InternalSchema getQuerySchema() {
Expand All @@ -121,8 +124,7 @@ InternalSchema getMergeSchema(String fileName) {
InternalSchema fileSchema = InternalSchemaCache.getInternalSchemaByVersionId(
commitInstantTime, tablePath,
new HoodieHadoopStorage(tablePath, getHadoopConf()),
validCommits, layout.getInstantFileNameParser(),
layout.getCommitMetadataSerDe(), layout.getInstantGenerator());
validCommits, layout, tableConfig);
if (querySchema.equals(fileSchema)) {
return InternalSchema.getEmptyInternalSchema();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,8 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
// test change column type float to double
spark.sql(s"alter table $tableName alter column col2 type double")
checkAnswer(s"select id, col1_new, col2 from $tableName where id = 1 or id = 2 order by id")(
Seq(1, null, getDouble("101.01", isMor)),
Seq(2, null, getDouble("102.02", isMor)))
Seq(1, null, 101.01),
Seq(2, null, 102.02))
spark.sql(
s"""
| insert into $tableName values
Expand All @@ -308,11 +308,11 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
new java.math.BigDecimal("100001.0001"), "a000001", java.sql.Date.valueOf("2021-12-25"),
java.sql.Timestamp.valueOf("2021-12-25 12:01:01"), true,
java.sql.Date.valueOf("2021-12-25")),
Seq(2, null, 2, 12, 100002L, getDouble("102.02", isMor), 1002.0002,
Seq(2, null, 2, 12, 100002L, 102.02, 1002.0002,
new java.math.BigDecimal("100002.0002"), "a000002", java.sql.Date.valueOf("2021-12-25"),
java.sql.Timestamp.valueOf("2021-12-25 12:02:02"), true,
java.sql.Date.valueOf("2021-12-25")),
Seq(3, null, 3, 13, 100003L, getDouble("103.03", isMor), 1003.0003,
Seq(3, null, 3, 13, 100003L, 103.03, 1003.0003,
new java.math.BigDecimal("100003.0003"), "a000003", java.sql.Date.valueOf("2021-12-25"),
java.sql.Timestamp.valueOf("2021-12-25 12:03:03"), false,
java.sql.Date.valueOf("2021-12-25")),
Expand Down Expand Up @@ -366,7 +366,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
spark.sql(s"alter table $tableName alter column col2 type string")
checkAnswer(s"select id, col1_new, col2 from $tableName where id = 1 or id = 2 order by id")(
Seq(1, 3, "101.01"),
Seq(2, null, getDouble("102.02", isMor && runClustering).toString))
Seq(2, null, "102.02"))
spark.sql(
s"""
| insert into $tableName values
Expand All @@ -375,7 +375,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
|""".stripMargin)

checkAnswer(s"select id, col1_new, comb, col0, col1, col2, col3, col4, col5, "
+ s"col6, col7, col8, par from $tableName")(getExpectedRowsSecondTime(isMor && runClustering): _*)
+ s"col6, col7, col8, par from $tableName")(getExpectedRowsSecondTime(): _*)
if (runCompaction) {
// try schedule compact
if (tableType == "mor") spark.sql(s"schedule compaction on $tableName")
Expand All @@ -398,7 +398,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
}
// Data should not change after scheduling or running table services
checkAnswer(s"select id, col1_new, comb, col0, col1, col2, col3, col4, col5, "
+ s"col6, col7, col8, par from $tableName")(getExpectedRowsSecondTime(isMor): _*)
+ s"col6, col7, col8, par from $tableName")(getExpectedRowsSecondTime(): _*)
spark.sql(
s"""
| insert into $tableName values
Expand All @@ -410,7 +410,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
+ s"where id = 1 or id = 6 or id = 2 or id = 11 order by id")(
Seq(1, 3, "101.01"),
Seq(11, 3, "101.01"),
Seq(2, null, getDouble("102.02", isMor).toString),
Seq(2, null, "102.02"),
Seq(6, 6, "105.05"))
}
spark.sessionState.conf.unsetConf("spark.sql.storeAssignmentPolicy")
Expand All @@ -419,18 +419,18 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
}
}

private def getExpectedRowsSecondTime(floatToDouble: Boolean): Seq[Seq[Any]] = {
private def getExpectedRowsSecondTime(): Seq[Seq[Any]] = {
Seq(
Seq(1, 3, 1, 11, 100001L, "101.01", 1001.0001, new java.math.BigDecimal("100001.00010000"),
"a000001", java.sql.Date.valueOf("2021-12-25"),
java.sql.Timestamp.valueOf("2021-12-25 12:01:01"), true,
java.sql.Date.valueOf("2021-12-25")),
Seq(2, null, 2, 12, 100002L, getDouble("102.02", floatToDouble).toString,
Seq(2, null, 2, 12, 100002L, "102.02",
1002.0002, new java.math.BigDecimal("100002.00020000"),
"a000002", java.sql.Date.valueOf("2021-12-25"),
java.sql.Timestamp.valueOf("2021-12-25 12:02:02"), true,
java.sql.Date.valueOf("2021-12-25")),
Seq(3, null, 3, 13, 100003L, getDouble("103.03", floatToDouble).toString,
Seq(3, null, 3, 13, 100003L, "103.03",
1003.0003, new java.math.BigDecimal("100003.00030000"),
"a000003", java.sql.Date.valueOf("2021-12-25"),
java.sql.Timestamp.valueOf("2021-12-25 12:03:03"), false,
Expand All @@ -449,16 +449,6 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
java.sql.Date.valueOf("2021-12-26")))
}

private def getDouble(value: String, convertFromFloat: Boolean): Double = {
// TODO(HUDI-8902): Investigate different read behavior on a field after promotion
// from float to double
if (convertFromFloat) {
value.toFloat.toDouble
} else {
value.toDouble
}
}

test("Test Chinese table ") {
withRecordType()(withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class Spark3ParquetSchemaEvolutionUtils(sharedConf: Configuration,
val layout = TimelineLayout.fromVersion(TimelineLayoutVersion.CURR_LAYOUT_VERSION)
InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, tablePath,
new HoodieHadoopStorage(tablePath, sharedConf), if (validCommits == null) "" else validCommits,
layout.getInstantFileNameParser, layout.getCommitMetadataSerDe, layout.getInstantGenerator)
layout)
} else {
null
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ class Spark33LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValu
val storage = new HoodieHadoopStorage(tablePath, sharedConf)
InternalSchemaCache.getInternalSchemaByVersionId(
commitInstantTime, tablePath, storage, if (validCommits == null) "" else validCommits,
layout.getInstantFileNameParser, layout.getCommitMetadataSerDe, layout.getInstantGenerator)
layout)
} else {
null
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ class Spark34LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValu
val storage = new HoodieHadoopStorage(tablePath, sharedConf)
InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, tablePath, storage,
if (validCommits == null) "" else validCommits,
layout.getInstantFileNameParser, layout.getCommitMetadataSerDe, layout.getInstantGenerator)
layout)
} else {
null
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ class Spark35LegacyHoodieParquetFileFormat(private val shouldAppendPartitionValu
val layout = TimelineLayout.fromVersion(TimelineLayoutVersion.CURR_LAYOUT_VERSION)
InternalSchemaCache.getInternalSchemaByVersionId(
commitInstantTime, tablePath, storage, if (validCommits == null) "" else validCommits,
layout.getInstantFileNameParser, layout.getCommitMetadataSerDe, layout.getInstantGenerator)
layout)
} else {
null
}
Expand Down
Loading