Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ private String printAllCompactions(HoodieDefaultTimeline timeline,
int limit,
boolean headerOnly) {

Stream<HoodieInstant> instantsStream = timeline.getCommitsAndCompactionTimeline().getReverseOrderedInstants();
Stream<HoodieInstant> instantsStream = timeline.getWriteTimeline().getReverseOrderedInstants();
List<Pair<HoodieInstant, HoodieCompactionPlan>> compactionPlans = instantsStream
.map(instant -> Pair.of(instant, compactionPlanReader.apply(instant)))
.filter(pair -> pair.getRight() != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ private HoodieTableFileSystemView buildFileSystemView(String globRegex, String m
} else if (excludeCompaction) {
timeline = metaClient.getActiveTimeline().getCommitsTimeline();
} else {
timeline = metaClient.getActiveTimeline().getCommitsAndCompactionTimeline();
timeline = metaClient.getActiveTimeline().getWriteTimeline();
}

if (!includeInflight) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@
import org.apache.hudi.cli.utils.SparkUtil;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
Expand Down Expand Up @@ -165,8 +167,10 @@ public String deleteSavepoint(@CliOption(key = {"commit"}, help = "Delete a save

private static SparkRDDWriteClient createHoodieClient(JavaSparkContext jsc, String basePath) throws Exception {
HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.NEVER).build())
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()).build();
return new SparkRDDWriteClient(new HoodieSparkEngineContext(jsc), config, false);
return new SparkRDDWriteClient(new HoodieSparkEngineContext(jsc), config);
}

}
8 changes: 8 additions & 0 deletions hudi-client/hudi-client-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,14 @@
</exclusions>
</dependency>

<!-- Other Utils -->
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>3.1.2</version>
<scope>test</scope>
</dependency>

<!-- Test -->
<dependency>
<groupId>org.junit.jupiter</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hudi.client.embedded.EmbeddedTimelineServerHelper;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.client.heartbeat.HoodieHeartbeatClient;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
Expand Down Expand Up @@ -48,6 +49,7 @@ public abstract class AbstractHoodieClient implements Serializable, AutoCloseabl
protected final transient Configuration hadoopConf;
protected final HoodieWriteConfig config;
protected final String basePath;
protected final HoodieHeartbeatClient heartbeatClient;

/**
* Timeline Server has the same lifetime as that of Client. Any operations done on the same timeline service will be
Expand All @@ -70,6 +72,8 @@ protected AbstractHoodieClient(HoodieEngineContext context, HoodieWriteConfig cl
this.config = clientConfig;
this.timelineServer = timelineServer;
shouldStopTimelineServer = !timelineServer.isPresent();
this.heartbeatClient = new HoodieHeartbeatClient(this.fs, this.basePath,
clientConfig.getHoodieClientHeartbeatIntervalInMs(), clientConfig.getHoodieClientHeartbeatTolerableMisses());
startEmbeddedServerView();
initWrapperFSMetrics();
}
Expand Down Expand Up @@ -136,4 +140,8 @@ protected HoodieTableMetaClient createMetaClient(boolean loadActiveTimelineOnLoa
public Option<EmbeddedTimelineService> getTimelineServer() {
return timelineServer;
}

public HoodieHeartbeatClient getHeartbeatClient() {
return heartbeatClient;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage;
import org.apache.hudi.callback.util.HoodieCommitCallbackFactory;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.client.heartbeat.HeartbeatUtils;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
Expand All @@ -39,6 +41,7 @@
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
Expand All @@ -48,6 +51,7 @@
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieRestoreException;
import org.apache.hudi.exception.HoodieRollbackException;
Expand Down Expand Up @@ -96,43 +100,29 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I

private transient WriteOperationType operationType;
private transient HoodieWriteCommitCallback commitCallback;
private transient AsyncCleanerService asyncCleanerService;
protected final boolean rollbackPending;

/**
* Create a write client, without cleaning up failed/inflight commits.
*
* @param context HoodieEngineContext
* @param clientConfig instance of HoodieWriteConfig
*/
public AbstractHoodieWriteClient(HoodieEngineContext context, HoodieWriteConfig clientConfig) {
this(context, clientConfig, false);
}
protected transient AsyncCleanerService asyncCleanerService;

/**
* Create a write client, with new hudi index.
*
* @param context HoodieEngineContext
* @param writeConfig instance of HoodieWriteConfig
* @param rollbackPending whether need to cleanup pending commits
*/
public AbstractHoodieWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, boolean rollbackPending) {
this(context, writeConfig, rollbackPending, Option.empty());
@Deprecated
public AbstractHoodieWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig) {
this(context, writeConfig, Option.empty());
}

/**
* Create a write client, allows to specify all parameters.
*
* @param context HoodieEngineContext
* @param writeConfig instance of HoodieWriteConfig
* @param rollbackPending whether need to cleanup pending commits
* @param timelineService Timeline Service that runs as part of write client.
*/
public AbstractHoodieWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, boolean rollbackPending,
@Deprecated
public AbstractHoodieWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig,
Option<EmbeddedTimelineService> timelineService) {
super(context, writeConfig, timelineService);
this.metrics = new HoodieMetrics(config, config.getTableName());
this.rollbackPending = rollbackPending;
this.index = createIndex(writeConfig);
}

