Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -234,12 +234,6 @@ public class HoodieCompactionConfig extends HoodieConfig {
.withDocumentation("Used by org.apache.hudi.io.compact.strategy.DayBasedCompactionStrategy to denote the number of "
+ "latest partitions to compact during a compaction run.");

public static final ConfigProperty<Boolean> PRESERVE_COMMIT_METADATA = ConfigProperty
.key("hoodie.compaction.preserve.commit.metadata")
.defaultValue(false)
.sinceVersion("0.11.0")
.withDocumentation("When rewriting data, preserves existing hoodie_commit_time");

/**
* Configs related to specific table types.
*/
Expand Down Expand Up @@ -673,11 +667,6 @@ public Builder withLogFileSizeThresholdBasedCompaction(long logFileSizeThreshold
return this;
}

public Builder withPreserveCommitMetadata(boolean preserveCommitMetadata) {
compactionConfig.setValue(PRESERVE_COMMIT_METADATA, String.valueOf(preserveCommitMetadata));
return this;
}

public Builder withCommitsArchivalBatchSize(int batchSize) {
compactionConfig.setValue(COMMITS_ARCHIVAL_BATCH_SIZE, String.valueOf(batchSize));
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1196,10 +1196,6 @@ public boolean isPreserveHoodieCommitMetadataForClustering() {
return getBoolean(HoodieClusteringConfig.PRESERVE_COMMIT_METADATA);
}

public boolean isPreserveHoodieCommitMetadataForCompaction() {
return getBoolean(HoodieCompactionConfig.PRESERVE_COMMIT_METADATA);
}

public boolean isClusteringEnabled() {
// TODO: future support async clustering
return inlineClusteringEnabled() || isAsyncClusteringEnabled();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
protected Map<String, HoodieRecord<T>> keyToNewRecords;
protected Set<String> writtenRecordKeys;
protected HoodieFileWriter<IndexedRecord> fileWriter;
private boolean preserveMetadata = false;

protected Path newFilePath;
protected Path oldFilePath;
Expand Down Expand Up @@ -134,7 +133,6 @@ public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTab
super(config, instantTime, partitionPath, fileId, hoodieTable, taskContextSupplier);
this.keyToNewRecords = keyToNewRecords;
this.useWriterSchema = true;
this.preserveMetadata = config.isPreserveHoodieCommitMetadataForCompaction();
init(fileId, this.partitionPath, dataFileToBeMerged);
validateAndSetAndKeyGenProps(keyGeneratorOpt, config.populateMetaFields());
}
Expand Down Expand Up @@ -293,11 +291,9 @@ protected boolean writeRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord
if (indexedRecord.isPresent() && !isDelete) {
// Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
IndexedRecord recordWithMetadataInSchema = rewriteRecord((GenericRecord) indexedRecord.get());
if (preserveMetadata) {
fileWriter.writeAvro(hoodieRecord.getRecordKey(), recordWithMetadataInSchema);
} else {
fileWriter.writeAvroWithMetadata(recordWithMetadataInSchema, hoodieRecord);
}
// do not preserve FILENAME_METADATA_FIELD
recordWithMetadataInSchema.put(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.FILENAME_METADATA_FIELD), newFilePath.getName());
fileWriter.writeAvro(hoodieRecord.getRecordKey(), recordWithMetadataInSchema);
recordsWritten++;
} else {
recordsDeleted++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.Transformations;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.HoodieIndex.IndexType;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
Expand All @@ -54,11 +52,14 @@
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;

import org.apache.avro.generic.GenericRecord;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.mapred.JobConf;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
Expand Down Expand Up @@ -201,13 +202,11 @@ public void testUpsertPartitioner(boolean populateMetaFields) throws Exception {
}

// TODO: Enable metadata virtual keys in this test once the feature HUDI-2593 is completed
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testLogFileCountsAfterCompaction(boolean preserveCommitMeta) throws Exception {
@Test
public void testLogFileCountsAfterCompaction() throws Exception {
boolean populateMetaFields = true;
// insert 100 records
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(true, false, HoodieIndex.IndexType.BLOOM,
1024 * 1024 * 1024L, HoodieClusteringConfig.newBuilder().build(), preserveCommitMeta)
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(true)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build());
addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
HoodieWriteConfig config = cfgBuilder.build();
Expand Down Expand Up @@ -288,7 +287,7 @@ public void testLogFileCountsAfterCompaction(boolean preserveCommitMeta) throws
List<Row> rows = actual.collectAsList();
assertEquals(updatedRecords.size(), rows.size());
for (Row row: rows) {
assertEquals(row.getAs(HoodieRecord.COMMIT_TIME_METADATA_FIELD), preserveCommitMeta ? newCommitTime : compactionInstantTime);
assertEquals(row.getAs(HoodieRecord.COMMIT_TIME_METADATA_FIELD), newCommitTime);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public void testInlineScheduleCompaction(boolean scheduleInlineCompaction) throw

HoodieWriteConfig cfg = getConfigBuilder(false)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024)
.withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(2).withPreserveCommitMetadata(true).withScheduleInlineCompaction(scheduleInlineCompaction).build())
.withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(2).withScheduleInlineCompaction(scheduleInlineCompaction).build())
.build();
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,20 +324,20 @@ protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit, HoodieI
}

protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit, long compactionSmallFileSize, HoodieClusteringConfig clusteringConfig) {
return getConfigBuilder(autoCommit, false, HoodieIndex.IndexType.BLOOM, compactionSmallFileSize, clusteringConfig, false);
return getConfigBuilder(autoCommit, false, HoodieIndex.IndexType.BLOOM, compactionSmallFileSize, clusteringConfig);
}

protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit, Boolean rollbackUsingMarkers, HoodieIndex.IndexType indexType) {
return getConfigBuilder(autoCommit, rollbackUsingMarkers, indexType, 1024 * 1024 * 1024L, HoodieClusteringConfig.newBuilder().build(), false);
return getConfigBuilder(autoCommit, rollbackUsingMarkers, indexType, 1024 * 1024 * 1024L, HoodieClusteringConfig.newBuilder().build());
}

protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit, Boolean rollbackUsingMarkers, HoodieIndex.IndexType indexType,
long compactionSmallFileSize, HoodieClusteringConfig clusteringConfig, boolean preserveCommitMetaForCompaction) {
long compactionSmallFileSize, HoodieClusteringConfig clusteringConfig) {
return HoodieWriteConfig.newBuilder().withPath(basePath()).withSchema(TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withDeleteParallelism(2)
.withAutoCommit(autoCommit)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(compactionSmallFileSize)
.withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).withPreserveCommitMetadata(preserveCommitMetaForCompaction).build())
.withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build())
.withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024 * 1024).parquetMaxFileSize(1024 * 1024 * 1024).build())
.withEmbeddedTimelineServerEnabled(true).forTable("test-trip-table")
.withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ class TestMORDataSource extends HoodieClientTestBase {
.options(commonOpts)
.mode(SaveMode.Append)
.save(basePath)
val commit3Time = HoodieDataSourceHelpers.latestCommit(fs, basePath)
val hudiSnapshotDF3 = spark.read.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
.load(basePath + "/*/*/*/*")
Expand Down Expand Up @@ -218,6 +219,7 @@ class TestMORDataSource extends HoodieClientTestBase {
.options(commonOpts)
.mode(SaveMode.Append)
.save(basePath)
val commit4Time = HoodieDataSourceHelpers.latestCommit(fs, basePath)
val hudiSnapshotDF4 = spark.read.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
.load(basePath + "/*/*/*/*")
Expand Down Expand Up @@ -268,8 +270,20 @@ class TestMORDataSource extends HoodieClientTestBase {
.option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, commit5Time)
.option(DataSourceReadOptions.END_INSTANTTIME.key, commit6Time)
.load(basePath)
// compaction updated 150 rows + inserted 2 new row
assertEquals(152, hudiIncDF6.count())
// compaction updated 150 rows + inserted 2 new row, but only 2 new records upsert between commit5Time and commit6Time
assertEquals(2, hudiIncDF6.count())

// Test taht compaction can't update _hoodie_commit_time.
val snapshot = spark.read.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
.load(basePath)
assertEquals(snapshot.where(s"_hoodie_commit_time = $commit2Time").count(), 50)
assertEquals(snapshot.where(s"_hoodie_commit_time = $commit3Time").count(), 50)
assertEquals(snapshot.where(s"_hoodie_commit_time = $commit4Time").count(), 50)
assertEquals(snapshot.where(s"_hoodie_commit_time = $commit5Time").count(), 50)

// Test that compaction should update _hoodie_file_name. There are 150 rows compacted.
assertEquals(snapshot.where(s"_hoodie_file_name like '%$commit6Time%'").count(), 150)
}

@Test
Expand Down