Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -212,12 +212,22 @@ private List<FileScanTask> planFiles(StreamingOffset startOffset, StreamingOffse

private boolean shouldProcess(Snapshot snapshot) {
String op = snapshot.operation();
Preconditions.checkState(!op.equals(DataOperations.DELETE) || skipDelete,
"Cannot process delete snapshot: %s", snapshot.snapshotId());
Preconditions.checkState(
op.equals(DataOperations.DELETE) || op.equals(DataOperations.APPEND) || op.equals(DataOperations.REPLACE),
"Cannot process %s snapshot: %s", op.toLowerCase(Locale.ROOT), snapshot.snapshotId());
Comment on lines -217 to -219
Copy link
Contributor Author

@kbendick kbendick Oct 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, the code handles skipDeletes via the first Preconditions.checkState assertion. Then, if that passed, it checks again that the operation is one of the insert only operations OR delete again (as presumably if the snapshot operation were of type DataOperations.DELETE, the flag skipDelete would have been set to true per the first assertion).

This was a little confusing for me at first seeing it checked twice, once in the negative and once in the positive.

The second assertion here also arguably serves as a way of detecting potentially unhandled DataOperation types. Given that, I chose to use a switch statement to be a bit more explicit about what is and is not allowed, as well as having the benefit of a default case to catch any unhandled DataOperations (should they be added later for example).

If we want to go back to using Preconditions, I'm more than happy to do that. As this code is arguably in the hot path for the driver during scheduling, we could collapse the two calls into one (as they have the same precondition statement anyway) and change the ordering so that string comparisons are reduced (by checking for skipDelete first and then checking for DataOperations.APPEND, which should be the most common case).

Let me know if anybody has issue with the use of the switch statement.

return op.equals(DataOperations.APPEND);
switch (op) {
case DataOperations.APPEND:
return true;
case DataOperations.REPLACE:
return false;
case DataOperations.DELETE:
case DataOperations.OVERWRITE:
Preconditions.checkState(skipDelete,
"Cannot process %s snapshot when read option %s is %b : %s", op.toLowerCase(Locale.ROOT),
SparkReadOptions.STREAMING_SKIP_DELETE_SNAPSHOTS, skipDelete, snapshot.snapshotId());
return false;
default:
String exceptionMsg = String.format("Encountered unhandled snapshot operation %s for snapshot: %s", op,
snapshot.snapshotId());
throw new RuntimeException(exceptionMsg);
}
}

private static class InitialOffsetStore {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,42 @@ public void testReadStreamWithSnapshotTypeOverwriteErrorsOut() throws Exception
);
}

@Test
public void testReadStreamWithSnapshotTypeOverwriteAndSkipDeleteOption() throws Exception {
// upgrade table to version 2 - to facilitate creation of Snapshot of type OVERWRITE.
TableOperations ops = ((BaseTable) table).operations();
TableMetadata meta = ops.current();
ops.commit(meta, meta.upgradeToFormatVersion(2));

// fill table with some initial data
List<List<SimpleRecord>> dataAcrossSnapshots = TEST_DATA_MULTIPLE_SNAPSHOTS;
appendDataAsMultipleSnapshots(dataAcrossSnapshots, tableIdentifier);

Schema deleteRowSchema = table.schema().select("data");
Record dataDelete = GenericRecord.create(deleteRowSchema);
List<Record> dataDeletes = Lists.newArrayList(
dataDelete.copy("data", "one") // id = 1
);

DeleteFile eqDeletes = FileHelpers.writeDeleteFile(
table, Files.localOutput(temp.newFile()), TestHelpers.Row.of(0), dataDeletes, deleteRowSchema);

table.newRowDelta()
.addDeletes(eqDeletes)
.commit();

// check pre-condition - that the above Delete file write - actually resulted in snapshot of type OVERWRITE
Assert.assertEquals(DataOperations.OVERWRITE, table.currentSnapshot().operation());

Dataset<Row> df = spark.readStream()
.format("iceberg")
.option(SparkReadOptions.STREAMING_SKIP_DELETE_SNAPSHOTS, "true")
.load(tableIdentifier);

Assertions.assertThat(processAvailable(df))
.containsExactlyInAnyOrderElementsOf(Iterables.concat(dataAcrossSnapshots));
}

@SuppressWarnings("unchecked")
@Test
public void testReadStreamWithSnapshotTypeReplaceIgnoresReplace() throws Exception {
Expand Down