-
Notifications
You must be signed in to change notification settings - Fork 3k
Skip processing snapshots of type Overwrite during readStream #3517
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
6f81361
5573942
cf3ceff
cac4b49
ad5bb27
6ac8b28
040010f
cacc07f
b9e76b3
bc2a386
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -77,6 +77,7 @@ public class SparkMicroBatchStream implements MicroBatchStream { | |
| private final boolean localityPreferred; | ||
| private final StreamingOffset initialOffset; | ||
| private final boolean skipDelete; | ||
| private final boolean skipOverwrite; | ||
| private final Long fromTimestamp; | ||
|
|
||
| SparkMicroBatchStream(JavaSparkContext sparkContext, Table table, SparkReadConf readConf, boolean caseSensitive, | ||
|
|
@@ -95,6 +96,7 @@ public class SparkMicroBatchStream implements MicroBatchStream { | |
| this.initialOffset = initialOffsetStore.initialOffset(); | ||
|
|
||
| this.skipDelete = readConf.streamingSkipDeleteSnapshots(); | ||
| this.skipOverwrite = readConf.streamingSkipOverwriteSnapshots(); | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -200,13 +202,25 @@ private List<FileScanTask> planFiles(StreamingOffset startOffset, StreamingOffse | |
|
|
||
| private boolean shouldProcess(Snapshot snapshot) { | ||
| String op = snapshot.operation(); | ||
| Preconditions.checkState(!op.equals(DataOperations.DELETE) || skipDelete, | ||
| "Cannot process delete snapshot: %s, to ignore deletes, set %s=true.", | ||
| snapshot.snapshotId(), SparkReadOptions.STREAMING_SKIP_DELETE_SNAPSHOTS); | ||
| Preconditions.checkState( | ||
| op.equals(DataOperations.DELETE) || op.equals(DataOperations.APPEND) || op.equals(DataOperations.REPLACE), | ||
| "Cannot process %s snapshot: %s", op.toLowerCase(Locale.ROOT), snapshot.snapshotId()); | ||
| return op.equals(DataOperations.APPEND); | ||
| switch (op) { | ||
| case DataOperations.APPEND: | ||
| return true; | ||
| case DataOperations.REPLACE: | ||
| return false; | ||
| case DataOperations.DELETE: | ||
| Preconditions.checkState(skipDelete, | ||
| "Cannot process delete snapshot : %s. Set read option %s to allow skipping snapshots of type delete", | ||
| snapshot.snapshotId(), SparkReadOptions.STREAMING_SKIP_DELETE_SNAPSHOTS); | ||
| return false; | ||
| case DataOperations.OVERWRITE: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think that this should use the same configuration to skip deletes and overwrites. Overwrites are different and I think that we should at a minimum have a different property. I would also prefer to have some additional clarity on how we plan to eventually handle this. We could skip overwrites, but what about use cases where they are probably upserts? What about when they're created by copy-on-write MERGE operations?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1. I would prefer they be two separate configs, but also that we have a plan for the longer term to handle sending out these row deltas. I'd be ok with getting a PR in to ignore Personally I would consider using a schema similar to the delta.io change capture feed that has a dataframe with the before image / after image (row before and after update) and then the type of operation for each row (insert, delete, update_before, update_after).
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I connected with @SreeramGarlapati to contribute on this PR.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have added a separate config to skip overwrites. I will discuss with @SreeramGarlapati and will update on the plan to eventually handle upserts.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. all in all, there are 2 options for reading upserts:
|
||
| Preconditions.checkState(skipOverwrite, | ||
| "Cannot process overwrite snapshot : %s. Set read option %s to allow skipping snapshots of type overwrite", | ||
rdblue marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| snapshot.snapshotId(), SparkReadOptions.STREAMING_SKIP_OVERWRITE_SNAPSHOTS); | ||
| return false; | ||
| default: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should add a test for this new conf option
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have added the Unit Test. |
||
| throw new IllegalStateException(String.format( | ||
| "Cannot process %s snapshot: %s", op.toLowerCase(Locale.ROOT), snapshot.snapshotId())); | ||
rdblue marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
| } | ||
|
|
||
| private static StreamingOffset determineStartingOffset(Table table, Long fromTimestamp) { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.