diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java index ee900fb36c1bb..f8387ffd06814 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java @@ -234,12 +234,6 @@ public class HoodieCompactionConfig extends HoodieConfig { .withDocumentation("Used by org.apache.hudi.io.compact.strategy.DayBasedCompactionStrategy to denote the number of " + "latest partitions to compact during a compaction run."); - public static final ConfigProperty PRESERVE_COMMIT_METADATA = ConfigProperty - .key("hoodie.compaction.preserve.commit.metadata") - .defaultValue(false) - .sinceVersion("0.11.0") - .withDocumentation("When rewriting data, preserves existing hoodie_commit_time"); - /** * Configs related to specific table types. */ @@ -673,11 +667,6 @@ public Builder withLogFileSizeThresholdBasedCompaction(long logFileSizeThreshold return this; } - public Builder withPreserveCommitMetadata(boolean preserveCommitMetadata) { - compactionConfig.setValue(PRESERVE_COMMIT_METADATA, String.valueOf(preserveCommitMetadata)); - return this; - } - public Builder withCommitsArchivalBatchSize(int batchSize) { compactionConfig.setValue(COMMITS_ARCHIVAL_BATCH_SIZE, String.valueOf(batchSize)); return this; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 1cdad6c565607..f81025a4a1e1c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -1196,10 +1196,6 @@ public boolean isPreserveHoodieCommitMetadataForClustering() { return getBoolean(HoodieClusteringConfig.PRESERVE_COMMIT_METADATA); } - public boolean isPreserveHoodieCommitMetadataForCompaction() { - return getBoolean(HoodieCompactionConfig.PRESERVE_COMMIT_METADATA); - } - public boolean isClusteringEnabled() { // TODO: future support async clustering return inlineClusteringEnabled() || isAsyncClusteringEnabled(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index 32d4ec2a6d794..130d289ed2522 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -97,7 +97,6 @@ public class HoodieMergeHandle extends H protected Map> keyToNewRecords; protected Set writtenRecordKeys; protected HoodieFileWriter fileWriter; - private boolean preserveMetadata = false; protected Path newFilePath; protected Path oldFilePath; @@ -134,7 +133,6 @@ public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTab super(config, instantTime, partitionPath, fileId, hoodieTable, taskContextSupplier); this.keyToNewRecords = keyToNewRecords; this.useWriterSchema = true; - this.preserveMetadata = config.isPreserveHoodieCommitMetadataForCompaction(); init(fileId, this.partitionPath, dataFileToBeMerged); validateAndSetAndKeyGenProps(keyGeneratorOpt, config.populateMetaFields()); } @@ -293,11 +291,9 @@ protected boolean writeRecord(HoodieRecord hoodieRecord, Option rows = actual.collectAsList(); assertEquals(updatedRecords.size(), rows.size()); for (Row row: rows) { - assertEquals(row.getAs(HoodieRecord.COMMIT_TIME_METADATA_FIELD), preserveCommitMeta ? newCommitTime : compactionInstantTime); + assertEquals(row.getAs(HoodieRecord.COMMIT_TIME_METADATA_FIELD), newCommitTime); } } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java index 63f6e46542371..3e209dbd1970c 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java @@ -145,7 +145,7 @@ public void testInlineScheduleCompaction(boolean scheduleInlineCompaction) throw HoodieWriteConfig cfg = getConfigBuilder(false) .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024) - .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(2).withPreserveCommitMetadata(true).withScheduleInlineCompaction(scheduleInlineCompaction).build()) + .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(2).withScheduleInlineCompaction(scheduleInlineCompaction).build()) .build(); try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java index 94e080cae4804..d4fbcc6259b14 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java @@ -324,20 +324,20 @@ protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit, HoodieI } protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit, long compactionSmallFileSize, HoodieClusteringConfig clusteringConfig) { - return getConfigBuilder(autoCommit, false, HoodieIndex.IndexType.BLOOM, compactionSmallFileSize, clusteringConfig, false); + return getConfigBuilder(autoCommit, false, HoodieIndex.IndexType.BLOOM, compactionSmallFileSize, clusteringConfig); } protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit, Boolean rollbackUsingMarkers, HoodieIndex.IndexType indexType) { - return getConfigBuilder(autoCommit, rollbackUsingMarkers, indexType, 1024 * 1024 * 1024L, HoodieClusteringConfig.newBuilder().build(), false); + return getConfigBuilder(autoCommit, rollbackUsingMarkers, indexType, 1024 * 1024 * 1024L, HoodieClusteringConfig.newBuilder().build()); } protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit, Boolean rollbackUsingMarkers, HoodieIndex.IndexType indexType, - long compactionSmallFileSize, HoodieClusteringConfig clusteringConfig, boolean preserveCommitMetaForCompaction) { + long compactionSmallFileSize, HoodieClusteringConfig clusteringConfig) { return HoodieWriteConfig.newBuilder().withPath(basePath()).withSchema(TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) .withDeleteParallelism(2) .withAutoCommit(autoCommit) .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(compactionSmallFileSize) - .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).withPreserveCommitMetadata(preserveCommitMetaForCompaction).build()) + .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build()) .withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024 * 1024).parquetMaxFileSize(1024 * 1024 * 1024).build()) .withEmbeddedTimelineServerEnabled(true).forTable("test-trip-table") .withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder() diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala index f420b296e2b3a..dc10a1aaef3fd 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala @@ -178,6 +178,7 @@ class TestMORDataSource extends HoodieClientTestBase { .options(commonOpts) .mode(SaveMode.Append) .save(basePath) + val commit3Time = HoodieDataSourceHelpers.latestCommit(fs, basePath) val hudiSnapshotDF3 = spark.read.format("org.apache.hudi") .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL) .load(basePath + "/*/*/*/*") @@ -218,6 +219,7 @@ class TestMORDataSource extends HoodieClientTestBase { .options(commonOpts) .mode(SaveMode.Append) .save(basePath) + val commit4Time = HoodieDataSourceHelpers.latestCommit(fs, basePath) val hudiSnapshotDF4 = spark.read.format("org.apache.hudi") .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL) .load(basePath + "/*/*/*/*") @@ -268,8 +270,20 @@ class TestMORDataSource extends HoodieClientTestBase { .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, commit5Time) .option(DataSourceReadOptions.END_INSTANTTIME.key, commit6Time) .load(basePath) - // compaction updated 150 rows + inserted 2 new row - assertEquals(152, hudiIncDF6.count()) + // compaction updated 150 rows + inserted 2 new row, but only 2 new records upsert between commit5Time and commit6Time + assertEquals(2, hudiIncDF6.count()) + + // Test taht compaction can't update _hoodie_commit_time. + val snapshot = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL) + .load(basePath) + assertEquals(snapshot.where(s"_hoodie_commit_time = $commit2Time").count(), 50) + assertEquals(snapshot.where(s"_hoodie_commit_time = $commit3Time").count(), 50) + assertEquals(snapshot.where(s"_hoodie_commit_time = $commit4Time").count(), 50) + assertEquals(snapshot.where(s"_hoodie_commit_time = $commit5Time").count(), 50) + + // Test that compaction should update _hoodie_file_name. There are 150 rows compacted. + assertEquals(snapshot.where(s"_hoodie_file_name like '%$commit6Time%'").count(), 150) } @Test