Skip to content

Conversation

@xi-db
Copy link
Contributor

@xi-db xi-db commented Sep 8, 2025

What changes were proposed in this pull request?

Currently, we enforce gRPC message limits on both the client and the server. These limits are largely meant to protect both sides from potential OOMs by rejecting abnormally large messages. However, there are cases in which the server incorrectly sends oversized messages that exceed these limits and cause execution failures.

Specifically, the large message issue from the server to the client we’re solving here, comes from the Arrow batch data in ExecutePlanResponse being too large. It’s caused by a single arrow row exceeding the 128MB message limit, and Arrow cannot partition further and it has to return the single large row in one gRPC message.

To improve Spark Connect stability, this PR implements chunking large Arrow batches when returning query results from the server to the client, ensuring each ExecutePlanResponse chunk remains within the size limit, and the chunks from a batch will be reassembled on the client when parsing as an arrow batch.

(Scala client changes are being implemented in a follow-up PR.)

To reproduce the existing issue we are solving here, run this code on Spark Connect:

repeat_num_per_mb = 1024 * 1024 // len('Apache Spark ')
res = spark.sql(f"select repeat('Apache Spark ', {repeat_num_per_mb * 300}) as huge_col from range(1)").collect()
print(len(res))

It fails with StatusCode.RESOURCE_EXHAUSTED error with message Received message larger than max (314570608 vs. 134217728), because the server is trying to send an ExecutePlanResponse of ~300MB to the client.

With the improvement introduced by the PR, the above code runs successfully and prints the expected result.

Why are the changes needed?

It improves Spark Connect stability when returning large rows.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

New tests on both the server side and the client side.

Was this patch authored or co-authored using generative AI tooling?

No.

@xi-db xi-db changed the title [SPARK-53525] Spark Connect ArrowBatch Result Chunking [SPARK-53525][CONNECT] Spark Connect ArrowBatch Result Chunking Sep 8, 2025
bool reattachable = 1;
}

message ResultChunkingOptions {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the client be able to set chunk size?

Copy link
Contributor

@hvanhovell hvanhovell Sep 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, can we name this ResultOptions? I can also see us setting a max arrow batch size here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I introduced a client-side option preferred_arrow_chunk_size (this commit).

.build()
response.setArrowBatch(batch)
responseObserver.onNext(response.build())
for (i <- 0 until numChunks) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't use for comprehensions... Either use an actual loop, or the functional equivalent this translates into.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, right, updated to an explicit loop with foreach.

val to = math.min(from + maxChunkSize, bytes.length)
val length = to - from

val response = proto.ExecutePlanResponse
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can reuse the builder.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, updated.

" While spark.connect.grpc.arrow.maxBatchSize determines the max size of a result batch," +
" maxChunkSize defines the max size of each individual chunk that is part of the batch" +
" that will be sent in a response. This allows the server to send large rows to clients." +
" However, excessively large plans remain unsupported due to Spark internals and JVM" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove these two lines. They are not related to the conf.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, removed.


// Execute plan.
@volatile var done = false
val responses = mutable.Buffer.empty[proto.ExecutePlanResponse]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If done need volatile, then this should be synchronized... Unless you are relying on some happens-before cleverness with the volatile variable (if so, then you need to document this)...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done here doesn't need volatile. I added volatile because other test cases have it. Removed as it is not needed in our case.

}

// Reassemble the chunks into a single Arrow batch and validate its content.
val batchData: ByteString =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you build this for scala, please use a Concatenating InputStream or something like that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's a good point, updated the test as well.

@hvanhovell
Copy link
Contributor

@xi-db are you adding scala support in a follow-up?

@xi-db
Copy link
Contributor Author

xi-db commented Sep 9, 2025

@xi-db are you adding scala support in a follow-up?

Yes, I'm implementing the scala support as a follow-up.

@xi-db xi-db force-pushed the arrow-batch-chunking branch from fb5689a to b68461f Compare September 10, 2025 12:09
sessionHolder.session.conf.get(CONNECT_SESSION_RESULT_CHUNKING_MAX_CHUNK_SIZE) > 0 &&
request.getRequestOptionsList.asScala.exists { option =>
option.hasResultChunkingOptions &&
option.getResultChunkingOptions.getAllowArrowBatchChunking == true
Copy link
Contributor

@heyihong heyihong Sep 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: option.getResultChunkingOptions.getAllowArrowBatchChunking should be sufficient, since the default value is false. option.getResultChunkingOptions will return a default message even if it is not set. In proto3, you won’t get a null pointer when accessing an unset field.

request.getRequestOptionsList.asScala.iterator.collectFirst {
case option
if option.hasResultChunkingOptions &&
option.getResultChunkingOptions.hasPreferredArrowChunkSize =>
Copy link
Contributor

@heyihong heyihong Sep 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: option.getResultChunkingOptions.hasPreferredArrowChunkSize should be sufficient

Copy link
Contributor

@hvanhovell hvanhovell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@hvanhovell
Copy link
Contributor

Merging to master. You can fix the NITS in a follow-up.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. Thank you, @xi-db and @hvanhovell .

dongjoon-hyun added a commit to apache/spark-connect-swift that referenced this pull request Oct 1, 2025
…th `4.1.0-preview2`

### What changes were proposed in this pull request?

This PR aims to update Spark Connect-generated Swift source code with Apache Spark `4.1.0-preview2`.

### Why are the changes needed?

There are many changes from Apache Spark 4.1.0.

- apache/spark#52342
- apache/spark#52256
- apache/spark#52271
- apache/spark#52242
- apache/spark#51473
- apache/spark#51653
- apache/spark#52072
- apache/spark#51561
- apache/spark#51563
- apache/spark#51489
- apache/spark#51507
- apache/spark#51462
- apache/spark#51464
- apache/spark#51442

To use the latest bug fixes and new messages to develop for new features of `4.1.0-preview2`.

```
$ git clone -b v4.1.0-preview2 https://github.com/apache/spark.git
$ cd spark/sql/connect/common/src/main/protobuf/
$ protoc --swift_out=. spark/connect/*.proto
$ protoc --grpc-swift_out=. spark/connect/*.proto

// Remove empty GRPC files
$ cd spark/connect

$ grep 'This file contained no services' *
catalog.grpc.swift:// This file contained no services.
commands.grpc.swift:// This file contained no services.
common.grpc.swift:// This file contained no services.
example_plugins.grpc.swift:// This file contained no services.
expressions.grpc.swift:// This file contained no services.
ml_common.grpc.swift:// This file contained no services.
ml.grpc.swift:// This file contained no services.
pipelines.grpc.swift:// This file contained no services.
relations.grpc.swift:// This file contained no services.
types.grpc.swift:// This file contained no services.

$ rm catalog.grpc.swift commands.grpc.swift common.grpc.swift example_plugins.grpc.swift expressions.grpc.swift ml_common.grpc.swift ml.grpc.swift pipelines.grpc.swift relations.grpc.swift types.grpc.swift
```

### Does this PR introduce _any_ user-facing change?

Pass the CIs.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #250 from dongjoon-hyun/SPARK-53777.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
asf-gitbox-commits pushed a commit that referenced this pull request Nov 3, 2025
…king - Scala Client

### What changes were proposed in this pull request?

In the previous PR #52271 of Spark Connect ArrowBatch Result Chunking, both Server-side and PySpark client changes were implemented.

In this PR, the corresponding Scala client changes are implemented, so large Arrow rows are now supported on the Scala client as well.

To reproduce the existing issue we are solving here, run this code on Spark Connect Scala client:

```
val res = spark.sql("select repeat('a', 1024*1024*300)").collect()
println(res(0).getString(0).length)
```

It fails with `RESOURCE_EXHAUSTED` error with message `gRPC message exceeds maximum size 134217728: 314573320`, because the server is trying to send an ExecutePlanResponse of ~300MB to the client.

With the improvement introduced by the PR, the above code runs successfully and prints the expected result.

### Why are the changes needed?

It improves Spark Connect stability when returning large rows.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New tests.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #52496 from xi-db/arrow-batch-chuking-scala-client.

Authored-by: Xi Lyu <[email protected]>
Signed-off-by: Herman van Hovell <[email protected]>
asf-gitbox-commits pushed a commit that referenced this pull request Nov 3, 2025
…king - Scala Client

### What changes were proposed in this pull request?

In the previous PR #52271 of Spark Connect ArrowBatch Result Chunking, both Server-side and PySpark client changes were implemented.

In this PR, the corresponding Scala client changes are implemented, so large Arrow rows are now supported on the Scala client as well.

To reproduce the existing issue we are solving here, run this code on Spark Connect Scala client:

```
val res = spark.sql("select repeat('a', 1024*1024*300)").collect()
println(res(0).getString(0).length)
```

It fails with `RESOURCE_EXHAUSTED` error with message `gRPC message exceeds maximum size 134217728: 314573320`, because the server is trying to send an ExecutePlanResponse of ~300MB to the client.

With the improvement introduced by the PR, the above code runs successfully and prints the expected result.

### Why are the changes needed?

It improves Spark Connect stability when returning large rows.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New tests.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #52496 from xi-db/arrow-batch-chuking-scala-client.

Authored-by: Xi Lyu <[email protected]>
Signed-off-by: Herman van Hovell <[email protected]>
(cherry picked from commit daa83fc)
Signed-off-by: Herman van Hovell <[email protected]>
xi-db added a commit to xi-db/spark that referenced this pull request Nov 8, 2025
…king - Scala Client

### What changes were proposed in this pull request?

In the previous PR apache#52271 of Spark Connect ArrowBatch Result Chunking, both Server-side and PySpark client changes were implemented.

In this PR, the corresponding Scala client changes are implemented, so large Arrow rows are now supported on the Scala client as well.

To reproduce the existing issue we are solving here, run this code on Spark Connect Scala client:

```
val res = spark.sql("select repeat('a', 1024*1024*300)").collect()
println(res(0).getString(0).length)
```

It fails with `RESOURCE_EXHAUSTED` error with message `gRPC message exceeds maximum size 134217728: 314573320`, because the server is trying to send an ExecutePlanResponse of ~300MB to the client.

With the improvement introduced by the PR, the above code runs successfully and prints the expected result.

### Why are the changes needed?

It improves Spark Connect stability when returning large rows.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New tests.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#52496 from xi-db/arrow-batch-chuking-scala-client.

Authored-by: Xi Lyu <[email protected]>
Signed-off-by: Herman van Hovell <[email protected]>
(cherry picked from commit daa83fc)
dongjoon-hyun pushed a commit that referenced this pull request Nov 8, 2025
… Chunking - Scala Client

### What changes were proposed in this pull request?

(This PR is a backporting PR containing #52496 and the test fix #52941.)

In the previous PR #52271 of Spark Connect ArrowBatch Result Chunking, both Server-side and PySpark client changes were implemented.

In this PR, the corresponding Scala client changes are implemented, so large Arrow rows are now supported on the Scala client as well.

To reproduce the existing issue we are solving here, run this code on Spark Connect Scala client:

```
val res = spark.sql("select repeat('a', 1024*1024*300)").collect()
println(res(0).getString(0).length)
```

It fails with `RESOURCE_EXHAUSTED` error with message `gRPC message exceeds maximum size 134217728: 314573320`, because the server is trying to send an ExecutePlanResponse of ~300MB to the client.

With the improvement introduced by the PR, the above code runs successfully and prints the expected result.

### Why are the changes needed?

It improves Spark Connect stability when returning large rows.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New tests.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #52953 from xi-db/[email protected].

Authored-by: Xi Lyu <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
asf-gitbox-commits pushed a commit that referenced this pull request Nov 11, 2025
### What changes were proposed in this pull request?

Currently, Spark Connect enforce gRPC message limits on both the client and the server. These limits are largely meant to protect the server from potential OOMs by rejecting abnormally large messages. However, there are several cases where genuine messages exceed the limit and cause execution failures.

To improve Spark Connect stability, this PR implements compressing unresolved proto plans to mitigate the issue of oversized messages from the client to the server. The compression applies to ExecutePlan and AnalyzePlan - the only two methods that might hit the message limit. The other issue of message limit from the server to the client is a different issue, and it’s out of the scope (that one is already fixed in #52271).

In the implementation,

* Zstandard is leveraged to compress proto plan as it has consistent high performance in our benchmark and achieves a good balance between compression ratio and performance.
* The config `spark.connect.maxPlanSize` is introduced to control the maximum size of a (decompressed) proto plan that can be executed in Spark Connect. It is mainly used to avoid decompression bomb attacks.

(Scala client changes are being implemented in a follow-up PR.)

To reproduce the existing issue we are solving here, run this code on Spark Connect:

```
import random
import string

def random_letters(length: int) -> str:
    return ''.join(random.choices(string.ascii_letters, k=length))

num_unique_small_relations = 5
size_per_small_relation = 512 * 1024
small_dfs = [spark.createDataFrame([(random_letters(size_per_small_relation),)],) for _ in range(num_unique_small_relations)]
result_df = small_dfs[0]
for _ in range(512):
    result_df = result_df.unionByName(small_dfs[random.randint(0, len(small_dfs) - 1)])
result_df.collect()
```

It fails with `StatusCode.RESOURCE_EXHAUSTED` error with message`Sent message larger than max (269178955 vs. 134217728)`, because the client was trying to send a too large message to the server.

Note: repeated small local relations is just one way causing a large plan, the size of the plan can also be contributed by repeated subtrees of plan transformations, serialized UDFs, captured external variables by UDFs, etc.

With the improvement introduced by the PR, the above code runs successfully and prints the expected result.

### Why are the changes needed?

It improves Spark Connect stability when executing and analyzing large plans.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New tests on both the server side and the client side.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #52894 from xi-db/plan-compression.

Authored-by: Xi Lyu <[email protected]>
Signed-off-by: Herman van Hovell <[email protected]>
asf-gitbox-commits pushed a commit that referenced this pull request Nov 11, 2025
### What changes were proposed in this pull request?

Currently, Spark Connect enforce gRPC message limits on both the client and the server. These limits are largely meant to protect the server from potential OOMs by rejecting abnormally large messages. However, there are several cases where genuine messages exceed the limit and cause execution failures.

To improve Spark Connect stability, this PR implements compressing unresolved proto plans to mitigate the issue of oversized messages from the client to the server. The compression applies to ExecutePlan and AnalyzePlan - the only two methods that might hit the message limit. The other issue of message limit from the server to the client is a different issue, and it’s out of the scope (that one is already fixed in #52271).

In the implementation,

* Zstandard is leveraged to compress proto plan as it has consistent high performance in our benchmark and achieves a good balance between compression ratio and performance.
* The config `spark.connect.maxPlanSize` is introduced to control the maximum size of a (decompressed) proto plan that can be executed in Spark Connect. It is mainly used to avoid decompression bomb attacks.

(Scala client changes are being implemented in a follow-up PR.)

To reproduce the existing issue we are solving here, run this code on Spark Connect:

```
import random
import string

def random_letters(length: int) -> str:
    return ''.join(random.choices(string.ascii_letters, k=length))

num_unique_small_relations = 5
size_per_small_relation = 512 * 1024
small_dfs = [spark.createDataFrame([(random_letters(size_per_small_relation),)],) for _ in range(num_unique_small_relations)]
result_df = small_dfs[0]
for _ in range(512):
    result_df = result_df.unionByName(small_dfs[random.randint(0, len(small_dfs) - 1)])
result_df.collect()
```

It fails with `StatusCode.RESOURCE_EXHAUSTED` error with message`Sent message larger than max (269178955 vs. 134217728)`, because the client was trying to send a too large message to the server.

Note: repeated small local relations is just one way causing a large plan, the size of the plan can also be contributed by repeated subtrees of plan transformations, serialized UDFs, captured external variables by UDFs, etc.

With the improvement introduced by the PR, the above code runs successfully and prints the expected result.

### Why are the changes needed?

It improves Spark Connect stability when executing and analyzing large plans.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New tests on both the server side and the client side.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #52894 from xi-db/plan-compression.

Authored-by: Xi Lyu <[email protected]>
Signed-off-by: Herman van Hovell <[email protected]>
(cherry picked from commit 0ccaacf)
Signed-off-by: Herman van Hovell <[email protected]>
zifeif2 pushed a commit to zifeif2/spark that referenced this pull request Nov 22, 2025
### What changes were proposed in this pull request?

Currently, Spark Connect enforce gRPC message limits on both the client and the server. These limits are largely meant to protect the server from potential OOMs by rejecting abnormally large messages. However, there are several cases where genuine messages exceed the limit and cause execution failures.

To improve Spark Connect stability, this PR implements compressing unresolved proto plans to mitigate the issue of oversized messages from the client to the server. The compression applies to ExecutePlan and AnalyzePlan - the only two methods that might hit the message limit. The other issue of message limit from the server to the client is a different issue, and it’s out of the scope (that one is already fixed in apache#52271).

In the implementation,

* Zstandard is leveraged to compress proto plan as it has consistent high performance in our benchmark and achieves a good balance between compression ratio and performance.
* The config `spark.connect.maxPlanSize` is introduced to control the maximum size of a (decompressed) proto plan that can be executed in Spark Connect. It is mainly used to avoid decompression bomb attacks.

(Scala client changes are being implemented in a follow-up PR.)

To reproduce the existing issue we are solving here, run this code on Spark Connect:

```
import random
import string

def random_letters(length: int) -> str:
    return ''.join(random.choices(string.ascii_letters, k=length))

num_unique_small_relations = 5
size_per_small_relation = 512 * 1024
small_dfs = [spark.createDataFrame([(random_letters(size_per_small_relation),)],) for _ in range(num_unique_small_relations)]
result_df = small_dfs[0]
for _ in range(512):
    result_df = result_df.unionByName(small_dfs[random.randint(0, len(small_dfs) - 1)])
result_df.collect()
```

It fails with `StatusCode.RESOURCE_EXHAUSTED` error with message`Sent message larger than max (269178955 vs. 134217728)`, because the client was trying to send a too large message to the server.

Note: repeated small local relations is just one way causing a large plan, the size of the plan can also be contributed by repeated subtrees of plan transformations, serialized UDFs, captured external variables by UDFs, etc.

With the improvement introduced by the PR, the above code runs successfully and prints the expected result.

### Why are the changes needed?

It improves Spark Connect stability when executing and analyzing large plans.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New tests on both the server side and the client side.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#52894 from xi-db/plan-compression.

Authored-by: Xi Lyu <[email protected]>
Signed-off-by: Herman van Hovell <[email protected]>
huangxiaopingRD pushed a commit to huangxiaopingRD/spark that referenced this pull request Nov 25, 2025
### What changes were proposed in this pull request?

Currently, we enforce gRPC message limits on both the client and the server. These limits are largely meant to protect both sides from potential OOMs by rejecting abnormally large messages. However, there are cases in which the server incorrectly sends oversized messages that exceed these limits and cause execution failures.

Specifically, the large message issue from the server to the client we’re solving here, comes from the Arrow batch data in ExecutePlanResponse being too large. It’s caused by a single arrow row exceeding the 128MB message limit, and Arrow cannot partition further and it has to return the single large row in one gRPC message.

To improve Spark Connect stability, this PR implements chunking large Arrow batches when returning query results from the server to the client, ensuring each ExecutePlanResponse chunk remains within the size limit, and the chunks from a batch will be reassembled on the client when parsing as an arrow batch.

(Scala client changes are being implemented in a follow-up PR.)

To reproduce the existing issue we are solving here, run this code on Spark Connect:
```
repeat_num_per_mb = 1024 * 1024 // len('Apache Spark ')
res = spark.sql(f"select repeat('Apache Spark ', {repeat_num_per_mb * 300}) as huge_col from range(1)").collect()
print(len(res))
```
It fails with `StatusCode.RESOURCE_EXHAUSTED` error with message `Received message larger than max (314570608 vs. 134217728)`, because the server is trying to send an ExecutePlanResponse of ~300MB to the client.

With the improvement introduced by the PR, the above code runs successfully and prints the expected result.

### Why are the changes needed?

It improves Spark Connect stability when returning large rows.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New tests on both the server side and the client side.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#52271 from xi-db/arrow-batch-chunking.

Authored-by: Xi Lyu <[email protected]>
Signed-off-by: Herman van Hovell <[email protected]>
huangxiaopingRD pushed a commit to huangxiaopingRD/spark that referenced this pull request Nov 25, 2025
…king - Scala Client

### What changes were proposed in this pull request?

In the previous PR apache#52271 of Spark Connect ArrowBatch Result Chunking, both Server-side and PySpark client changes were implemented.

In this PR, the corresponding Scala client changes are implemented, so large Arrow rows are now supported on the Scala client as well.

To reproduce the existing issue we are solving here, run this code on Spark Connect Scala client:

```
val res = spark.sql("select repeat('a', 1024*1024*300)").collect()
println(res(0).getString(0).length)
```

It fails with `RESOURCE_EXHAUSTED` error with message `gRPC message exceeds maximum size 134217728: 314573320`, because the server is trying to send an ExecutePlanResponse of ~300MB to the client.

With the improvement introduced by the PR, the above code runs successfully and prints the expected result.

### Why are the changes needed?

It improves Spark Connect stability when returning large rows.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New tests.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#52496 from xi-db/arrow-batch-chuking-scala-client.

Authored-by: Xi Lyu <[email protected]>
Signed-off-by: Herman van Hovell <[email protected]>
huangxiaopingRD pushed a commit to huangxiaopingRD/spark that referenced this pull request Nov 25, 2025
### What changes were proposed in this pull request?

Currently, Spark Connect enforce gRPC message limits on both the client and the server. These limits are largely meant to protect the server from potential OOMs by rejecting abnormally large messages. However, there are several cases where genuine messages exceed the limit and cause execution failures.

To improve Spark Connect stability, this PR implements compressing unresolved proto plans to mitigate the issue of oversized messages from the client to the server. The compression applies to ExecutePlan and AnalyzePlan - the only two methods that might hit the message limit. The other issue of message limit from the server to the client is a different issue, and it’s out of the scope (that one is already fixed in apache#52271).

In the implementation,

* Zstandard is leveraged to compress proto plan as it has consistent high performance in our benchmark and achieves a good balance between compression ratio and performance.
* The config `spark.connect.maxPlanSize` is introduced to control the maximum size of a (decompressed) proto plan that can be executed in Spark Connect. It is mainly used to avoid decompression bomb attacks.

(Scala client changes are being implemented in a follow-up PR.)

To reproduce the existing issue we are solving here, run this code on Spark Connect:

```
import random
import string

def random_letters(length: int) -> str:
    return ''.join(random.choices(string.ascii_letters, k=length))

num_unique_small_relations = 5
size_per_small_relation = 512 * 1024
small_dfs = [spark.createDataFrame([(random_letters(size_per_small_relation),)],) for _ in range(num_unique_small_relations)]
result_df = small_dfs[0]
for _ in range(512):
    result_df = result_df.unionByName(small_dfs[random.randint(0, len(small_dfs) - 1)])
result_df.collect()
```

It fails with `StatusCode.RESOURCE_EXHAUSTED` error with message`Sent message larger than max (269178955 vs. 134217728)`, because the client was trying to send a too large message to the server.

Note: repeated small local relations is just one way causing a large plan, the size of the plan can also be contributed by repeated subtrees of plan transformations, serialized UDFs, captured external variables by UDFs, etc.

With the improvement introduced by the PR, the above code runs successfully and prints the expected result.

### Why are the changes needed?

It improves Spark Connect stability when executing and analyzing large plans.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New tests on both the server side and the client side.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#52894 from xi-db/plan-compression.

Authored-by: Xi Lyu <[email protected]>
Signed-off-by: Herman van Hovell <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants