Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,13 @@ public boolean streamingSkipDeleteSnapshots() {
.parse();
}

public boolean streamingSkipOverwriteSnapshots() {
return confParser.booleanConf()
.option(SparkReadOptions.STREAMING_SKIP_OVERWRITE_SNAPSHOTS)
.defaultValue(SparkReadOptions.STREAMING_SKIP_OVERWRITE_SNAPSHOTS_DEFAULT)
.parse();
}

public boolean parquetVectorizationEnabled() {
return confParser.booleanConf()
.option(SparkReadOptions.VECTORIZATION_ENABLED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ private SparkReadOptions() {
public static final String STREAMING_SKIP_DELETE_SNAPSHOTS = "streaming-skip-delete-snapshots";
public static final boolean STREAMING_SKIP_DELETE_SNAPSHOTS_DEFAULT = false;

// skip snapshots of type overwrite while reading stream out of iceberg table
public static final String STREAMING_SKIP_OVERWRITE_SNAPSHOTS = "streaming-skip-overwrite-snapshots";
public static final boolean STREAMING_SKIP_OVERWRITE_SNAPSHOTS_DEFAULT = false;

// Controls whether to allow reading timestamps without zone info
public static final String HANDLE_TIMESTAMP_WITHOUT_TIMEZONE = "handle-timestamp-without-timezone";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public class SparkMicroBatchStream implements MicroBatchStream {
private final boolean localityPreferred;
private final StreamingOffset initialOffset;
private final boolean skipDelete;
private final boolean skipOverwrite;
private final Long fromTimestamp;

SparkMicroBatchStream(JavaSparkContext sparkContext, Table table, SparkReadConf readConf, boolean caseSensitive,
Expand All @@ -95,6 +96,7 @@ public class SparkMicroBatchStream implements MicroBatchStream {
this.initialOffset = initialOffsetStore.initialOffset();

this.skipDelete = readConf.streamingSkipDeleteSnapshots();
this.skipOverwrite = readConf.streamingSkipOverwriteSnapshots();
}

@Override
Expand Down Expand Up @@ -200,13 +202,26 @@ private List<FileScanTask> planFiles(StreamingOffset startOffset, StreamingOffse

private boolean shouldProcess(Snapshot snapshot) {
String op = snapshot.operation();
Preconditions.checkState(!op.equals(DataOperations.DELETE) || skipDelete,
"Cannot process delete snapshot: %s, to ignore deletes, set %s=true.",
snapshot.snapshotId(), SparkReadOptions.STREAMING_SKIP_DELETE_SNAPSHOTS);
Preconditions.checkState(
op.equals(DataOperations.DELETE) || op.equals(DataOperations.APPEND) || op.equals(DataOperations.REPLACE),
"Cannot process %s snapshot: %s", op.toLowerCase(Locale.ROOT), snapshot.snapshotId());
return op.equals(DataOperations.APPEND);
switch (op) {
case DataOperations.APPEND:
return true;
case DataOperations.REPLACE:
return false;
case DataOperations.DELETE:
Preconditions.checkState(skipDelete,
"Cannot process delete snapshot: %s, to ignore deletes, set %s=true",
snapshot.snapshotId(), SparkReadOptions.STREAMING_SKIP_DELETE_SNAPSHOTS);
return false;
case DataOperations.OVERWRITE:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that this should use the same configuration to skip deletes and overwrites. Overwrites are different and I think that we should at a minimum have a different property. I would also prefer to have some additional clarity on how we plan to eventually handle this. We could skip overwrites, but what about use cases where they are probably upserts? What about when they're created by copy-on-write MERGE operations?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. I would prefer they be two separate configs, but also that we have a plan for the longer term to handle sending out these row deltas.

I'd be ok with getting a PR in to ignore OVERWRITE, but this isn't something we should ignore in the longer term (or even really the near-to-medium term) as others have mentioned.

Personally I would consider using a schema similar to the delta.io change capture feed that has a dataframe with the before image / after image (row before and after update) and then the type of operation for each row (insert, delete, update_before, update_after).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I connected with @SreeramGarlapati to contribute on this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added a separate config to skip overwrites. I will discuss with @SreeramGarlapati and will update on the plan to eventually handle upserts.

Copy link
Collaborator Author

@SreeramGarlapati SreeramGarlapati Jan 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all in all, there are 2 options for reading upserts:

  1. for updates which are written with - copy on write -- a new data file is created which has a combination of both old rows and these new updated rows. So, in this case - we can take a spark option from the user to take consent - that they are okay with data replay.
  2. for updates which are written with - merge on read - we will expose an option to read change data feed - where we will include a metadata column - which indicates whether a record is an INSERT vs DELETE.
    did this make sense - @rdblue & @kbendick

Preconditions.checkState(skipOverwrite,
"Cannot process overwrite snapshot: %s, to ignore overwrites, set %s=true",
snapshot.snapshotId(), SparkReadOptions.STREAMING_SKIP_OVERWRITE_SNAPSHOTS);
return false;
default:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should add a test for this new conf option

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added the Unit Test.

throw new IllegalStateException(String.format(
"Cannot process unknown snapshot operation: %s (snapshot id %s)",
op.toLowerCase(Locale.ROOT), snapshot.snapshotId()));
}
}

private static StreamingOffset determineStartingOffset(Table table, Long fromTimestamp) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,31 @@ public void testReadStreamWithSnapshotTypeDeleteAndSkipDeleteOption() throws Exc
.containsExactlyInAnyOrderElementsOf(Iterables.concat(dataAcrossSnapshots));
}

@Test
public void testReadStreamWithSnapshotTypeDeleteAndSkipOverwriteOption() throws Exception {
table.updateSpec()
.removeField("id_bucket")
.addField(ref("id"))
.commit();

// fill table with some data
List<List<SimpleRecord>> dataAcrossSnapshots = TEST_DATA_MULTIPLE_SNAPSHOTS;
appendDataAsMultipleSnapshots(dataAcrossSnapshots);

// this should create a snapshot with type overwrite.
table.newOverwrite()
.overwriteByRowFilter(Expressions.greaterThan("id", 4))
.commit();

// check pre-condition - that the above delete operation on table resulted in Snapshot of Type OVERWRITE.
table.refresh();
Assert.assertEquals(DataOperations.OVERWRITE, table.currentSnapshot().operation());

StreamingQuery query = startStream(SparkReadOptions.STREAMING_SKIP_OVERWRITE_SNAPSHOTS, "true");
Assertions.assertThat(rowsAvailable(query))
.containsExactlyInAnyOrderElementsOf(Iterables.concat(dataAcrossSnapshots));
}

/**
* appends each list as a Snapshot on the iceberg table at the given location.
* accepts a list of lists - each list representing data per snapshot.
Expand Down