Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ protected void autoCleanOnCommit() {
} else {
// Do not reuse instantTime for clean as metadata table requires all changes to have unique instant timestamps.
LOG.info("Auto cleaning is enabled. Running cleaner now");
clean();
clean(true);
}
}
}
Expand Down Expand Up @@ -570,16 +570,22 @@ public void restoreToSavepoint(String savepointTime) {
SavepointHelpers.validateSavepointRestore(table, savepointTime);
}

@Deprecated
public boolean rollback(final String commitInstantTime) throws HoodieRollbackException {
return rollback(commitInstantTime, false);
}

/**
* @Deprecated
* Rollback the inflight record changes with the given commit time. This
* will be removed in future in favor of {@link AbstractHoodieWriteClient#restoreToInstant(String)}
*
* @param commitInstantTime Instant time of the commit
* @param skipLocking if this is triggered by another parent transaction, locking can be skipped.
* @throws HoodieRollbackException if rollback cannot be performed successfully
*/
@Deprecated
public boolean rollback(final String commitInstantTime) throws HoodieRollbackException {
public boolean rollback(final String commitInstantTime, boolean skipLocking) throws HoodieRollbackException {
LOG.info("Begin rollback of instant " + commitInstantTime);
final String rollbackInstantTime = HoodieActiveTimeline.createNewInstantTime();
final Timer.Context timerContext = this.metrics.getRollbackCtx();
Expand All @@ -590,10 +596,12 @@ public boolean rollback(final String commitInstantTime) throws HoodieRollbackExc
.findFirst());
if (commitInstantOpt.isPresent()) {
LOG.info("Scheduling Rollback at instant time :" + rollbackInstantTime);
Option<HoodieRollbackPlan> rollbackPlanOption = table.scheduleRollback(context, rollbackInstantTime, commitInstantOpt.get(), false);
Option<HoodieRollbackPlan> rollbackPlanOption = table.scheduleRollback(context, rollbackInstantTime,
commitInstantOpt.get(), false);
if (rollbackPlanOption.isPresent()) {
// execute rollback
HoodieRollbackMetadata rollbackMetadata = table.rollback(context, rollbackInstantTime, commitInstantOpt.get(), true);
HoodieRollbackMetadata rollbackMetadata = table.rollback(context, rollbackInstantTime, commitInstantOpt.get(), true,
skipLocking);
if (timerContext != null) {
long durationInMs = metrics.getDurationInMs(timerContext.stop());
metrics.updateRollbackMetrics(durationInMs, rollbackMetadata.getTotalFilesDeleted());
Expand Down Expand Up @@ -644,7 +652,19 @@ public HoodieRestoreMetadata restoreToInstant(final String instantTime) throws H
* cleaned)
*/
public HoodieCleanMetadata clean(String cleanInstantTime) throws HoodieIOException {
return clean(cleanInstantTime, true);
return clean(cleanInstantTime, true, false);
}

/**
* Clean up any stale/old files/data lying around (either on file storage or index storage) based on the
* configurations and CleaningPolicy used. (typically files that no longer can be used by a running query can be
* cleaned)
* @param cleanInstantTime instant time for clean.
* @param skipLocking if this is triggered by another parent transaction, locking can be skipped.
* @return instance of {@link HoodieCleanMetadata}.
*/
public HoodieCleanMetadata clean(String cleanInstantTime, boolean skipLocking) throws HoodieIOException {
return clean(cleanInstantTime, true, skipLocking);
}

/**
Expand All @@ -653,17 +673,20 @@ public HoodieCleanMetadata clean(String cleanInstantTime) throws HoodieIOExcepti
* cleaned). This API provides the flexibility to schedule clean instant asynchronously via
* {@link AbstractHoodieWriteClient#scheduleTableService(String, Option, TableServiceType)} and disable inline scheduling
* of clean.
* @param cleanInstantTime instant time for clean.
* @param scheduleInline true if needs to be scheduled inline. false otherwise.
* @param skipLocking if this is triggered by another parent transaction, locking can be skipped.
*/
public HoodieCleanMetadata clean(String cleanInstantTime, boolean scheduleInline) throws HoodieIOException {
public HoodieCleanMetadata clean(String cleanInstantTime, boolean scheduleInline, boolean skipLocking) throws HoodieIOException {
if (scheduleInline) {
scheduleTableServiceInternal(cleanInstantTime, Option.empty(), TableServiceType.CLEAN);
}
LOG.info("Cleaner started");
final Timer.Context timerContext = metrics.getCleanCtx();
LOG.info("Cleaned failed attempts if any");
CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
HoodieTimeline.CLEAN_ACTION, () -> rollbackFailedWrites());
HoodieCleanMetadata metadata = createTable(config, hadoopConf).clean(context, cleanInstantTime);
HoodieTimeline.CLEAN_ACTION, () -> rollbackFailedWrites(skipLocking));
HoodieCleanMetadata metadata = createTable(config, hadoopConf).clean(context, cleanInstantTime, skipLocking);
if (timerContext != null && metadata != null) {
long durationMs = metrics.getDurationInMs(timerContext.stop());
metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted());
Expand All @@ -675,7 +698,17 @@ public HoodieCleanMetadata clean(String cleanInstantTime, boolean scheduleInline
}

public HoodieCleanMetadata clean() {
return clean(HoodieActiveTimeline.createNewInstantTime());
return clean(false);
}

/**
* Triggers clean for the table. This refers to Clean up any stale/old files/data lying around (either on file storage or index storage) based on the
* * configurations and CleaningPolicy used.
* @param skipLocking if this is triggered by another parent transaction, locking can be skipped.
* @return instance of {@link HoodieCleanMetadata}.
*/
public HoodieCleanMetadata clean(boolean skipLocking) {
return clean(HoodieActiveTimeline.createNewInstantTime(), skipLocking);
}

/**
Expand Down Expand Up @@ -797,20 +830,29 @@ private HoodieTimeline getInflightTimelineExcludeCompactionAndClustering(HoodieT
* Rollback all failed writes.
*/
public Boolean rollbackFailedWrites() {
return rollbackFailedWrites(false);
}

/**
* Rollback all failed writes.
* @param skipLocking if this is triggered by another parent transaction, locking can be skipped.
*/
public Boolean rollbackFailedWrites(boolean skipLocking) {
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
List<String> instantsToRollback = getInstantsToRollback(table.getMetaClient(), config.getFailedWritesCleanPolicy());
rollbackFailedWrites(instantsToRollback);
List<String> instantsToRollback = getInstantsToRollback(table.getMetaClient(), config.getFailedWritesCleanPolicy(),
Option.empty());
rollbackFailedWrites(instantsToRollback, skipLocking);
return true;
}

protected void rollbackFailedWrites(List<String> instantsToRollback) {
protected void rollbackFailedWrites(List<String> instantsToRollback, boolean skipLocking) {
for (String instant : instantsToRollback) {
if (HoodieTimeline.compareTimestamps(instant, HoodieTimeline.LESSER_THAN_OR_EQUALS,
HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS)) {
rollbackFailedBootstrap();
break;
} else {
rollback(instant);
rollback(instant, skipLocking);
}
}
// Delete any heartbeat files for already rolled back commits
Expand All @@ -822,11 +864,17 @@ protected void rollbackFailedWrites(List<String> instantsToRollback) {
}
}

protected List<String> getInstantsToRollback(HoodieTableMetaClient metaClient, HoodieFailedWritesCleaningPolicy cleaningPolicy) {
protected List<String> getInstantsToRollback(HoodieTableMetaClient metaClient, HoodieFailedWritesCleaningPolicy cleaningPolicy, Option<String> curInstantTime) {
Stream<HoodieInstant> inflightInstantsStream = getInflightTimelineExcludeCompactionAndClustering(metaClient)
.getReverseOrderedInstants();
if (cleaningPolicy.isEager()) {
return inflightInstantsStream.map(HoodieInstant::getTimestamp).collect(Collectors.toList());
return inflightInstantsStream.map(HoodieInstant::getTimestamp).filter(entry -> {
if (curInstantTime.isPresent()) {
return !entry.equals(curInstantTime.get());
} else {
return true;
}
}).collect(Collectors.toList());
} else if (cleaningPolicy.isLazy()) {
return inflightInstantsStream.filter(instant -> {
try {
Expand Down Expand Up @@ -975,7 +1023,7 @@ protected Option<String> inlineCluster(Option<Map<String, String>> extraMetadata
protected void rollbackInflightClustering(HoodieInstant inflightInstant, HoodieTable<T, I, K, O> table) {
String commitTime = HoodieActiveTimeline.createNewInstantTime();
table.scheduleRollback(context, commitTime, inflightInstant, false);
table.rollback(context, commitTime, inflightInstant, false);
table.rollback(context, commitTime, inflightInstant, false, false);
table.getActiveTimeline().revertReplaceCommitInflightToRequested(inflightInstant);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ public abstract Option<HoodieCleanerPlan> scheduleCleaning(HoodieEngineContext c
*
* @return information on cleaned file slices
*/
public abstract HoodieCleanMetadata clean(HoodieEngineContext context, String cleanInstantTime);
public abstract HoodieCleanMetadata clean(HoodieEngineContext context, String cleanInstantTime, boolean skipLocking);

/**
* Schedule rollback for the instant time.
Expand Down Expand Up @@ -452,7 +452,8 @@ public abstract Option<HoodieRollbackPlan> scheduleRollback(HoodieEngineContext
public abstract HoodieRollbackMetadata rollback(HoodieEngineContext context,
String rollbackInstantTime,
HoodieInstant commitInstant,
boolean deleteInstants);
boolean deleteInstants,
boolean skipLocking);

/**
* Create a savepoint at the specified instant, so that the table can be restored
Expand Down Expand Up @@ -480,7 +481,7 @@ public abstract HoodieRestoreMetadata restore(HoodieEngineContext context,
public void rollbackInflightCompaction(HoodieInstant inflightInstant) {
String commitTime = HoodieActiveTimeline.createNewInstantTime();
scheduleRollback(context, commitTime, inflightInstant, false);
rollback(context, commitTime, inflightInstant, false);
rollback(context, commitTime, inflightInstant, false, false);
getActiveTimeline().revertCompactionInflightToRequested(inflightInstant);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,16 @@ public class CleanActionExecutor<T extends HoodieRecordPayload, I, K, O> extends
private static final long serialVersionUID = 1L;
private static final Logger LOG = LogManager.getLogger(CleanActionExecutor.class);
private final TransactionManager txnManager;
private final boolean skipLocking;

public CleanActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable<T, I, K, O> table, String instantTime) {
this(context, config, table, instantTime, false);
}

public CleanActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable<T, I, K, O> table, String instantTime, boolean skipLocking) {
super(context, config, table, instantTime);
this.txnManager = new TransactionManager(config, table.getMetaClient().getFs());
this.skipLocking = skipLocking;
}

static Boolean deleteFileAndGetResult(FileSystem fs, String deletePathStr) throws IOException {
Expand Down Expand Up @@ -214,11 +220,17 @@ private HoodieCleanMetadata runClean(HoodieTable<T, I, K, O> table, HoodieInstan
* @param cleanMetadata instance of {@link HoodieCleanMetadata} to be applied to metadata.
*/
private void writeMetadata(HoodieCleanMetadata cleanMetadata) {
try {
this.txnManager.beginTransaction(Option.empty(), Option.empty());
writeTableMetadata(cleanMetadata);
} finally {
this.txnManager.endTransaction();
if (config.isMetadataTableEnabled()) {
try {
if (!skipLocking) {
this.txnManager.beginTransaction(Option.empty(), Option.empty());
}
writeTableMetadata(cleanMetadata);
} finally {
if (!skipLocking) {
this.txnManager.endTransaction();
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ protected HoodieRollbackMetadata rollbackInstant(HoodieInstant instantToRollback
instantToRollback,
true,
true,
false,
false);
return rollbackActionExecutor.execute();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ protected HoodieRollbackMetadata rollbackInstant(HoodieInstant instantToRollback
instantToRollback,
true,
true,
false,
false);

// TODO : Get file status and create a rollback stat and file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,17 @@ public abstract class BaseRollbackActionExecutor<T extends HoodieRecordPayload,
protected final boolean skipTimelinePublish;
protected final boolean useMarkerBasedStrategy;
private final TransactionManager txnManager;
private final boolean skipLocking;

public BaseRollbackActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable<T, I, K, O> table,
String instantTime,
HoodieInstant instantToRollback,
boolean deleteInstants) {
boolean deleteInstants,
boolean skipLocking) {
this(context, config, table, instantTime, instantToRollback, deleteInstants,
false, config.shouldRollbackUsingMarkers());
false, config.shouldRollbackUsingMarkers(), skipLocking);
}

public BaseRollbackActionExecutor(HoodieEngineContext context,
Expand All @@ -77,7 +79,8 @@ public BaseRollbackActionExecutor(HoodieEngineContext context,
HoodieInstant instantToRollback,
boolean deleteInstants,
boolean skipTimelinePublish,
boolean useMarkerBasedStrategy) {
boolean useMarkerBasedStrategy,
boolean skipLocking) {
super(context, config, table, instantTime);
this.instantToRollback = instantToRollback;
this.deleteInstants = deleteInstants;
Expand All @@ -87,6 +90,7 @@ public BaseRollbackActionExecutor(HoodieEngineContext context,
ValidationUtils.checkArgument(!instantToRollback.isCompleted(),
"Cannot use marker based rollback strategy on completed instant:" + instantToRollback);
}
this.skipLocking = skipLocking;
this.txnManager = new TransactionManager(config, table.getMetaClient().getFs());
}

Expand Down Expand Up @@ -265,11 +269,17 @@ protected void finishRollback(HoodieInstant inflightInstant, HoodieRollbackMetad
* @param rollbackMetadata instance of {@link HoodieRollbackMetadata} to be applied to metadata.
*/
private void writeToMetadata(HoodieRollbackMetadata rollbackMetadata) {
try {
this.txnManager.beginTransaction(Option.empty(), Option.empty());
writeTableMetadata(rollbackMetadata);
} finally {
this.txnManager.endTransaction();
if (config.isMetadataTableEnabled()) {
try {
if (!skipLocking) {
this.txnManager.beginTransaction(Option.empty(), Option.empty());
}
writeTableMetadata(rollbackMetadata);
} finally {
if (!skipLocking) {
this.txnManager.endTransaction();
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ public CopyOnWriteRollbackActionExecutor(HoodieEngineContext context,
HoodieTable<T, I, K, O> table,
String instantTime,
HoodieInstant commitInstant,
boolean deleteInstants) {
super(context, config, table, instantTime, commitInstant, deleteInstants);
boolean deleteInstants,
boolean skipLocking) {
super(context, config, table, instantTime, commitInstant, deleteInstants, skipLocking);
}

public CopyOnWriteRollbackActionExecutor(HoodieEngineContext context,
Expand All @@ -54,8 +55,9 @@ public CopyOnWriteRollbackActionExecutor(HoodieEngineContext context,
HoodieInstant commitInstant,
boolean deleteInstants,
boolean skipTimelinePublish,
boolean useMarkerBasedStrategy) {
super(context, config, table, instantTime, commitInstant, deleteInstants, skipTimelinePublish, useMarkerBasedStrategy);
boolean useMarkerBasedStrategy,
boolean skipLocking) {
super(context, config, table, instantTime, commitInstant, deleteInstants, skipTimelinePublish, useMarkerBasedStrategy, skipLocking);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ public MergeOnReadRollbackActionExecutor(HoodieEngineContext context,
HoodieTable<T, I, K, O> table,
String instantTime,
HoodieInstant commitInstant,
boolean deleteInstants) {
super(context, config, table, instantTime, commitInstant, deleteInstants);
boolean deleteInstants,
boolean skipLocking) {
super(context, config, table, instantTime, commitInstant, deleteInstants, skipLocking);
}

public MergeOnReadRollbackActionExecutor(HoodieEngineContext context,
Expand All @@ -54,8 +55,9 @@ public MergeOnReadRollbackActionExecutor(HoodieEngineContext context,
HoodieInstant commitInstant,
boolean deleteInstants,
boolean skipTimelinePublish,
boolean useMarkerBasedStrategy) {
super(context, config, table, instantTime, commitInstant, deleteInstants, skipTimelinePublish, useMarkerBasedStrategy);
boolean useMarkerBasedStrategy,
boolean skipLocking) {
super(context, config, table, instantTime, commitInstant, deleteInstants, skipTimelinePublish, useMarkerBasedStrategy, skipLocking);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,13 +309,14 @@ public Option<HoodieRollbackPlan> scheduleRollback(HoodieEngineContext context,
}

@Override
public HoodieCleanMetadata clean(HoodieEngineContext context, String cleanInstantTime) {
public HoodieCleanMetadata clean(HoodieEngineContext context, String cleanInstantTime, boolean skipLocking) {
return new CleanActionExecutor(context, config, this, cleanInstantTime).execute();
}

@Override
public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollbackInstantTime, HoodieInstant commitInstant, boolean deleteInstants) {
return new CopyOnWriteRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute();
public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollbackInstantTime, HoodieInstant commitInstant,
boolean deleteInstants, boolean skipLocking) {
return new CopyOnWriteRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants, skipLocking).execute();
}

@Override
Expand Down
Loading