Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -515,13 +515,25 @@ private Stream<HoodieInstant> getInstantsToArchive() {
private boolean deleteArchivedInstants(List<HoodieInstant> archivedInstants, HoodieEngineContext context) throws IOException {
LOG.info("Deleting instants " + archivedInstants);
boolean success = true;
List<String> instantFiles = archivedInstants.stream().map(archivedInstant ->
new Path(metaClient.getMetaPath(), archivedInstant.getFileName())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The archivedInstants seems already been sorted, because the instants from the timeline are already sorted by the instant timestamp and state, while in

Map<Pair<String, String>, List<HoodieInstant>> groupByTsAction = rawActiveTimeline.getInstants()

we group by the instant pairs but it should still keep sequence for single instant,
one instant would be sorted by state: requested -> inflight -> complete,
so in theory, the files are also cleaned in this sequence.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's right. But I'm not sure if deleteFilesParallelize is cleaned in order. It use FSUtils.parallelizeFilesProcess to delete file and depend on implement in different engine.

public <I, K, V> Map<K, V> mapToPair(List<I> data, SerializablePairFunction<I, K, V> func, Integer parallelism) {
return data.stream().parallel().map(throwingMapToPairWrapper(func)).collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
}

For example, in flink engine context, it use parallel stream to delete file.
This is my understanding, please ping me if I am wrong.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the parallel deletion may cause the disorder of the files.

But there is still a confusion needs to be resolved here:
The archiver only archives the old/completed instant on the timeline, while the flink writer only checks the latest inflight instant which should not be archived yet, so why the flink client is affected here ?

Copy link
Contributor Author

@BruceKellan BruceKellan May 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had this happen last time. Flink Application's JobManager was force killed because of memory.
Ant it left some instant file:

  • 20220513170152006.commit.requested
  • 20220513170152006.inflight
  • 20220513170202566.commit
  • 20220513170202566.commit.requested
  • 20220513170202566.inflight

In this case, flink Application will execute this code:

public void bootstrap(HoodieTableMetaClient metaClient) throws IOException {
fs.delete(path, true);
fs.mkdirs(path);
metaClient.getActiveTimeline().getCommitsTimeline().filterPendingExcludingCompaction()
.lastInstant().ifPresent(instant -> startInstant(instant.getTimestamp()));
}

It will scan and get last pending instant, and create a in-flight file in .hoodie/.aux/ckp_meta(20220513170152006 in this case).

The final result will throw an excpetion:

Caused by: org.apache.hudi.exception.HoodieRollbackException: Found commits after time :20220513170152006, please rollback greater commits first

if (!HoodieHeartbeatClient.heartbeatExists(table.getMetaClient().getFs(),
config.getBasePath(), instantTimeToRollback)) {
throw new HoodieRollbackException(
"Found commits after time :" + instantTimeToRollback + ", please rollback greater commits first");
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, the logic to fetch the pending instant is problematic, but the error trace you paste is mainly because when collecting the instants to rollback in line:

private HoodieTimeline getInflightTimelineExcludeCompactionAndClustering(HoodieTableMetaClient metaClient) {

We do not consider that there are inflight instants (previously completed but inflight again because of the archiving) on the timeline, cc @nsivabalan :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a patch to fix the pending instant time fetch but it does not solve your problem :)
fix.patch.zip

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

).map(Path::toString).collect(Collectors.toList());

List<HoodieInstant> nonCommitInstants = new ArrayList<>();
List<HoodieInstant> commitInstant = new ArrayList<>();
for (HoodieInstant hoodieInstant : archivedInstants) {
if (hoodieInstant.isCompleted()) {
commitInstant.add(hoodieInstant);
} else {
nonCommitInstants.add(hoodieInstant);
}
}
List<String> nonCommitInstantFiles = mapInstantToFile(nonCommitInstants);
List<String> commitInstantFiles = mapInstantToFile(commitInstant);
context.setJobStatus(this.getClass().getSimpleName(), "Delete archived instants");
Map<String, Boolean> resultDeleteInstantFiles = deleteFilesParallelize(metaClient, instantFiles, context, false);

// Suppose an application crashes while archiving, it leaves some instant files that should be deleted.
// If the commit file is deleted but the in-flight file is left,
// when the application starts, it will scan and get the last pending instant and throw an exception.
// So we need to delete non-commit instant files first.
Map<String, Boolean> resultDeleteInstantFiles = new HashMap<>();
resultDeleteInstantFiles.putAll(deleteFilesParallelize(metaClient, nonCommitInstantFiles, context, false));
resultDeleteInstantFiles.putAll(deleteFilesParallelize(metaClient, commitInstantFiles, context, false));
for (Map.Entry<String, Boolean> result : resultDeleteInstantFiles.entrySet()) {
LOG.info("Archived and deleted instant file " + result.getKey() + " : " + result.getValue());
success &= result.getValue();
Expand All @@ -537,6 +549,12 @@ private boolean deleteArchivedInstants(List<HoodieInstant> archivedInstants, Hoo
return success;
}

private List<String> mapInstantToFile(List<HoodieInstant> instants) {
return instants.stream().map(instant ->
new Path(metaClient.getMetaPath(), instant.getFileName())
).map(Path::toString).collect(Collectors.toList());
}

/**
* Remove older instants from auxiliary meta folder.
*
Expand Down