diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index 700793271967..0c8fa4028ab4 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -212,12 +212,22 @@ private List planFiles(StreamingOffset startOffset, StreamingOffse private boolean shouldProcess(Snapshot snapshot) { String op = snapshot.operation(); - Preconditions.checkState(!op.equals(DataOperations.DELETE) || skipDelete, - "Cannot process delete snapshot: %s", snapshot.snapshotId()); - Preconditions.checkState( - op.equals(DataOperations.DELETE) || op.equals(DataOperations.APPEND) || op.equals(DataOperations.REPLACE), - "Cannot process %s snapshot: %s", op.toLowerCase(Locale.ROOT), snapshot.snapshotId()); - return op.equals(DataOperations.APPEND); + switch (op) { + case DataOperations.APPEND: + return true; + case DataOperations.REPLACE: + return false; + case DataOperations.DELETE: + case DataOperations.OVERWRITE: + Preconditions.checkState(skipDelete, + "Cannot process %s snapshot when read option %s is %b : %s", op.toLowerCase(Locale.ROOT), + SparkReadOptions.STREAMING_SKIP_DELETE_SNAPSHOTS, skipDelete, snapshot.snapshotId()); + return false; + default: + String exceptionMsg = String.format("Encountered unhandled snapshot operation %s for snapshot: %s", op, + snapshot.snapshotId()); + throw new RuntimeException(exceptionMsg); + } } private static class InitialOffsetStore { diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java index 4e50cf2a3e21..e3b28319b53d 100644 --- a/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java +++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java @@ -288,6 +288,42 @@ public void testReadStreamWithSnapshotTypeOverwriteErrorsOut() throws Exception ); } + @Test + public void testReadStreamWithSnapshotTypeOverwriteAndSkipDeleteOption() throws Exception { + // upgrade table to version 2 - to facilitate creation of Snapshot of type OVERWRITE. + TableOperations ops = ((BaseTable) table).operations(); + TableMetadata meta = ops.current(); + ops.commit(meta, meta.upgradeToFormatVersion(2)); + + // fill table with some initial data + List> dataAcrossSnapshots = TEST_DATA_MULTIPLE_SNAPSHOTS; + appendDataAsMultipleSnapshots(dataAcrossSnapshots, tableIdentifier); + + Schema deleteRowSchema = table.schema().select("data"); + Record dataDelete = GenericRecord.create(deleteRowSchema); + List dataDeletes = Lists.newArrayList( + dataDelete.copy("data", "one") // id = 1 + ); + + DeleteFile eqDeletes = FileHelpers.writeDeleteFile( + table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), dataDeletes, deleteRowSchema); + + table.newRowDelta() + .addDeletes(eqDeletes) + .commit(); + + // check pre-condition - that the above Delete file write - actually resulted in snapshot of type OVERWRITE + Assert.assertEquals(DataOperations.OVERWRITE, table.currentSnapshot().operation()); + + Dataset df = spark.readStream() + .format("iceberg") + .option(SparkReadOptions.STREAMING_SKIP_DELETE_SNAPSHOTS, "true") + .load(tableIdentifier); + + Assertions.assertThat(processAvailable(df)) + .containsExactlyInAnyOrderElementsOf(Iterables.concat(dataAcrossSnapshots)); + } + @SuppressWarnings("unchecked") @Test public void testReadStreamWithSnapshotTypeReplaceIgnoresReplace() throws Exception {