Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 90 additions & 5 deletions core/src/main/java/org/apache/iceberg/ManifestFilterManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.exceptions.ValidationException;
Expand Down Expand Up @@ -53,6 +54,7 @@
abstract class ManifestFilterManager<F extends ContentFile<F>> {
private static final Logger LOG = LoggerFactory.getLogger(ManifestFilterManager.class);
private static final Joiner COMMA = Joiner.on(",");
private static final AtomicLong MIN_SEQ_CONST = new AtomicLong(Long.MIN_VALUE);

protected static class DeleteException extends ValidationException {
private final String partition;
Expand All @@ -78,13 +80,16 @@ public String partition() {
private boolean failMissingDeletePaths = false;
private int duplicateDeleteCount = 0;
private boolean caseSensitive = true;
private boolean dropPartitionDeleteEnabled = false;

// cache filtered manifests to avoid extra work when commits fail.
private final Map<ManifestFile, ManifestFile> filteredManifests = Maps.newConcurrentMap();

// tracking where files were deleted to validate retries quickly
private final Map<ManifestFile, Iterable<F>> filteredManifestToDeletedFiles =
Maps.newConcurrentMap();
private Map<Pair<Integer, StructLike>, AtomicLong> minSequenceNumberByPartition =
Maps.newConcurrentMap();

private final Supplier<ExecutorService> workerPoolSupplier;

Expand All @@ -110,6 +115,15 @@ protected void failMissingDeletePaths() {
this.failMissingDeletePaths = true;
}

protected void setDropPartitionDelete(boolean dropPartitionDelete) {
this.dropPartitionDeleteEnabled = dropPartitionDelete;
}

public void setMinSequenceNumberByPartition(
Map<Pair<Integer, StructLike>, AtomicLong> minSequenceNumberByPartition) {
this.minSequenceNumberByPartition = minSequenceNumberByPartition;
}

/**
* Add a filter to match files to delete. A file will be deleted if all of the rows it contains
* match this or any other filter passed to this method.
Expand Down Expand Up @@ -145,6 +159,24 @@ protected void dropDeleteFilesOlderThan(long sequenceNumber) {
this.minSequenceNumber = sequenceNumber;
}

protected void recordPartitionMinDataSequenceNumber(
int specId, StructLike partition, long sequenceNumber) {
Preconditions.checkArgument(
sequenceNumber >= 0, "Invalid minimum data sequence number: %s", sequenceNumber);

Pair<Integer, StructLike> par = Pair.of(specId, partition);
minSequenceNumberByPartition.compute(
par,
(key, currentMin) -> {
if (currentMin == null) {
return new AtomicLong(sequenceNumber);
} else {
currentMin.updateAndGet(min -> Math.min(min, sequenceNumber));
return currentMin;
}
});
}

void caseSensitive(boolean newCaseSensitive) {
this.caseSensitive = newCaseSensitive;
}
Expand Down Expand Up @@ -289,13 +321,38 @@ private void invalidateFilteredCache() {
cleanUncommitted(SnapshotProducer.EMPTY_SET);
}

private void recordPartitionMinDataSequenceNumber(ManifestFile manifest) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The existing approach only looks at manifest metadata to understand the min data sequence number across all partitions. This is really cheap as we don't have to open manifests (which can be a really expensive operation). That leads to the problem if one partition is significantly behind, it prevents garbage collection of delete files in other partitions. We have solved that for position deletes via the rewritePositionDeletes action but it still remains open for equality deletes.

I am not convinced opening these manifests during commits is a good idea. Can we explore the option of leveraging the partition stats spec added recently? We are still building an action to generate those stats but let's think through whether it can help us. One option can be to check if the partition stats file is present and use it populate the min data sequence numbers, opening just a single Parquet file vs potentially tons of manifests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we explore the option of leveraging the partition stats spec added

Wow. That sounds like a nice usecase 👍
For each partition, we do keep the snapshot id that last updated that partition. Using the snapshot id we can extract the data sequence numbers from the snapshot.

@zinking: The current status of the partition stats project can be tracked from this: #8450

Another alternative approach is to convert equality delete to position delete (this work is pending), So we can reuse the rewritePositionDeletes. But it is a long route.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with above. I think there were some other attempts to do this before too, but the concern here is that you dont want to do a lot of things in the commit critical path (here potentially opening an unlimited number of manifest files). Yea if its something cheaper to do like reading one partition stats file, it may be better. Also yes the plan has always been to implement convert eq-delete to pos deletes (which can then be cleaned up by rewritePositionDeletes), though not sure if any progress is being made there.

Copy link
Contributor Author

@zinking zinking Jan 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree that adding this on the general write path is too heavy, so that's why I prefer it enabled during rewrite, or probably just some of the rewrites.

on the other hand, it sounds reasonable to track this on partition metadata, but there has to be somewhere to calculate it anyways.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think what Szehon meant is to call the cleanup action from the finally block of rewrite action. Not expose as a new action for cleanup.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll need to think about this a bit more but I do like the idea of using partition stats in one or another way. I'll get back next week.

Copy link
Member

@szehon-ho szehon-ho Jan 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea for me @zinking raised the good question about using partition stats. Its optional so if the user hasn't analyzed the table, it will be different behavior of whether dangling deletes are removed or not. Which may not be so obvious to users.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I think we all agree that we should use partition stats if they are available and read manifests otherwise. We may think about extending our regular writes to check if there is a partition stats file available and drop the delete files per partition rather than globally, like it is done today. We shouldn't open manifests during writes. We can only do that in a distributed fashion, meaning it has to be part of an action. There we either can add a new action or integrate this logic into the existing action.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@szehon-ho and I talked a little bit about this offline. I think we can try extending the action for rewriting data files to also attempt to remove dangling deletes in partitions that were successfully compacted. Separately, we may integrate the cleanup using partition stats during regular commits under a flag (off by default).

if (dropPartitionDeleteEnabled && manifest.content() != ManifestContent.DELETES) {
try (ManifestReader<F> reader = newManifestReader(manifest)) {
reader
.entries()
.forEach(
entry -> {
F dataFile = entry.file();
long dataFileSequence = dataFile.dataSequenceNumber();
if (entry.status() == ManifestEntry.Status.DELETED
|| dataFileSequence == ManifestWriter.UNASSIGNED_SEQ) {
// ignore data files already deleted
return;
}
recordPartitionMinDataSequenceNumber(
dataFile.specId(), dataFile.partition(), dataFileSequence);
});
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to close manifest: %s", manifest);
}
}
}

/** @return a ManifestReader that is a filtered version of the input manifest. */
private ManifestFile filterManifest(Schema tableSchema, ManifestFile manifest) {
ManifestFile cached = filteredManifests.get(manifest);
if (cached != null) {
return cached;
}

recordPartitionMinDataSequenceNumber(manifest);

boolean hasLiveFiles = manifest.hasAddedFiles() || manifest.hasExistingFiles();
if (!hasLiveFiles || !canContainDeletedFiles(manifest)) {
filteredManifests.put(manifest, manifest);
Expand Down Expand Up @@ -360,14 +417,30 @@ private boolean canContainDeletedFiles(ManifestFile manifest) {
return canContainExpressionDeletes
|| canContainDroppedPartitions
|| canContainDroppedFiles
|| canContainDropBySeq;
|| canContainDropBySeq
|| (dropPartitionDeleteEnabled && canContainDropByPartitionSeq(manifest));
}

private boolean canContainDropByPartitionSeq(ManifestFile manifest) {
// When delete file within delete manifest could be marked for delete
// That delete file sequence must be less the one of its corresponding data sequence in
// partition
// Conversely if the min sequence of the delete manifest is bigger than the max of all data
// sequences in partition
// Then there is no chance that this manifest contains delete file that could be removed
return dropPartitionDeleteEnabled
&& manifest.content() == ManifestContent.DELETES
&& !(manifest.minSequenceNumber()
> this.minSequenceNumberByPartition.values().stream()
.mapToLong(AtomicLong::get)
.max()
.orElse(Long.MIN_VALUE));
}

@SuppressWarnings({"CollectionUndefinedEquality", "checkstyle:CyclomaticComplexity"})
private boolean manifestHasDeletedFiles(
PartitionAndMetricsEvaluator evaluator, ManifestReader<F> reader) {
boolean isDelete = reader.isDeleteManifestReader();

for (ManifestEntry<F> entry : reader.liveEntries()) {
F file = entry.file();
boolean markedForDelete =
Expand All @@ -376,7 +449,13 @@ private boolean manifestHasDeletedFiles(
|| (isDelete
&& entry.isLive()
&& entry.dataSequenceNumber() > 0
&& entry.dataSequenceNumber() < minSequenceNumber);
&& (entry.dataSequenceNumber() < minSequenceNumber
|| (dropPartitionDeleteEnabled
&& entry.dataSequenceNumber()
< this.minSequenceNumberByPartition
.getOrDefault(
Pair.of(file.specId(), file.partition()), MIN_SEQ_CONST)
.get())));

if (markedForDelete || evaluator.rowsMightMatch(file)) {
boolean allRowsMatch = markedForDelete || evaluator.rowsMustMatch(file);
Expand Down Expand Up @@ -409,7 +488,6 @@ private ManifestFile filterManifestWithDeletedFiles(
// manifest. produce a copy of the manifest with all deleted files removed.
List<F> deletedFiles = Lists.newArrayList();
Set<CharSequenceWrapper> deletedPaths = Sets.newHashSet();

try {
ManifestWriter<F> writer = newManifestWriter(reader.spec());
try {
Expand All @@ -424,7 +502,14 @@ private ManifestFile filterManifestWithDeletedFiles(
|| (isDelete
&& entry.isLive()
&& entry.dataSequenceNumber() > 0
&& entry.dataSequenceNumber() < minSequenceNumber);
&& (entry.dataSequenceNumber() < minSequenceNumber
|| (dropPartitionDeleteEnabled
&& entry.dataSequenceNumber()
< minSequenceNumberByPartition
.getOrDefault(
Pair.of(file.specId(), file.partition()),
MIN_SEQ_CONST)
.get())));
if (entry.status() != ManifestEntry.Status.DELETED) {
if (markedForDelete || evaluator.rowsMightMatch(file)) {
boolean allRowsMatch = markedForDelete || evaluator.rowsMustMatch(file);
Expand Down
13 changes: 13 additions & 0 deletions core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.iceberg;

import static org.apache.iceberg.TableProperties.DROP_PARTITION_DELETE_ENABLED;
import static org.apache.iceberg.TableProperties.DROP_PARTITION_DELETE_ENABLED_DEFAULT;
import static org.apache.iceberg.TableProperties.MANIFEST_MIN_MERGE_COUNT;
import static org.apache.iceberg.TableProperties.MANIFEST_MIN_MERGE_COUNT_DEFAULT;
import static org.apache.iceberg.TableProperties.MANIFEST_TARGET_SIZE_BYTES;
Expand All @@ -30,6 +32,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.iceberg.events.CreateSnapshotEvent;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.exceptions.ValidationException;
Expand Down Expand Up @@ -77,6 +80,7 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
private final ManifestFilterManager<DataFile> filterManager;
private final ManifestMergeManager<DeleteFile> deleteMergeManager;
private final ManifestFilterManager<DeleteFile> deleteFilterManager;
private final boolean dropPartitionDeleteEnabled;

// update data
private final List<DataFile> newDataFiles = Lists.newArrayList();
Expand Down Expand Up @@ -119,6 +123,15 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
this.deleteMergeManager =
new DeleteFileMergeManager(targetSizeBytes, minCountToMerge, mergeEnabled);
this.deleteFilterManager = new DeleteFileFilterManager();
this.dropPartitionDeleteEnabled =
ops.current()
.propertyAsBoolean(
DROP_PARTITION_DELETE_ENABLED, DROP_PARTITION_DELETE_ENABLED_DEFAULT);
this.deleteFilterManager.setDropPartitionDelete(dropPartitionDeleteEnabled);
this.filterManager.setDropPartitionDelete(dropPartitionDeleteEnabled);
Map<Pair<Integer, StructLike>, AtomicLong> seqByPartMap = Maps.newConcurrentMap();
this.deleteFilterManager.setMinSequenceNumberByPartition(seqByPartMap);
this.filterManager.setMinSequenceNumberByPartition(seqByPartMap);
}

@Override
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ private TableProperties() {}

public static final String MANIFEST_MERGE_ENABLED = "commit.manifest-merge.enabled";
public static final boolean MANIFEST_MERGE_ENABLED_DEFAULT = true;
public static final String DROP_PARTITION_DELETE_ENABLED = "commit.drop-partition-delete.enabled";

// TODO: turn back to false after review
public static final boolean DROP_PARTITION_DELETE_ENABLED_DEFAULT = true;

public static final String DEFAULT_FILE_FORMAT = "write.format.default";
public static final String DELETE_DEFAULT_FILE_FORMAT = "write.delete.format.default";
Expand Down
Loading