Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,12 @@ public class HoodieCompactionConfig extends HoodieConfig {
.withDocumentation("Used by org.apache.hudi.io.compact.strategy.DayBasedCompactionStrategy to denote the number of "
+ "latest partitions to compact during a compaction run.");

public static final ConfigProperty<Boolean> PRESERVE_COMMIT_METADATA = ConfigProperty
.key("hoodie.compaction.preserve.commit.metadata")
.defaultValue(false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compaction should not change the original _hoodie_commit_time or _hoodie_commit_seqno values at all. So we should look into making that the default behavior as @YannByron suggested.

.sinceVersion("0.11.0")
.withDocumentation("When rewriting data, preserves existing hoodie_commit_time");

/**
* Configs related to specific table types.
*/
Expand Down Expand Up @@ -621,6 +627,11 @@ public Builder withLogFileSizeThresholdBasedCompaction(long logFileSizeThreshold
return this;
}

public Builder withPreserveCommitMetadata(boolean preserveCommitMetadata) {
compactionConfig.setValue(PRESERVE_COMMIT_METADATA, String.valueOf(preserveCommitMetadata));
return this;
}

public Builder withCommitsArchivalBatchSize(int batchSize) {
compactionConfig.setValue(COMMITS_ARCHIVAL_BATCH_SIZE, String.valueOf(batchSize));
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1161,10 +1161,14 @@ public boolean isAsyncClusteringEnabled() {
return getBoolean(HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE);
}

public boolean isPreserveHoodieCommitMetadata() {
public boolean isPreserveHoodieCommitMetadataForClustering() {
return getBoolean(HoodieClusteringConfig.PRESERVE_COMMIT_METADATA);
}

public boolean isPreserveHoodieCommitMetadataForCompaction() {
return getBoolean(HoodieCompactionConfig.PRESERVE_COMMIT_METADATA);
}

public boolean isClusteringEnabled() {
// TODO: future support async clustering
return inlineClusteringEnabled() || isAsyncClusteringEnabled();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
protected Map<String, HoodieRecord<T>> keyToNewRecords;
protected Set<String> writtenRecordKeys;
protected HoodieFileWriter<IndexedRecord> fileWriter;
private boolean preserveMetadata = false;

protected Path newFilePath;
protected Path oldFilePath;
Expand Down Expand Up @@ -133,6 +134,7 @@ public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTab
super(config, instantTime, partitionPath, fileId, hoodieTable, taskContextSupplier);
this.keyToNewRecords = keyToNewRecords;
this.useWriterSchema = true;
this.preserveMetadata = config.isPreserveHoodieCommitMetadataForCompaction();
init(fileId, this.partitionPath, dataFileToBeMerged);
validateAndSetAndKeyGenProps(keyGeneratorOpt, config.populateMetaFields());
}
Expand Down Expand Up @@ -291,7 +293,11 @@ protected boolean writeRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord
if (indexedRecord.isPresent() && !isDelete) {
// Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
IndexedRecord recordWithMetadataInSchema = rewriteRecord((GenericRecord) indexedRecord.get());
fileWriter.writeAvroWithMetadata(recordWithMetadataInSchema, hoodieRecord);
if (preserveMetadata) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's see. _hoodie_file_name could technically change to the base file?

fileWriter.writeAvro(hoodieRecord.getRecordKey(), recordWithMetadataInSchema);
} else {
fileWriter.writeAvroWithMetadata(recordWithMetadataInSchema, hoodieRecord);
}
recordsWritten++;
} else {
recordsDeleted++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public Option<HoodieClusteringPlan> generateClusteringPlan() {
.setInputGroups(clusteringGroups)
.setExtraMetadata(getExtraMetadata())
.setVersion(getPlanVersion())
.setPreserveHoodieMetadata(getWriteConfig().isPreserveHoodieCommitMetadata())
.setPreserveHoodieMetadata(getWriteConfig().isPreserveHoodieCommitMetadataForClustering())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clustering should not change the commit time either.

.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1602,7 +1602,7 @@ private HoodieWriteMetadata<JavaRDD<WriteStatus>> performClustering(HoodieCluste
SparkRDDWriteClient client = getHoodieWriteClient(config);
String clusteringCommitTime = client.scheduleClustering(Option.empty()).get().toString();
HoodieWriteMetadata<JavaRDD<WriteStatus>> clusterMetadata = client.cluster(clusteringCommitTime, completeClustering);
if (config.isPreserveHoodieCommitMetadata() && config.populateMetaFields()) {
if (config.isPreserveHoodieCommitMetadataForClustering() && config.populateMetaFields()) {
verifyRecordsWrittenWithPreservedMetadata(new HashSet<>(allRecords.getRight()), allRecords.getLeft(), clusterMetadata.getWriteStatuses().collect());
} else {
verifyRecordsWritten(clusteringCommitTime, populateMetaFields, allRecords.getLeft(), clusterMetadata.getWriteStatuses().collect(), config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,15 @@
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.Transformations;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.HoodieIndex.IndexType;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
import org.apache.hudi.table.action.deltacommit.AbstractSparkDeltaCommitActionExecutor;
import org.apache.hudi.table.action.deltacommit.SparkDeleteDeltaCommitActionExecutor;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils;
import org.apache.hudi.testutils.HoodieSparkWriteableTestTable;
import org.apache.hudi.testutils.MetadataMergeWriteStatus;
Expand All @@ -53,6 +56,8 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.mapred.JobConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
Expand Down Expand Up @@ -194,10 +199,12 @@ public void testUpsertPartitioner(boolean populateMetaFields) throws Exception {

// TODO: Enable metadata virtual keys in this test once the feature HUDI-2593 is completed
@ParameterizedTest
@ValueSource(booleans = {true})
public void testLogFileCountsAfterCompaction(boolean populateMetaFields) throws Exception {
@ValueSource(booleans = {false, true})
public void testLogFileCountsAfterCompaction(boolean preserveCommitMeta) throws Exception {
boolean populateMetaFields = true;
// insert 100 records
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(true)
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(true, false, HoodieIndex.IndexType.BLOOM,
1024 * 1024 * 1024L, HoodieClusteringConfig.newBuilder().build(), preserveCommitMeta)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build());
addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
HoodieWriteConfig config = cfgBuilder.build();
Expand Down Expand Up @@ -268,6 +275,18 @@ public void testLogFileCountsAfterCompaction(boolean populateMetaFields) throws
List<WriteStatus> writeStatuses = result.collect();
assertTrue(writeStatuses.stream().anyMatch(writeStatus -> writeStatus.getStat().getPartitionPath().contentEquals(partitionPath)));
}

// Check the entire dataset has all records still
String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
for (int i = 0; i < fullPartitionPaths.length; i++) {
fullPartitionPaths[i] = String.format("%s/%s/*", basePath(), dataGen.getPartitionPaths()[i]);
}
Dataset<Row> actual = HoodieClientTestUtils.read(jsc(), basePath(), sqlContext(), fs(), fullPartitionPaths);
List<Row> rows = actual.collectAsList();
assertEquals(updatedRecords.size(), rows.size());
for (Row row: rows) {
assertEquals(row.getAs(HoodieRecord.COMMIT_TIME_METADATA_FIELD), preserveCommitMeta ? newCommitTime : compactionInstantTime);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,20 +305,20 @@ protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit, HoodieI
}

protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit, long compactionSmallFileSize, HoodieClusteringConfig clusteringConfig) {
return getConfigBuilder(autoCommit, false, HoodieIndex.IndexType.BLOOM, compactionSmallFileSize, clusteringConfig);
return getConfigBuilder(autoCommit, false, HoodieIndex.IndexType.BLOOM, compactionSmallFileSize, clusteringConfig, false);
}

protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit, Boolean rollbackUsingMarkers, HoodieIndex.IndexType indexType) {
return getConfigBuilder(autoCommit, rollbackUsingMarkers, indexType, 1024 * 1024 * 1024L, HoodieClusteringConfig.newBuilder().build());
return getConfigBuilder(autoCommit, rollbackUsingMarkers, indexType, 1024 * 1024 * 1024L, HoodieClusteringConfig.newBuilder().build(), false);
}

protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit, Boolean rollbackUsingMarkers, HoodieIndex.IndexType indexType,
long compactionSmallFileSize, HoodieClusteringConfig clusteringConfig) {
long compactionSmallFileSize, HoodieClusteringConfig clusteringConfig, boolean preserveCommitMetaForCompaction) {
return HoodieWriteConfig.newBuilder().withPath(basePath()).withSchema(TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withDeleteParallelism(2)
.withAutoCommit(autoCommit)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(compactionSmallFileSize)
.withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build())
.withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).withPreserveCommitMetadata(preserveCommitMetaForCompaction).build())
.withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024 * 1024).parquetMaxFileSize(1024 * 1024 * 1024).build())
.withEmbeddedTimelineServerEnabled(true).forTable("test-trip-table")
.withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder()
Expand Down