Expand Down Expand Up @@ -181,7 +171,7 @@ public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Opti
HoodieCommitMetadata metadata = CommitUtils.buildMetadata(stats, partitionToReplaceFileIds, extraMetadata, operationType, config.getSchema(), commitActionType);
// Finalize write
finalizeWrite(table, instantTime, stats);

HeartbeatUtils.abortIfHeartbeatExpired(instantTime, table, heartbeatClient, config);
try {
activeTimeline.saveAsComplete(new HoodieInstant(true, commitActionType, instantTime),
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
Expand Down Expand Up @@ -236,17 +226,16 @@ protected void syncTableMetadata() {
* Main API to run bootstrap to hudi.
*/
public void bootstrap(Option<Map<String, String>> extraMetadata) {
if (rollbackPending) {
rollBackInflightBootstrap();
}
// TODO : MULTIWRITER -> check if failed bootstrap files can be cleaned later
HoodieTable<T, I, K, O> table = getTableAndInitCtx(WriteOperationType.UPSERT, HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS);
rollbackFailedBootstrap();
table.bootstrap(context, extraMetadata);
}

/**
* Main API to rollback pending bootstrap.
* Main API to rollback failed bootstrap.
*/
protected void rollBackInflightBootstrap() {
public void rollbackFailedBootstrap() {
LOG.info("Rolling back pending bootstrap if present");
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction();
Expand All @@ -258,7 +247,6 @@ protected void rollBackInflightBootstrap() {
table.rollbackBootstrap(context, HoodieActiveTimeline.createNewInstantTime());
LOG.info("Finished rolling back pending bootstrap");
}

}

/**
Expand Down Expand Up @@ -369,9 +357,8 @@ public abstract O bulkInsertPreppedRecords(I preppedRecords, final String instan

/**
* Common method containing steps to be performed before write (upsert/insert/...
*
* @param instantTime Instant Time
* @return Write Status
* @param instantTime
* @param writeOperationType
*/
protected void preWrite(String instantTime, WriteOperationType writeOperationType) {
setOperationType(writeOperationType);
Expand Down Expand Up @@ -424,15 +411,16 @@ protected void postCommit(HoodieTable<T, I, K, O> table, HoodieCommitMetadata me
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, table);
archiveLog.archiveIfRequired(context);
autoCleanOnCommit();

syncTableMetadata();
} catch (IOException ioe) {
throw new HoodieIOException(ioe.getMessage(), ioe);
} finally {
this.heartbeatClient.stop(instantTime);
}
}

protected void runAnyPendingCompactions(HoodieTable<T, I, K, O> table) {
table.getActiveTimeline().getCommitsAndCompactionTimeline().filterPendingCompactionTimeline().getInstants()
table.getActiveTimeline().getWriteTimeline().filterPendingCompactionTimeline().getInstants()
.forEach(instant -> {
LOG.info("Running previously failed inflight compaction at instant " + instant);
compact(instant.getTimestamp(), true);
Expand Down Expand Up @@ -533,11 +521,14 @@ public void restoreToSavepoint(String savepointTime) {
}

/**
* Rollback the inflight record changes with the given commit time.
* @Deprecated
* Rollback the inflight record changes with the given commit time. This
* will be removed in future in favor of {@link AbstractHoodieWriteClient#restoreToInstant(String)}
*
* @param commitInstantTime Instant time of the commit
* @throws HoodieRollbackException if rollback cannot be performed successfully
*/
@Deprecated
public boolean rollback(final String commitInstantTime) throws HoodieRollbackException {
LOG.info("Begin rollback of instant " + commitInstantTime);
final String rollbackInstantTime = HoodieActiveTimeline.createNewInstantTime();
Expand Down Expand Up @@ -598,6 +589,9 @@ public HoodieRestoreMetadata restoreToInstant(final String instantTime) throws H
public HoodieCleanMetadata clean(String cleanInstantTime) throws HoodieIOException {
LOG.info("Cleaner started");
final Timer.Context timerContext = metrics.getCleanCtx();
LOG.info("Cleaned failed attempts if any");
CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
HoodieTimeline.CLEAN_ACTION, () -> rollbackFailedWrites());
HoodieCleanMetadata metadata = createTable(config, hadoopConf).clean(context, cleanInstantTime);
if (timerContext != null && metadata != null) {
long durationMs = metrics.getDurationInMs(timerContext.stop());
Expand All @@ -617,11 +611,8 @@ public HoodieCleanMetadata clean() {
* Provides a new commit time for a write operation (insert/update/delete).
*/
public String startCommit() {
// NOTE : Need to ensure that rollback is done before a new commit is started
if (rollbackPending) {
// Only rollback pending commit/delta-commits. Do not touch compaction commits
rollbackPendingCommits();
}
CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
HoodieTimeline.COMMIT_ACTION, () -> rollbackFailedWrites());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not part of this patch. but just curious. Why this method(startCommit) in turn does not call into startCommitWithTime() ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we do that, we don't need to call CleanerUtils.rollbackFailedWrites at two diff places(613, 643). Guess you might have ran into some issues. Interested to understand the reasoning.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's take this offline. We can probably refactor but not in this PR.

String instantTime = HoodieActiveTimeline.createNewInstantTime();
HoodieTableMetaClient metaClient = createMetaClient(true);
startCommit(instantTime, metaClient.getCommitActionType(), metaClient);
Expand Down Expand Up @@ -650,11 +641,8 @@ public void startCommitWithTime(String instantTime, String actionType) {
* Completes a new commit time for a write operation (insert/update/delete) with specified action.
*/
private void startCommitWithTime(String instantTime, String actionType, HoodieTableMetaClient metaClient) {
// NOTE : Need to ensure that rollback is done before a new commit is started
if (rollbackPending) {
// Only rollback inflight commit/delta-commits. Do not touch compaction commits
rollbackPendingCommits();
}
CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
HoodieTimeline.COMMIT_ACTION, () -> rollbackFailedWrites());
startCommit(instantTime, actionType, metaClient);
}

Expand All @@ -666,6 +654,9 @@ private void startCommit(String instantTime, String actionType, HoodieTableMetaC
HoodieTimeline.compareTimestamps(latestPending.getTimestamp(), HoodieTimeline.LESSER_THAN, instantTime),
"Latest pending compaction instant time must be earlier than this instant time. Latest Compaction :"
+ latestPending + ", Ingesting at " + instantTime));
if (config.getFailedWritesCleanPolicy().isLazy()) {
this.heartbeatClient.start(instantTime);
}
metaClient.getActiveTimeline().createNewInstant(new HoodieInstant(HoodieInstant.State.REQUESTED, actionType,
instantTime));
}
Expand Down Expand Up @@ -721,7 +712,7 @@ protected abstract void completeCompaction(HoodieCommitMetadata metadata, O writ

/**
* Rollback failed compactions. Inflight rollbacks for compactions revert the .inflight file to the .requested file
*
* TODO : Deprecate this method and make it protected
* @param inflightInstant Inflight Compaction Instant
* @param table Hoodie Table
*/
Expand Down Expand Up @@ -749,22 +740,49 @@ private HoodieTimeline getInflightTimelineExcludeCompactionAndClustering(HoodieT
}

/**
* Cleanup all pending commits.
* Rollback all failed writes.
*/
private void rollbackPendingCommits() {
public Boolean rollbackFailedWrites() {
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
HoodieTimeline inflightTimeline = getInflightTimelineExcludeCompactionAndClustering(table);
List<String> commits = inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp)
.collect(Collectors.toList());
for (String commit : commits) {
if (HoodieTimeline.compareTimestamps(commit, HoodieTimeline.LESSER_THAN_OR_EQUALS,
List<String> instantsToRollback = getInstantsToRollback(table);
for (String instant : instantsToRollback) {
if (HoodieTimeline.compareTimestamps(instant, HoodieTimeline.LESSER_THAN_OR_EQUALS,
HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS)) {
rollBackInflightBootstrap();
rollbackFailedBootstrap();
break;
} else {
rollback(commit);
rollback(instant);
}
}
// Delete any heartbeat files for already rolled back commits
try {
HeartbeatUtils.cleanExpiredHeartbeats(this.heartbeatClient.getAllExistingHeartbeatInstants(),
createMetaClient(true), basePath);
} catch (IOException io) {
LOG.error("Unable to delete heartbeat files", io);
}
return true;
}

private List<String> getInstantsToRollback(HoodieTable<T, I, K, O> table) {
if (config.getFailedWritesCleanPolicy().isEager()) {
HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction();
return inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp)
.collect(Collectors.toList());
} else if (config.getFailedWritesCleanPolicy() == HoodieFailedWritesCleaningPolicy.NEVER) {
return Collections.EMPTY_LIST;
} else if (config.getFailedWritesCleanPolicy().isLazy()) {
return table.getMetaClient().getActiveTimeline()
.getCommitsTimeline().filterInflights().getReverseOrderedInstants().filter(instant -> {
try {
return heartbeatClient.isHeartbeatExpired(instant.getTimestamp());
} catch (IOException io) {
throw new HoodieException("Failed to check heartbeat for instant " + instant, io);
}
}).map(HoodieInstant::getTimestamp).collect(Collectors.toList());
} else {
throw new IllegalArgumentException("Invalid Failed Writes Cleaning Policy " + config.getFailedWritesCleanPolicy());
}
}

/**
Expand Down Expand Up @@ -912,5 +930,6 @@ public void close() {
// Calling this here releases any resources used by your index, so make sure to finish any related operations
// before this point
this.index.close();
this.heartbeatClient.stop();
}
}
Loading