Skip to content

Conversation

@dengziming
Copy link
Member

@dengziming dengziming commented Nov 14, 2022

What changes were proposed in this pull request?

This PR supports local data for LocalRelation, we decided to use Arrow IPC batches format to transfer data. the schema is embedded in the binary records so we can remove the attributes field from LocalRelation

Why are the changes needed?

It's necessary to have local data to do unit test and validation.

Does this PR introduce any user-facing change?

No

How was this patch tested?

unit test.

@amaliujia
Copy link
Contributor

Question: can we re-use the ARROW collection we have done here?

cc @zhengruifeng

@amaliujia
Copy link
Contributor

amaliujia commented Nov 14, 2022

also cc @hvanhovell @amaliujia

@grundprinzip
Copy link
Contributor

grundprinzip commented Nov 14, 2022

Thanks for the contribution! The overall approach seems good.

The original idea was for the local data to be sent as Arrow IPC batches as well as it follows the same direction as on the return path.

In addition, we have the benefit that the Arrow IPC message actually have a schema embedded so that we can do nice validation on the receive path.

It would be great to figure out if we can get an Python e2e test in there as well just to make sure we cover the whole scenario. The easiest way might be to convert the Pandas DF into Arrow and then serialize this to the server.

https://arrow.apache.org/docs/python/pandas.html

@zhengruifeng
Copy link
Contributor

@dengziming thanks for the contributions!

I think we'd better apply Arrow batch instead of structs in this proto message.

you may refer to #38468 on how to update the proto message, and the implementation of fromBatchIterator on how to convert arrow batches into internal rows;

@dengziming
Copy link
Member Author

Thank you all for you reviews @zhengruifeng @amaliujia @grundprinzip , there may be some delay since I need some time to get familiar with Arrow.🤝

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@amaliujia
Copy link
Contributor

@dengziming thanks!

BTW you can try to covert this PR to draft then re-open when you think it is ready for review again.

@dengziming dengziming marked this pull request as draft November 16, 2022 02:15
@dengziming dengziming marked this pull request as ready for review November 16, 2022 06:17
@dengziming
Copy link
Member Author

I used the arrow format without schema here since we already defined attributes in LocalRelation, WDYT? @amaliujia @zhengruifeng @grundprinzip

Copy link
Contributor

@grundprinzip grundprinzip left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very much looking forward to this change!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this needs to be repeated bytes here because bytes itself is binary data of "arbitrary" length.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, I use repeated bytes in case that the batch size is lager than maxRecordsPerBatch, I think is enough to use bytes here since LocalRelation is mostly used in debugging cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If data is a regular byte array, it becomes a ByteString that you can simply extract here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

literals -> data?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about removing this field and also read the schema from the arrow batch? @grundprinzip

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For short term it should work. We probably only need name and type to ask server side construct such attributes for the local relation.

For longer term I am not sure. Depending on if there are other extra information that LocalRelation needs from such attributes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

each arrow_batch in collect starts with the schema, it will be consistent if we also do this in createDataFrame

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure I am not against to use arrow schema for now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find we lack a fromBatchWithSchemaIterator method correspond to toBatchWithSchemaIterator, so I will implement one.

@zhengruifeng
Copy link
Contributor

you may reformat the scala code by
./build/mvn -Pscala-2.12 scalafmt:format -Dscalafmt.skip=fase -Dscalafmt.validateOnly=false -Dscalafmt.changedOnly=false -pl connector/connect

@amaliujia
Copy link
Contributor

You can also run the scala lint locally ./dev/lint-scala

@dengziming
Copy link
Member Author

I resolved the comments and move schema to the arrow batch, there are still some TODOs left behind which I will fix after we all agree this plan. @amaliujia @grundprinzip @zhengruifeng

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep these newlines. I think Scala linter would complain about this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, I have reverted these changes.

Copy link
Contributor

@grundprinzip grundprinzip left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Conceptually the approach looks good. I think we need to improve the testing and fix some of the unnecessary format changes. The connect module uses auto formatting so it should be really easy.

One thing I've seen is that in the from batch with schema iterator approach we don't check the schema integrity over batch boundaries. This might be ok in this case.

In addition im thinking if it's not better to just use the from buffer with schema method because it has the invariant of having exactly one schema and the message type only has one buffer anyways.

Otherwise im good with the approach. Looks very promising.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why these changes?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those are made by IDE format plugin, I have reverted them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test here is kind of bare bones. Before we fully approve the PR we need to extend the test coverage a bit.

@dengziming dengziming force-pushed the SPARK-41114 branch 2 times, most recently from 5aa4bad to 8e49fd1 Compare November 21, 2022 16:22
Copy link
Member Author

@dengziming dengziming left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your reviews @grundprinzip , I have fixed most of them, I also changed the test case to be more convincing and added 2 other cases about empty data and illegal data.
Do you think we need more test case here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, I have reverted these changes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those are made by IDE format plugin, I have reverted them.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Seq())
Seq.empty)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is too much to have it as a common util at the core module. It's only used twice ..

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for late reviews. Can we dedup the logic like ArrowBatchWithSchemaIterator is doing?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use the same protobuf message you added, @zhengruifeng ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want to do this the other way around right? The row count in the current arrowbatch message is not needed, that information is already encoded inside the Arrow IPC stream.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine to remove the row count, it was supported in collect just because it was in the initial proto message

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the information is already in ARROW IPC stream, +1 to remove row count.

I don't also think row count was used properly in the initial implementation (e.g. probably was not used in the CSV version).

Copy link
Contributor

@grundprinzip grundprinzip left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally I'm happy with the PR. I just have minor nits on comments etc. I will approve, but we will need @hvanhovell and / or @cloud-fan to approve as well.

@dengziming
Copy link
Member Author

Thank you @grundprinzip for your review, I fixed the comments and let's wait for @hvanhovell and @cloud-fan. 🤝

Copy link
Contributor

@amaliujia amaliujia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Thanks!

@HyukjinKwon
Copy link
Member

Merged to master.

@HyukjinKwon
Copy link
Member

@dengziming this is really awesome. Thanks for addressing all comments, and landing this feature. Since you implemented this, are you also interested in supporting spark.createDataFrame(panadsDF) case too? Pandas is arguably super common than just plain spark.createDataFrame(others). Wouldn't be super complicated to implement.

@dengziming
Copy link
Member Author

@HyukjinKwon Thank you, I'm glad to have a try, but I'm new to python, it will take me some time to get familiar with it.

@grundprinzip
Copy link
Contributor

@dengziming If you don't mind, I would create a quick PR that allows reading the data from a Pandas DF because thats very quick and helps us to get to a useful state quickest.

If you're still interested in doing the Python side work, maybe if you can have a look at the createDataFrame without Pandas based on Schema and Rows.

@dengziming
Copy link
Member Author

@grundprinzip Thank you, I would like to review your code.

@grundprinzip
Copy link
Contributor

@dengziming please have a look at #38803

beliefer pushed a commit to beliefer/spark that referenced this pull request Dec 15, 2022
### What changes were proposed in this pull request?
This PR supports local data for LocalRelation, we decided to use Arrow IPC batches format to transfer data. the schema is embedded in the binary records so we can remove the `attributes` field from `LocalRelation`

### Why are the changes needed?
It's necessary to have local data to do unit test and validation.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
unit test.

Closes apache#38659 from dengziming/SPARK-41114.

Authored-by: dengziming <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
beliefer pushed a commit to beliefer/spark that referenced this pull request Dec 18, 2022
### What changes were proposed in this pull request?
This PR supports local data for LocalRelation, we decided to use Arrow IPC batches format to transfer data. the schema is embedded in the binary records so we can remove the `attributes` field from `LocalRelation`

### Why are the changes needed?
It's necessary to have local data to do unit test and validation.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
unit test.

Closes apache#38659 from dengziming/SPARK-41114.

Authored-by: dengziming <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants