From d648ea5d2b5907240c60ca8a7249ed12c9f189c4 Mon Sep 17 00:00:00 2001 From: Eduard Tudenhoefner Date: Fri, 4 Oct 2024 11:13:25 +0200 Subject: [PATCH 1/3] Core: Rename DeleteFileHolder to PendingDeleteFile --- .../iceberg/MergingSnapshotProducer.java | 18 +-- .../org/apache/iceberg/SnapshotProducer.java | 147 ++++++++++++++++-- 2 files changed, 144 insertions(+), 21 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index 2209b348227d..8f3864a97914 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -86,7 +86,7 @@ abstract class MergingSnapshotProducer extends SnapshotProducer { private final DataFileSet newDataFiles = DataFileSet.create(); private final DeleteFileSet newDeleteFiles = DeleteFileSet.create(); private Long newDataFilesDataSequenceNumber; - private final Map> newDeleteFilesBySpec = Maps.newHashMap(); + private final Map> newDeleteFilesBySpec = Maps.newHashMap(); private final List appendManifests = Lists.newArrayList(); private final List rewrittenAppendManifests = Lists.newArrayList(); private final SnapshotSummary.Builder addedFilesSummary = SnapshotSummary.builder(); @@ -255,24 +255,24 @@ protected void add(DataFile file) { /** Add a delete file to the new snapshot. */ protected void add(DeleteFile file) { Preconditions.checkNotNull(file, "Invalid delete file: null"); - add(new DeleteFileHolder(file)); + add(new PendingDeleteFile(file)); } /** Add a delete file to the new snapshot. */ protected void add(DeleteFile file, long dataSequenceNumber) { Preconditions.checkNotNull(file, "Invalid delete file: null"); - add(new DeleteFileHolder(file, dataSequenceNumber)); + add(new PendingDeleteFile(file, dataSequenceNumber)); } - private void add(DeleteFileHolder fileHolder) { - int specId = fileHolder.deleteFile().specId(); + private void add(PendingDeleteFile deleteFile) { + int specId = deleteFile.specId(); PartitionSpec fileSpec = ops.current().spec(specId); - List deleteFiles = + List deleteFiles = newDeleteFilesBySpec.computeIfAbsent(specId, s -> Lists.newArrayList()); - if (newDeleteFiles.add(fileHolder.deleteFile())) { - deleteFiles.add(fileHolder); - addedFilesSummary.addedFile(fileSpec, fileHolder.deleteFile()); + if (newDeleteFiles.add(deleteFile)) { + deleteFiles.add(deleteFile); + addedFilesSummary.addedFile(fileSpec, deleteFile); hasNewDeleteFiles = true; } } diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index 27724f787dd2..dabd570ea1b6 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -35,6 +35,7 @@ import com.github.benmanes.caffeine.cache.LoadingCache; import java.io.IOException; import java.math.RoundingMode; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collection; import java.util.List; @@ -595,20 +596,20 @@ private List writeDataFileGroup( } protected List writeDeleteManifests( - Collection files, PartitionSpec spec) { + Collection files, PartitionSpec spec) { return writeManifests(files, group -> writeDeleteFileGroup(group, spec)); } private List writeDeleteFileGroup( - Collection files, PartitionSpec spec) { + Collection files, PartitionSpec spec) { RollingManifestWriter writer = newRollingDeleteManifestWriter(spec); try (RollingManifestWriter closableWriter = writer) { - for (DeleteFileHolder file : files) { + for (PendingDeleteFile file : files) { if (file.dataSequenceNumber() != null) { - closableWriter.add(file.deleteFile(), file.dataSequenceNumber()); + closableWriter.add(file, file.dataSequenceNumber()); } else { - closableWriter.add(file.deleteFile()); + closableWriter.add(file); } } } catch (IOException e) { @@ -752,7 +753,7 @@ private static void updateTotal( } } - protected static class DeleteFileHolder { + protected static class PendingDeleteFile implements DeleteFile { private final DeleteFile deleteFile; private final Long dataSequenceNumber; @@ -762,7 +763,7 @@ protected static class DeleteFileHolder { * @param deleteFile delete file * @param dataSequenceNumber data sequence number to apply */ - DeleteFileHolder(DeleteFile deleteFile, long dataSequenceNumber) { + PendingDeleteFile(DeleteFile deleteFile, long dataSequenceNumber) { this.deleteFile = deleteFile; this.dataSequenceNumber = dataSequenceNumber; } @@ -772,17 +773,139 @@ protected static class DeleteFileHolder { * * @param deleteFile delete file */ - DeleteFileHolder(DeleteFile deleteFile) { + PendingDeleteFile(DeleteFile deleteFile) { this.deleteFile = deleteFile; this.dataSequenceNumber = null; } - public DeleteFile deleteFile() { - return deleteFile; - } - + @Override public Long dataSequenceNumber() { return dataSequenceNumber; } + + @Override + public Long fileSequenceNumber() { + return deleteFile.fileSequenceNumber(); + } + + @Override + public DeleteFile copy() { + return deleteFile.copy(); + } + + @Override + public DeleteFile copyWithoutStats() { + return deleteFile.copyWithoutStats(); + } + + @Override + public DeleteFile copyWithStats(Set requestedColumnIds) { + return deleteFile.copyWithStats(requestedColumnIds); + } + + @Override + public DeleteFile copy(boolean withStats) { + return deleteFile.copy(withStats); + } + + @Override + public String manifestLocation() { + return deleteFile.manifestLocation(); + } + + @Override + public Long pos() { + return deleteFile.pos(); + } + + @Override + public int specId() { + return deleteFile.specId(); + } + + @Override + public FileContent content() { + return deleteFile.content(); + } + + @Override + public CharSequence path() { + return deleteFile.path(); + } + + @Override + public String location() { + return deleteFile.location(); + } + + @Override + public FileFormat format() { + return deleteFile.format(); + } + + @Override + public StructLike partition() { + return deleteFile.partition(); + } + + @Override + public long recordCount() { + return deleteFile.recordCount(); + } + + @Override + public long fileSizeInBytes() { + return deleteFile.fileSizeInBytes(); + } + + @Override + public Map columnSizes() { + return deleteFile.columnSizes(); + } + + @Override + public Map valueCounts() { + return deleteFile.valueCounts(); + } + + @Override + public Map nullValueCounts() { + return deleteFile.nullValueCounts(); + } + + @Override + public Map nanValueCounts() { + return deleteFile.nanValueCounts(); + } + + @Override + public Map lowerBounds() { + return deleteFile.lowerBounds(); + } + + @Override + public Map upperBounds() { + return deleteFile.upperBounds(); + } + + @Override + public ByteBuffer keyMetadata() { + return deleteFile.keyMetadata(); + } + + @Override + public List splitOffsets() { + return deleteFile.splitOffsets(); + } + + @Override + public List equalityFieldIds() { + return deleteFile.equalityFieldIds(); + } + + @Override + public Integer sortOrderId() { + return deleteFile.sortOrderId(); + } } } From 6d264553658e8fb5d40291614b0dd95e955b6a48 Mon Sep 17 00:00:00 2001 From: Eduard Tudenhoefner Date: Fri, 4 Oct 2024 11:36:24 +0200 Subject: [PATCH 2/3] Core: Optimize duplicate data/delete file detection --- .../iceberg/MergingSnapshotProducer.java | 46 +++++++++---------- .../org/apache/iceberg/SnapshotProducer.java | 6 +-- 2 files changed, 25 insertions(+), 27 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index 8f3864a97914..9f241052f702 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -82,11 +82,9 @@ abstract class MergingSnapshotProducer extends SnapshotProducer { private final ManifestFilterManager deleteFilterManager; // update data - private final Map> newDataFilesBySpec = Maps.newHashMap(); - private final DataFileSet newDataFiles = DataFileSet.create(); - private final DeleteFileSet newDeleteFiles = DeleteFileSet.create(); + private final Map newDataFilesBySpec = Maps.newHashMap(); private Long newDataFilesDataSequenceNumber; - private final Map> newDeleteFilesBySpec = Maps.newHashMap(); + private final Map newDeleteFilesBySpec = Maps.newHashMap(); private final List appendManifests = Lists.newArrayList(); private final List rewrittenAppendManifests = Lists.newArrayList(); private final SnapshotSummary.Builder addedFilesSummary = SnapshotSummary.builder(); @@ -162,10 +160,10 @@ protected Expression rowFilter() { protected List addedDataFiles() { return ImmutableList.copyOf( - newDataFilesBySpec.values().stream().flatMap(List::stream).collect(Collectors.toList())); + newDataFilesBySpec.values().stream().flatMap(Set::stream).collect(Collectors.toList())); } - protected Map> addedDataFilesBySpec() { + protected Map addedDataFilesBySpec() { return ImmutableMap.copyOf(newDataFilesBySpec); } @@ -236,19 +234,18 @@ protected boolean addsDeleteFiles() { /** Add a data file to the new snapshot. */ protected void add(DataFile file) { Preconditions.checkNotNull(file, "Invalid data file: null"); - if (newDataFiles.add(file)) { - PartitionSpec fileSpec = ops.current().spec(file.specId()); - Preconditions.checkArgument( - fileSpec != null, - "Cannot find partition spec %s for data file: %s", - file.specId(), - file.path()); - + PartitionSpec fileSpec = ops.current().spec(file.specId()); + Preconditions.checkArgument( + fileSpec != null, + "Cannot find partition spec %s for data file: %s", + file.specId(), + file.location()); + + DataFileSet dataFiles = + newDataFilesBySpec.computeIfAbsent(fileSpec, ignored -> DataFileSet.create()); + if (dataFiles.add(file)) { addedFilesSummary.addedFile(fileSpec, file); hasNewDataFiles = true; - List dataFiles = - newDataFilesBySpec.computeIfAbsent(fileSpec, ignored -> Lists.newArrayList()); - dataFiles.add(file); } } @@ -266,12 +263,11 @@ protected void add(DeleteFile file, long dataSequenceNumber) { private void add(PendingDeleteFile deleteFile) { int specId = deleteFile.specId(); - PartitionSpec fileSpec = ops.current().spec(specId); - List deleteFiles = - newDeleteFilesBySpec.computeIfAbsent(specId, s -> Lists.newArrayList()); + DeleteFileSet deleteFiles = + newDeleteFilesBySpec.computeIfAbsent(specId, s -> DeleteFileSet.create()); - if (newDeleteFiles.add(deleteFile)) { - deleteFiles.add(deleteFile); + if (deleteFiles.add(deleteFile)) { + PartitionSpec fileSpec = ops.current().spec(specId); addedFilesSummary.addedFile(fileSpec, deleteFile); hasNewDeleteFiles = true; } @@ -974,7 +970,8 @@ private List newDataFilesAsManifests() { newDataFilesBySpec.forEach( (dataSpec, dataFiles) -> { List newDataManifests = - writeDataManifests(dataFiles, newDataFilesDataSequenceNumber, dataSpec); + writeDataManifests( + Lists.newArrayList(dataFiles), newDataFilesDataSequenceNumber, dataSpec); cachedNewDataManifests.addAll(newDataManifests); }); this.hasNewDataFiles = false; @@ -1005,7 +1002,8 @@ private List newDeleteFilesAsManifests() { newDeleteFilesBySpec.forEach( (specId, deleteFiles) -> { PartitionSpec spec = ops.current().spec(specId); - List newDeleteManifests = writeDeleteManifests(deleteFiles, spec); + List newDeleteManifests = + writeDeleteManifests(Lists.newArrayList(deleteFiles), spec); cachedNewDeleteManifests.addAll(newDeleteManifests); }); diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index dabd570ea1b6..961e20f7bc20 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -596,16 +596,16 @@ private List writeDataFileGroup( } protected List writeDeleteManifests( - Collection files, PartitionSpec spec) { + Collection files, PartitionSpec spec) { return writeManifests(files, group -> writeDeleteFileGroup(group, spec)); } private List writeDeleteFileGroup( - Collection files, PartitionSpec spec) { + Collection files, PartitionSpec spec) { RollingManifestWriter writer = newRollingDeleteManifestWriter(spec); try (RollingManifestWriter closableWriter = writer) { - for (PendingDeleteFile file : files) { + for (DeleteFile file : files) { if (file.dataSequenceNumber() != null) { closableWriter.add(file, file.dataSequenceNumber()); } else { From 371e472ad89cfa67ec78a0f2b808baf292ccfdf6 Mon Sep 17 00:00:00 2001 From: Eduard Tudenhoefner Date: Tue, 15 Oct 2024 09:09:05 +0200 Subject: [PATCH 3/3] updates --- .../apache/iceberg/ManifestFilterManager.java | 2 +- .../iceberg/MergingSnapshotProducer.java | 47 ++++++++++--------- .../org/apache/iceberg/SnapshotProducer.java | 18 +++++-- 3 files changed, 39 insertions(+), 28 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java b/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java index fddb1a161637..adc5e1b7d3c2 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java +++ b/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java @@ -68,9 +68,9 @@ public String partition() { private final Map specsById; private final PartitionSet deleteFilePartitions; + private final Set deleteFiles = newFileSet(); private final PartitionSet dropPartitions; private final CharSequenceSet deletePaths = CharSequenceSet.empty(); - private final Set deleteFiles = newFileSet(); private Expression deleteExpression = Expressions.alwaysFalse(); private long minSequenceNumber = 0; private boolean failAnyDelete = false; diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index 9f241052f702..ab55f86ebf6f 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -30,7 +30,6 @@ import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.stream.Collectors; import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.events.CreateSnapshotEvent; import org.apache.iceberg.exceptions.ValidationException; @@ -42,7 +41,6 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.base.Predicate; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Iterators; @@ -159,12 +157,9 @@ protected Expression rowFilter() { } protected List addedDataFiles() { - return ImmutableList.copyOf( - newDataFilesBySpec.values().stream().flatMap(Set::stream).collect(Collectors.toList())); - } - - protected Map addedDataFilesBySpec() { - return ImmutableMap.copyOf(newDataFilesBySpec); + return newDataFilesBySpec.values().stream() + .flatMap(Set::stream) + .collect(ImmutableList.toImmutableList()); } protected void failAnyDelete() { @@ -234,21 +229,25 @@ protected boolean addsDeleteFiles() { /** Add a data file to the new snapshot. */ protected void add(DataFile file) { Preconditions.checkNotNull(file, "Invalid data file: null"); - PartitionSpec fileSpec = ops.current().spec(file.specId()); + PartitionSpec spec = spec(file.specId()); Preconditions.checkArgument( - fileSpec != null, + spec != null, "Cannot find partition spec %s for data file: %s", file.specId(), file.location()); DataFileSet dataFiles = - newDataFilesBySpec.computeIfAbsent(fileSpec, ignored -> DataFileSet.create()); + newDataFilesBySpec.computeIfAbsent(spec, ignored -> DataFileSet.create()); if (dataFiles.add(file)) { - addedFilesSummary.addedFile(fileSpec, file); + addedFilesSummary.addedFile(spec, file); hasNewDataFiles = true; } } + private PartitionSpec spec(int specId) { + return ops.current().spec(specId); + } + /** Add a delete file to the new snapshot. */ protected void add(DeleteFile file) { Preconditions.checkNotNull(file, "Invalid delete file: null"); @@ -261,14 +260,18 @@ protected void add(DeleteFile file, long dataSequenceNumber) { add(new PendingDeleteFile(file, dataSequenceNumber)); } - private void add(PendingDeleteFile deleteFile) { - int specId = deleteFile.specId(); - DeleteFileSet deleteFiles = - newDeleteFilesBySpec.computeIfAbsent(specId, s -> DeleteFileSet.create()); + private void add(PendingDeleteFile file) { + PartitionSpec spec = spec(file.specId()); + Preconditions.checkArgument( + spec != null, + "Cannot find partition spec %s for delete file: %s", + file.specId(), + file.location()); - if (deleteFiles.add(deleteFile)) { - PartitionSpec fileSpec = ops.current().spec(specId); - addedFilesSummary.addedFile(fileSpec, deleteFile); + DeleteFileSet deleteFiles = + newDeleteFilesBySpec.computeIfAbsent(spec.specId(), ignored -> DeleteFileSet.create()); + if (deleteFiles.add(file)) { + addedFilesSummary.addedFile(spec, file); hasNewDeleteFiles = true; } } @@ -970,8 +973,7 @@ private List newDataFilesAsManifests() { newDataFilesBySpec.forEach( (dataSpec, dataFiles) -> { List newDataManifests = - writeDataManifests( - Lists.newArrayList(dataFiles), newDataFilesDataSequenceNumber, dataSpec); + writeDataManifests(dataFiles, newDataFilesDataSequenceNumber, dataSpec); cachedNewDataManifests.addAll(newDataManifests); }); this.hasNewDataFiles = false; @@ -1002,8 +1004,7 @@ private List newDeleteFilesAsManifests() { newDeleteFilesBySpec.forEach( (specId, deleteFiles) -> { PartitionSpec spec = ops.current().spec(specId); - List newDeleteManifests = - writeDeleteManifests(Lists.newArrayList(deleteFiles), spec); + List newDeleteManifests = writeDeleteManifests(deleteFiles, spec); cachedNewDeleteManifests.addAll(newDeleteManifests); }); diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index 961e20f7bc20..33114baa641d 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -606,6 +606,8 @@ private List writeDeleteFileGroup( try (RollingManifestWriter closableWriter = writer) { for (DeleteFile file : files) { + Preconditions.checkArgument( + file instanceof PendingDeleteFile, "Invalid delete file: must be PendingDeleteFile"); if (file.dataSequenceNumber() != null) { closableWriter.add(file, file.dataSequenceNumber()); } else { @@ -778,6 +780,14 @@ protected static class PendingDeleteFile implements DeleteFile { this.dataSequenceNumber = null; } + private PendingDeleteFile wrap(DeleteFile file) { + if (null != dataSequenceNumber) { + return new PendingDeleteFile(file, dataSequenceNumber); + } + + return new PendingDeleteFile(file); + } + @Override public Long dataSequenceNumber() { return dataSequenceNumber; @@ -790,22 +800,22 @@ public Long fileSequenceNumber() { @Override public DeleteFile copy() { - return deleteFile.copy(); + return wrap(deleteFile.copy()); } @Override public DeleteFile copyWithoutStats() { - return deleteFile.copyWithoutStats(); + return wrap(deleteFile.copyWithoutStats()); } @Override public DeleteFile copyWithStats(Set requestedColumnIds) { - return deleteFile.copyWithStats(requestedColumnIds); + return wrap(deleteFile.copyWithStats(requestedColumnIds)); } @Override public DeleteFile copy(boolean withStats) { - return deleteFile.copy(withStats); + return wrap(deleteFile.copy(withStats)); } @Override