Skip to content

Conversation

@amaliujia
Copy link
Contributor

@amaliujia amaliujia commented Oct 3, 2022

What changes were proposed in this pull request?

Add initial Read API for Spark Connect that allows setting schema, format, option and path, and then to read files into DataFrame.

Why are the changes needed?

PySpark readwriter API parity for Spark Connect

Does this PR introduce any user-facing change?

No

How was this patch tested?

UT

@amaliujia
Copy link
Contributor Author

@amaliujia amaliujia changed the title [SPARK-40539][CONNECT] PySpark readwriter API parity for Spark Connect [SPARK-40539][CONNECT] Initial DataFrame Read API parity for Spark Connect Oct 3, 2022
Copy link
Contributor

@grundprinzip grundprinzip left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left some suggestions on some of the pieces. Nothing major.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

path should end up with options.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

format is required

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe simply map<string, string>?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh yes.. I forget that proto supports MAP. Will update this.

@amaliujia
Copy link
Contributor Author

@HyukjinKwon @zhengruifeng @cloud-fan

This PR finally has caught up after blockers were solved. It is ready for another review now.

Copy link
Contributor

@grundprinzip grundprinzip left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please have a look at my main comment in SparkConnectPlanner with regard to calling load() during the planning.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Required. Supported formats include: parquet, orc, text, json, parquet, csv, avro.
// Required. Supported formats may include: parquet, orc, text, json, parquet, csv, avro.

The reason is that the resolution of the data source is happening on the server side and depends on which DS classes are available in the classpath.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe these formats are called built-in format and we can trust that Spark will always support those by default.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

jdbc is also a built-in format. I think it's OK to just give some examples here.

Copy link
Contributor Author

@amaliujia amaliujia Oct 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like how Apache Beam document their proto and I want to match it in connect once the proto becomes stable: https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto

So this part will be revised and expanded anyway (e.g. include the full list, document case sensitivity, document applicable options for each format if there is any, etc.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So my question here is, if the execution has already happened or not. What I mean is, is load() a blocking operation or a logical one? If you call load() and then return the analyzed plan. What happens if you call collect on this plan? Does the load happen again?

Can verify this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

load() just builds the logical plan. it's not an action.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks I was able to verify this for myself as well using some experiments.

@amaliujia amaliujia force-pushed the SPARK-40539 branch 2 times, most recently from 81bbfc3 to cb1395f Compare October 18, 2022 18:27
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume these APIs are just copied from pyspark.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes these are to match DataFrame API.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it generated by proto?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not. This class is more or less the same as the DSL that we introduced for Scala: the core idea is to provide a way for toProto.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 01c7a46 Oct 20, 2022
Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

late LGTM2

SandishKumarHN pushed a commit to SandishKumarHN/spark that referenced this pull request Dec 12, 2022
…nnect

### What changes were proposed in this pull request?

Add initial Read API for Spark Connect that allows setting schema, format, option and path, and then to read files into DataFrame.

### Why are the changes needed?

PySpark readwriter API parity for Spark Connect

### Does this PR introduce _any_ user-facing change?

No
### How was this patch tested?

UT

Closes apache#38086 from amaliujia/SPARK-40539.

Authored-by: Rui Wang <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants