diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java index 640f0cb1826a4..c42294f12c047 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java @@ -224,6 +224,12 @@ public class HoodieCompactionConfig extends HoodieConfig { .withDocumentation("Used by org.apache.hudi.io.compact.strategy.DayBasedCompactionStrategy to denote the number of " + "latest partitions to compact during a compaction run."); + public static final ConfigProperty PRESERVE_COMMIT_METADATA = ConfigProperty + .key("hoodie.compaction.preserve.commit.metadata") + .defaultValue(false) + .sinceVersion("0.11.0") + .withDocumentation("When rewriting data, preserves existing hoodie_commit_time"); + /** * Configs related to specific table types. */ @@ -621,6 +627,11 @@ public Builder withLogFileSizeThresholdBasedCompaction(long logFileSizeThreshold return this; } + public Builder withPreserveCommitMetadata(boolean preserveCommitMetadata) { + compactionConfig.setValue(PRESERVE_COMMIT_METADATA, String.valueOf(preserveCommitMetadata)); + return this; + } + public Builder withCommitsArchivalBatchSize(int batchSize) { compactionConfig.setValue(COMMITS_ARCHIVAL_BATCH_SIZE, String.valueOf(batchSize)); return this; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 3571da17231cb..ba9c24801641b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -1161,10 +1161,14 @@ public boolean isAsyncClusteringEnabled() { return getBoolean(HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE); } - public boolean isPreserveHoodieCommitMetadata() { + public boolean isPreserveHoodieCommitMetadataForClustering() { return getBoolean(HoodieClusteringConfig.PRESERVE_COMMIT_METADATA); } + public boolean isPreserveHoodieCommitMetadataForCompaction() { + return getBoolean(HoodieCompactionConfig.PRESERVE_COMMIT_METADATA); + } + public boolean isClusteringEnabled() { // TODO: future support async clustering return inlineClusteringEnabled() || isAsyncClusteringEnabled(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index d1d67efff4b96..87a8d133f0dd5 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -97,6 +97,7 @@ public class HoodieMergeHandle extends H protected Map> keyToNewRecords; protected Set writtenRecordKeys; protected HoodieFileWriter fileWriter; + private boolean preserveMetadata = false; protected Path newFilePath; protected Path oldFilePath; @@ -133,6 +134,7 @@ public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTab super(config, instantTime, partitionPath, fileId, hoodieTable, taskContextSupplier); this.keyToNewRecords = keyToNewRecords; this.useWriterSchema = true; + this.preserveMetadata = config.isPreserveHoodieCommitMetadataForCompaction(); init(fileId, this.partitionPath, dataFileToBeMerged); validateAndSetAndKeyGenProps(keyGeneratorOpt, config.populateMetaFields()); } @@ -291,7 +293,11 @@ protected boolean writeRecord(HoodieRecord hoodieRecord, Option generateClusteringPlan() { .setInputGroups(clusteringGroups) .setExtraMetadata(getExtraMetadata()) .setVersion(getPlanVersion()) - .setPreserveHoodieMetadata(getWriteConfig().isPreserveHoodieCommitMetadata()) + .setPreserveHoodieMetadata(getWriteConfig().isPreserveHoodieCommitMetadataForClustering()) .build()); } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java index 566db224e61ed..ef474b00aa093 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java @@ -1602,7 +1602,7 @@ private HoodieWriteMetadata> performClustering(HoodieCluste SparkRDDWriteClient client = getHoodieWriteClient(config); String clusteringCommitTime = client.scheduleClustering(Option.empty()).get().toString(); HoodieWriteMetadata> clusterMetadata = client.cluster(clusteringCommitTime, completeClustering); - if (config.isPreserveHoodieCommitMetadata() && config.populateMetaFields()) { + if (config.isPreserveHoodieCommitMetadataForClustering() && config.populateMetaFields()) { verifyRecordsWrittenWithPreservedMetadata(new HashSet<>(allRecords.getRight()), allRecords.getLeft(), clusterMetadata.getWriteStatuses().collect()); } else { verifyRecordsWritten(clusteringCommitTime, populateMetaFields, allRecords.getLeft(), clusterMetadata.getWriteStatuses().collect(), config); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java index 7674c3489072a..8b8df197ba78b 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java @@ -38,12 +38,15 @@ import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.Transformations; import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.HoodieIndex.IndexType; import org.apache.hudi.metadata.HoodieTableMetadataWriter; import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter; import org.apache.hudi.table.action.deltacommit.AbstractSparkDeltaCommitActionExecutor; import org.apache.hudi.table.action.deltacommit.SparkDeleteDeltaCommitActionExecutor; +import org.apache.hudi.testutils.HoodieClientTestUtils; import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils; import org.apache.hudi.testutils.HoodieSparkWriteableTestTable; import org.apache.hudi.testutils.MetadataMergeWriteStatus; @@ -53,6 +56,8 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.mapred.JobConf; import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -194,10 +199,12 @@ public void testUpsertPartitioner(boolean populateMetaFields) throws Exception { // TODO: Enable metadata virtual keys in this test once the feature HUDI-2593 is completed @ParameterizedTest - @ValueSource(booleans = {true}) - public void testLogFileCountsAfterCompaction(boolean populateMetaFields) throws Exception { + @ValueSource(booleans = {false, true}) + public void testLogFileCountsAfterCompaction(boolean preserveCommitMeta) throws Exception { + boolean populateMetaFields = true; // insert 100 records - HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(true) + HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(true, false, HoodieIndex.IndexType.BLOOM, + 1024 * 1024 * 1024L, HoodieClusteringConfig.newBuilder().build(), preserveCommitMeta) .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()); addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields); HoodieWriteConfig config = cfgBuilder.build(); @@ -268,6 +275,18 @@ public void testLogFileCountsAfterCompaction(boolean populateMetaFields) throws List writeStatuses = result.collect(); assertTrue(writeStatuses.stream().anyMatch(writeStatus -> writeStatus.getStat().getPartitionPath().contentEquals(partitionPath))); } + + // Check the entire dataset has all records still + String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length]; + for (int i = 0; i < fullPartitionPaths.length; i++) { + fullPartitionPaths[i] = String.format("%s/%s/*", basePath(), dataGen.getPartitionPaths()[i]); + } + Dataset actual = HoodieClientTestUtils.read(jsc(), basePath(), sqlContext(), fs(), fullPartitionPaths); + List rows = actual.collectAsList(); + assertEquals(updatedRecords.size(), rows.size()); + for (Row row: rows) { + assertEquals(row.getAs(HoodieRecord.COMMIT_TIME_METADATA_FIELD), preserveCommitMeta ? newCommitTime : compactionInstantTime); + } } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java index 1ecdd336464d0..fb19a63259e19 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java @@ -305,20 +305,20 @@ protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit, HoodieI } protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit, long compactionSmallFileSize, HoodieClusteringConfig clusteringConfig) { - return getConfigBuilder(autoCommit, false, HoodieIndex.IndexType.BLOOM, compactionSmallFileSize, clusteringConfig); + return getConfigBuilder(autoCommit, false, HoodieIndex.IndexType.BLOOM, compactionSmallFileSize, clusteringConfig, false); } protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit, Boolean rollbackUsingMarkers, HoodieIndex.IndexType indexType) { - return getConfigBuilder(autoCommit, rollbackUsingMarkers, indexType, 1024 * 1024 * 1024L, HoodieClusteringConfig.newBuilder().build()); + return getConfigBuilder(autoCommit, rollbackUsingMarkers, indexType, 1024 * 1024 * 1024L, HoodieClusteringConfig.newBuilder().build(), false); } protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit, Boolean rollbackUsingMarkers, HoodieIndex.IndexType indexType, - long compactionSmallFileSize, HoodieClusteringConfig clusteringConfig) { + long compactionSmallFileSize, HoodieClusteringConfig clusteringConfig, boolean preserveCommitMetaForCompaction) { return HoodieWriteConfig.newBuilder().withPath(basePath()).withSchema(TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2) .withDeleteParallelism(2) .withAutoCommit(autoCommit) .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(compactionSmallFileSize) - .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build()) + .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).withPreserveCommitMetadata(preserveCommitMetaForCompaction).build()) .withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024 * 1024).parquetMaxFileSize(1024 * 1024 * 1024).build()) .withEmbeddedTimelineServerEnabled(true).forTable("test-trip-table") .withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder()