-
Notifications
You must be signed in to change notification settings - Fork 3k
Spark3 structured streaming micro_batch read support #2611
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark3 structured streaming micro_batch read support #2611
Conversation
| return new SparkBatchQueryScan( | ||
| spark, table, caseSensitive, schemaWithMetadataColumns(), filterExpressions, options); | ||
| // TODO: understand how to differentiate that this is a spark streaming microbatch scan. | ||
| if (false) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aokolnychyi / @RussellSpitzer / @holdenk
Spark3 gives ScanBuilder - abstraction - to define all types of Scans (Batch, MicroBatch & Continuous). But, the current implementation / class modelling - has SparkBatchScan as the Scan implementation.
Looking at some of the concerns of BatchScan - all the way from the State maintenance of a single SnapshotId to read from, the asOfTimeStamp & features like VectorizedReads - all of these don't seem relevant to Streaming Scans.
So, I feel that we need to divide out Streaming Scans into a different class.
Does this thought process - make sense?
If we go by this route - do you folks know - how to pass different Scan objects to Spark based on Batch vs Streaming?
| public void stop() { | ||
| } | ||
|
|
||
| private String getOffsetLogLocation(String checkpointLocation) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aokolnychyi @RussellSpitzer @rdblue @holdenk - do you folks know of a better way to read the last checkpointed offset.
Am needing this hack for cases where spark cluster goes down and has to restart stream from an old checkpoint. I definitely DO NOT plan to Keep this HACK. Am trying to understand a better way to do this. Truly appreciate any help here.
Started a conversation in iceberg slack channel.
| Snapshot previousSnapshot = table.snapshot(microBatchStartOffset.snapshotId()); | ||
| Snapshot pointer = table.currentSnapshot(); | ||
| while (pointer != null && previousSnapshot.snapshotId() != pointer.parentId()) { | ||
| Preconditions.checkState(pointer.operation().equals(DataOperations.APPEND), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: Add unittest coverage for overwrite operation.
|
Ok this is great work, I'm going to have to get back up to speed on the streaming stuff. Perhaps https://github.com/viirya has some thoughts here as well. |
|
@holdenk - thanks for your first take on the PR. Would be happy to hear out https://github.com/viirya's thoughts as well. Am unable to tag https://github.com/viirya. Please see if you can... The PR description about GDPR - is to decide between The reasoning is that -
Did this make sense!? Happy to discuss. |
Hi @holdenk - after gaving it a bit more thought - I actually think otherwise. Overall, I agree with you. Ignoring deletes from tables might spook out some of the consumers of this data. For now, I removed the IgnoreDeletes part from the PR and updated the description to reflect the same here: #2660 PS: I was playing around with this branch to squash-merge my 28 commits to one commit (as it was very chatty) & in the process git closed this PR and is not letting me reopen this. So, I had to create a brand new PR. |
This work is an extension of the idea in issue #179 & the Spark2 work done in PR #2272 - only that - this is for Spark3.
In the current implementation:
DELETEorREPLACE. rationale:DELETEs are common for GDPR andREPLACEis common during table maintenance / compaction related rewrites.OVERWRITESare not handled. Something for future.cc: @aokolnychyi & @RussellSpitzer & @holdenk @rdblue @rdsr