diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index 700793271967..94883b9e1396 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -212,12 +212,20 @@ private List planFiles(StreamingOffset startOffset, StreamingOffse private boolean shouldProcess(Snapshot snapshot) { String op = snapshot.operation(); - Preconditions.checkState(!op.equals(DataOperations.DELETE) || skipDelete, - "Cannot process delete snapshot: %s", snapshot.snapshotId()); - Preconditions.checkState( - op.equals(DataOperations.DELETE) || op.equals(DataOperations.APPEND) || op.equals(DataOperations.REPLACE), - "Cannot process %s snapshot: %s", op.toLowerCase(Locale.ROOT), snapshot.snapshotId()); - return op.equals(DataOperations.APPEND); + switch (op) { + case DataOperations.APPEND: + return true; + case DataOperations.REPLACE: + return false; + case DataOperations.DELETE: + Preconditions.checkState(skipDelete, + "Cannot process delete snapshot : %s. Set read option %s to allow skipping snapshots of type delete", + snapshot.snapshotId(), SparkReadOptions.STREAMING_SKIP_DELETE_SNAPSHOTS); + return false; + default: + throw new IllegalStateException(String.format( + "Cannot process %s snapshot: %s", op.toLowerCase(Locale.ROOT), snapshot.snapshotId())); + } } private static class InitialOffsetStore {