diff --git a/api/src/main/java/org/apache/iceberg/RewriteFiles.java b/api/src/main/java/org/apache/iceberg/RewriteFiles.java index c392c7118d8a..46545b8c027e 100644 --- a/api/src/main/java/org/apache/iceberg/RewriteFiles.java +++ b/api/src/main/java/org/apache/iceberg/RewriteFiles.java @@ -32,15 +32,93 @@ * will be resolved by applying the changes to the new latest snapshot and reattempting the commit. * If any of the deleted files are no longer in the latest snapshot when reattempting, the commit * will throw a {@link ValidationException}. + * + *

Note that the new state of the table after each rewrite must be logically equivalent to the + * original table state. */ public interface RewriteFiles extends SnapshotUpdate { + /** + * Remove a data file from the current table state. + * + *

This rewrite operation may change the size or layout of the data files. When applicable, it + * is also recommended to discard already deleted records while rewriting data files. However, the + * set of live data records must never change. + * + * @param dataFile a rewritten data file + * @return this for method chaining + */ + default RewriteFiles deleteFile(DataFile dataFile) { + throw new UnsupportedOperationException( + this.getClass().getName() + " does not implement deleteFile"); + } + + /** + * Remove a delete file from the table state. + * + *

This rewrite operation may change the size or layout of the delete files. When applicable, + * it is also recommended to discard delete records for files that are no longer part of the table + * state. However, the set of applicable delete records must never change. + * + * @param deleteFile a rewritten delete file + * @return this for method chaining + */ + default RewriteFiles deleteFile(DeleteFile deleteFile) { + throw new UnsupportedOperationException( + this.getClass().getName() + " does not implement deleteFile"); + } + + /** + * Add a new data file. + * + *

This rewrite operation may change the size or layout of the data files. When applicable, it + * is also recommended to discard already deleted records while rewriting data files. However, the + * set of live data records must never change. + * + * @param dataFile a new data file + * @return this for method chaining + */ + default RewriteFiles addFile(DataFile dataFile) { + throw new UnsupportedOperationException( + this.getClass().getName() + " does not implement addFile"); + } + + /** + * Add a new delete file. + * + *

This rewrite operation may change the size or layout of the delete files. When applicable, + * it is also recommended to discard delete records for files that are no longer part of the table + * state. However, the set of applicable delete records must never change. + * + * @param deleteFile a new delete file + * @return this for method chaining + */ + default RewriteFiles addFile(DeleteFile deleteFile) { + throw new UnsupportedOperationException( + this.getClass().getName() + " does not implement addFile"); + } + + /** + * Configure the data sequence number for this rewrite operation. This data sequence number will + * be used for all new data files that are added in this rewrite. This method is helpful to avoid + * commit conflicts between data compaction and adding equality deletes. + * + * @param sequenceNumber a data sequence number + * @return this for method chaining + */ + default RewriteFiles dataSequenceNumber(long sequenceNumber) { + throw new UnsupportedOperationException( + this.getClass().getName() + " does not implement dataSequenceNumber"); + } + /** * Add a rewrite that replaces one set of data files with another set that contains the same data. * * @param filesToDelete files that will be replaced (deleted), cannot be null or empty. * @param filesToAdd files that will be added, cannot be null or empty. * @return this for method chaining + * @deprecated since 1.3.0, will be removed in 2.0.0 */ + @Deprecated default RewriteFiles rewriteFiles(Set filesToDelete, Set filesToAdd) { return rewriteFiles(filesToDelete, ImmutableSet.of(), filesToAdd, ImmutableSet.of()); } @@ -53,7 +131,9 @@ default RewriteFiles rewriteFiles(Set filesToDelete, Set fil * @param filesToAdd files that will be added, cannot be null or empty. * @param sequenceNumber sequence number to use for all data files added * @return this for method chaining + * @deprecated since 1.3.0, will be removed in 2.0.0 */ + @Deprecated RewriteFiles rewriteFiles( Set filesToDelete, Set filesToAdd, long sequenceNumber); @@ -65,7 +145,9 @@ RewriteFiles rewriteFiles( * @param dataFilesToAdd data files that will be added. * @param deleteFilesToAdd delete files that will be added. * @return this for method chaining. + * @deprecated since 1.3.0, will be removed in 2.0.0 */ + @Deprecated RewriteFiles rewriteFiles( Set dataFilesToReplace, Set deleteFilesToReplace, diff --git a/core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java b/core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java index 54996e831117..177e9d0b3fdf 100644 --- a/core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java +++ b/core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java @@ -44,33 +44,41 @@ protected String operation() { return DataOperations.REPLACE; } - private void verifyInputAndOutputFiles( - Set dataFilesToDelete, - Set deleteFilesToDelete, - Set dataFilesToAdd, - Set deleteFilesToAdd) { - Preconditions.checkNotNull(dataFilesToDelete, "Data files to delete can not be null"); - Preconditions.checkNotNull(deleteFilesToDelete, "Delete files to delete can not be null"); - Preconditions.checkNotNull(dataFilesToAdd, "Data files to add can not be null"); - Preconditions.checkNotNull(deleteFilesToAdd, "Delete files to add can not be null"); + @Override + public RewriteFiles deleteFile(DataFile dataFile) { + replacedDataFiles.add(dataFile); + delete(dataFile); + return self(); + } - int filesToDelete = 0; - filesToDelete += dataFilesToDelete.size(); - filesToDelete += deleteFilesToDelete.size(); + @Override + public RewriteFiles deleteFile(DeleteFile deleteFile) { + delete(deleteFile); + return self(); + } - Preconditions.checkArgument(filesToDelete > 0, "Files to delete cannot be null or empty"); + @Override + public RewriteFiles addFile(DataFile dataFile) { + add(dataFile); + return self(); + } - if (deleteFilesToDelete.isEmpty()) { - Preconditions.checkArgument( - deleteFilesToAdd.isEmpty(), - "Delete files to add must be empty because there's no delete file to be rewritten"); - } + @Override + public RewriteFiles addFile(DeleteFile deleteFile) { + add(deleteFile); + return self(); + } + + @Override + public RewriteFiles dataSequenceNumber(long sequenceNumber) { + setNewDataFilesDataSequenceNumber(sequenceNumber); + return self(); } @Override public RewriteFiles rewriteFiles( Set filesToDelete, Set filesToAdd, long sequenceNumber) { - setNewFilesSequenceNumber(sequenceNumber); + setNewDataFilesDataSequenceNumber(sequenceNumber); return rewriteFiles(filesToDelete, ImmutableSet.of(), filesToAdd, ImmutableSet.of()); } @@ -80,24 +88,26 @@ public RewriteFiles rewriteFiles( Set deleteFilesToReplace, Set dataFilesToAdd, Set deleteFilesToAdd) { - verifyInputAndOutputFiles( - dataFilesToReplace, deleteFilesToReplace, dataFilesToAdd, deleteFilesToAdd); - replacedDataFiles.addAll(dataFilesToReplace); + + Preconditions.checkNotNull(dataFilesToReplace, "Replaced data files can't be null"); + Preconditions.checkNotNull(deleteFilesToReplace, "Replaced delete files can't be null"); + Preconditions.checkNotNull(dataFilesToAdd, "Added data files can't be null"); + Preconditions.checkNotNull(deleteFilesToAdd, "Added delete files can't be null"); for (DataFile dataFile : dataFilesToReplace) { - delete(dataFile); + deleteFile(dataFile); } for (DeleteFile deleteFile : deleteFilesToReplace) { - delete(deleteFile); + deleteFile(deleteFile); } for (DataFile dataFile : dataFilesToAdd) { - add(dataFile); + addFile(dataFile); } for (DeleteFile deleteFile : deleteFilesToAdd) { - add(deleteFile); + addFile(deleteFile); } return this; @@ -117,10 +127,24 @@ public BaseRewriteFiles toBranch(String branch) { @Override protected void validate(TableMetadata base, Snapshot parent) { + validateReplacedAndAddedFiles(); if (replacedDataFiles.size() > 0) { // if there are replaced data files, there cannot be any new row-level deletes for those data // files validateNoNewDeletesForDataFiles(base, startingSnapshotId, replacedDataFiles, parent); } } + + private void validateReplacedAndAddedFiles() { + Preconditions.checkArgument( + deletesDataFiles() || deletesDeleteFiles(), "Files to delete cannot be empty"); + + Preconditions.checkArgument( + deletesDataFiles() || !addsDataFiles(), + "Data files to add must be empty because there's no data file to be rewritten"); + + Preconditions.checkArgument( + deletesDeleteFiles() || !addsDeleteFiles(), + "Delete files to add must be empty because there's no delete file to be rewritten"); + } } diff --git a/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java b/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java index 36fa222ae034..9a50c0cecb13 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java +++ b/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java @@ -165,6 +165,12 @@ void delete(CharSequence path) { deletePaths.add(path); } + boolean containsDeletes() { + return deletePaths.size() > 0 + || deleteExpression != Expressions.alwaysFalse() + || dropPartitions.size() > 0; + } + /** * Filter deleted files out of a list of manifests. * diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index 490ec2dedc59..00e182781887 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -84,7 +84,7 @@ abstract class MergingSnapshotProducer extends SnapshotProducer { // update data private final List newFiles = Lists.newArrayList(); - private Long newFilesSequenceNumber; + private Long newDataFilesDataSequenceNumber; private final Map> newDeleteFilesBySpec = Maps.newHashMap(); private final List appendManifests = Lists.newArrayList(); private final List rewrittenAppendManifests = Lists.newArrayList(); @@ -209,6 +209,22 @@ protected void delete(CharSequence path) { filterManager.delete(path); } + protected boolean deletesDataFiles() { + return filterManager.containsDeletes(); + } + + protected boolean deletesDeleteFiles() { + return deleteFilterManager.containsDeletes(); + } + + protected boolean addsDataFiles() { + return newFiles.size() > 0; + } + + protected boolean addsDeleteFiles() { + return newDeleteFilesBySpec.size() > 0; + } + /** Add a data file to the new snapshot. */ protected void add(DataFile file) { Preconditions.checkNotNull(file, "Invalid data file: null"); @@ -392,7 +408,7 @@ private CloseableIterable> addedDataFiles( protected void validateNoNewDeletesForDataFiles( TableMetadata base, Long startingSnapshotId, Iterable dataFiles, Snapshot parent) { validateNoNewDeletesForDataFiles( - base, startingSnapshotId, null, dataFiles, newFilesSequenceNumber != null, parent); + base, startingSnapshotId, null, dataFiles, newDataFilesDataSequenceNumber != null, parent); } /** @@ -651,8 +667,20 @@ private CloseableIterable> deletedDataFiles( return manifestGroup.entries(); } + /** + * Sets a data sequence number for new data files. + * + * @param sequenceNumber a data sequence number + * @deprecated since 1.3.0, will be removed in 1.4.0; use {@link + * #setNewDataFilesDataSequenceNumber(long)}; + */ + @Deprecated protected void setNewFilesSequenceNumber(long sequenceNumber) { - this.newFilesSequenceNumber = sequenceNumber; + setNewDataFilesDataSequenceNumber(sequenceNumber); + } + + protected void setNewDataFilesDataSequenceNumber(long sequenceNumber) { + this.newDataFilesDataSequenceNumber = sequenceNumber; } private long startingSequenceNumber(TableMetadata metadata, Long staringSnapshotId) { @@ -926,10 +954,10 @@ private ManifestFile newFilesAsManifest() { try { ManifestWriter writer = newManifestWriter(dataSpec()); try { - if (newFilesSequenceNumber == null) { + if (newDataFilesDataSequenceNumber == null) { writer.addAll(newFiles); } else { - newFiles.forEach(f -> writer.add(f, newFilesSequenceNumber)); + newFiles.forEach(f -> writer.add(f, newDataFilesDataSequenceNumber)); } } finally { writer.close(); diff --git a/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java b/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java index dae057d36b46..5f229be579b7 100644 --- a/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java +++ b/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java @@ -44,7 +44,6 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Multimaps; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.relocated.com.google.common.collect.Streams; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.StructLikeWrapper; @@ -325,12 +324,18 @@ void doReplace( Iterable addedDataFiles, long startingSnapshotId) { RewriteFiles rewriteFiles = table.newRewrite().validateFromSnapshot(startingSnapshotId); + + for (DataFile dataFile : deletedDataFiles) { + rewriteFiles.deleteFile(dataFile); + } + + for (DataFile dataFile : addedDataFiles) { + rewriteFiles.addFile(dataFile); + } + if (useStartingSequenceNumber) { long sequenceNumber = table.snapshot(startingSnapshotId).sequenceNumber(); - rewriteFiles.rewriteFiles( - Sets.newHashSet(deletedDataFiles), Sets.newHashSet(addedDataFiles), sequenceNumber); - } else { - rewriteFiles.rewriteFiles(Sets.newHashSet(deletedDataFiles), Sets.newHashSet(addedDataFiles)); + rewriteFiles.dataSequenceNumber(sequenceNumber); } commit(rewriteFiles); diff --git a/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesCommitManager.java b/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesCommitManager.java index c60764430ba7..32b6e0101ad5 100644 --- a/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesCommitManager.java +++ b/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesCommitManager.java @@ -21,12 +21,11 @@ import java.util.Set; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.RewriteFiles; import org.apache.iceberg.Table; import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,18 +52,19 @@ public RewritePositionDeletesCommitManager(Table table) { * @param fileGroups file groups to commit */ public void commit(Set fileGroups) { - Set rewrittenDeleteFiles = Sets.newHashSet(); - Set addedDeleteFiles = Sets.newHashSet(); + RewriteFiles rewriteFiles = table.newRewrite().validateFromSnapshot(startingSnapshotId); + for (RewritePositionDeletesGroup group : fileGroups) { - rewrittenDeleteFiles.addAll(group.rewrittenDeleteFiles()); - addedDeleteFiles.addAll(group.addedDeleteFiles()); + for (DeleteFile file : group.rewrittenDeleteFiles()) { + rewriteFiles.deleteFile(file); + } + + for (DeleteFile file : group.addedDeleteFiles()) { + rewriteFiles.addFile(file); + } } - table - .newRewrite() - .validateFromSnapshot(startingSnapshotId) - .rewriteFiles(ImmutableSet.of(), rewrittenDeleteFiles, ImmutableSet.of(), addedDeleteFiles) - .commit(); + rewriteFiles.commit(); } /** diff --git a/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java b/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java index d1259eda2f05..256c0c94cced 100644 --- a/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java +++ b/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java @@ -86,7 +86,7 @@ public void testEmptyTable() { .rewriteFiles( ImmutableSet.of(), ImmutableSet.of(FILE_A_DELETES), - ImmutableSet.of(FILE_A), + ImmutableSet.of(), ImmutableSet.of(FILE_B_DELETES)), branch)); } @@ -142,7 +142,7 @@ public void testDeleteOnly() { AssertHelpers.assertThrows( "Expected an exception", IllegalArgumentException.class, - "Files to delete cannot be null or empty", + "Files to delete cannot be empty", () -> apply( table.newRewrite().rewriteFiles(Collections.emptySet(), Sets.newSet(FILE_A)), @@ -151,7 +151,7 @@ public void testDeleteOnly() { AssertHelpers.assertThrows( "Expected an exception", IllegalArgumentException.class, - "Files to delete cannot be null or empty", + "Files to delete cannot be empty", () -> apply( table @@ -166,7 +166,7 @@ public void testDeleteOnly() { AssertHelpers.assertThrows( "Expected an exception", IllegalArgumentException.class, - "Files to delete cannot be null or empty", + "Files to delete cannot be empty", () -> apply( table