Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,13 @@ public boolean streamingSkipDeleteSnapshots() {
.parse();
}

public boolean streamingSkipOverwriteSnapshots() {
return confParser.booleanConf()
.option(SparkReadOptions.STREAMING_SKIP_OVERWRITE_SNAPSHOTS)
.defaultValue(SparkReadOptions.STREAMING_SKIP_OVERWRITE_SNAPSHOTS_DEFAULT)
.parse();
}

public boolean parquetVectorizationEnabled() {
return confParser.booleanConf()
.option(SparkReadOptions.VECTORIZATION_ENABLED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ private SparkReadOptions() {
public static final String STREAMING_SKIP_DELETE_SNAPSHOTS = "streaming-skip-delete-snapshots";
public static final boolean STREAMING_SKIP_DELETE_SNAPSHOTS_DEFAULT = false;

// skip snapshots of type overwrite while reading stream out of iceberg table
public static final String STREAMING_SKIP_OVERWRITE_SNAPSHOTS = "streaming-skip-overwrite-snapshots";
public static final boolean STREAMING_SKIP_OVERWRITE_SNAPSHOTS_DEFAULT = false;

// Controls whether to allow reading timestamps without zone info
public static final String HANDLE_TIMESTAMP_WITHOUT_TIMEZONE = "handle-timestamp-without-timezone";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkReadConf;
import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.spark.source.SparkBatchScan.ReadTask;
import org.apache.iceberg.spark.source.SparkBatchScan.ReaderFactory;
import org.apache.iceberg.util.SnapshotUtil;
Expand Down Expand Up @@ -76,6 +77,7 @@ public class SparkMicroBatchStream implements MicroBatchStream {
private final boolean localityPreferred;
private final StreamingOffset initialOffset;
private final boolean skipDelete;
private final boolean skipOverwrite;
private final Long fromTimestamp;

SparkMicroBatchStream(JavaSparkContext sparkContext, Table table, SparkReadConf readConf, boolean caseSensitive,
Expand All @@ -94,6 +96,7 @@ public class SparkMicroBatchStream implements MicroBatchStream {
this.initialOffset = initialOffsetStore.initialOffset();

this.skipDelete = readConf.streamingSkipDeleteSnapshots();
this.skipOverwrite = readConf.streamingSkipOverwriteSnapshots();
}

@Override
Expand Down Expand Up @@ -199,12 +202,26 @@ private List<FileScanTask> planFiles(StreamingOffset startOffset, StreamingOffse

private boolean shouldProcess(Snapshot snapshot) {
String op = snapshot.operation();
Preconditions.checkState(!op.equals(DataOperations.DELETE) || skipDelete,
"Cannot process delete snapshot: %s", snapshot.snapshotId());
Preconditions.checkState(
op.equals(DataOperations.DELETE) || op.equals(DataOperations.APPEND) || op.equals(DataOperations.REPLACE),
"Cannot process %s snapshot: %s", op.toLowerCase(Locale.ROOT), snapshot.snapshotId());
return op.equals(DataOperations.APPEND);
switch (op) {
case DataOperations.APPEND:
return true;
case DataOperations.REPLACE:
return false;
case DataOperations.DELETE:
Preconditions.checkState(skipDelete,
"Cannot process delete snapshot: %s, to ignore deletes, set %s=true",
snapshot.snapshotId(), SparkReadOptions.STREAMING_SKIP_DELETE_SNAPSHOTS);
return false;
case DataOperations.OVERWRITE:
Preconditions.checkState(skipOverwrite,
"Cannot process overwrite snapshot: %s, to ignore overwrites, set %s=true",
snapshot.snapshotId(), SparkReadOptions.STREAMING_SKIP_OVERWRITE_SNAPSHOTS);
return false;
default:
throw new IllegalStateException(String.format(
"Cannot process unknown snapshot operation: %s (snapshot id %s)",
op.toLowerCase(Locale.ROOT), snapshot.snapshotId()));
}
}

private static StreamingOffset determineStartingOffset(Table table, Long fromTimestamp) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,30 @@ public void testReadStreamWithSnapshotTypeDeleteAndSkipDeleteOption() throws Exc
.containsExactlyInAnyOrderElementsOf(Iterables.concat(dataAcrossSnapshots));
}

@Test
public void testReadStreamWithSnapshotTypeDeleteAndSkipOverwriteOption() throws Exception {
table.updateSpec()
.removeField("id_bucket")
.addField(ref("id"))
.commit();

// fill table with some data
List<List<SimpleRecord>> dataAcrossSnapshots = TEST_DATA_MULTIPLE_SNAPSHOTS;
appendDataAsMultipleSnapshots(dataAcrossSnapshots);

// this should create a snapshot with type overwrite.
table.newOverwrite()
.overwriteByRowFilter(Expressions.greaterThan("id", 4))
.commit();

// check pre-condition - that the above delete operation on table resulted in Snapshot of Type OVERWRITE.
Assert.assertEquals(DataOperations.OVERWRITE, table.currentSnapshot().operation());

StreamingQuery query = startStream(SparkReadOptions.STREAMING_SKIP_OVERWRITE_SNAPSHOTS, "true");
Assertions.assertThat(rowsAvailable(query))
.containsExactlyInAnyOrderElementsOf(Iterables.concat(dataAcrossSnapshots));
}

/**
* appends each list as a Snapshot on the iceberg table at the given location.
* accepts a list of lists - each list representing data per snapshot.
Expand Down