Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,7 @@ private SparkReadOptions() {

// Set ID that is used to fetch file scan tasks
public static final String FILE_SCAN_TASK_SET_ID = "file-scan-task-set-id";

// skip snapshots of type delete while reading stream out of iceberg table
public static final String STREAMING_SKIP_DELETE_SNAPSHOTS = "streaming-skip-delete-snapshots";
}
13 changes: 13 additions & 0 deletions spark3/src/main/java/org/apache/iceberg/spark/Spark3Util.java
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,19 @@ public static Integer propertyAsInt(CaseInsensitiveStringMap options, String pro
return null;
}

public static Boolean propertyAsBoolean(CaseInsensitiveStringMap options, String property, Boolean defaultValue) {
if (defaultValue != null) {
return options.getBoolean(property, defaultValue);
}

String value = options.get(property);
if (value != null) {
return Boolean.parseBoolean(value);
}

return null;
}

public static class DescribeSchemaVisitor extends TypeUtil.SchemaVisitor<String> {
private static final Joiner COMMA = Joiner.on(',');
private static final DescribeSchemaVisitor INSTANCE = new DescribeSchemaVisitor();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Locale;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.DataOperations;
import org.apache.iceberg.FileScanTask;
Expand Down Expand Up @@ -59,6 +60,8 @@
import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
import org.apache.spark.sql.connector.read.streaming.Offset;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK;
import static org.apache.iceberg.TableProperties.SPLIT_LOOKBACK_DEFAULT;
Expand All @@ -69,6 +72,7 @@

public class SparkMicroBatchStream implements MicroBatchStream {
private static final Joiner SLASH = Joiner.on("/");
private static final Logger LOG = LoggerFactory.getLogger(SparkMicroBatchStream.class);

private final Table table;
private final boolean caseSensitive;
Expand All @@ -79,6 +83,7 @@ public class SparkMicroBatchStream implements MicroBatchStream {
private final Long splitOpenFileCost;
private final boolean localityPreferred;
private final StreamingOffset initialOffset;
private final boolean skipDelete;

SparkMicroBatchStream(JavaSparkContext sparkContext, Table table, boolean caseSensitive,
Schema expectedSchema, CaseInsensitiveStringMap options, String checkpointLocation) {
Expand All @@ -100,6 +105,8 @@ public class SparkMicroBatchStream implements MicroBatchStream {

InitialOffsetStore initialOffsetStore = new InitialOffsetStore(table, checkpointLocation);
this.initialOffset = initialOffsetStore.initialOffset();

this.skipDelete = Spark3Util.propertyAsBoolean(options, SparkReadOptions.STREAMING_SKIP_DELETE_SNAPSHOTS, false);
}

@Override
Expand Down Expand Up @@ -169,35 +176,44 @@ public void stop() {

private List<FileScanTask> planFiles(StreamingOffset startOffset, StreamingOffset endOffset) {
List<FileScanTask> fileScanTasks = Lists.newArrayList();
MicroBatch latestMicroBatch = null;
StreamingOffset batchStartOffset = StreamingOffset.START_OFFSET.equals(startOffset) ?
new StreamingOffset(SnapshotUtil.oldestSnapshot(table).snapshotId(), 0, false) :
startOffset;

StreamingOffset currentOffset = null;

do {
StreamingOffset currentOffset =
latestMicroBatch != null && latestMicroBatch.lastIndexOfSnapshot() ?
new StreamingOffset(snapshotAfter(latestMicroBatch.snapshotId()), 0L, false) :
batchStartOffset;
if (currentOffset == null) {
currentOffset = batchStartOffset;
} else {
Snapshot snapshotAfter = SnapshotUtil.snapshotAfter(table, currentOffset.snapshotId());
currentOffset = new StreamingOffset(snapshotAfter.snapshotId(), 0L, false);
}

latestMicroBatch = MicroBatches.from(table.snapshot(currentOffset.snapshotId()), table.io())
if (!shouldProcess(table.snapshot(currentOffset.snapshotId()))) {
LOG.debug("Skipping snapshot: {} of table {}", currentOffset.snapshotId(), table.name());
continue;
}

MicroBatch latestMicroBatch = MicroBatches.from(table.snapshot(currentOffset.snapshotId()), table.io())
.caseSensitive(caseSensitive)
.specsById(table.specs())
.generate(currentOffset.position(), Long.MAX_VALUE, currentOffset.shouldScanAllFiles());

fileScanTasks.addAll(latestMicroBatch.tasks());
} while (latestMicroBatch.snapshotId() != endOffset.snapshotId());
} while (currentOffset.snapshotId() != endOffset.snapshotId());

return fileScanTasks;
}

private long snapshotAfter(long snapshotId) {
Snapshot snapshotAfter = SnapshotUtil.snapshotAfter(table, snapshotId);

Preconditions.checkState(snapshotAfter.operation().equals(DataOperations.APPEND),
"Invalid Snapshot operation: %s, only APPEND is allowed.", snapshotAfter.operation());

return snapshotAfter.snapshotId();
private boolean shouldProcess(Snapshot snapshot) {
String op = snapshot.operation();
Preconditions.checkState(!op.equals(DataOperations.DELETE) || skipDelete,
"Cannot process delete snapshot: %s", snapshot.snapshotId());
Preconditions.checkState(
op.equals(DataOperations.DELETE) || op.equals(DataOperations.APPEND) || op.equals(DataOperations.REPLACE),
"Cannot process %s snapshot: %s", op.toLowerCase(Locale.ROOT), snapshot.snapshotId());
return op.equals(DataOperations.APPEND);
}

private static class InitialOffsetStore {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkCatalogTestBase;
import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
Expand Down Expand Up @@ -282,17 +283,17 @@ public void testReadStreamWithSnapshotTypeOverwriteErrorsOut() throws Exception
AssertHelpers.assertThrowsCause(
"Streaming should fail with IllegalStateException, as the snapshot is not of type APPEND",
IllegalStateException.class,
"Invalid Snapshot operation",
"Cannot process overwrite snapshot",
() -> streamingQuery.processAllAvailable()
);
}

@SuppressWarnings("unchecked")
@Test
public void testReadStreamWithSnapshotTypeReplaceErrorsOut() throws Exception {
public void testReadStreamWithSnapshotTypeReplaceIgnoresReplace() throws Exception {
// fill table with some data
List<List<SimpleRecord>> dataAcrossSnapshots = TEST_DATA_MULTIPLE_SNAPSHOTS;
appendDataAsMultipleSnapshots(dataAcrossSnapshots, tableIdentifier);
List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
appendDataAsMultipleSnapshots(expected, tableIdentifier);

table.refresh();

Expand All @@ -307,27 +308,18 @@ public void testReadStreamWithSnapshotTypeReplaceErrorsOut() throws Exception {
Dataset<Row> df = spark.readStream()
.format("iceberg")
.load(tableIdentifier);
StreamingQuery streamingQuery = df.writeStream()
.format("memory")
.queryName("testtablewithreplace")
.outputMode(OutputMode.Append())
.start();

AssertHelpers.assertThrowsCause(
"Streaming should fail with IllegalStateException, as the snapshot is not of type APPEND",
IllegalStateException.class,
"Invalid Snapshot operation",
() -> streamingQuery.processAllAvailable()
);
List<SimpleRecord> actual = processAvailable(df);
Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
}

@SuppressWarnings("unchecked")
@Test
public void testReadStreamWithSnapshotTypeDeleteErrorsOut() throws Exception {
table.updateSpec()
.removeField("id_bucket")
.addField(ref("id"))
.commit();
.removeField("id_bucket")
.addField(ref("id"))
.commit();

table.refresh();

Expand Down Expand Up @@ -358,11 +350,44 @@ public void testReadStreamWithSnapshotTypeDeleteErrorsOut() throws Exception {
AssertHelpers.assertThrowsCause(
"Streaming should fail with IllegalStateException, as the snapshot is not of type APPEND",
IllegalStateException.class,
"Invalid Snapshot operation",
"Cannot process delete snapshot",
() -> streamingQuery.processAllAvailable()
);
}

@SuppressWarnings("unchecked")
@Test
public void testReadStreamWithSnapshotTypeDeleteAndSkipDeleteOption() throws Exception {
table.updateSpec()
.removeField("id_bucket")
.addField(ref("id"))
.commit();

table.refresh();

// fill table with some data
List<List<SimpleRecord>> dataAcrossSnapshots = TEST_DATA_MULTIPLE_SNAPSHOTS;
appendDataAsMultipleSnapshots(dataAcrossSnapshots, tableIdentifier);

table.refresh();

// this should create a snapshot with type delete.
table.newDelete()
.deleteFromRowFilter(Expressions.equal("id", 4))
.commit();

// check pre-condition - that the above delete operation on table resulted in Snapshot of Type DELETE.
table.refresh();
Assert.assertEquals(DataOperations.DELETE, table.currentSnapshot().operation());

Dataset<Row> df = spark.readStream()
.format("iceberg")
.option(SparkReadOptions.STREAMING_SKIP_DELETE_SNAPSHOTS, "true")
.load(tableIdentifier);
Assertions.assertThat(processAvailable(df))
.containsExactlyInAnyOrderElementsOf(Iterables.concat(dataAcrossSnapshots));
}

private static List<SimpleRecord> processMicroBatch(DataStreamWriter<Row> singleBatchWriter, String viewName)
throws TimeoutException, StreamingQueryException {
StreamingQuery streamingQuery = singleBatchWriter.start();
Expand Down