diff --git a/api/src/main/java/org/apache/iceberg/RewriteFiles.java b/api/src/main/java/org/apache/iceberg/RewriteFiles.java index f09008a15d8f..62ba58e828d1 100644 --- a/api/src/main/java/org/apache/iceberg/RewriteFiles.java +++ b/api/src/main/java/org/apache/iceberg/RewriteFiles.java @@ -54,12 +54,23 @@ default RewriteFiles rewriteFiles(Set filesToDelete, Set fil /** * Add a rewrite that replaces one set of files with another set that contains the same data. * - * @param dataFilesToDelete data files that will be replaced (deleted). - * @param deleteFilesToDelete delete files that will be replaced (deleted). + * @param dataFilesToReplace data files that will be replaced (deleted). + * @param deleteFilesToReplace delete files that will be replaced (deleted). * @param dataFilesToAdd data files that will be added. * @param deleteFilesToAdd delete files that will be added. * @return this for method chaining. */ - RewriteFiles rewriteFiles(Set dataFilesToDelete, Set deleteFilesToDelete, + RewriteFiles rewriteFiles(Set dataFilesToReplace, Set deleteFilesToReplace, Set dataFilesToAdd, Set deleteFilesToAdd); + + /** + * Set the snapshot ID used in any reads for this operation. + *

+ * Validations will check changes after this snapshot ID. If this is not called, all ancestor snapshots through the + * table's initial snapshot are validated. + * + * @param snapshotId a snapshot ID + * @return this for method chaining + */ + RewriteFiles validateFromSnapshot(long snapshotId); } diff --git a/core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java b/core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java index c7732ef8a5a9..f94bcd157d0c 100644 --- a/core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java +++ b/core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java @@ -21,8 +21,12 @@ import java.util.Set; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; class BaseRewriteFiles extends MergingSnapshotProducer implements RewriteFiles { + private final Set replacedDataFiles = Sets.newHashSet(); + private Long startingSnapshotId = null; + BaseRewriteFiles(String tableName, TableOperations ops) { super(tableName, ops); @@ -63,15 +67,16 @@ private void verifyInputAndOutputFiles(Set dataFilesToDelete, Set dataFilesToDelete, Set deleteFilesToDelete, + public RewriteFiles rewriteFiles(Set dataFilesToReplace, Set deleteFilesToReplace, Set dataFilesToAdd, Set deleteFilesToAdd) { - verifyInputAndOutputFiles(dataFilesToDelete, deleteFilesToDelete, dataFilesToAdd, deleteFilesToAdd); + verifyInputAndOutputFiles(dataFilesToReplace, deleteFilesToReplace, dataFilesToAdd, deleteFilesToAdd); + replacedDataFiles.addAll(dataFilesToReplace); - for (DataFile dataFile : dataFilesToDelete) { + for (DataFile dataFile : dataFilesToReplace) { delete(dataFile); } - for (DeleteFile deleteFile : deleteFilesToDelete) { + for (DeleteFile deleteFile : deleteFilesToReplace) { delete(deleteFile); } @@ -85,4 +90,18 @@ public RewriteFiles rewriteFiles(Set dataFilesToDelete, Set 0) { + // if there are replaced data files, there cannot be any new row-level deletes for those data files + validateNoNewDeletesForDataFiles(base, startingSnapshotId, replacedDataFiles); + } + } } diff --git a/core/src/main/java/org/apache/iceberg/BaseRowDelta.java b/core/src/main/java/org/apache/iceberg/BaseRowDelta.java index 485609f8a46c..8a1371311b30 100644 --- a/core/src/main/java/org/apache/iceberg/BaseRowDelta.java +++ b/core/src/main/java/org/apache/iceberg/BaseRowDelta.java @@ -94,6 +94,7 @@ protected void validate(TableMetadata base) { validateDataFilesExist(base, startingSnapshotId, referencedDataFiles, !validateDeletes); } + // TODO: does this need to check new delete files? if (conflictDetectionFilter != null) { validateAddedDataFiles(base, startingSnapshotId, conflictDetectionFilter, caseSensitive); } diff --git a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java index 7605e1c3548b..1c3ce122bedd 100644 --- a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java +++ b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java @@ -312,6 +312,7 @@ static Builder builderFor(FileIO io, Iterable deleteManifests) { static class Builder { private final FileIO io; private final Set deleteManifests; + private long minSequenceNumber = 0L; private Map specsById = null; private Expression dataFilter = Expressions.alwaysTrue(); private Expression partitionFilter = Expressions.alwaysTrue(); @@ -323,6 +324,11 @@ static class Builder { this.deleteManifests = Sets.newHashSet(deleteManifests); } + Builder afterSequenceNumber(long seq) { + this.minSequenceNumber = seq; + return this; + } + Builder specsById(Map newSpecsById) { this.specsById = newSpecsById; return this; @@ -357,8 +363,10 @@ DeleteFileIndex build() { .run(deleteFile -> { try (CloseableIterable> reader = deleteFile) { for (ManifestEntry entry : reader) { - // copy with stats for better filtering against data file stats - deleteEntries.add(entry.copy()); + if (entry.sequenceNumber() > minSequenceNumber) { + // copy with stats for better filtering against data file stats + deleteEntries.add(entry.copy()); + } } } catch (IOException e) { throw new RuntimeIOException(e, "Failed to close"); diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index f5c61f550afd..f26412b27bbd 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -41,6 +41,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.util.CharSequenceSet; +import org.apache.iceberg.util.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,6 +63,9 @@ abstract class MergingSnapshotProducer extends SnapshotProducer { ImmutableSet.of(DataOperations.OVERWRITE, DataOperations.REPLACE, DataOperations.DELETE); private static final Set VALIDATE_DATA_FILES_EXIST_SKIP_DELETE_OPERATIONS = ImmutableSet.of(DataOperations.OVERWRITE, DataOperations.REPLACE); + // delete files can be added in "overwrite" or "delete" operations + private static final Set VALIDATE_REPLACED_DATA_FILES_OPERATIONS = + ImmutableSet.of(DataOperations.OVERWRITE, DataOperations.DELETE); private final String tableName; private final TableOperations ops; @@ -253,28 +257,10 @@ protected void validateAddedDataFiles(TableMetadata base, Long startingSnapshotI return; } - List manifests = Lists.newArrayList(); - Set newSnapshots = Sets.newHashSet(); - - Long currentSnapshotId = base.currentSnapshot().snapshotId(); - while (currentSnapshotId != null && !currentSnapshotId.equals(startingSnapshotId)) { - Snapshot currentSnapshot = ops.current().snapshot(currentSnapshotId); - - ValidationException.check(currentSnapshot != null, - "Cannot determine history between starting snapshot %s and current %s", - startingSnapshotId, currentSnapshotId); - - if (VALIDATE_ADDED_FILES_OPERATIONS.contains(currentSnapshot.operation())) { - newSnapshots.add(currentSnapshotId); - for (ManifestFile manifest : currentSnapshot.dataManifests()) { - if (manifest.snapshotId() == (long) currentSnapshotId) { - manifests.add(manifest); - } - } - } - - currentSnapshotId = currentSnapshot.parentId(); - } + Pair, Set> history = + validationHistory(base, startingSnapshotId, VALIDATE_ADDED_FILES_OPERATIONS, ManifestContent.DATA); + List manifests = history.first(); + Set newSnapshots = history.second(); ManifestGroup conflictGroup = new ManifestGroup(ops.io(), manifests, ImmutableList.of()) .caseSensitive(caseSensitive) @@ -297,6 +283,39 @@ protected void validateAddedDataFiles(TableMetadata base, Long startingSnapshotI } } + /** + * Validates that no new delete files that must be applied to the given data files have been added to the table since + * a starting snapshot. + * + * @param base table metadata to validate + * @param startingSnapshotId id of the snapshot current at the start of the operation + * @param dataFiles data files to validate have no new row deletes + */ + protected void validateNoNewDeletesForDataFiles(TableMetadata base, Long startingSnapshotId, + Iterable dataFiles) { + // if there is no current table state, no files have been added + if (base.currentSnapshot() == null) { + return; + } + + Pair, Set> history = + validationHistory(base, startingSnapshotId, VALIDATE_REPLACED_DATA_FILES_OPERATIONS, ManifestContent.DELETES); + List deleteManifests = history.first(); + + long startingSequenceNumber = startingSnapshotId == null ? 0 : base.snapshot(startingSnapshotId).sequenceNumber(); + DeleteFileIndex deletes = DeleteFileIndex.builderFor(ops.io(), deleteManifests) + .afterSequenceNumber(startingSequenceNumber) + .specsById(ops.current().specsById()) + .build(); + + for (DataFile dataFile : dataFiles) { + // if any delete is found that applies to files written in or before the starting snapshot, fail + if (deletes.forDataFile(startingSequenceNumber, dataFile).length > 0) { + throw new ValidationException("Cannot commit, found new delete for replaced data file: %s", dataFile); + } + } + } + @SuppressWarnings("CollectionUndefinedEquality") protected void validateDataFilesExist(TableMetadata base, Long startingSnapshotId, CharSequenceSet requiredDataFiles, boolean skipDeletes) { @@ -309,6 +328,31 @@ protected void validateDataFilesExist(TableMetadata base, Long startingSnapshotI VALIDATE_DATA_FILES_EXIST_SKIP_DELETE_OPERATIONS : VALIDATE_DATA_FILES_EXIST_OPERATIONS; + Pair, Set> history = + validationHistory(base, startingSnapshotId, matchingOperations, ManifestContent.DATA); + List manifests = history.first(); + Set newSnapshots = history.second(); + + ManifestGroup matchingDeletesGroup = new ManifestGroup(ops.io(), manifests, ImmutableList.of()) + .filterManifestEntries(entry -> entry.status() != ManifestEntry.Status.ADDED && + newSnapshots.contains(entry.snapshotId()) && requiredDataFiles.contains(entry.file().path())) + .specsById(base.specsById()) + .ignoreExisting(); + + try (CloseableIterator> deletes = matchingDeletesGroup.entries().iterator()) { + if (deletes.hasNext()) { + throw new ValidationException("Cannot commit, missing data files: %s", + Iterators.toString(Iterators.transform(deletes, entry -> entry.file().path().toString()))); + } + + } catch (IOException e) { + throw new UncheckedIOException("Failed to validate required files exist", e); + } + } + + private Pair, Set> validationHistory(TableMetadata base, Long startingSnapshotId, + Set matchingOperations, + ManifestContent content) { List manifests = Lists.newArrayList(); Set newSnapshots = Sets.newHashSet(); @@ -322,9 +366,17 @@ protected void validateDataFilesExist(TableMetadata base, Long startingSnapshotI if (matchingOperations.contains(currentSnapshot.operation())) { newSnapshots.add(currentSnapshotId); - for (ManifestFile manifest : currentSnapshot.dataManifests()) { - if (manifest.snapshotId() == (long) currentSnapshotId) { - manifests.add(manifest); + if (content == ManifestContent.DATA) { + for (ManifestFile manifest : currentSnapshot.dataManifests()) { + if (manifest.snapshotId() == (long) currentSnapshotId) { + manifests.add(manifest); + } + } + } else { + for (ManifestFile manifest : currentSnapshot.deleteManifests()) { + if (manifest.snapshotId() == (long) currentSnapshotId) { + manifests.add(manifest); + } } } } @@ -332,21 +384,7 @@ protected void validateDataFilesExist(TableMetadata base, Long startingSnapshotI currentSnapshotId = currentSnapshot.parentId(); } - ManifestGroup matchingDeletesGroup = new ManifestGroup(ops.io(), manifests, ImmutableList.of()) - .filterManifestEntries(entry -> entry.status() != ManifestEntry.Status.ADDED && - newSnapshots.contains(entry.snapshotId()) && requiredDataFiles.contains(entry.file().path())) - .specsById(base.specsById()) - .ignoreExisting(); - - try (CloseableIterator> deletes = matchingDeletesGroup.entries().iterator()) { - if (deletes.hasNext()) { - throw new ValidationException("Cannot commit, missing data files: %s", - Iterators.toString(Iterators.transform(deletes, entry -> entry.file().path().toString()))); - } - - } catch (IOException e) { - throw new UncheckedIOException("Failed to validate required files exist", e); - } + return Pair.of(manifests, newSnapshots); } @Override diff --git a/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java b/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java index 44efadd6d899..5c0cf7a6b1bf 100644 --- a/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java +++ b/core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java @@ -197,8 +197,14 @@ public BaseRewriteDataFilesAction filter(Expression expr) { @Override public RewriteDataFilesActionResult execute() { CloseableIterable fileScanTasks = null; + if (table.currentSnapshot() == null) { + return RewriteDataFilesActionResult.empty(); + } + + long startingSnapshotId = table.currentSnapshot().snapshotId(); try { fileScanTasks = table.newScan() + .useSnapshot(startingSnapshotId) .caseSensitive(caseSensitive) .ignoreResiduals() .filter(filter) @@ -241,7 +247,7 @@ public RewriteDataFilesActionResult execute() { List currentDataFiles = combinedScanTasks.stream() .flatMap(tasks -> tasks.files().stream().map(FileScanTask::file)) .collect(Collectors.toList()); - replaceDataFiles(currentDataFiles, addedDataFiles); + replaceDataFiles(currentDataFiles, addedDataFiles, startingSnapshotId); return new RewriteDataFilesActionResult(currentDataFiles, addedDataFiles); } @@ -262,10 +268,12 @@ private Map> groupTasksByPartition( return tasksGroupedByPartition.asMap(); } - private void replaceDataFiles(Iterable deletedDataFiles, Iterable addedDataFiles) { + private void replaceDataFiles(Iterable deletedDataFiles, Iterable addedDataFiles, + long startingSnapshotId) { try { - RewriteFiles rewriteFiles = table.newRewrite(); - rewriteFiles.rewriteFiles(Sets.newHashSet(deletedDataFiles), Sets.newHashSet(addedDataFiles)); + RewriteFiles rewriteFiles = table.newRewrite() + .validateFromSnapshot(startingSnapshotId) + .rewriteFiles(Sets.newHashSet(deletedDataFiles), Sets.newHashSet(addedDataFiles)); commit(rewriteFiles); } catch (Exception e) { Tasks.foreach(Iterables.transform(addedDataFiles, f -> f.path().toString())) diff --git a/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java b/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java index 28c3748ac2e5..05bfddc64c5e 100644 --- a/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java +++ b/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java @@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.iceberg.DataFile; +import org.apache.iceberg.RewriteFiles; import org.apache.iceberg.Table; import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -46,9 +47,16 @@ public class RewriteDataFilesCommitManager { private static final Logger LOG = LoggerFactory.getLogger(RewriteDataFilesCommitManager.class); private final Table table; + private final long startingSnapshotId; + // constructor used for testing public RewriteDataFilesCommitManager(Table table) { + this(table, table.currentSnapshot().snapshotId()); + } + + public RewriteDataFilesCommitManager(Table table, long startingSnapshotId) { this.table = table; + this.startingSnapshotId = startingSnapshotId; } /** @@ -64,9 +72,10 @@ public void commitFileGroups(Set fileGroups) { addedDataFiles = Sets.union(addedDataFiles, group.addedFiles()); } - table.newRewrite() - .rewriteFiles(rewrittenDataFiles, addedDataFiles) - .commit(); + RewriteFiles rewrite = table.newRewrite() + .validateFromSnapshot(startingSnapshotId) + .rewriteFiles(rewrittenDataFiles, addedDataFiles); + rewrite.commit(); } /** diff --git a/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java b/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java index 2bdc9f2c4940..19b97fad91e3 100644 --- a/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java +++ b/core/src/test/java/org/apache/iceberg/TestRewriteFiles.java @@ -238,6 +238,7 @@ public void testRewriteDataAndDeleteFiles() { // Rewrite the files. Snapshot pending = table.newRewrite() + .validateFromSnapshot(table.currentSnapshot().snapshotId()) .rewriteFiles(ImmutableSet.of(FILE_A), ImmutableSet.of(FILE_A_DELETES), ImmutableSet.of(FILE_D), ImmutableSet.of()) .apply(); @@ -314,6 +315,7 @@ public void testFailureWhenRewriteBothDataAndDeleteFiles() { table.ops().failCommits(5); RewriteFiles rewrite = table.newRewrite() + .validateFromSnapshot(table.currentSnapshot().snapshotId()) .rewriteFiles(ImmutableSet.of(FILE_A), ImmutableSet.of(FILE_A_DELETES, FILE_B_DELETES), ImmutableSet.of(FILE_D), ImmutableSet.of()); Snapshot pending = rewrite.apply(); @@ -399,6 +401,7 @@ public void testRecoverWhenRewriteBothDataAndDeleteFiles() { table.ops().failCommits(3); RewriteFiles rewrite = table.newRewrite() + .validateFromSnapshot(table.currentSnapshot().snapshotId()) .rewriteFiles(ImmutableSet.of(FILE_A), ImmutableSet.of(FILE_A_DELETES, FILE_B_DELETES), ImmutableSet.of(FILE_D), ImmutableSet.of()); Snapshot pending = rewrite.apply(); @@ -505,10 +508,12 @@ public void testRemoveAllDeletes() { .commit(); // Apply and commit the rewrite transaction. - RewriteFiles rewrite = table.newRewrite().rewriteFiles( - ImmutableSet.of(FILE_A), ImmutableSet.of(FILE_A_DELETES), - ImmutableSet.of(), ImmutableSet.of() - ); + RewriteFiles rewrite = table.newRewrite() + .validateFromSnapshot(table.currentSnapshot().snapshotId()) + .rewriteFiles( + ImmutableSet.of(FILE_A), ImmutableSet.of(FILE_A_DELETES), + ImmutableSet.of(), ImmutableSet.of() + ); Snapshot pending = rewrite.apply(); Assert.assertEquals("Should produce 2 manifests", 2, pending.allManifests().size()); @@ -606,4 +611,34 @@ public void testAlreadyDeletedFile() { Assert.assertEquals("Only 3 manifests should exist", 3, listManifestFiles().size()); } + + @Test + public void testNewDeleteFile() { + Assume.assumeTrue("Delete files are only supported in v2", formatVersion > 1); + + table.newAppend() + .appendFile(FILE_A) + .commit(); + + long snapshotBeforeDeletes = table.currentSnapshot().snapshotId(); + + table.newRowDelta() + .addDeletes(FILE_A_DELETES) + .commit(); + + long snapshotAfterDeletes = table.currentSnapshot().snapshotId(); + + AssertHelpers.assertThrows("Should fail because deletes were added after the starting snapshot", + ValidationException.class, "Cannot commit, found new delete for replaced data file", + () -> table.newRewrite() + .validateFromSnapshot(snapshotBeforeDeletes) + .rewriteFiles(Sets.newSet(FILE_A), Sets.newSet(FILE_A2)) + .apply()); + + // the rewrite should be valid when validating from the snapshot after the deletes + table.newRewrite() + .validateFromSnapshot(snapshotAfterDeletes) + .rewriteFiles(Sets.newSet(FILE_A), Sets.newSet(FILE_A2)) + .apply(); + } } diff --git a/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java b/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java index a86c8de1e730..c49ac28274a5 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java +++ b/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java @@ -114,10 +114,16 @@ public RewriteDataFiles filter(Expression expression) { @Override public RewriteDataFiles.Result execute() { + if (table.currentSnapshot() == null) { + return new BaseRewriteDataFilesResult(ImmutableList.of()); + } + + long startingSnapshotId = table.currentSnapshot().snapshotId(); + validateAndInitOptions(); strategy = strategy.options(options()); - Map>> fileGroupsByPartition = planFileGroups(); + Map>> fileGroupsByPartition = planFileGroups(startingSnapshotId); RewriteExecutionContext ctx = new RewriteExecutionContext(fileGroupsByPartition); Stream groupStream = toGroupStream(ctx, fileGroupsByPartition); @@ -126,15 +132,17 @@ public RewriteDataFiles.Result execute() { return new BaseRewriteDataFilesResult(Collections.emptyList()); } + RewriteDataFilesCommitManager commitManager = commitManager(startingSnapshotId); if (partialProgressEnabled) { - return doExecuteWithPartialProgress(ctx, groupStream); + return doExecuteWithPartialProgress(ctx, groupStream, commitManager); } else { - return doExecute(ctx, groupStream); + return doExecute(ctx, groupStream, commitManager); } } - private Map>> planFileGroups() { + private Map>> planFileGroups(long startingSnapshotId) { CloseableIterable fileScanTasks = table.newScan() + .useSnapshot(startingSnapshotId) .filter(filter) .ignoreResiduals() .planFiles(); @@ -186,13 +194,13 @@ private ExecutorService rewriteService() { } @VisibleForTesting - RewriteDataFilesCommitManager commitManager() { - return new RewriteDataFilesCommitManager(table); + RewriteDataFilesCommitManager commitManager(long startingSnapshotId) { + return new RewriteDataFilesCommitManager(table, startingSnapshotId); } - private Result doExecute(RewriteExecutionContext ctx, Stream groupStream) { + private Result doExecute(RewriteExecutionContext ctx, Stream groupStream, + RewriteDataFilesCommitManager commitManager) { ExecutorService rewriteService = rewriteService(); - RewriteDataFilesCommitManager commitManager = commitManager(); ConcurrentLinkedQueue rewrittenGroups = Queues.newConcurrentLinkedQueue(); @@ -244,12 +252,13 @@ private Result doExecute(RewriteExecutionContext ctx, Stream g return new BaseRewriteDataFilesResult(rewriteResults); } - private Result doExecuteWithPartialProgress(RewriteExecutionContext ctx, Stream groupStream) { + private Result doExecuteWithPartialProgress(RewriteExecutionContext ctx, Stream groupStream, + RewriteDataFilesCommitManager commitManager) { ExecutorService rewriteService = rewriteService(); // Start Commit Service int groupsPerCommit = IntMath.divide(ctx.totalGroupCount(), maxCommits, RoundingMode.CEILING); - RewriteDataFilesCommitManager.CommitService commitService = commitManager().service(groupsPerCommit); + RewriteDataFilesCommitManager.CommitService commitService = commitManager.service(groupsPerCommit); commitService.start(); // Start rewrite tasks diff --git a/spark/src/test/java/org/apache/iceberg/spark/actions/TestNewRewriteDataFilesAction.java b/spark/src/test/java/org/apache/iceberg/spark/actions/TestNewRewriteDataFilesAction.java index 0631d037e243..85000659c682 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/actions/TestNewRewriteDataFilesAction.java +++ b/spark/src/test/java/org/apache/iceberg/spark/actions/TestNewRewriteDataFilesAction.java @@ -395,7 +395,7 @@ public void testSingleCommitWithCommitFailure() { doReturn(util) .when(spyRewrite) - .commitManager(); + .commitManager(table.currentSnapshot().snapshotId()); AssertHelpers.assertThrows("Should fail entire rewrite if commit fails", RuntimeException.class, () -> spyRewrite.execute()); @@ -548,7 +548,7 @@ public void testParallelPartialProgressWithCommitFailure() { doReturn(util) .when(spyRewrite) - .commitManager(); + .commitManager(table.currentSnapshot().snapshotId()); RewriteDataFiles.Result result = spyRewrite.execute(); @@ -608,7 +608,7 @@ public void testCommitStateUnknownException() { doReturn(util) .when(spyAction) - .commitManager(); + .commitManager(table.currentSnapshot().snapshotId()); AssertHelpers.assertThrows("Should propagate CommitStateUnknown Exception", CommitStateUnknownException.class, () -> spyAction.execute());