diff --git a/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java b/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java index fddb1a161637..adc5e1b7d3c2 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java +++ b/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java @@ -68,9 +68,9 @@ public String partition() { private final Map specsById; private final PartitionSet deleteFilePartitions; + private final Set deleteFiles = newFileSet(); private final PartitionSet dropPartitions; private final CharSequenceSet deletePaths = CharSequenceSet.empty(); - private final Set deleteFiles = newFileSet(); private Expression deleteExpression = Expressions.alwaysFalse(); private long minSequenceNumber = 0; private boolean failAnyDelete = false; diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index 2209b348227d..ab55f86ebf6f 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -30,7 +30,6 @@ import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.stream.Collectors; import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.events.CreateSnapshotEvent; import org.apache.iceberg.exceptions.ValidationException; @@ -42,7 +41,6 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.base.Predicate; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Iterators; @@ -82,11 +80,9 @@ abstract class MergingSnapshotProducer extends SnapshotProducer { private final ManifestFilterManager deleteFilterManager; // update data - private final Map> newDataFilesBySpec = Maps.newHashMap(); - private final DataFileSet newDataFiles = DataFileSet.create(); - private final DeleteFileSet newDeleteFiles = DeleteFileSet.create(); + private final Map newDataFilesBySpec = Maps.newHashMap(); private Long newDataFilesDataSequenceNumber; - private final Map> newDeleteFilesBySpec = Maps.newHashMap(); + private final Map newDeleteFilesBySpec = Maps.newHashMap(); private final List appendManifests = Lists.newArrayList(); private final List rewrittenAppendManifests = Lists.newArrayList(); private final SnapshotSummary.Builder addedFilesSummary = SnapshotSummary.builder(); @@ -161,12 +157,9 @@ protected Expression rowFilter() { } protected List addedDataFiles() { - return ImmutableList.copyOf( - newDataFilesBySpec.values().stream().flatMap(List::stream).collect(Collectors.toList())); - } - - protected Map> addedDataFilesBySpec() { - return ImmutableMap.copyOf(newDataFilesBySpec); + return newDataFilesBySpec.values().stream() + .flatMap(Set::stream) + .collect(ImmutableList.toImmutableList()); } protected void failAnyDelete() { @@ -236,43 +229,49 @@ protected boolean addsDeleteFiles() { /** Add a data file to the new snapshot. */ protected void add(DataFile file) { Preconditions.checkNotNull(file, "Invalid data file: null"); - if (newDataFiles.add(file)) { - PartitionSpec fileSpec = ops.current().spec(file.specId()); - Preconditions.checkArgument( - fileSpec != null, - "Cannot find partition spec %s for data file: %s", - file.specId(), - file.path()); - - addedFilesSummary.addedFile(fileSpec, file); + PartitionSpec spec = spec(file.specId()); + Preconditions.checkArgument( + spec != null, + "Cannot find partition spec %s for data file: %s", + file.specId(), + file.location()); + + DataFileSet dataFiles = + newDataFilesBySpec.computeIfAbsent(spec, ignored -> DataFileSet.create()); + if (dataFiles.add(file)) { + addedFilesSummary.addedFile(spec, file); hasNewDataFiles = true; - List dataFiles = - newDataFilesBySpec.computeIfAbsent(fileSpec, ignored -> Lists.newArrayList()); - dataFiles.add(file); } } + private PartitionSpec spec(int specId) { + return ops.current().spec(specId); + } + /** Add a delete file to the new snapshot. */ protected void add(DeleteFile file) { Preconditions.checkNotNull(file, "Invalid delete file: null"); - add(new DeleteFileHolder(file)); + add(new PendingDeleteFile(file)); } /** Add a delete file to the new snapshot. */ protected void add(DeleteFile file, long dataSequenceNumber) { Preconditions.checkNotNull(file, "Invalid delete file: null"); - add(new DeleteFileHolder(file, dataSequenceNumber)); + add(new PendingDeleteFile(file, dataSequenceNumber)); } - private void add(DeleteFileHolder fileHolder) { - int specId = fileHolder.deleteFile().specId(); - PartitionSpec fileSpec = ops.current().spec(specId); - List deleteFiles = - newDeleteFilesBySpec.computeIfAbsent(specId, s -> Lists.newArrayList()); - - if (newDeleteFiles.add(fileHolder.deleteFile())) { - deleteFiles.add(fileHolder); - addedFilesSummary.addedFile(fileSpec, fileHolder.deleteFile()); + private void add(PendingDeleteFile file) { + PartitionSpec spec = spec(file.specId()); + Preconditions.checkArgument( + spec != null, + "Cannot find partition spec %s for delete file: %s", + file.specId(), + file.location()); + + DeleteFileSet deleteFiles = + newDeleteFilesBySpec.computeIfAbsent(spec.specId(), ignored -> DeleteFileSet.create()); + if (deleteFiles.add(file)) { + addedFilesSummary.addedFile(spec, file); hasNewDeleteFiles = true; } } diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index 27724f787dd2..33114baa641d 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -35,6 +35,7 @@ import com.github.benmanes.caffeine.cache.LoadingCache; import java.io.IOException; import java.math.RoundingMode; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collection; import java.util.List; @@ -595,20 +596,22 @@ private List writeDataFileGroup( } protected List writeDeleteManifests( - Collection files, PartitionSpec spec) { + Collection files, PartitionSpec spec) { return writeManifests(files, group -> writeDeleteFileGroup(group, spec)); } private List writeDeleteFileGroup( - Collection files, PartitionSpec spec) { + Collection files, PartitionSpec spec) { RollingManifestWriter writer = newRollingDeleteManifestWriter(spec); try (RollingManifestWriter closableWriter = writer) { - for (DeleteFileHolder file : files) { + for (DeleteFile file : files) { + Preconditions.checkArgument( + file instanceof PendingDeleteFile, "Invalid delete file: must be PendingDeleteFile"); if (file.dataSequenceNumber() != null) { - closableWriter.add(file.deleteFile(), file.dataSequenceNumber()); + closableWriter.add(file, file.dataSequenceNumber()); } else { - closableWriter.add(file.deleteFile()); + closableWriter.add(file); } } } catch (IOException e) { @@ -752,7 +755,7 @@ private static void updateTotal( } } - protected static class DeleteFileHolder { + protected static class PendingDeleteFile implements DeleteFile { private final DeleteFile deleteFile; private final Long dataSequenceNumber; @@ -762,7 +765,7 @@ protected static class DeleteFileHolder { * @param deleteFile delete file * @param dataSequenceNumber data sequence number to apply */ - DeleteFileHolder(DeleteFile deleteFile, long dataSequenceNumber) { + PendingDeleteFile(DeleteFile deleteFile, long dataSequenceNumber) { this.deleteFile = deleteFile; this.dataSequenceNumber = dataSequenceNumber; } @@ -772,17 +775,147 @@ protected static class DeleteFileHolder { * * @param deleteFile delete file */ - DeleteFileHolder(DeleteFile deleteFile) { + PendingDeleteFile(DeleteFile deleteFile) { this.deleteFile = deleteFile; this.dataSequenceNumber = null; } - public DeleteFile deleteFile() { - return deleteFile; + private PendingDeleteFile wrap(DeleteFile file) { + if (null != dataSequenceNumber) { + return new PendingDeleteFile(file, dataSequenceNumber); + } + + return new PendingDeleteFile(file); } + @Override public Long dataSequenceNumber() { return dataSequenceNumber; } + + @Override + public Long fileSequenceNumber() { + return deleteFile.fileSequenceNumber(); + } + + @Override + public DeleteFile copy() { + return wrap(deleteFile.copy()); + } + + @Override + public DeleteFile copyWithoutStats() { + return wrap(deleteFile.copyWithoutStats()); + } + + @Override + public DeleteFile copyWithStats(Set requestedColumnIds) { + return wrap(deleteFile.copyWithStats(requestedColumnIds)); + } + + @Override + public DeleteFile copy(boolean withStats) { + return wrap(deleteFile.copy(withStats)); + } + + @Override + public String manifestLocation() { + return deleteFile.manifestLocation(); + } + + @Override + public Long pos() { + return deleteFile.pos(); + } + + @Override + public int specId() { + return deleteFile.specId(); + } + + @Override + public FileContent content() { + return deleteFile.content(); + } + + @Override + public CharSequence path() { + return deleteFile.path(); + } + + @Override + public String location() { + return deleteFile.location(); + } + + @Override + public FileFormat format() { + return deleteFile.format(); + } + + @Override + public StructLike partition() { + return deleteFile.partition(); + } + + @Override + public long recordCount() { + return deleteFile.recordCount(); + } + + @Override + public long fileSizeInBytes() { + return deleteFile.fileSizeInBytes(); + } + + @Override + public Map columnSizes() { + return deleteFile.columnSizes(); + } + + @Override + public Map valueCounts() { + return deleteFile.valueCounts(); + } + + @Override + public Map nullValueCounts() { + return deleteFile.nullValueCounts(); + } + + @Override + public Map nanValueCounts() { + return deleteFile.nanValueCounts(); + } + + @Override + public Map lowerBounds() { + return deleteFile.lowerBounds(); + } + + @Override + public Map upperBounds() { + return deleteFile.upperBounds(); + } + + @Override + public ByteBuffer keyMetadata() { + return deleteFile.keyMetadata(); + } + + @Override + public List splitOffsets() { + return deleteFile.splitOffsets(); + } + + @Override + public List equalityFieldIds() { + return deleteFile.equalityFieldIds(); + } + + @Override + public Integer sortOrderId() { + return deleteFile.sortOrderId(); + } } }