Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ protected String operation() {

@Override
public ReplacePartitions addFile(DataFile file) {
dropPartition(file.partition());
dropPartition(file.specId(), file.partition());
add(file);
return this;
}
Expand Down
60 changes: 38 additions & 22 deletions core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,44 +40,60 @@
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.ListMultimap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.StructLikeWrapper;
import org.apache.iceberg.util.Tasks;

/**
* An index of {@link DeleteFile delete files} by sequence number.
* <p>
* Use {@link #builderFor(FileIO, Iterable)} to construct an index, and {@link #forDataFile(int, long, DataFile)} or
* {@link #forEntry(int, ManifestEntry)} to get the the delete files to apply to a given data file.
* Use {@link #builderFor(FileIO, Iterable)} to construct an index, and {@link #forDataFile(long, DataFile)} or
* {@link #forEntry(ManifestEntry)} to get the the delete files to apply to a given data file.
*/
class DeleteFileIndex {
private static final DeleteFile[] NO_DELETE_FILES = new DeleteFile[0];

private final Map<Integer, Types.StructType> partitionTypeById;
private final Map<Integer, ThreadLocal<StructLikeWrapper>> wrapperById;
private final long[] globalSeqs;
private final DeleteFile[] globalDeletes;
private final Map<Pair<Integer, StructLikeWrapper>, Pair<long[], DeleteFile[]>> sortedDeletesByPartition;
private final ThreadLocal<StructLikeWrapper> lookupWrapper = ThreadLocal.withInitial(
() -> StructLikeWrapper.wrap(null));

DeleteFileIndex(long[] globalSeqs, DeleteFile[] globalDeletes,
DeleteFileIndex(Map<Integer, PartitionSpec> specsById, long[] globalSeqs, DeleteFile[] globalDeletes,
Map<Pair<Integer, StructLikeWrapper>, Pair<long[], DeleteFile[]>> sortedDeletesByPartition) {
ImmutableMap.Builder<Integer, Types.StructType> builder = ImmutableMap.builder();
specsById.forEach((specId, spec) -> builder.put(specId, spec.partitionType()));
this.partitionTypeById = builder.build();
this.wrapperById = Maps.newHashMap();
this.globalSeqs = globalSeqs;
this.globalDeletes = globalDeletes;
this.sortedDeletesByPartition = sortedDeletesByPartition;
}

DeleteFile[] forEntry(int specId, ManifestEntry<DataFile> entry) {
return forDataFile(specId, entry.sequenceNumber(), entry.file());
private StructLikeWrapper newWrapper(int specId) {
return StructLikeWrapper.forType(partitionTypeById.get(specId));
}

DeleteFile[] forDataFile(int specId, long sequenceNumber, DataFile file) {
Pair<Integer, StructLikeWrapper> partition = Pair.of(specId, lookupWrapper.get().set(file.partition()));
private Pair<Integer, StructLikeWrapper> partition(int specId, StructLike struct) {
ThreadLocal<StructLikeWrapper> wrapper = wrapperById.computeIfAbsent(specId,
id -> ThreadLocal.withInitial(() -> newWrapper(id)));
return Pair.of(specId, wrapper.get().set(struct));
}

DeleteFile[] forEntry(ManifestEntry<DataFile> entry) {
return forDataFile(entry.sequenceNumber(), entry.file());
}

DeleteFile[] forDataFile(long sequenceNumber, DataFile file) {
Pair<Integer, StructLikeWrapper> partition = partition(file.specId(), file.partition());
Pair<long[], DeleteFile[]> partitionDeletes = sortedDeletesByPartition.get(partition);

if (partitionDeletes == null) {
Expand Down Expand Up @@ -164,15 +180,15 @@ Builder planWith(ExecutorService newExecutorService) {

DeleteFileIndex build() {
// read all of the matching delete manifests in parallel and accumulate the matching files in a queue
Queue<Pair<Integer, ManifestEntry<DeleteFile>>> deleteEntries = new ConcurrentLinkedQueue<>();
Queue<ManifestEntry<DeleteFile>> deleteEntries = new ConcurrentLinkedQueue<>();
Tasks.foreach(deleteManifestReaders())
.stopOnFailure().throwFailureWhenFinished()
.executeWith(executorService)
.run(specIdAndReader -> {
try (CloseableIterable<ManifestEntry<DeleteFile>> reader = specIdAndReader.second()) {
.run(deleteFile -> {
try (CloseableIterable<ManifestEntry<DeleteFile>> reader = deleteFile) {
for (ManifestEntry<DeleteFile> entry : reader) {
// copy with stats for better filtering against data file stats
deleteEntries.add(Pair.of(specIdAndReader.first(), entry.copy()));
deleteEntries.add(entry.copy());
}
} catch (IOException e) {
throw new RuntimeIOException("Failed to close", e);
Expand All @@ -182,10 +198,11 @@ DeleteFileIndex build() {
// build a map from (specId, partition) to delete file entries
ListMultimap<Pair<Integer, StructLikeWrapper>, ManifestEntry<DeleteFile>> deleteFilesByPartition =
Multimaps.newListMultimap(Maps.newHashMap(), Lists::newArrayList);
for (Pair<Integer, ManifestEntry<DeleteFile>> specIdAndEntry : deleteEntries) {
int specId = specIdAndEntry.first();
ManifestEntry<DeleteFile> entry = specIdAndEntry.second();
deleteFilesByPartition.put(Pair.of(specId, StructLikeWrapper.wrap(entry.file().partition())), entry);
for (ManifestEntry<DeleteFile> entry : deleteEntries) {
int specId = entry.file().specId();
StructLikeWrapper wrapper = StructLikeWrapper.forType(specsById.get(specId).partitionType())
.set(entry.file().partition());
deleteFilesByPartition.put(Pair.of(specId, wrapper), entry);
}

// sort the entries in each map value by sequence number and split into sequence numbers and delete files lists
Expand Down Expand Up @@ -237,10 +254,10 @@ DeleteFileIndex build() {
}
}

return new DeleteFileIndex(globalApplySeqs, globalDeletes, sortedDeletesByPartition);
return new DeleteFileIndex(specsById, globalApplySeqs, globalDeletes, sortedDeletesByPartition);
}

private Iterable<Pair<Integer, CloseableIterable<ManifestEntry<DeleteFile>>>> deleteManifestReaders() {
private Iterable<CloseableIterable<ManifestEntry<DeleteFile>>> deleteManifestReaders() {
LoadingCache<Integer, ManifestEvaluator> evalCache = specsById == null ? null :
Caffeine.newBuilder().build(specId -> {
PartitionSpec spec = specsById.get(specId);
Expand All @@ -257,13 +274,12 @@ private Iterable<Pair<Integer, CloseableIterable<ManifestEntry<DeleteFile>>>> de

return Iterables.transform(
matchingManifests,
manifest -> Pair.of(
manifest.partitionSpecId(),
manifest ->
ManifestFiles.readDeleteManifest(manifest, io, specsById)
.filterRows(dataFilter)
.filterPartitions(partitionFilter)
.caseSensitive(caseSensitive)
.liveEntries())
.liveEntries()
);
}
}
Expand Down
50 changes: 23 additions & 27 deletions core/src/main/java/org/apache/iceberg/ManifestFilterManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.CharSequenceWrapper;
import org.apache.iceberg.util.ManifestFileUtil;
import org.apache.iceberg.util.StructLikeWrapper;
import org.apache.iceberg.util.PartitionSet;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
Expand All @@ -65,9 +65,10 @@ public String partition() {
}
}

private final Map<Integer, PartitionSpec> specsById;
private final PartitionSet deleteFilePartitions;
private final PartitionSet dropPartitions;
private final Set<CharSequence> deletePaths = CharSequenceSet.empty();
private final Set<StructLikeWrapper> deleteFilePartitions = Sets.newHashSet();
private final Set<StructLikeWrapper> dropPartitions = Sets.newHashSet();
private Expression deleteExpression = Expressions.alwaysFalse();
private long minSequenceNumber = 0;
private boolean hasPathOnlyDeletes = false;
Expand All @@ -82,7 +83,12 @@ public String partition() {
private final Map<ManifestFile, Iterable<F>> filteredManifestToDeletedFiles =
Maps.newConcurrentMap();

protected abstract PartitionSpec spec(int specId);
protected ManifestFilterManager(Map<Integer, PartitionSpec> specsById) {
this.specsById = specsById;
this.deleteFilePartitions = PartitionSet.create(specsById);
this.dropPartitions = PartitionSet.create(specsById);
}

protected abstract void deleteFile(String location);
protected abstract ManifestWriter<F> newManifestWriter(PartitionSpec spec);
protected abstract ManifestReader<F> newManifestReader(ManifestFile manifest);
Expand Down Expand Up @@ -110,10 +116,10 @@ protected void deleteByRowFilter(Expression expr) {
/**
* Add a partition tuple to drop from the table during the delete phase.
*/
protected void dropPartition(StructLike partition) {
protected void dropPartition(int specId, StructLike partition) {
Preconditions.checkNotNull(partition, "Cannot delete files in invalid partition: null");
invalidateFilteredCache();
dropPartitions.add(StructLikeWrapper.wrap(partition));
dropPartitions.add(specId, partition);
}

/**
Expand All @@ -138,7 +144,7 @@ void delete(F file) {
Preconditions.checkNotNull(file, "Cannot delete file: null");
invalidateFilteredCache();
deletePaths.add(file.path());
deleteFilePartitions.add(StructLikeWrapper.wrap(file.partition()));
deleteFilePartitions.add(file.specId(), file.partition());
}

/**
Expand Down Expand Up @@ -191,7 +197,7 @@ SnapshotSummary.Builder buildSummary(Iterable<ManifestFile> manifests) {
SnapshotSummary.Builder summaryBuilder = SnapshotSummary.builder();

for (ManifestFile manifest : manifests) {
PartitionSpec manifestSpec = spec(manifest.partitionSpecId());
PartitionSpec manifestSpec = specsById.get(manifest.partitionSpecId());
Iterable<F> manifestDeletes = filteredManifestToDeletedFiles.get(manifest);
if (manifestDeletes != null) {
for (F file : manifestDeletes) {
Expand Down Expand Up @@ -282,19 +288,16 @@ private ManifestFile filterManifest(StrictMetricsEvaluator metricsEvaluator, Man
}

try (ManifestReader<F> reader = newManifestReader(manifest)) {
// reused to compare file partitions with the drop set
StructLikeWrapper partitionWrapper = StructLikeWrapper.wrap(null);

// this assumes that the manifest doesn't have files to remove and streams through the
// manifest without copying data. if a manifest does have a file to remove, this will break
// out of the loop and move on to filtering the manifest.
boolean hasDeletedFiles = manifestHasDeletedFiles(metricsEvaluator, reader, partitionWrapper);
boolean hasDeletedFiles = manifestHasDeletedFiles(metricsEvaluator, reader);
if (!hasDeletedFiles) {
filteredManifests.put(manifest, manifest);
return manifest;
}

return filterManifestWithDeletedFiles(metricsEvaluator, manifest, reader, partitionWrapper);
return filterManifestWithDeletedFiles(metricsEvaluator, manifest, reader);

} catch (IOException e) {
throw new RuntimeIOException("Failed to close manifest: " + manifest, e);
Expand All @@ -305,18 +308,15 @@ private boolean canContainDeletedFiles(ManifestFile manifest) {
boolean canContainExpressionDeletes;
if (deleteExpression != null && deleteExpression != Expressions.alwaysFalse()) {
ManifestEvaluator manifestEvaluator =
ManifestEvaluator.forRowFilter(deleteExpression, spec(manifest.partitionSpecId()), true);
ManifestEvaluator.forRowFilter(deleteExpression, specsById.get(manifest.partitionSpecId()), true);
canContainExpressionDeletes = manifestEvaluator.eval(manifest);
} else {
canContainExpressionDeletes = false;
}

boolean canContainDroppedPartitions;
if (dropPartitions.size() > 0) {
canContainDroppedPartitions = ManifestFileUtil.canContainAny(
manifest,
Iterables.transform(dropPartitions, StructLikeWrapper::get),
this::spec);
canContainDroppedPartitions = ManifestFileUtil.canContainAny(manifest, dropPartitions, specsById);
} else {
canContainDroppedPartitions = false;
}
Expand All @@ -326,10 +326,7 @@ private boolean canContainDeletedFiles(ManifestFile manifest) {
canContainDroppedFiles = true;
} else if (deletePaths.size() > 0) {
// because there were no path-only deletes, the set of deleted file partitions is valid
canContainDroppedFiles = ManifestFileUtil.canContainAny(
manifest,
Iterables.transform(deleteFilePartitions, StructLikeWrapper::get),
this::spec);
canContainDroppedFiles = ManifestFileUtil.canContainAny(manifest, deleteFilePartitions, specsById);
} else {
canContainDroppedFiles = false;
}
Expand All @@ -341,15 +338,15 @@ private boolean canContainDeletedFiles(ManifestFile manifest) {
}

private boolean manifestHasDeletedFiles(
StrictMetricsEvaluator metricsEvaluator, ManifestReader<F> reader, StructLikeWrapper partitionWrapper) {
StrictMetricsEvaluator metricsEvaluator, ManifestReader<F> reader) {
boolean isDelete = reader.isDeleteManifestReader();
Evaluator inclusive = inclusiveDeleteEvaluator(reader.spec());
Evaluator strict = strictDeleteEvaluator(reader.spec());
boolean hasDeletedFiles = false;
for (ManifestEntry<F> entry : reader.entries()) {
F file = entry.file();
boolean fileDelete = deletePaths.contains(file.path()) ||
dropPartitions.contains(partitionWrapper.set(file.partition())) ||
dropPartitions.contains(file.specId(), file.partition()) ||
(isDelete && entry.sequenceNumber() > 0 && entry.sequenceNumber() < minSequenceNumber);
if (fileDelete || inclusive.eval(file.partition())) {
ValidationException.check(
Expand All @@ -368,8 +365,7 @@ private boolean manifestHasDeletedFiles(
}

private ManifestFile filterManifestWithDeletedFiles(
StrictMetricsEvaluator metricsEvaluator, ManifestFile manifest, ManifestReader<F> reader,
StructLikeWrapper partitionWrapper) {
StrictMetricsEvaluator metricsEvaluator, ManifestFile manifest, ManifestReader<F> reader) {
boolean isDelete = reader.isDeleteManifestReader();
Evaluator inclusive = inclusiveDeleteEvaluator(reader.spec());
Evaluator strict = strictDeleteEvaluator(reader.spec());
Expand All @@ -384,7 +380,7 @@ private ManifestFile filterManifestWithDeletedFiles(
reader.entries().forEach(entry -> {
F file = entry.file();
boolean fileDelete = deletePaths.contains(file.path()) ||
dropPartitions.contains(partitionWrapper.set(file.partition())) ||
dropPartitions.contains(file.specId(), file.partition()) ||
(isDelete && entry.sequenceNumber() > 0 && entry.sequenceNumber() < minSequenceNumber);
if (entry.status() != ManifestEntry.Status.DELETED) {
if (fileDelete || inclusive.eval(file.partition())) {
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/org/apache/iceberg/ManifestGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,10 @@ public CloseableIterable<FileScanTask> planFiles() {
ResidualEvaluator residuals = residualCache.get(specId);
if (dropStats) {
return CloseableIterable.transform(entries, e -> new BaseFileScanTask(
e.file().copyWithoutStats(), deleteFiles.forEntry(specId, e), schemaString, specString, residuals));
e.file().copyWithoutStats(), deleteFiles.forEntry(e), schemaString, specString, residuals));
} else {
return CloseableIterable.transform(entries, e -> new BaseFileScanTask(
e.file().copy(), deleteFiles.forEntry(specId, e), schemaString, specString, residuals));
e.file().copy(), deleteFiles.forEntry(e), schemaString, specString, residuals));
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,10 @@ protected void deleteByRowFilter(Expression expr) {
/**
* Add a partition tuple to drop from the table during the delete phase.
*/
protected void dropPartition(StructLike partition) {
protected void dropPartition(int specId, StructLike partition) {
// dropping the data in a partition also drops all deletes in the partition
filterManager.dropPartition(partition);
deleteFilterManager.dropPartition(partition);
filterManager.dropPartition(specId, partition);
deleteFilterManager.dropPartition(specId, partition);
}

/**
Expand Down Expand Up @@ -368,9 +368,8 @@ private ManifestFile newDeleteFilesAsManifest() {
}

private class DataFileFilterManager extends ManifestFilterManager<DataFile> {
@Override
protected PartitionSpec spec(int specId) {
return ops.current().spec(specId);
private DataFileFilterManager() {
super(ops.current().specsById());
}

@Override
Expand Down Expand Up @@ -421,9 +420,8 @@ protected ManifestReader<DataFile> newManifestReader(ManifestFile manifest) {
}

private class DeleteFileFilterManager extends ManifestFilterManager<DeleteFile> {
@Override
protected PartitionSpec spec(int specId) {
return ops.current().spec(specId);
private DeleteFileFilterManager() {
super(ops.current().specsById());
}

@Override
Expand Down
Loading