-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-26520][SQL] data source v2 API refactor (micro-batch read) #23430
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #100655 has finished for PR 23430 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its cleaner to separate the Scan and the Builder.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have a strong preference, currently all the streaming sources don't support operator pushdown, so it's easier to implement both of them together.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO, a name like Batch (defined above) does not convey much and is inconsistent with MicroBatchStream. If these represent the physical scans (as the comments indicate) maybe rename it to BatchScan/MicroBatchScan and so on.
E.g.
public interface Scan {
..
BatchScan toBatch()
MicroBatchScan toMicroBatch()
ContinuousScan toContinuous()
}
And maybe then these scans could extend a marker interface like PhysicalScan to differentiate it from the logical Scan.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another idea is to merge Scan and Batch/Stream, see the alternative 1 in the doc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In "alternative1" there is no equivalent Logical Scan? I was thinking we need the Scan (the logical scan) separate from physical scans.
Also if they don't inherit a common parent can it be passed to the DatasourceV2ScanExec ?
Anyways better to relook and rename as appropriate to keep the different ones (batch/micro-batch/continuous) have common pre/suffixes and denote what they mean.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need an additional level? Cant this be part of the micro batch physical scan (MicroBatchStream)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is for ContinuousStream which will be added later. Please refer to the doc for more details.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. A Stream (a stream of events) and Source typically imply different things. In this case the SparkDataStream looks to be more of a source specific thing than a stream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of just being a marker interface, could these be a mixin interface for the Scan (that also defines the respective methods)? like:
public interface SupportsBatchRead extends Scan {
Batch toBatch();
}
and so on. And if a Table supports Read one could query its Scan object to figure out the type if required.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to know the capability at the table level. It's too late to do it at the scan level, as creating a scan may be expensive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only thing is it doesn't enforce anything. A method like Table.supportedTypes() might also work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@arunmahadevan, that's similar to the capabilities that we plan to add. Spark will query specific capabilities for a table to make determinations like this to cut down on the number of empty interfaces.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is moved to a new file: https://github.com/apache/spark/pull/23430/files#diff-c223080834806efd92efc860656b60cdR34
|
Test build #100699 has finished for PR 23430 at commit
|
|
Test build #100724 has finished for PR 23430 at commit
|
|
retest this please |
|
Test build #100729 has finished for PR 23430 at commit
|
|
Test build #100867 has finished for PR 23430 at commit
|
|
retest this please |
|
Test build #100879 has finished for PR 23430 at commit
|
|
retest this please |
|
Test build #100890 has finished for PR 23430 at commit
|
0866233 to
5a4047e
Compare
|
Test build #101389 has finished for PR 23430 at commit
|
jose-torres
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Note that I haven't really thought about the naming, just tried to confirm that it follows the community consensus in the doc.
| StreamingExecutionRelation(readSupport, output)(sparkSession) | ||
| logInfo(s"Reading table [$table] from DataSourceV2 named '$dsName' [$ds]") | ||
| val dsOptions = new DataSourceOptions(options.asJava) | ||
| // TODO: operator pushdown. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this an urgent TODO, or does the existing v2 interface already not handle pushdown. (I probably should know this, but it's been a while since the original implementation.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not urgent, as no builtin streaming source supports pushdown yet. The ds v2 API can handle pushdown, and our batch sources do support it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please open JIRAs for all these todo items. Thanks!
|
retest this please |
|
Test build #101489 has finished for PR 23430 at commit
|
|
LGTM Thanks! Merged to master. |
## What changes were proposed in this pull request? Following apache#23086, this PR does the API refactor for micro-batch read, w.r.t. the [doc](https://docs.google.com/document/d/1uUmKCpWLdh9vHxP7AWJ9EgbwB_U6T3EJYNjhISGmiQg/edit?usp=sharing) The major changes: 1. rename `XXXMicroBatchReadSupport` to `XXXMicroBatchReadStream` 2. implement `TableProvider`, `Table`, `ScanBuilder` and `Scan` for streaming sources 3. at the beginning of micro-batch streaming execution, convert `StreamingRelationV2` to `StreamingDataSourceV2Relation` directly, instead of `StreamingExecutionRelation`. followup: support operator pushdown for stream sources ## How was this patch tested? existing tests Closes apache#23430 from cloud-fan/micro-batch. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: gatorsmile <[email protected]>
## What changes were proposed in this pull request? Following apache#23430, this PR does the API refactor for continuous read, w.r.t. the [doc](https://docs.google.com/document/d/1uUmKCpWLdh9vHxP7AWJ9EgbwB_U6T3EJYNjhISGmiQg/edit?usp=sharing) The major changes: 1. rename `XXXContinuousReadSupport` to `XXXContinuousStream` 2. at the beginning of continuous streaming execution, convert `StreamingRelationV2` to `StreamingDataSourceV2Relation` directly, instead of `StreamingExecutionRelation`. 3. remove all the hacks as we have finished all the read side API refactor ## How was this patch tested? existing tests Closes apache#23619 from cloud-fan/continuous. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: gatorsmile <[email protected]>
What changes were proposed in this pull request?
Following #23086, this PR does the API refactor for micro-batch read, w.r.t. the doc
The major changes:
XXXMicroBatchReadSupporttoXXXMicroBatchStreamTableProvider,Table,ScanBuilderandScanfor streaming sourcesStreamingRelationV2toStreamingDataSourceV2Relationdirectly, instead ofStreamingExecutionRelation.followup:
support operator pushdown for stream sources
How was this patch tested?
existing tests