Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
Expand Down Expand Up @@ -150,11 +149,7 @@ void testCOWToMORConvertedTableRollback(boolean rollbackUsingMarkers) throws Exc
void testRollbackWithDeltaAndCompactionCommit(boolean rollbackUsingMarkers) throws Exception {
// NOTE: First writer will have Metadata table DISABLED
HoodieWriteConfig.Builder cfgBuilder =
getConfigBuilder(false, rollbackUsingMarkers, HoodieIndex.IndexType.SIMPLE)
.withMetadataConfig(
HoodieMetadataConfig.newBuilder()
.enable(false)
.build());
getConfigBuilder(false, rollbackUsingMarkers, HoodieIndex.IndexType.SIMPLE);

addConfigsForPopulateMetaFields(cfgBuilder, true);
HoodieWriteConfig cfg = cfgBuilder.build();
Expand Down Expand Up @@ -209,7 +204,7 @@ void testRollbackWithDeltaAndCompactionCommit(boolean rollbackUsingMarkers) thro
final String commitTime1 = "002";
// WriteClient with custom config (disable small file handling)
// NOTE: Second writer will have Metadata table ENABLED
try (SparkRDDWriteClient secondClient = getHoodieWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff(false));) {
try (SparkRDDWriteClient secondClient = getHoodieWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff(true));) {
secondClient.startCommitWithTime(commitTime1);

List<HoodieRecord> copyOfRecords = new ArrayList<>(records);
Expand Down Expand Up @@ -245,8 +240,8 @@ void testRollbackWithDeltaAndCompactionCommit(boolean rollbackUsingMarkers) thro
/*
* Write 3 (inserts + updates - testing successful delta commit)
*/
final String commitTime2 = "002";
try (SparkRDDWriteClient thirdClient = getHoodieWriteClient(cfg);) {
final String commitTime2 = "003";
try (SparkRDDWriteClient thirdClient = getHoodieWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff(true));) {
thirdClient.startCommitWithTime(commitTime2);

List<HoodieRecord> copyOfRecords = new ArrayList<>(records);
Expand Down Expand Up @@ -287,7 +282,7 @@ void testRollbackWithDeltaAndCompactionCommit(boolean rollbackUsingMarkers) thro
/*
* Write 4 (updates)
*/
newCommitTime = "003";
newCommitTime = "004";
thirdClient.startCommitWithTime(newCommitTime);

writeStatusJavaRDD = thirdClient.upsert(writeRecords, newCommitTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ private boolean isLocalViewBehind(Context ctx) {
String timelineHashFromClient = ctx.queryParam(RemoteHoodieTableFileSystemView.TIMELINE_HASH, "");
HoodieTimeline localTimeline =
viewManager.getFileSystemView(basePath).getTimeline().filterCompletedAndCompactionInstants();
String localLastKnownInstant = localTimeline.lastInstant().isPresent() ? localTimeline.lastInstant().get().getTimestamp()
: HoodieTimeline.INVALID_INSTANT_TS;
if (LOG.isDebugEnabled()) {
LOG.debug("Client [ LastTs=" + lastKnownInstantFromClient + ", TimelineHash=" + timelineHashFromClient
+ "], localTimeline=" + localTimeline.getInstants().collect(Collectors.toList()));
Expand All @@ -134,7 +136,9 @@ private boolean isLocalViewBehind(Context ctx) {
}

String localTimelineHash = localTimeline.getTimelineHash();
if (!localTimelineHash.equals(timelineHashFromClient)) {
// refresh if timeline hash mismatches and if local's last known instant < client's last known instant
if (!localTimelineHash.equals(timelineHashFromClient)
&& HoodieTimeline.compareTimestamps(localLastKnownInstant, HoodieTimeline.LESSER_THAN, lastKnownInstantFromClient)) {
Copy link
Contributor

@danny0405 danny0405 Mar 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Finally i got the reason:

We have a start commit method that may generate a rollback instant with greater timestamp than the actual passed in instant time:

public void startCommitWithTime(String instantTime, String actionType) {

And unfortunately, flink uses that (spark uses that too), here is how the problem comes:

delta_commit compaction delta_commit rollback_commit
--- t1 --------------- t2 ------------ t3 ------------- t4 ------------

The t4 was created before t3 was created and it was with the highest timestamp t4, then the following sequence happens:

  1. the rollback action would then refresh the remote timeline service with the latest timestamp t4 (remember the fs view as V1)
  2. the t3 delta commit start to execute and commit, say the commit was successful
  3. then we want to trigger the compaction after the commit of t3

And the tricky things happens:

the compaction scheduler takes the client, the client uses the latest timestamps on timeline and tries to fetch all the fileslices, but because the client timestamp t4 equals with the remote timeline time t4, the view does not sync and we still got V1 fs view here and we can not find any compaction plan because there was no log files in the view.

Here is my fix patch to make sure the rollback timestamp not greater than the delta commit time.

HUDI-2761.patch.zip

return true;
}

Expand Down