Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,8 @@ protected void commit(HoodieTable table, String commitActionType, String instant
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
// Finalize write
finalizeWrite(table, instantTime, stats);
// update Metadata table
writeTableMetadata(table, instantTime, commitActionType, metadata);
activeTimeline.saveAsComplete(new HoodieInstant(true, commitActionType, instantTime),
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
}
Expand All @@ -244,16 +246,24 @@ void emitCommitMetrics(String instantTime, HoodieCommitMetadata metadata, String
}

/**
* Any pre-commit actions like conflict resolution or updating metadata table goes here.
* Any pre-commit actions like conflict resolution goes here.
* @param inflightInstant instant of inflight operation.
* @param metadata commit metadata for which pre commit is being invoked.
*/
protected void preCommit(HoodieInstant inflightInstant, HoodieCommitMetadata metadata) {
// Create a Hoodie table after starting the transaction which encapsulated the commits and files visible.
// Important to create this after the lock to ensure latest commits show up in the timeline without need for reload
HoodieTable table = createTable(config, hadoopConf);
table.getMetadataWriter().ifPresent(w -> ((HoodieTableMetadataWriter)w).update(metadata, inflightInstant.getTimestamp(),
table.isTableServiceAction(inflightInstant.getAction())));
// To be overridden by specific engines to perform conflict resolution if any.
}

/**
* Write the HoodieCommitMetadata to metadata table if available.
* @param table {@link HoodieTable} of interest.
* @param instantTime instant time of the commit.
* @param actionType action type of the commit.
* @param metadata instance of {@link HoodieCommitMetadata}.
*/
protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata) {
table.getMetadataWriter().ifPresent(w -> ((HoodieTableMetadataWriter)w).update(metadata, instantTime,
table.isTableServiceAction(actionType)));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,10 +258,10 @@ protected void preWrite(String instantTime, WriteOperationType writeOperationTyp
}

@Override
protected void preCommit(HoodieInstant inflightInstant, HoodieCommitMetadata metadata) {
protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata) {
this.metadataWriterOption.ifPresent(w -> {
w.initTableMetadata(); // refresh the timeline
w.update(metadata, inflightInstant.getTimestamp(), getHoodieTable().isTableServiceAction(inflightInstant.getAction()));
w.update(metadata, instantTime, getHoodieTable().isTableServiceAction(actionType));
});
}

Expand Down Expand Up @@ -362,9 +362,9 @@ public void completeCompaction(
String compactionCommitTime) {
this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction");
List<HoodieWriteStat> writeStats = writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList());
writeTableMetadata(table, metadata, new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime));
// commit to data table after committing to metadata table.
finalizeWrite(table, compactionCommitTime, writeStats);
// commit to data table after committing to metadata table.
writeTableMetadata(table, metadata, new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime));
LOG.info("Committing Compaction {} finished with result {}.", compactionCommitTime, metadata);
CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.SparkHoodieIndexFactory;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
import org.apache.hudi.metrics.DistributedRegistry;
import org.apache.hudi.table.BulkInsertPartitioner;
Expand Down Expand Up @@ -314,9 +313,9 @@ protected void completeCompaction(HoodieCommitMetadata metadata, JavaRDD<WriteSt
String compactionCommitTime) {
this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction");
List<HoodieWriteStat> writeStats = writeStatuses.map(WriteStatus::getStat).collect();
writeTableMetadata(table, metadata, new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime));
// commit to data table after committing to metadata table.
finalizeWrite(table, compactionCommitTime, writeStats);
writeTableMetadataForTableServices(table, metadata, new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime));
// commit to data table after committing to metadata table.
LOG.info("Committing Compaction " + compactionCommitTime + ". Finished with result " + metadata);
CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata);
WriteMarkersFactory.get(config.getMarkersType(), table, compactionCommitTime)
Expand Down Expand Up @@ -386,8 +385,8 @@ private void completeClustering(HoodieReplaceCommitMetadata metadata, JavaRDD<Wr
throw new HoodieClusteringException("Clustering failed to write to files:"
+ writeStats.stream().filter(s -> s.getTotalWriteErrors() > 0L).map(s -> s.getFileId()).collect(Collectors.joining(",")));
}
writeTableMetadata(table, metadata, new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringCommitTime));
finalizeWrite(table, clusteringCommitTime, writeStats);
writeTableMetadataForTableServices(table, metadata, new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringCommitTime));
try {
// try to save statistics info to hudi
if (config.isDataSkippingEnabled() && config.isLayoutOptimizationEnabled() && !config.getClusteringSortColumns().isEmpty()) {
Expand Down Expand Up @@ -415,8 +414,8 @@ private void completeClustering(HoodieReplaceCommitMetadata metadata, JavaRDD<Wr
LOG.info("Clustering successfully on commit " + clusteringCommitTime);
}

private void writeTableMetadata(HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table, HoodieCommitMetadata commitMetadata,
HoodieInstant hoodieInstant) {
private void writeTableMetadataForTableServices(HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table, HoodieCommitMetadata commitMetadata,
HoodieInstant hoodieInstant) {
try {
this.txnManager.beginTransaction(Option.of(hoodieInstant), Option.empty());
boolean isTableServiceAction = table.isTableServiceAction(hoodieInstant.getAction());
Expand Down Expand Up @@ -497,8 +496,6 @@ protected void preCommit(HoodieInstant inflightInstant, HoodieCommitMetadata met
HoodieTable table = createTable(config, hadoopConf);
TransactionUtils.resolveWriteConflictIfAny(table, this.txnManager.getCurrentTransactionOwner(),
Option.of(metadata), config, txnManager.getLastCompletedTransactionOwner());
table.getMetadataWriter().ifPresent(w -> ((HoodieTableMetadataWriter)w).update(metadata, inflightInstant.getTimestamp(),
table.isTableServiceAction(inflightInstant.getAction())));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
Expand All @@ -121,6 +122,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

@Tag("functional")
Expand Down Expand Up @@ -224,7 +226,7 @@ public void testOnlyValidPartitionsAdded(HoodieTableType tableType) throws Excep
@ParameterizedTest
@MethodSource("bootstrapAndTableOperationTestArgs")
public void testTableOperations(HoodieTableType tableType, boolean enableFullScan) throws Exception {
init(tableType, true, enableFullScan, false);
init(tableType, true, enableFullScan, false, false);
doWriteInsertAndUpsert(testTable);

// trigger an upsert
Expand Down Expand Up @@ -482,7 +484,7 @@ private Long getNextCommitTime(long curCommitTime) {
@ParameterizedTest
@EnumSource(HoodieTableType.class)
public void testMetadataBootstrapLargeCommitList(HoodieTableType tableType) throws Exception {
init(tableType, true, true, true);
init(tableType, true, true, true, false);
long baseCommitTime = Long.parseLong(HoodieActiveTimeline.createNewInstantTime());
for (int i = 1; i < 25; i += 7) {
long commitTime1 = getNextCommitTime(baseCommitTime);
Expand Down Expand Up @@ -541,6 +543,34 @@ public void testFirstCommitRollback(HoodieTableType tableType) throws Exception
}
}

/**
* Tests the metadata payload spurious deletes.
* Lets say a commit was applied to metadata table, and later was explicitly got rolledback. Due to spark task failures, there could be more files in rollback
* metadata when compared to the original commit metadata. When payload consistency check is enabled, it will throw exception. If not, it will succeed.
* @throws Exception
*/
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testMetadataPayloadSpuriousDeletes(boolean ignoreSpuriousDeletes) throws Exception {
tableType = COPY_ON_WRITE;
init(tableType, true, true, false, ignoreSpuriousDeletes);
doWriteInsertAndUpsert(testTable);
// trigger an upsert
doWriteOperationAndValidate(testTable, "0000003");

// trigger a commit and rollback
doWriteOperation(testTable, "0000004");
// add extra files in rollback to check for payload consistency
Map<String, List<String>> extraFiles = new HashMap<>();
extraFiles.put("p1", Collections.singletonList("f10"));
extraFiles.put("p2", Collections.singletonList("f12"));
testTable.doRollbackWithExtraFiles("0000004", "0000005", extraFiles);
if (!ignoreSpuriousDeletes) {
assertThrows(HoodieMetadataException.class, () -> validateMetadata(testTable));
} else {
validateMetadata(testTable);
}
}

/**
* Test several table operations with restore. This test uses SparkRDDWriteClient.
Expand Down Expand Up @@ -1101,7 +1131,7 @@ public void testRollbackOfPartiallyFailedCommitWithNewPartitions() throws Except
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);

try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, true, false, true, false).build(),
getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, true, false, true, false, false).build(),
true)) {
String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.startCommitWithTime(newCommitTime);
Expand Down Expand Up @@ -1132,7 +1162,7 @@ public void testRollbackOfPartiallyFailedCommitWithNewPartitions() throws Except
}

try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, true, false, true, false).build(),
getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, true, false, true, false, false).build(),
true)) {
String newCommitTime = client.startCommit();
// Next insert
Expand All @@ -1154,7 +1184,7 @@ public void testErrorCases() throws Exception {
// TESTCASE: If commit on the metadata table succeeds but fails on the dataset, then on next init the metadata table
// should be rolled back to last valid commit.
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, true, false, true, false).build(),
getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, true, false, true, false, false).build(),
true)) {
String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.startCommitWithTime(newCommitTime);
Expand All @@ -1178,7 +1208,7 @@ public void testErrorCases() throws Exception {
}

try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, true, false, true, false).build(),
getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, true, false, true, false, false).build(),
true)) {
String newCommitTime = client.startCommit();
// Next insert
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2079,7 +2079,7 @@ public void testConsistencyCheckDuringFinalize(boolean enableOptimisticConsisten

private void testRollbackAfterConsistencyCheckFailureUsingFileList(boolean rollbackUsingMarkers, boolean enableOptimisticConsistencyGuard,
boolean populateMetaFields) throws Exception {
String instantTime = "000";
String instantTime = "00000000000010";
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();

Properties properties = new Properties();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,11 @@ public void init(HoodieTableType tableType) throws IOException {
}

public void init(HoodieTableType tableType, boolean enableMetadataTable) throws IOException {
init(tableType, enableMetadataTable, true, false);
init(tableType, enableMetadataTable, true, false, false);
}

public void init(HoodieTableType tableType, boolean enableMetadataTable, boolean enableFullScan, boolean enableMetrics) throws IOException {
public void init(HoodieTableType tableType, boolean enableMetadataTable, boolean enableFullScan, boolean enableMetrics, boolean
validateMetadataPayloadStateConsistency) throws IOException {
this.tableType = tableType;
initPath();
initSparkContexts("TestHoodieMetadata");
Expand All @@ -88,7 +89,7 @@ public void init(HoodieTableType tableType, boolean enableMetadataTable, boolean
initTestDataGenerator();
metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(basePath);
writeConfig = getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, enableMetadataTable, enableMetrics,
enableFullScan, true).build();
enableFullScan, true, validateMetadataPayloadStateConsistency).build();
initWriteConfigAndMetatableWriter(writeConfig, enableMetadataTable);
}

Expand Down Expand Up @@ -266,11 +267,12 @@ protected HoodieWriteConfig.Builder getWriteConfigBuilder(boolean autoCommit, bo

protected HoodieWriteConfig.Builder getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy policy, boolean autoCommit, boolean useFileListingMetadata,
boolean enableMetrics) {
return getWriteConfigBuilder(policy, autoCommit, useFileListingMetadata, enableMetrics, true, true);
return getWriteConfigBuilder(policy, autoCommit, useFileListingMetadata, enableMetrics, true, true, false);
}

protected HoodieWriteConfig.Builder getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy policy, boolean autoCommit, boolean useFileListingMetadata,
boolean enableMetrics, boolean enableFullScan, boolean useRollbackUsingMarkers) {
boolean enableMetrics, boolean enableFullScan, boolean useRollbackUsingMarkers,
boolean validateMetadataPayloadConsistency) {
Properties properties = new Properties();
properties.put(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key(), SimpleKeyGenerator.class.getName());
return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA)
Expand All @@ -290,6 +292,7 @@ protected HoodieWriteConfig.Builder getWriteConfigBuilder(HoodieFailedWritesClea
.enableFullScan(enableFullScan)
.enableMetrics(enableMetrics)
.withPopulateMetaFields(false)
.ignoreSpuriousDeletes(validateMetadataPayloadConsistency)
.build())
.withMetricsConfig(HoodieMetricsConfig.newBuilder().on(enableMetrics)
.withExecutorMetrics(true).build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,13 @@ public final class HoodieMetadataConfig extends HoodieConfig {
.sinceVersion("0.10.0")
.withDocumentation("When enabled, populates all meta fields. When disabled, no meta fields are populated.");

public static final ConfigProperty<Boolean> IGNORE_SPURIOUS_DELETES = ConfigProperty
.key("_" + METADATA_PREFIX + ".ignore.spurious.deletes")
.defaultValue(true)
.sinceVersion("0.10.10")
.withDocumentation("There are cases when extra files are requested to be deleted from metadata table which was never added before. This config"
+ "determines how to handle such spurious deletes");

private HoodieMetadataConfig() {
super();
}
Expand Down Expand Up @@ -174,6 +181,10 @@ public boolean populateMetaFields() {
return getBooleanOrDefault(HoodieMetadataConfig.POPULATE_META_FIELDS);
}

public boolean ignoreSpuriousDeletes() {
return getBoolean(IGNORE_SPURIOUS_DELETES);
}

public static class Builder {

private EngineType engineType = EngineType.SPARK;
Expand Down Expand Up @@ -252,6 +263,11 @@ public Builder enableFullScan(boolean enableFullScan) {
return this;
}

public Builder ignoreSpuriousDeletes(boolean validateMetadataPayloadConsistency) {
metadataConfig.setValue(IGNORE_SPURIOUS_DELETES, String.valueOf(validateMetadataPayloadConsistency));
return this;
}

public Builder withEngineType(EngineType engineType) {
this.engineType = engineType;
return this;
Expand Down
Loading