Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -506,13 +506,7 @@ private Stream<HoodieInstant> getInstantsToArchive() {
List<HoodieInstant> instantsToStream = groupByTsAction.get(Pair.of(hoodieInstant.getTimestamp(),
HoodieInstant.getComparableAction(hoodieInstant.getAction())));
if (instantsToStream != null) {
// sorts the instants in natural order to make sure the metadata files be removed
// in HoodieInstant.State sequence: requested -> inflight -> completed,
// this is important because when a COMPLETED metadata file is removed first,
// other monitors on the timeline(such as the compaction or clustering services) would
// mistakenly recognize the pending file as a pending operation,
// then all kinds of weird bugs occur.
return instantsToStream.stream().sorted();
return instantsToStream.stream();
} else {
// if a concurrent writer archived the instant
return Stream.empty();
Expand All @@ -522,19 +516,29 @@ private Stream<HoodieInstant> getInstantsToArchive() {

private boolean deleteArchivedInstants(List<HoodieInstant> archivedInstants, HoodieEngineContext context) throws IOException {
LOG.info("Deleting instants " + archivedInstants);
boolean success = true;
List<String> instantFiles = archivedInstants.stream().map(archivedInstant ->
new Path(metaClient.getMetaPath(), archivedInstant.getFileName())
).map(Path::toString).collect(Collectors.toList());

context.setJobStatus(this.getClass().getSimpleName(), "Delete archived instants: " + config.getTableName());
Map<String, Boolean> resultDeleteInstantFiles = deleteFilesParallelize(metaClient, instantFiles, context, false);
List<String> pendingInstantFiles = new ArrayList<>();
List<String> completedInstantFiles = new ArrayList<>();

for (Map.Entry<String, Boolean> result : resultDeleteInstantFiles.entrySet()) {
LOG.info("Archived and deleted instant file " + result.getKey() + " : " + result.getValue());
success &= result.getValue();
for (HoodieInstant instant : archivedInstants) {
String filePath = new Path(metaClient.getMetaPath(), instant.getFileName()).toString();
if (instant.isCompleted()) {
completedInstantFiles.add(filePath);
} else {
pendingInstantFiles.add(filePath);
}
}

context.setJobStatus(this.getClass().getSimpleName(), "Delete archived instants: " + config.getTableName());
// Delete the metadata files
// in HoodieInstant.State sequence: requested -> inflight -> completed,
// this is important because when a COMPLETED metadata file is removed first,
// other monitors on the timeline(such as the compaction or clustering services) would
// mistakenly recognize the pending file as a pending operation,
// then all kinds of weird bugs occur.
boolean success = deleteArchivedInstantFiles(context, true, pendingInstantFiles);
success &= deleteArchivedInstantFiles(context, success, completedInstantFiles);

// Remove older meta-data from auxiliary path too
Option<HoodieInstant> latestCommitted = Option.fromJavaOptional(archivedInstants.stream().filter(i -> i.isCompleted() && (i.getAction().equals(HoodieTimeline.COMMIT_ACTION)
|| (i.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)))).max(Comparator.comparing(HoodieInstant::getTimestamp)));
Expand All @@ -545,6 +549,16 @@ private boolean deleteArchivedInstants(List<HoodieInstant> archivedInstants, Hoo
return success;
}

private boolean deleteArchivedInstantFiles(HoodieEngineContext context, boolean success, List<String> files) {
Map<String, Boolean> resultDeleteInstantFiles = deleteFilesParallelize(metaClient, files, context, false);

for (Map.Entry<String, Boolean> result : resultDeleteInstantFiles.entrySet()) {
LOG.info("Archived and deleted instant file " + result.getKey() + " : " + result.getValue());
success &= result.getValue();
}
return success;
}

/**
* Remove older instants from auxiliary meta folder.
*
Expand Down