diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java index a4748df75af9..c7c01758c3ee 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java @@ -116,6 +116,13 @@ public boolean streamingSkipDeleteSnapshots() { .parse(); } + public boolean streamingSkipOverwriteSnapshots() { + return confParser.booleanConf() + .option(SparkReadOptions.STREAMING_SKIP_OVERWRITE_SNAPSHOTS) + .defaultValue(SparkReadOptions.STREAMING_SKIP_OVERWRITE_SNAPSHOTS_DEFAULT) + .parse(); + } + public boolean parquetVectorizationEnabled() { return confParser.booleanConf() .option(SparkReadOptions.VECTORIZATION_ENABLED) diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java index 2d1c8cd178ff..edcc2300344a 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java @@ -61,6 +61,10 @@ private SparkReadOptions() { public static final String STREAMING_SKIP_DELETE_SNAPSHOTS = "streaming-skip-delete-snapshots"; public static final boolean STREAMING_SKIP_DELETE_SNAPSHOTS_DEFAULT = false; + // skip snapshots of type overwrite while reading stream out of iceberg table + public static final String STREAMING_SKIP_OVERWRITE_SNAPSHOTS = "streaming-skip-overwrite-snapshots"; + public static final boolean STREAMING_SKIP_OVERWRITE_SNAPSHOTS_DEFAULT = false; + // Controls whether to allow reading timestamps without zone info public static final String HANDLE_TIMESTAMP_WITHOUT_TIMEZONE = "handle-timestamp-without-timezone"; diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index 6311c48f1c78..dd68022c6b8d 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -77,6 +77,7 @@ public class SparkMicroBatchStream implements MicroBatchStream { private final boolean localityPreferred; private final StreamingOffset initialOffset; private final boolean skipDelete; + private final boolean skipOverwrite; private final Long fromTimestamp; SparkMicroBatchStream(JavaSparkContext sparkContext, Table table, SparkReadConf readConf, boolean caseSensitive, @@ -95,6 +96,7 @@ public class SparkMicroBatchStream implements MicroBatchStream { this.initialOffset = initialOffsetStore.initialOffset(); this.skipDelete = readConf.streamingSkipDeleteSnapshots(); + this.skipOverwrite = readConf.streamingSkipOverwriteSnapshots(); } @Override @@ -200,13 +202,26 @@ private List planFiles(StreamingOffset startOffset, StreamingOffse private boolean shouldProcess(Snapshot snapshot) { String op = snapshot.operation(); - Preconditions.checkState(!op.equals(DataOperations.DELETE) || skipDelete, - "Cannot process delete snapshot: %s, to ignore deletes, set %s=true.", - snapshot.snapshotId(), SparkReadOptions.STREAMING_SKIP_DELETE_SNAPSHOTS); - Preconditions.checkState( - op.equals(DataOperations.DELETE) || op.equals(DataOperations.APPEND) || op.equals(DataOperations.REPLACE), - "Cannot process %s snapshot: %s", op.toLowerCase(Locale.ROOT), snapshot.snapshotId()); - return op.equals(DataOperations.APPEND); + switch (op) { + case DataOperations.APPEND: + return true; + case DataOperations.REPLACE: + return false; + case DataOperations.DELETE: + Preconditions.checkState(skipDelete, + "Cannot process delete snapshot: %s, to ignore deletes, set %s=true", + snapshot.snapshotId(), SparkReadOptions.STREAMING_SKIP_DELETE_SNAPSHOTS); + return false; + case DataOperations.OVERWRITE: + Preconditions.checkState(skipOverwrite, + "Cannot process overwrite snapshot: %s, to ignore overwrites, set %s=true", + snapshot.snapshotId(), SparkReadOptions.STREAMING_SKIP_OVERWRITE_SNAPSHOTS); + return false; + default: + throw new IllegalStateException(String.format( + "Cannot process unknown snapshot operation: %s (snapshot id %s)", + op.toLowerCase(Locale.ROOT), snapshot.snapshotId())); + } } private static StreamingOffset determineStartingOffset(Table table, Long fromTimestamp) { diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java index 145cf78fba86..fb0b9031a872 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java @@ -481,6 +481,31 @@ public void testReadStreamWithSnapshotTypeDeleteAndSkipDeleteOption() throws Exc .containsExactlyInAnyOrderElementsOf(Iterables.concat(dataAcrossSnapshots)); } + @Test + public void testReadStreamWithSnapshotTypeDeleteAndSkipOverwriteOption() throws Exception { + table.updateSpec() + .removeField("id_bucket") + .addField(ref("id")) + .commit(); + + // fill table with some data + List> dataAcrossSnapshots = TEST_DATA_MULTIPLE_SNAPSHOTS; + appendDataAsMultipleSnapshots(dataAcrossSnapshots); + + // this should create a snapshot with type overwrite. + table.newOverwrite() + .overwriteByRowFilter(Expressions.greaterThan("id", 4)) + .commit(); + + // check pre-condition - that the above delete operation on table resulted in Snapshot of Type OVERWRITE. + table.refresh(); + Assert.assertEquals(DataOperations.OVERWRITE, table.currentSnapshot().operation()); + + StreamingQuery query = startStream(SparkReadOptions.STREAMING_SKIP_OVERWRITE_SNAPSHOTS, "true"); + Assertions.assertThat(rowsAvailable(query)) + .containsExactlyInAnyOrderElementsOf(Iterables.concat(dataAcrossSnapshots)); + } + /** * appends each list as a Snapshot on the iceberg table at the given location. * accepts a list of lists - each list representing data per snapshot.