-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-41005][CONNECT][PYTHON] Arrow-based collect #38468
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
402e11b to
ab3291a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Wouldn't it be better to do the heavy lifting on the executors? IMO it is better to convert to arrow directly.
Dataset.toArrowBatchRddseems to be a good start. - It would also be nice if we can avoid materializing the entire result on the driver. We should be able to forward the batches for a partition immediately when we receive them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1, agree, will update
2, i think we can use toLocalIterator, but it may trigger multi jobs #38300 (comment)
ab3291a to
5e7a339
Compare
python/pyspark/sql/connect/client.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make this change , otherwise some tests will fail
those tests only generate single json batch, so worked with json
.../connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
Outdated
Show resolved
Hide resolved
.../connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
Outdated
Show resolved
Hide resolved
.../connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
Outdated
Show resolved
Hide resolved
|
@zhengruifeng can you please add test cases for things like
|
c88ba94 to
930cd73
Compare
.../connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
Outdated
Show resolved
Hide resolved
.../connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
Outdated
Show resolved
Hide resolved
.../connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
Outdated
Show resolved
Hide resolved
b653013 to
6324203
Compare
.../connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
Outdated
Show resolved
Hide resolved
.../connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use the main thread for this? It is not entirely cheap to spin-up s thread pool, and you have the main thread twiddling its thumbs anyway. It also makes completing the stream simpler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can also use 2 fields in the proto: partition_id & batch_id
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use the main thread for this?
ok will follow #38468 (comment)
You can also use 2 fields in the proto: partition_id & batch_id
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand correctly, the TaskResultGetter runs in a separate thread pool, and there is no back pressure mechanism, the current approach still may cause memory pressure on the driver if the client consumes results slowly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like this comes with a bigger problem to define the client side incremental collect protocol first. Client side consumes data but does not consume them all at once. How client incremental collect? Then combined with that how server side control the rate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Things goes worse if the we decide to keep strict partition ordering.
Actually, Spark implemented IndirectTaskResult to support transfer task result from executor to driver through block manager, but currently it eagerly fetches and deserializes blocks to DirectTaskResult on task-result-getter thread pool. What if defer it to JobWaiter#taskSucceeded?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could do that yes. If we get the format right we could just serve the block directly to the client (to avoid deserialization).
Alternatively you could also write the data to some persistent storage on the executors, and just pass the file paths to the client. That will be the most efficient from the POV of the driver. This requires you have some garbage collection in place to clean-up results.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be clear, my suggestion is to defer the deserialization and block releasing to JobWaiter#taskSucceeded phase.
The second way you suggested has many performance benefits but requires the client to communicate with the external storage service, it brings other burdens for the client, e.g. result cleanup(you mentioned), s3/hdfs client libs is quite large, network policy, auth.
The IndirectTaskResult way leverages the existing code and spark build-in block mechanism to transfer data, we can benefit w/ a little code modification, and we don't need to worry about result cleanup.
238eb11 to
8ffb01f
Compare
.../connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should make the client responsible for ordering the results. This will be a burden for all clients.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, i will make sure the batches are sent in order from server side.
then we don't need partition_id and batch_id any more
|
Logic-wise, makes sense. |
.../connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
Outdated
Show resolved
Hide resolved
.../connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
Outdated
Show resolved
Hide resolved
|
Thanks for the most recent updates. I find it will help other problems. For example now we at least send one partition with schema even all partitions are empty. By doing so, clients won't need to worry |
.../connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a way we could track this as a spark metric for the query? Fine to do in a follow up if we create a jira
.../connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala
Outdated
Show resolved
Hide resolved
0375c45 to
e53d538
Compare
hvanhovell
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
| } | ||
|
|
||
| val signal = new Object | ||
| val partitions = collection.mutable.Map.empty[Int, Array[Batch]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we really need a map? We know the number of partitions and we can just create an array.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
then we don't need a lock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nvm, this is kind of async collect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just change from array to map ... see #38468 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably add some comments at the beginning to explain the overall workflow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we apply the same idea to JSON batches?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we apply the same idea to JSON batches?
I think so, let's optimize it later
| } | ||
| } | ||
|
|
||
| private[sql] def toArrowBatchIterator( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we simply change toBatchIterator to return row count as well? The perf overhead is very small but the maintenance overhead is much larger if we have 2 similar methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
toArrowBatchIterator also write schema before each record batch, while toBatchIterator just output record batch.
and I am going to update toArrowBatchIterator in a follow-up to control each batch size < 4MB as per the suggestions #38468 (comment) #38468 (comment)
I think we can deduplicate the codes then
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, we probably need a better naming, as it's hard to tell the difference between toBatchIterator and toArrowBatchIterator.
|
I would like to close the discussion on the ordered vs un-ordered result.
It does not seem like having this discussion without appropriate evidence to support these two modes is supporting the efficient evolution of this feature. Thank you. |
|
merged into master, will have a follow up PR to update |
| () | ||
| } | ||
|
|
||
| spark.sparkContext.runJob(batches, processPartition, resultHandler) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like it had to be (async) submitJob instead of (sync) runJob (#38468 (comment)). In fact, I figured out a simpler way to avoid synchronization. PTAL #38613
### What changes were proposed in this pull request? Control the max size of arrow batch ### Why are the changes needed? as per the suggestion #38468 (comment) ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? existing tests Closes #38612 from zhengruifeng/connect_arrow_batchsize. Authored-by: Ruifeng Zheng <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
|
Made another PR to refactor and deduplicate the Arrow codes PTAL: #38618 |
…rters codes ### What changes were proposed in this pull request? This PR is a followup of both #38468 and #38612 that proposes to deduplicate codes in `ArrowConverters` by creating two classes `ArrowBatchIterator` and `ArrowBatchWithSchemaIterator`. In addition, we reuse `ArrowBatchWithSchemaIterator` when creating an empty Arrow batch at `createEmptyArrowBatch`. While I am here, - I addressed my own comment at #38612 (comment) - Kept the support of both max rows and size. Max row size check was removed in #38612 ### Why are the changes needed? For better readability and maintenance. ### Does this PR introduce _any_ user-facing change? No, both codes are not related. ### How was this patch tested? This is refactoring so existing CI should cover. Closes #38618 from HyukjinKwon/SPARK-41108-followup-1. Authored-by: Hyukjin Kwon <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
### What changes were proposed in this pull request? implement arrow-based collect ### Why are the changes needed? more performant collect ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? set the default encoding to arrow, now all tests run with arrow Closes apache#38468 from zhengruifeng/connect_arrow_collect. Authored-by: Ruifeng Zheng <[email protected]> Signed-off-by: Ruifeng Zheng <[email protected]>
### What changes were proposed in this pull request? Control the max size of arrow batch ### Why are the changes needed? as per the suggestion apache#38468 (comment) ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? existing tests Closes apache#38612 from zhengruifeng/connect_arrow_batchsize. Authored-by: Ruifeng Zheng <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
…rters codes ### What changes were proposed in this pull request? This PR is a followup of both apache#38468 and apache#38612 that proposes to deduplicate codes in `ArrowConverters` by creating two classes `ArrowBatchIterator` and `ArrowBatchWithSchemaIterator`. In addition, we reuse `ArrowBatchWithSchemaIterator` when creating an empty Arrow batch at `createEmptyArrowBatch`. While I am here, - I addressed my own comment at apache#38612 (comment) - Kept the support of both max rows and size. Max row size check was removed in apache#38612 ### Why are the changes needed? For better readability and maintenance. ### Does this PR introduce _any_ user-facing change? No, both codes are not related. ### How was this patch tested? This is refactoring so existing CI should cover. Closes apache#38618 from HyukjinKwon/SPARK-41108-followup-1. Authored-by: Hyukjin Kwon <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
What changes were proposed in this pull request?
implement arrow-based collect
Why are the changes needed?
more performant collect
Does this PR introduce any user-facing change?
no
How was this patch tested?
set the default encoding to arrow, now all tests run with arrow