Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,10 @@ private SparkReadOptions() {

// Set ID that is used to fetch file scan tasks
public static final String FILE_SCAN_TASK_SET_ID = "file-scan-task-set-id";

// skip snapshots of type delete while reading stream out of iceberg table
public static final String READ_STREAM_SKIP_DELETE = "read-stream-skip-delete";

// skip snapshots of type replace while reading stream out of iceberg table
public static final String READ_STREAM_SKIP_REPLACE = "read-stream-skip-replace";
}
13 changes: 13 additions & 0 deletions spark3/src/main/java/org/apache/iceberg/spark/Spark3Util.java
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,19 @@ public static Integer propertyAsInt(CaseInsensitiveStringMap options, String pro
return null;
}

public static Boolean propertyAsBoolean(CaseInsensitiveStringMap options, String property, Boolean defaultValue) {
if (defaultValue != null) {
return options.getBoolean(property, defaultValue);
}

String value = options.get(property);
if (value != null) {
return Boolean.parseBoolean(value);
}

return null;
}

public static class DescribeSchemaVisitor extends TypeUtil.SchemaVisitor<String> {
private static final Joiner COMMA = Joiner.on(',');
private static final DescribeSchemaVisitor INSTANCE = new DescribeSchemaVisitor();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ public class SparkMicroBatchStream implements MicroBatchStream {
private final Long splitOpenFileCost;
private final boolean localityPreferred;
private final StreamingOffset initialOffset;
private final boolean skipDelete;
private final boolean skipReplace;

SparkMicroBatchStream(JavaSparkContext sparkContext, Table table, boolean caseSensitive,
Schema expectedSchema, CaseInsensitiveStringMap options, String checkpointLocation) {
Expand All @@ -100,6 +102,9 @@ public class SparkMicroBatchStream implements MicroBatchStream {

InitialOffsetStore initialOffsetStore = new InitialOffsetStore(table, checkpointLocation);
this.initialOffset = initialOffsetStore.initialOffset();

this.skipDelete = Spark3Util.propertyAsBoolean(options, SparkReadOptions.READ_STREAM_SKIP_DELETE, false);
this.skipReplace = Spark3Util.propertyAsBoolean(options, SparkReadOptions.READ_STREAM_SKIP_REPLACE, false);
}

@Override
Expand Down Expand Up @@ -169,35 +174,64 @@ public void stop() {

private List<FileScanTask> planFiles(StreamingOffset startOffset, StreamingOffset endOffset) {
List<FileScanTask> fileScanTasks = Lists.newArrayList();
MicroBatch latestMicroBatch = null;
StreamingOffset batchStartOffset = StreamingOffset.START_OFFSET.equals(startOffset) ?
new StreamingOffset(SnapshotUtil.oldestSnapshot(table).snapshotId(), 0, false) :
startOffset;

StreamingOffset currentOffset = null;

do {
StreamingOffset currentOffset =
latestMicroBatch != null && latestMicroBatch.lastIndexOfSnapshot() ?
new StreamingOffset(snapshotAfter(latestMicroBatch.snapshotId()), 0L, false) :
batchStartOffset;
if (currentOffset == null) {
currentOffset = batchStartOffset;
} else {
Snapshot snapshotAfter = SnapshotUtil.snapshotAfter(table, currentOffset.snapshotId());
boolean shouldSkip = shouldSkip(snapshotAfter);

currentOffset = new StreamingOffset(snapshotAfter.snapshotId(), 0L, false);

if (shouldSkip) {
continue;
}
}

latestMicroBatch = MicroBatches.from(table.snapshot(currentOffset.snapshotId()), table.io())
MicroBatch latestMicroBatch = MicroBatches.from(table.snapshot(currentOffset.snapshotId()), table.io())
.caseSensitive(caseSensitive)
.specsById(table.specs())
.generate(currentOffset.position(), Long.MAX_VALUE, currentOffset.shouldScanAllFiles());

fileScanTasks.addAll(latestMicroBatch.tasks());
} while (latestMicroBatch.snapshotId() != endOffset.snapshotId());
} while (currentOffset.snapshotId() != endOffset.snapshotId());

return fileScanTasks;
}

private long snapshotAfter(long snapshotId) {
Snapshot snapshotAfter = SnapshotUtil.snapshotAfter(table, snapshotId);
private boolean shouldSkip(Snapshot snapshot) {
if (snapshot.operation().equals(DataOperations.DELETE)) {
return shouldSkipDelete(snapshot);
} else if (snapshot.operation().equals(DataOperations.REPLACE)) {
return shouldSkipReplace(snapshot);
}

Preconditions.checkState(
snapshot.operation().equals(DataOperations.APPEND),
"Invalid Snapshot operation: %s, only APPEND is allowed.", snapshot.operation());
return false;
}

Preconditions.checkState(snapshotAfter.operation().equals(DataOperations.APPEND),
"Invalid Snapshot operation: %s, only APPEND is allowed.", snapshotAfter.operation());
private boolean shouldSkipDelete(Snapshot snapshot) {
Preconditions.checkState(skipDelete,
"Invalid Snapshot operation: %s, only APPEND is allowed. To skip delete, set Spark Option %s",
snapshot.operation(),
SparkReadOptions.READ_STREAM_SKIP_DELETE);
return true;
}

return snapshotAfter.snapshotId();
private boolean shouldSkipReplace(Snapshot snapshot) {
Preconditions.checkState(skipReplace,
"Invalid Snapshot operation: %s, only APPEND is allowed. To skip replace, set Spark Option %s",
snapshot.operation(),
SparkReadOptions.READ_STREAM_SKIP_REPLACE);
return true;
}

private static class InitialOffsetStore {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkCatalogTestBase;
import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
Expand Down Expand Up @@ -321,6 +322,32 @@ public void testReadStreamWithSnapshotTypeReplaceErrorsOut() throws Exception {
);
}

@SuppressWarnings("unchecked")
@Test
public void testReadStreamWithSnapshotTypeReplaceAndSkipReplaceOption() throws Exception {
// fill table with some data
List<List<SimpleRecord>> dataAcrossSnapshots = TEST_DATA_MULTIPLE_SNAPSHOTS;
appendDataAsMultipleSnapshots(dataAcrossSnapshots, tableIdentifier);

table.refresh();

// this should create a snapshot with type Replace.
table.rewriteManifests()
.clusterBy(f -> 1)
.commit();

// check pre-condition
Assert.assertEquals(DataOperations.REPLACE, table.currentSnapshot().operation());

Dataset<Row> df = spark.readStream()
.format("iceberg")
.option(SparkReadOptions.READ_STREAM_SKIP_REPLACE, "true")
.load(tableIdentifier);

Assertions.assertThat(processAvailable(df))
.containsExactlyInAnyOrderElementsOf(Iterables.concat(dataAcrossSnapshots));
}

@SuppressWarnings("unchecked")
@Test
public void testReadStreamWithSnapshotTypeDeleteErrorsOut() throws Exception {
Expand Down Expand Up @@ -363,6 +390,45 @@ public void testReadStreamWithSnapshotTypeDeleteErrorsOut() throws Exception {
);
}

@SuppressWarnings("unchecked")
@Test
public void testReadStreamWithSnapshotTypeDeleteAndSkipDeleteOption() throws Exception {
table.updateSpec()
.removeField("id_bucket")
.addField(ref("id"))
.commit();

table.refresh();

// fill table with some data
List<List<SimpleRecord>> dataAcrossSnapshots = TEST_DATA_MULTIPLE_SNAPSHOTS;
appendDataAsMultipleSnapshots(dataAcrossSnapshots, tableIdentifier);

table.refresh();

// this should create a snapshot with type delete.
table.newDelete()
.deleteFromRowFilter(Expressions.equal("id", 4))
.commit();

// check pre-condition - that the above delete operation on table resulted in Snapshot of Type DELETE.
table.refresh();
Assert.assertEquals(DataOperations.DELETE, table.currentSnapshot().operation());

Dataset<Row> df = spark.readStream()
.format("iceberg")
.option(SparkReadOptions.READ_STREAM_SKIP_DELETE, "true")
.load(tableIdentifier);
StreamingQuery streamingQuery = df.writeStream()
.format("memory")
.queryName("testtablewithdelete")
.outputMode(OutputMode.Append())
.start();

Assertions.assertThat(processAvailable(df))
.containsExactlyInAnyOrderElementsOf(Iterables.concat(dataAcrossSnapshots));
}

private static List<SimpleRecord> processMicroBatch(DataStreamWriter<Row> singleBatchWriter, String viewName)
throws TimeoutException, StreamingQueryException {
StreamingQuery streamingQuery = singleBatchWriter.start();
Expand Down