-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-39375][CONNECT][FOLLOW-UP] Refactor Read to UnresolvedRelation #38193
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-39375][CONNECT][FOLLOW-UP] Refactor Read to UnresolvedRelation #38193
Conversation
|
R: @cloud-fan |
| RelationCommon common = 1; | ||
| oneof rel_type { | ||
| Read read = 2; | ||
| UnresolvedRelation unresolved_relation = 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Per the review suggestions on #38086, we plan to decouple unresolved relation and data source (e.g. in different messages). cc @cloud-fan
Data source needs its own plan with its own fields. In that case current Read will only be unresolved relation.
For Spark I don't think Relation and DataSource are ever being the same concept. Relation has an identifier and analyzer needs to resolve it. Data source has format, path, schema and other configurations. IIRC relation and data source do not share same fields.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found refactoring in #38086 makes that PR too complicated so I split this part out as a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't agree with the change. a read is a sink the same way you use it in the from statement.
What's wrong with modeling the different read types below Read?
Conceptually, the hierarchical modeling of Read -> Table and Read-> data source makes a lot of sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left a comment in #38193 (comment) but let me also reply here.
Yes, we want the query plan proto definition to follow the basics for the relational algebra / SQL language. However, data source reading is not really SQL, but more of "configuration". To read a data source in SQL, we need to do
CRETE TABLE t USING my_source OPTIONS (...)
SELECT * FROM t
We must register the data source as a table and then read it. We can't read a data source directly in SQL.
DF API provides a way to read a data source directly: spark.read.format("my_source").option(...).load(). This means, we can't say a read is a sink the same way you use it in the from statement. Reading a data source directly needs a dedicated query plan.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure I follow why we need to bifurcate into different types of reads in the proto. The whole point of using parsed plans is to have a high level (non-ambiguous) description of the operation, not an implementation, the latter is for Spark to figure out. We just need to describe whatever it takes what is needed to read the data. If it is a table scan then it is easy, you only need a name, otherwise you may have to add a bit more information.
BTW nitpicking on naming. Operations should almost always have a verb as its name (scan, project, filter, aggregate, generate, ...) if it does not, you probably should rethink the name. UNRESOLVED RELATION is the thing you want to scan, and not the operation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hvanhovell , yes, we are talking about how to describe the scan operation. IIUC there are two options on the table:
- Have a single
Readoperation, with many fields:tableName,userSpecifiedSchema,dataSourceName,options. If we are scanning a table, onlytableNameis set, if we are scanning a data source, all fields excepttableNameare set. - Have a
UnresolvedRelationandUnresolvedDataSource(we can pick better names, this is just to match catalyst).UnresolvedRelationonly have atableNamefield,UnresolvedDataSourcehas many fields:userSpecifiedSchema,dataSourceName,options.
I'm not familiar wth the protobuf design philosophy. Does it prefer option 1 or 2?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we're getting closer to it. Yes, proto has proper Union type support. So what the Read relation should look like is the following:
message Read {
oneof read_type {
UnresolvedRelation unresolved_relation = 1;
UnresolvedDataSource unresolved_data_source =2;
}
message UnresolvedRelation {
}
message UnresolvedDataSource {
}
}
The generated code has the right behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking something similar:
It is a wrong pattern that a message contains many fields while a subset of the fields that belong to one type and others that belong to different types, which is option 1 that Wenchen mentioned.
It's good that we keep all relevant fields into one message. If there is more than one message but we want to co-locate it then there is a type to differentiate those, which is Wenchen's option 2 and Martin's example, if I am not mis-read it.
I think we are sort of having the consensus on the rule of when to split messages.
However the rule of when to co-locate messages here is still not clear on, for example, why we don't choose to define
message ExecuteWithSingleInput {
oneof node_type {
Project,
Filter
}
message Project {
}
message Filter {
}
}
Basically the rule to split messages is clear, but the rule to co-locate message is not. We can co-locate Project and Filter as they both have single relation input. But we have chosen not to in the proto. The naming in my example is bad and I am pretty we can have a better verb to describe it.
To me the sharing semantic of a unresolved_relation and data_source is even less than project and filter. In Apache Calcite, it can merge project and filter and call it a Calc
|
I think this change is unnecessary. |
| string query = 1; | ||
| } | ||
|
|
||
| // Relation that reads from a file / table or other data source. Does not have additional |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@grundprinzip It's hard to unify this. Spark has 2 different code paths to resolve a table via name, and resolve a data source via format name, options, etc. This is kind of merging two query plans into one when they have no common fields. It's better to split.
| // inputs. | ||
| message Read { | ||
| oneof read_type { | ||
| NamedTable named_table = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan, please see here, the Read had already had a union type and this just needs to be extended with an additional DataSource type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, yea we can add a new DataSource type here, in addition to the NamedTable. This seems like we give them a parent class Read. Any benefits of doing so compared to just flattening it? Please bear with me as I'm not very familiar with protobuf...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The benefits of not flattening is about intent. We're defining an intent for "Read"ing data using different alternatives. To a certain degree it's kind of like polymorphism :)
Personally, I prefer the hierarchy because it makes it clear how this should be consumed from an API perspective.
|
Can one of the admins verify this patch? |
What changes were proposed in this pull request?
Refactor proto from
ReadtoUnresolvedRelation.Why are the changes needed?
UnresolvedRelation has its own proto. In the future we will add a proto for DataSource. We can decouple these two plans in connect.
Does this PR introduce any user-facing change?
No
How was this patch tested?
UT