Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 63 additions & 5 deletions core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
import org.apache.iceberg.relocated.com.google.common.collect.ObjectArrays;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.types.Conversions;
Expand All @@ -75,6 +76,7 @@ class DeleteFileIndex {
private final DeleteFileGroup globalDeletes;
private final Map<Pair<Integer, StructLikeWrapper>, DeleteFileGroup> deletesByPartition;
private final boolean isEmpty;
private final boolean useColumnStatsFiltering;

/** @deprecated since 1.4.0, will be removed in 1.5.0. */
@Deprecated
Expand All @@ -83,20 +85,22 @@ class DeleteFileIndex {
long[] globalSeqs,
DeleteFile[] globalDeletes,
Map<Pair<Integer, StructLikeWrapper>, Pair<long[], DeleteFile[]>> deletesByPartition) {
this(specs, index(specs, globalSeqs, globalDeletes), index(specs, deletesByPartition));
this(specs, index(specs, globalSeqs, globalDeletes), index(specs, deletesByPartition), true);
}

private DeleteFileIndex(
Map<Integer, PartitionSpec> specs,
DeleteFileGroup globalDeletes,
Map<Pair<Integer, StructLikeWrapper>, DeleteFileGroup> deletesByPartition) {
Map<Pair<Integer, StructLikeWrapper>, DeleteFileGroup> deletesByPartition,
boolean useColumnStatsFiltering) {
ImmutableMap.Builder<Integer, Types.StructType> builder = ImmutableMap.builder();
specs.forEach((specId, spec) -> builder.put(specId, spec.partitionType()));
this.partitionTypeById = builder.build();
this.wrapperById = wrappers(specs);
this.globalDeletes = globalDeletes;
this.deletesByPartition = deletesByPartition;
this.isEmpty = globalDeletes == null && deletesByPartition.isEmpty();
this.useColumnStatsFiltering = useColumnStatsFiltering;
}

public boolean isEmpty() {
Expand Down Expand Up @@ -148,7 +152,16 @@ DeleteFile[] forDataFile(long sequenceNumber, DataFile file) {

if (globalDeletes == null && partitionDeletes == null) {
return NO_DELETES;
} else if (useColumnStatsFiltering) {
return limitWithColumnStatsFiltering(sequenceNumber, file, partitionDeletes);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about filterWithColumnStats, filterWithoutColumnStats ?

Slightly shorter, and not sure if word 'limit' has any different significance than filter worth calling out?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kind of wanted to indicate that we both limit by sequence number as well as filter using column stats.

} else {
return limitWithoutColumnStatsFiltering(sequenceNumber, partitionDeletes);
}
}

// limits deletes using sequence numbers and checks whether columns stats overlap
private DeleteFile[] limitWithColumnStatsFiltering(
long sequenceNumber, DataFile file, DeleteFileGroup partitionDeletes) {

Stream<IndexedDeleteFile> matchingDeletes;
if (partitionDeletes == null) {
Expand All @@ -167,6 +180,21 @@ DeleteFile[] forDataFile(long sequenceNumber, DataFile file) {
.toArray(DeleteFile[]::new);
}

// limits deletes using sequence numbers but skips expensive column stats filtering
private DeleteFile[] limitWithoutColumnStatsFiltering(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See below why a separate method is added.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice find, maybe worth a comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

long sequenceNumber, DeleteFileGroup partitionDeletes) {

if (partitionDeletes == null) {
Copy link
Contributor Author

@aokolnychyi aokolnychyi Aug 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only invoked if global or partition deletes are not null. If both are null, we return earlier.

return globalDeletes.filter(sequenceNumber);
} else if (globalDeletes == null) {
return partitionDeletes.filter(sequenceNumber);
} else {
DeleteFile[] matchingGlobalDeletes = globalDeletes.filter(sequenceNumber);
DeleteFile[] matchingPartitionDeletes = partitionDeletes.filter(sequenceNumber);
return ObjectArrays.concat(matchingGlobalDeletes, matchingPartitionDeletes, DeleteFile.class);
}
}

private static boolean canContainDeletesForFile(DataFile dataFile, IndexedDeleteFile deleteFile) {
switch (deleteFile.content()) {
case POSITION_DELETES:
Expand Down Expand Up @@ -483,6 +511,8 @@ private Collection<DeleteFile> loadDeleteFiles() {
DeleteFileIndex build() {
Iterable<DeleteFile> files = deleteFiles != null ? filterDeleteFiles() : loadDeleteFiles();

boolean useColumnStatsFiltering = false;

// build a map from (specId, partition) to delete file entries
Map<Integer, StructLikeWrapper> wrappersBySpecId = Maps.newHashMap();
ListMultimap<Pair<Integer, StructLikeWrapper>, IndexedDeleteFile> deleteFilesByPartition =
Expand All @@ -494,7 +524,13 @@ DeleteFileIndex build() {
wrappersBySpecId
.computeIfAbsent(specId, id -> StructLikeWrapper.forType(spec.partitionType()))
.copyFor(file.partition());
deleteFilesByPartition.put(Pair.of(specId, wrapper), new IndexedDeleteFile(spec, file));
IndexedDeleteFile indexedFile = new IndexedDeleteFile(spec, file);
deleteFilesByPartition.put(Pair.of(specId, wrapper), indexedFile);

if (!useColumnStatsFiltering) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

useColumnStatsFiltering |= indexedFile.hasLowerAndUpperBounds() also works, but optional as its just style perference

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am avoiding an extra call to hasLowerAndUpperBounds if we have already seen a file with stats. This allows us to avoid loading and converting the underlying boundaries if present. It is not guaranteed we would have to check those boundaries as the delete file could be filtered using sequence numbers.

Copy link
Member

@szehon-ho szehon-ho Aug 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea my thought was it translates to:

useColumnStatsWithFiltering = useColumnStatsWithFiltering || indexFile.hasLowerAndUpperBound, which shouldn't evaluate the second one in theory if I understand correctly.

So was thinking JVM would do the optimization for us, but not a big deal either way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The order in which the operands are evaluated is not guaranteed. In a lot of cases, JVM would start evaluate both at the same time. I've seen this while profiling.

useColumnStatsFiltering = indexedFile.hasLowerAndUpperBounds();
}

ScanMetricsUtil.indexedDeleteFile(scanMetrics, file);
}

Expand Down Expand Up @@ -535,7 +571,8 @@ DeleteFileIndex build() {
}
}

return new DeleteFileIndex(specsById, globalDeletes, sortedDeletesByPartition);
return new DeleteFileIndex(
specsById, globalDeletes, sortedDeletesByPartition, useColumnStatsFiltering);
}

private Iterable<CloseableIterable<ManifestEntry<DeleteFile>>> deleteManifestReaders() {
Expand Down Expand Up @@ -597,7 +634,28 @@ private static class DeleteFileGroup {
this.files = files;
}

public DeleteFile[] filter(long seq) {
Copy link
Contributor Author

@aokolnychyi aokolnychyi Aug 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no need to use the Stream API (which has overhead) if column stats filtering is disabled. Take a look at the flamegraph below that uses streams without column stats filtering.

image

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would the stream could be costly with useColumnStatsFiltering? Should we prefer to use for-loop? It seems we have heavy stream usage.

Copy link
Contributor Author

@aokolnychyi aokolnychyi Aug 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the Stream API makes the implementation easier to read in case we need to iterate through the elements and check if each of them matches. We would need to try this separately and see how much more complicated it would be and what kind of benefits we get.

int start = findStartIndex(seq);

if (start >= files.length) {
return NO_DELETES;
}

DeleteFile[] matchingFiles = new DeleteFile[files.length - start];

for (int index = start; index < files.length; index++) {
matchingFiles[index - start] = files[index].wrapped();
}

return matchingFiles;
}

public Stream<IndexedDeleteFile> limit(long seq) {
int start = findStartIndex(seq);
return Arrays.stream(files, start, files.length);
}

private int findStartIndex(long seq) {
int pos = Arrays.binarySearch(seqs, seq);
int start;
if (pos < 0) {
Expand All @@ -612,7 +670,7 @@ public Stream<IndexedDeleteFile> limit(long seq) {
}
}

return Arrays.stream(files, start, files.length);
return start;
}

public Iterable<DeleteFile> referencedDeleteFiles() {
Expand Down
Loading