Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,13 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.exception.HoodieNotSupportedException;

import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand All @@ -46,7 +50,7 @@
* Flink hoodie backed table metadata writer.
*/
public class FlinkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetadataWriter {

private static final Logger LOG = LoggerFactory.getLogger(FlinkHoodieBackedTableMetadataWriter.class);
private transient BaseHoodieWriteClient writeClient;

public static HoodieTableMetadataWriter create(Configuration conf, HoodieWriteConfig writeConfig,
Expand Down Expand Up @@ -118,33 +122,31 @@ private void doCommit(String instantTime, Map<MetadataPartitionType, HoodieData<

if (!metadataMetaClient.getActiveTimeline().containsInstant(instantTime)) {
// if this is a new commit being applied to metadata for the first time
writeClient.startCommitWithTime(instantTime);
metadataMetaClient.getActiveTimeline().transitionRequestedToInflight(HoodieActiveTimeline.DELTA_COMMIT_ACTION, instantTime);
LOG.info("New commit at " + instantTime + " being applied to MDT.");
} else {
Option<HoodieInstant> alreadyCompletedInstant = metadataMetaClient.getActiveTimeline().filterCompletedInstants().filter(entry -> entry.getTimestamp().equals(instantTime)).lastInstant();
if (alreadyCompletedInstant.isPresent()) {
// this code path refers to a re-attempted commit that got committed to metadata table, but failed in datatable.
// for eg, lets say compaction c1 on 1st attempt succeeded in metadata table and failed before committing to datatable.
// when retried again, data table will first rollback pending compaction. these will be applied to metadata table, but all changes
// are upserts to metadata table and so only a new delta commit will be created.
// once rollback is complete, compaction will be retried again, which will eventually hit this code block where the respective commit is
// already part of completed commit. So, we have to manually remove the completed instant and proceed.
// and it is for the same reason we enabled withAllowMultiWriteOnSameInstant for metadata table.
HoodieActiveTimeline.deleteInstantFile(metadataMetaClient.getFs(), metadataMetaClient.getMetaPath(), alreadyCompletedInstant.get());
metadataMetaClient.reloadActiveTimeline();
// this code path refers to a re-attempted commit that:
// 1. got committed to metadata table, but failed in datatable.
// 2. failed while committing to metadata table
// for e.g., let's say compaction c1 on 1st attempt succeeded in metadata table and failed before committing to datatable.
// when retried again, data table will first rollback pending compaction. these will be applied to metadata table, but all changes
// are upserts to metadata table and so only a new delta commit will be created.
// once rollback is complete in datatable, compaction will be retried again, which will eventually hit this code block where the respective commit is
// already part of completed commit. So, we have to manually rollback the completed instant and proceed.
Option<HoodieInstant> alreadyCompletedInstant = metadataMetaClient.getActiveTimeline().filterCompletedInstants().filter(entry -> entry.getTimestamp().equals(instantTime))
.lastInstant();
LOG.info(String.format("%s completed commit at %s being applied to MDT.",
alreadyCompletedInstant.isPresent() ? "Already" : "Partially", instantTime));

// Rollback the previous commit
if (!writeClient.rollback(instantTime)) {
throw new HoodieMetadataException("Failed to rollback deltacommit at " + instantTime + " from MDT");
}
// If the alreadyCompletedInstant is empty, that means there is a requested or inflight
// instant with the same instant time. This happens for data table clean action which
// reuses the same instant time without rollback first. It is a no-op here as the
// clean plan is the same, so we don't need to delete the requested and inflight instant
// files in the active timeline.

// The metadata writer uses LAZY cleaning strategy without auto commit,
// write client then checks the heartbeat expiration when committing the instant,
// sets up the heartbeat explicitly to make the check pass.
writeClient.getHeartbeatClient().start(instantTime);
metadataMetaClient.reloadActiveTimeline();
}

writeClient.startCommitWithTime(instantTime);
metadataMetaClient.getActiveTimeline().transitionRequestedToInflight(HoodieActiveTimeline.DELTA_COMMIT_ACTION, instantTime);

List<WriteStatus> statuses = isInitializing
? writeClient.bulkInsertPreppedRecords(preppedRecordList, instantTime, Option.empty())
: writeClient.upsertPreppedRecords(preppedRecordList, instantTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.metrics.DistributedRegistry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.table.BulkInsertPartitioner;
Expand Down Expand Up @@ -144,27 +144,29 @@ private void commitInternal(String instantTime, Map<MetadataPartitionType, Hoodi

if (!metadataMetaClient.getActiveTimeline().getCommitsTimeline().containsInstant(instantTime)) {
// if this is a new commit being applied to metadata for the first time
writeClient.startCommitWithTime(instantTime);
LOG.info("New commit at " + instantTime + " being applied to MDT.");
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to MDT -> to MDT.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we sync the logic also to FlinkHoodieBackedTableMetadataWriter ?

Option<HoodieInstant> alreadyCompletedInstant = metadataMetaClient.getActiveTimeline().filterCompletedInstants().filter(entry -> entry.getTimestamp().equals(instantTime)).lastInstant();
if (alreadyCompletedInstant.isPresent()) {
// this code path refers to a re-attempted commit that got committed to metadata table, but failed in datatable.
// for eg, lets say compaction c1 on 1st attempt succeeded in metadata table and failed before committing to datatable.
// when retried again, data table will first rollback pending compaction. these will be applied to metadata table, but all changes
// are upserts to metadata table and so only a new delta commit will be created.
// once rollback is complete, compaction will be retried again, which will eventually hit this code block where the respective commit is
// already part of completed commit. So, we have to manually remove the completed instant and proceed.
// and it is for the same reason we enabled withAllowMultiWriteOnSameInstant for metadata table.
HoodieActiveTimeline.deleteInstantFile(metadataMetaClient.getFs(), metadataMetaClient.getMetaPath(), alreadyCompletedInstant.get());
metadataMetaClient.reloadActiveTimeline();
// this code path refers to a re-attempted commit that:
// 1. got committed to metadata table, but failed in datatable.
// 2. failed while committing to metadata table
// for e.g., let's say compaction c1 on 1st attempt succeeded in metadata table and failed before committing to datatable.
// when retried again, data table will first rollback pending compaction. these will be applied to metadata table, but all changes
// are upserts to metadata table and so only a new delta commit will be created.
// once rollback is complete in datatable, compaction will be retried again, which will eventually hit this code block where the respective commit is
// already part of completed commit. So, we have to manually rollback the completed instant and proceed.
Option<HoodieInstant> alreadyCompletedInstant = metadataMetaClient.getActiveTimeline().filterCompletedInstants().filter(entry -> entry.getTimestamp().equals(instantTime))
.lastInstant();
LOG.info(String.format("%s completed commit at %s being applied to MDT.",
alreadyCompletedInstant.isPresent() ? "Already" : "Partially", instantTime));

// Rollback the previous commit
if (!writeClient.rollback(instantTime)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to gauge if we really need this.

I guess in next couple of patches, you are going to add below change:

  • Any rollback in DT will be an actual rollback in MDT as well.

having said that, lets go through this use-case.

Compaction Commit C5 is inflight in DT and succeeded in MDT, but crashed in DT.
so on restart, a rollback is triggered in DT. which when gets into MDT territory, will rollback the succeeded commit in MDT. So, it will be automatically taken care of.

After rollback of C5 is completed, C5 will be re-attempted in DT. and when it gets into MDT territory, there won't be any traces of DC5 at all. So, wondering when exactly we will hit this case?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we are talking about a partially failed commit in MDT:

Compaction Commit C5 is inflight in DT and DC5 in MDT is also partitally committed and crashed.
On restart, any new operation in DT when it gets into MDT territory, on deducting a partial commit in MDT, a rollback will be triggered eagerly. Ref:

So, this case is also taken care of.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will rollback the succeeded commit in MDT. So, it will be automatically taken care of.

The current code on master just removes the .complete metadata file, is that a rollback you are mentioning about? To keep sync with regular rollback, I think triggering a real rollback action is necessary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Older solution of removing the completed action and reattempt won't work in all scenarios. We will have to consider the following scenarios:
(1) c1.commit failed on the main dataset; On MDT, c1.deltacommit was completed.
(a) with record index enabled, new log block was added to the log file by c1.deltacommit. Simply removing deltacommit, may not be enough and will require additional action to rollback the logblock, to keep the log file consistent.
(2) c1.clean was attempted. c1.deltacommit was completed. When clean is retried, second attempt could bring in some of the files that were in the "failed" list of the first attempt (vs the "success" list).
(3) c1.rollback was attempted. c1.deltacommit was completed. (We fixed an issue with incomplete rollback, with MDT updated with deltacommit, scenario. This change played a role in this scenario as well).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, fix the rollback in sync with normal DT can avoid many potential bugs, +1 for this direction.

throw new HoodieMetadataException("Failed to rollback deltacommit at " + instantTime + " from MDT");
}
// If the alreadyCompletedInstant is empty, that means there is a requested or inflight
// instant with the same instant time. This happens for data table clean action which
// reuses the same instant time without rollback first. It is a no-op here as the
// clean plan is the same, so we don't need to delete the requested and inflight instant
// files in the active timeline.
metadataMetaClient.reloadActiveTimeline();
}

writeClient.startCommitWithTime(instantTime);
if (bulkInsertPartitioner.isPresent()) {
writeClient.bulkInsertPreppedRecords(preppedRecordRDD, instantTime, bulkInsertPartitioner).collect();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.client.functional;

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
import org.apache.hudi.avro.model.HoodieMetadataRecord;
import org.apache.hudi.client.SparkRDDWriteClient;
Expand Down Expand Up @@ -2029,7 +2030,9 @@ public void testReattemptOfFailedClusteringCommit() throws Exception {

HoodieWriteConfig newWriteConfig = getConfigBuilder(TRIP_EXAMPLE_SCHEMA, HoodieIndex.IndexType.BLOOM, HoodieFailedWritesCleaningPolicy.EAGER)
.withAutoCommit(false)
.withClusteringConfig(clusteringConfig).build();
.withClusteringConfig(clusteringConfig)
.withRollbackUsingMarkers(false)
.build();

// trigger clustering
SparkRDDWriteClient newClient = getHoodieWriteClient(newWriteConfig);
Expand Down Expand Up @@ -2821,6 +2824,69 @@ public void testDuplicatesDuringRecordIndexBootstrap() throws Exception {
}
}

/**
* Test duplicate operation with same instant timestamp.
*
* This can happen if the commit on the MDT succeeds but fails on the dataset. For some table services like clean,
* compaction, replace commit, the operation will be retried with the same timestamp (taken from inflight). Hence,
* metadata table will see an additional commit with the same timestamp as a previously completed deltacommit.
*/
@Test
public void testRepeatedActionWithSameInstantTime() throws Exception {
init(HoodieTableType.COPY_ON_WRITE);
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);

Properties props = new Properties();
props.put(HoodieCleanConfig.ALLOW_MULTIPLE_CLEANS.key(), "false");
HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false).withProps(props).build();

// Perform three writes so we can initiate a clean
int index = 0;
final String partition = "2015/03/16";
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, writeConfig)) {
for (; index < 3; ++index) {
String newCommitTime = "00" + index;
List<HoodieRecord> records = index == 0 ? dataGen.generateInsertsForPartition(newCommitTime, 10, partition)
: dataGen.generateUniqueUpdates(newCommitTime, 5);
client.startCommitWithTime(newCommitTime);
client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
}
}
assertEquals(metaClient.reloadActiveTimeline().getCommitTimeline().filterCompletedInstants().countInstants(), 3);

try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, writeConfig)) {
// Perform a clean
String cleanInstantTime = "00" + index++;
HoodieCleanMetadata cleanMetadata = client.clean(cleanInstantTime);
// 1 partition should be cleaned
assertEquals(cleanMetadata.getPartitionMetadata().size(), 1);
// 1 file cleaned
assertEquals(cleanMetadata.getPartitionMetadata().get(partition).getSuccessDeleteFiles().size(), 1);
assertEquals(cleanMetadata.getPartitionMetadata().get(partition).getFailedDeleteFiles().size(), 0);
assertEquals(cleanMetadata.getPartitionMetadata().get(partition).getDeletePathPatterns().size(), 1);

// To simulate failed clean on the main dataset, we will delete the completed clean instant
String cleanInstantFileName = HoodieTimeline.makeCleanerFileName(cleanInstantTime);
assertTrue(fs.delete(new Path(basePath + Path.SEPARATOR + HoodieTableMetaClient.METAFOLDER_NAME,
cleanInstantFileName), false));
assertEquals(metaClient.reloadActiveTimeline().getCleanerTimeline().filterInflights().countInstants(), 1);
assertEquals(metaClient.reloadActiveTimeline().getCleanerTimeline().filterCompletedInstants().countInstants(), 0);

// Initiate another clean. The previous leftover clean will be attempted and no other clean will be scheduled.
String newCleanInstantTime = "00" + index++;
cleanMetadata = client.clean(newCleanInstantTime);

// 1 partition should be cleaned
assertEquals(cleanMetadata.getPartitionMetadata().size(), 1);
// 1 file cleaned but was already deleted so will be a failed delete
assertEquals(cleanMetadata.getPartitionMetadata().get(partition).getSuccessDeleteFiles().size(), 0);
assertEquals(cleanMetadata.getPartitionMetadata().get(partition).getFailedDeleteFiles().size(), 1);
assertEquals(cleanMetadata.getPartitionMetadata().get(partition).getDeletePathPatterns().size(), 1);

validateMetadata(client);
}
}

private void doPreBootstrapOperations(HoodieTestTable testTable) throws Exception {
doPreBootstrapOperations(testTable, "0000001", "0000002");
}
Expand Down