-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-40539][CONNECT] Initial DataFrame Read API parity for Spark Connect #38086
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeRef | |
| import org.apache.spark.sql.catalyst.parser.CatalystSqlParser | ||
| import org.apache.spark.sql.catalyst.plans.{logical, FullOuter, Inner, JoinType, LeftAnti, LeftOuter, LeftSemi, RightOuter} | ||
| import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Sample, SubqueryAlias} | ||
| import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap | ||
| import org.apache.spark.sql.types._ | ||
|
|
||
| final case class InvalidPlanInput( | ||
|
|
@@ -112,7 +113,19 @@ class SparkConnectPlanner(plan: proto.Relation, session: SparkSession) { | |
| } else { | ||
| child | ||
| } | ||
| case _ => throw InvalidPlanInput() | ||
| case proto.Read.ReadTypeCase.DATA_SOURCE => | ||
| if (rel.getDataSource.getFormat == "") { | ||
| throw InvalidPlanInput("DataSource requires a format") | ||
| } | ||
| val localMap = CaseInsensitiveMap[String](rel.getDataSource.getOptionsMap.asScala.toMap) | ||
| val reader = session.read | ||
| reader.format(rel.getDataSource.getFormat) | ||
| localMap.foreach { case (key, value) => reader.option(key, value) } | ||
| if (rel.getDataSource.getSchema != null) { | ||
| reader.schema(rel.getDataSource.getSchema) | ||
| } | ||
| reader.load().queryExecution.analyzed | ||
|
||
| case _ => throw InvalidPlanInput("Does not support " + rel.getReadTypeCase.name()) | ||
| } | ||
| baseRelation | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,6 +23,7 @@ | |
| Union, | ||
| cast, | ||
| TYPE_CHECKING, | ||
| Mapping, | ||
| ) | ||
|
|
||
| import pyspark.sql.connect.proto as proto | ||
|
|
@@ -111,6 +112,46 @@ def _child_repr_(self) -> str: | |
| return self._child._repr_html_() if self._child is not None else "" | ||
|
|
||
|
|
||
| class DataSource(LogicalPlan): | ||
amaliujia marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| """A datasource with a format and optional a schema from which Spark reads data""" | ||
|
|
||
| def __init__( | ||
| self, | ||
| format: str = "", | ||
| schema: Optional[str] = None, | ||
| options: Optional[Mapping[str, str]] = None, | ||
| ) -> None: | ||
| super().__init__(None) | ||
| self.format = format | ||
| self.schema = schema | ||
| self.options = options | ||
|
|
||
| def plan(self, session: Optional["RemoteSparkSession"]) -> proto.Relation: | ||
| plan = proto.Relation() | ||
| if self.format is not None: | ||
| plan.read.data_source.format = self.format | ||
| if self.schema is not None: | ||
| plan.read.data_source.schema = self.schema | ||
| if self.options is not None: | ||
| for k in self.options.keys(): | ||
| v = self.options.get(k) | ||
| if v is not None: | ||
| plan.read.data_source.options[k] = v | ||
| return plan | ||
|
|
||
| def _repr_html_(self) -> str: | ||
| return f""" | ||
| <ul> | ||
| <li> | ||
| <b>DataSource</b><br /> | ||
| format: {self.format} | ||
| schema: {self.schema} | ||
| options: {self.options} | ||
| </li> | ||
| </ul> | ||
| """ | ||
|
|
||
|
|
||
| class Read(LogicalPlan): | ||
| def __init__(self, table_name: str) -> None: | ||
| super().__init__(None) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason is that the resolution of the data source is happening on the server side and depends on which DS classes are available in the classpath.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe these formats are called
built-informat and we can trust that Spark will always support those by default.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
jdbcis also a built-in format. I think it's OK to just give some examples here.Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really like how Apache Beam document their proto and I want to match it in connect once the proto becomes stable: https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto
So this part will be revised and expanded anyway (e.g. include the full list, document case sensitivity, document applicable options for each format if there is any, etc.)