-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-53525][CONNECT][FOLLOWUP] Spark Connect ArrowBatch Result Chunking - Scala Client #52496
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
ea236b1 to
0648956
Compare
vicennial
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @hvanhovell
| throw new IllegalStateException( | ||
| s"Expected arrow batch to start at row offset $numRecords in results, " + | ||
| s"but received arrow batch starting at offset $expectedStartOffset.") | ||
| s"Expected chunk index ${arrowBatchChunksToAssemble.size} of the " + | ||
| s"arrow batch but got ${arrowBatch.getChunkIndex}.") | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since these are user facing exceptions, should we be using a structured error state/code here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't introduce new error classes, and the IllegalStateException exceptions were already in SparkResults. Yeah, switching to an structured error class can be a followup.
| .addAllTags(tags.get.toSeq.asJava) | ||
|
|
||
| // Add request option to allow result chunking. | ||
| val chunkingOptionsBuilder = proto.ResultChunkingOptions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to set this if the chunking is disabled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, it is not needed. I've updated the logic, so we only set chunking option when configuration.allowArrowBatchChunking is enabled.
| private[this] var arrowSchema: pojo.Schema = _ | ||
| private[this] var nextResultIndex: Int = 0 | ||
| private val resultMap = mutable.Map.empty[Int, (Long, Seq[ArrowMessage])] | ||
| private val arrowBatchChunksToAssemble = mutable.Buffer.empty[ByteString] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be local to processReponses? AFAICT it should not return unless we have a complete arrow batch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, it's now updated to be local to processResponses.
…-scala-client # Conflicts: # sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/ClientE2ETestSuite.scala
|
Merging to master/4.1. Thanks! |
…king - Scala Client ### What changes were proposed in this pull request? In the previous PR #52271 of Spark Connect ArrowBatch Result Chunking, both Server-side and PySpark client changes were implemented. In this PR, the corresponding Scala client changes are implemented, so large Arrow rows are now supported on the Scala client as well. To reproduce the existing issue we are solving here, run this code on Spark Connect Scala client: ``` val res = spark.sql("select repeat('a', 1024*1024*300)").collect() println(res(0).getString(0).length) ``` It fails with `RESOURCE_EXHAUSTED` error with message `gRPC message exceeds maximum size 134217728: 314573320`, because the server is trying to send an ExecutePlanResponse of ~300MB to the client. With the improvement introduced by the PR, the above code runs successfully and prints the expected result. ### Why are the changes needed? It improves Spark Connect stability when returning large rows. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #52496 from xi-db/arrow-batch-chuking-scala-client. Authored-by: Xi Lyu <[email protected]> Signed-off-by: Herman van Hovell <[email protected]> (cherry picked from commit daa83fc) Signed-off-by: Herman van Hovell <[email protected]>
|
@hvanhovell @xi-db, unfortunately, the daily maven test starts to fail after this patch after a closer look, I think this should be a test-only issue related to maven classpath and won't cause problems on real deployment for reference, there was a similar issue #41622 but I'm afraid the solution is not applicable for this PR. also cc @LuciferYang @dongjoon-hyun |
@xi-db Do you have time to fix this problem? |
|
Thank you for pinging me, @pan3793 and @LuciferYang . |
|
To @xi-db and @hvanhovell . I agree with @pan3793 's analysis that it will be only class path issues due to the difference between Maven and SBT. However, I hope this patch didn't hide any other regression for last 2 days. Inevitably, let me reverted this follow-up commit from To @pan3793 and @LuciferYang , I re-triggered |
|
Is there any progress on this? If it's difficult to fix, can we ignore these two cases for now? |
|
Hi @LuciferYang , I'm now looking into the |
|
@xi-db, this can be reproduced by |
|
Hi @pan3793 @LuciferYang , I opened a PR to fix the issue: #52941. I reproduced the issue with the commands you shared and now the Maven tests succeed with the fix. PTAL, thanks! |
|
Thank you for the follow-up, @xi-db and all. |
…ct testing ### What changes were proposed in this pull request? In this PR #52496, tests were implemented using `io.grpc.ClientInterceptor` to verify gRPC messages. However, it failed the Maven tests ([comment](#52496 (comment))) because the related gRPC classes are missing in the testing SparkConnectService in Maven tests. In this PR, gRPC classes for testing purposes are added as artifacts like other existing classes from `scalatest` and `spark-catalyst` to also allow io.grpc classes in tests. ### Why are the changes needed? To fix the broken daily Maven tests ([comment](#52496 (comment))). ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Maven tests with following commands passed. ``` $ build/mvn -Phive clean install -DskipTests $ build/mvn -Phive -pl sql/connect/client/jvm test -Dtest=none -DwildcardSuites=org.apache.spark.sql.connect.ClientE2ETestSuite ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #52941 from xi-db/arrow-batch-chunking-scala-client-fix-maven. Authored-by: Xi Lyu <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
…king - Scala Client ### What changes were proposed in this pull request? In the previous PR apache#52271 of Spark Connect ArrowBatch Result Chunking, both Server-side and PySpark client changes were implemented. In this PR, the corresponding Scala client changes are implemented, so large Arrow rows are now supported on the Scala client as well. To reproduce the existing issue we are solving here, run this code on Spark Connect Scala client: ``` val res = spark.sql("select repeat('a', 1024*1024*300)").collect() println(res(0).getString(0).length) ``` It fails with `RESOURCE_EXHAUSTED` error with message `gRPC message exceeds maximum size 134217728: 314573320`, because the server is trying to send an ExecutePlanResponse of ~300MB to the client. With the improvement introduced by the PR, the above code runs successfully and prints the expected result. ### Why are the changes needed? It improves Spark Connect stability when returning large rows. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#52496 from xi-db/arrow-batch-chuking-scala-client. Authored-by: Xi Lyu <[email protected]> Signed-off-by: Herman van Hovell <[email protected]> (cherry picked from commit daa83fc)
…ct testing ### What changes were proposed in this pull request? In this PR apache#52496, tests were implemented using `io.grpc.ClientInterceptor` to verify gRPC messages. However, it failed the Maven tests ([comment](apache#52496 (comment))) because the related gRPC classes are missing in the testing SparkConnectService in Maven tests. In this PR, gRPC classes for testing purposes are added as artifacts like other existing classes from `scalatest` and `spark-catalyst` to also allow io.grpc classes in tests. ### Why are the changes needed? To fix the broken daily Maven tests ([comment](apache#52496 (comment))). ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Maven tests with following commands passed. ``` $ build/mvn -Phive clean install -DskipTests $ build/mvn -Phive -pl sql/connect/client/jvm test -Dtest=none -DwildcardSuites=org.apache.spark.sql.connect.ClientE2ETestSuite ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#52941 from xi-db/arrow-batch-chunking-scala-client-fix-maven. Authored-by: Xi Lyu <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit 1f7bbeb)
… Chunking - Scala Client ### What changes were proposed in this pull request? (This PR is a backporting PR containing #52496 and the test fix #52941.) In the previous PR #52271 of Spark Connect ArrowBatch Result Chunking, both Server-side and PySpark client changes were implemented. In this PR, the corresponding Scala client changes are implemented, so large Arrow rows are now supported on the Scala client as well. To reproduce the existing issue we are solving here, run this code on Spark Connect Scala client: ``` val res = spark.sql("select repeat('a', 1024*1024*300)").collect() println(res(0).getString(0).length) ``` It fails with `RESOURCE_EXHAUSTED` error with message `gRPC message exceeds maximum size 134217728: 314573320`, because the server is trying to send an ExecutePlanResponse of ~300MB to the client. With the improvement introduced by the PR, the above code runs successfully and prints the expected result. ### Why are the changes needed? It improves Spark Connect stability when returning large rows. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #52953 from xi-db/[email protected]. Authored-by: Xi Lyu <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
…ct testing ### What changes were proposed in this pull request? In this PR apache#52496, tests were implemented using `io.grpc.ClientInterceptor` to verify gRPC messages. However, it failed the Maven tests ([comment](apache#52496 (comment))) because the related gRPC classes are missing in the testing SparkConnectService in Maven tests. In this PR, gRPC classes for testing purposes are added as artifacts like other existing classes from `scalatest` and `spark-catalyst` to also allow io.grpc classes in tests. ### Why are the changes needed? To fix the broken daily Maven tests ([comment](apache#52496 (comment))). ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Maven tests with following commands passed. ``` $ build/mvn -Phive clean install -DskipTests $ build/mvn -Phive -pl sql/connect/client/jvm test -Dtest=none -DwildcardSuites=org.apache.spark.sql.connect.ClientE2ETestSuite ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#52941 from xi-db/arrow-batch-chunking-scala-client-fix-maven. Authored-by: Xi Lyu <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
…king - Scala Client ### What changes were proposed in this pull request? In the previous PR apache#52271 of Spark Connect ArrowBatch Result Chunking, both Server-side and PySpark client changes were implemented. In this PR, the corresponding Scala client changes are implemented, so large Arrow rows are now supported on the Scala client as well. To reproduce the existing issue we are solving here, run this code on Spark Connect Scala client: ``` val res = spark.sql("select repeat('a', 1024*1024*300)").collect() println(res(0).getString(0).length) ``` It fails with `RESOURCE_EXHAUSTED` error with message `gRPC message exceeds maximum size 134217728: 314573320`, because the server is trying to send an ExecutePlanResponse of ~300MB to the client. With the improvement introduced by the PR, the above code runs successfully and prints the expected result. ### Why are the changes needed? It improves Spark Connect stability when returning large rows. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#52496 from xi-db/arrow-batch-chuking-scala-client. Authored-by: Xi Lyu <[email protected]> Signed-off-by: Herman van Hovell <[email protected]>
…ct testing ### What changes were proposed in this pull request? In this PR apache#52496, tests were implemented using `io.grpc.ClientInterceptor` to verify gRPC messages. However, it failed the Maven tests ([comment](apache#52496 (comment))) because the related gRPC classes are missing in the testing SparkConnectService in Maven tests. In this PR, gRPC classes for testing purposes are added as artifacts like other existing classes from `scalatest` and `spark-catalyst` to also allow io.grpc classes in tests. ### Why are the changes needed? To fix the broken daily Maven tests ([comment](apache#52496 (comment))). ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Maven tests with following commands passed. ``` $ build/mvn -Phive clean install -DskipTests $ build/mvn -Phive -pl sql/connect/client/jvm test -Dtest=none -DwildcardSuites=org.apache.spark.sql.connect.ClientE2ETestSuite ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#52941 from xi-db/arrow-batch-chunking-scala-client-fix-maven. Authored-by: Xi Lyu <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>

What changes were proposed in this pull request?
In the previous PR #52271 of Spark Connect ArrowBatch Result Chunking, both Server-side and PySpark client changes were implemented.
In this PR, the corresponding Scala client changes are implemented, so large Arrow rows are now supported on the Scala client as well.
To reproduce the existing issue we are solving here, run this code on Spark Connect Scala client:
It fails with
RESOURCE_EXHAUSTEDerror with messagegRPC message exceeds maximum size 134217728: 314573320, because the server is trying to send an ExecutePlanResponse of ~300MB to the client.With the improvement introduced by the PR, the above code runs successfully and prints the expected result.
Why are the changes needed?
It improves Spark Connect stability when returning large rows.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
New tests.
Was this patch authored or co-authored using generative AI tooling?
No.