diff --git a/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java b/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java index bce0bf4e8bb5..2fd522acd1c3 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java +++ b/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java @@ -50,4 +50,7 @@ private SparkReadOptions() { // Set ID that is used to fetch file scan tasks public static final String FILE_SCAN_TASK_SET_ID = "file-scan-task-set-id"; + + // skip snapshots of type delete while reading stream out of iceberg table + public static final String STREAMING_SKIP_DELETE_SNAPSHOTS = "streaming-skip-delete-snapshots"; } diff --git a/spark3/src/main/java/org/apache/iceberg/spark/Spark3Util.java b/spark3/src/main/java/org/apache/iceberg/spark/Spark3Util.java index 9a24b3e5ffb6..7b52ba118890 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/Spark3Util.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/Spark3Util.java @@ -562,6 +562,19 @@ public static Integer propertyAsInt(CaseInsensitiveStringMap options, String pro return null; } + public static Boolean propertyAsBoolean(CaseInsensitiveStringMap options, String property, Boolean defaultValue) { + if (defaultValue != null) { + return options.getBoolean(property, defaultValue); + } + + String value = options.get(property); + if (value != null) { + return Boolean.parseBoolean(value); + } + + return null; + } + public static class DescribeSchemaVisitor extends TypeUtil.SchemaVisitor { private static final Joiner COMMA = Joiner.on(','); private static final DescribeSchemaVisitor INSTANCE = new DescribeSchemaVisitor(); diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index 1a7f217af7f9..956ce0ce783a 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -27,6 +27,7 @@ import java.io.UncheckedIOException; import java.nio.charset.StandardCharsets; import java.util.List; +import java.util.Locale; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.DataOperations; import org.apache.iceberg.FileScanTask; @@ -59,6 +60,8 @@ import org.apache.spark.sql.connector.read.streaming.MicroBatchStream; import org.apache.spark.sql.connector.read.streaming.Offset; import org.apache.spark.sql.util.CaseInsensitiveStringMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK; import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT; @@ -69,6 +72,7 @@ public class SparkMicroBatchStream implements MicroBatchStream { private static final Joiner SLASH = Joiner.on("/"); + private static final Logger LOG = LoggerFactory.getLogger(SparkMicroBatchStream.class); private final Table table; private final boolean caseSensitive; @@ -79,6 +83,7 @@ public class SparkMicroBatchStream implements MicroBatchStream { private final Long splitOpenFileCost; private final boolean localityPreferred; private final StreamingOffset initialOffset; + private final boolean skipDelete; SparkMicroBatchStream(JavaSparkContext sparkContext, Table table, boolean caseSensitive, Schema expectedSchema, CaseInsensitiveStringMap options, String checkpointLocation) { @@ -100,6 +105,8 @@ public class SparkMicroBatchStream implements MicroBatchStream { InitialOffsetStore initialOffsetStore = new InitialOffsetStore(table, checkpointLocation); this.initialOffset = initialOffsetStore.initialOffset(); + + this.skipDelete = Spark3Util.propertyAsBoolean(options, SparkReadOptions.STREAMING_SKIP_DELETE_SNAPSHOTS, false); } @Override @@ -169,35 +176,44 @@ public void stop() { private List planFiles(StreamingOffset startOffset, StreamingOffset endOffset) { List fileScanTasks = Lists.newArrayList(); - MicroBatch latestMicroBatch = null; StreamingOffset batchStartOffset = StreamingOffset.START_OFFSET.equals(startOffset) ? new StreamingOffset(SnapshotUtil.oldestSnapshot(table).snapshotId(), 0, false) : startOffset; + StreamingOffset currentOffset = null; + do { - StreamingOffset currentOffset = - latestMicroBatch != null && latestMicroBatch.lastIndexOfSnapshot() ? - new StreamingOffset(snapshotAfter(latestMicroBatch.snapshotId()), 0L, false) : - batchStartOffset; + if (currentOffset == null) { + currentOffset = batchStartOffset; + } else { + Snapshot snapshotAfter = SnapshotUtil.snapshotAfter(table, currentOffset.snapshotId()); + currentOffset = new StreamingOffset(snapshotAfter.snapshotId(), 0L, false); + } - latestMicroBatch = MicroBatches.from(table.snapshot(currentOffset.snapshotId()), table.io()) + if (!shouldProcess(table.snapshot(currentOffset.snapshotId()))) { + LOG.debug("Skipping snapshot: {} of table {}", currentOffset.snapshotId(), table.name()); + continue; + } + + MicroBatch latestMicroBatch = MicroBatches.from(table.snapshot(currentOffset.snapshotId()), table.io()) .caseSensitive(caseSensitive) .specsById(table.specs()) .generate(currentOffset.position(), Long.MAX_VALUE, currentOffset.shouldScanAllFiles()); fileScanTasks.addAll(latestMicroBatch.tasks()); - } while (latestMicroBatch.snapshotId() != endOffset.snapshotId()); + } while (currentOffset.snapshotId() != endOffset.snapshotId()); return fileScanTasks; } - private long snapshotAfter(long snapshotId) { - Snapshot snapshotAfter = SnapshotUtil.snapshotAfter(table, snapshotId); - - Preconditions.checkState(snapshotAfter.operation().equals(DataOperations.APPEND), - "Invalid Snapshot operation: %s, only APPEND is allowed.", snapshotAfter.operation()); - - return snapshotAfter.snapshotId(); + private boolean shouldProcess(Snapshot snapshot) { + String op = snapshot.operation(); + Preconditions.checkState(!op.equals(DataOperations.DELETE) || skipDelete, + "Cannot process delete snapshot: %s", snapshot.snapshotId()); + Preconditions.checkState( + op.equals(DataOperations.DELETE) || op.equals(DataOperations.APPEND) || op.equals(DataOperations.REPLACE), + "Cannot process %s snapshot: %s", op.toLowerCase(Locale.ROOT), snapshot.snapshotId()); + return op.equals(DataOperations.APPEND); } private static class InitialOffsetStore { diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java index 07f3df4ea4aa..4e50cf2a3e21 100644 --- a/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java +++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java @@ -42,6 +42,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkCatalogTestBase; +import org.apache.iceberg.spark.SparkReadOptions; import org.apache.iceberg.types.Types; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; @@ -282,17 +283,17 @@ public void testReadStreamWithSnapshotTypeOverwriteErrorsOut() throws Exception AssertHelpers.assertThrowsCause( "Streaming should fail with IllegalStateException, as the snapshot is not of type APPEND", IllegalStateException.class, - "Invalid Snapshot operation", + "Cannot process overwrite snapshot", () -> streamingQuery.processAllAvailable() ); } @SuppressWarnings("unchecked") @Test - public void testReadStreamWithSnapshotTypeReplaceErrorsOut() throws Exception { + public void testReadStreamWithSnapshotTypeReplaceIgnoresReplace() throws Exception { // fill table with some data - List> dataAcrossSnapshots = TEST_DATA_MULTIPLE_SNAPSHOTS; - appendDataAsMultipleSnapshots(dataAcrossSnapshots, tableIdentifier); + List> expected = TEST_DATA_MULTIPLE_SNAPSHOTS; + appendDataAsMultipleSnapshots(expected, tableIdentifier); table.refresh(); @@ -307,27 +308,18 @@ public void testReadStreamWithSnapshotTypeReplaceErrorsOut() throws Exception { Dataset df = spark.readStream() .format("iceberg") .load(tableIdentifier); - StreamingQuery streamingQuery = df.writeStream() - .format("memory") - .queryName("testtablewithreplace") - .outputMode(OutputMode.Append()) - .start(); - AssertHelpers.assertThrowsCause( - "Streaming should fail with IllegalStateException, as the snapshot is not of type APPEND", - IllegalStateException.class, - "Invalid Snapshot operation", - () -> streamingQuery.processAllAvailable() - ); + List actual = processAvailable(df); + Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected)); } @SuppressWarnings("unchecked") @Test public void testReadStreamWithSnapshotTypeDeleteErrorsOut() throws Exception { table.updateSpec() - .removeField("id_bucket") - .addField(ref("id")) - .commit(); + .removeField("id_bucket") + .addField(ref("id")) + .commit(); table.refresh(); @@ -358,11 +350,44 @@ public void testReadStreamWithSnapshotTypeDeleteErrorsOut() throws Exception { AssertHelpers.assertThrowsCause( "Streaming should fail with IllegalStateException, as the snapshot is not of type APPEND", IllegalStateException.class, - "Invalid Snapshot operation", + "Cannot process delete snapshot", () -> streamingQuery.processAllAvailable() ); } + @SuppressWarnings("unchecked") + @Test + public void testReadStreamWithSnapshotTypeDeleteAndSkipDeleteOption() throws Exception { + table.updateSpec() + .removeField("id_bucket") + .addField(ref("id")) + .commit(); + + table.refresh(); + + // fill table with some data + List> dataAcrossSnapshots = TEST_DATA_MULTIPLE_SNAPSHOTS; + appendDataAsMultipleSnapshots(dataAcrossSnapshots, tableIdentifier); + + table.refresh(); + + // this should create a snapshot with type delete. + table.newDelete() + .deleteFromRowFilter(Expressions.equal("id", 4)) + .commit(); + + // check pre-condition - that the above delete operation on table resulted in Snapshot of Type DELETE. + table.refresh(); + Assert.assertEquals(DataOperations.DELETE, table.currentSnapshot().operation()); + + Dataset df = spark.readStream() + .format("iceberg") + .option(SparkReadOptions.STREAMING_SKIP_DELETE_SNAPSHOTS, "true") + .load(tableIdentifier); + Assertions.assertThat(processAvailable(df)) + .containsExactlyInAnyOrderElementsOf(Iterables.concat(dataAcrossSnapshots)); + } + private static List processMicroBatch(DataStreamWriter singleBatchWriter, String viewName) throws TimeoutException, StreamingQueryException { StreamingQuery streamingQuery = singleBatchWriter.start();