Skip to content

Conversation

@khakhlyuk
Copy link
Contributor

@khakhlyuk khakhlyuk commented Oct 14, 2025

What changes were proposed in this pull request?

https://issues.apache.org/jira/browse/SPARK-53917

Problem description

LocalRelation is a Catalyst logical operator used to represent a dataset of rows inline as part of the LogicalPlan. LocalRelations represent dataframes created directly from Python and Scala objects, e.g., Python and Scala lists, pandas dataframes, csv files loaded in memory, etc.
In Spark Connect, local relations are transferred over gRPC using LocalRelation (for relations under 64MB) and CachedLocalRelation (larger relations over 64MB) messages.
CachedLocalRelations currently have a hard size limit of 2GB, which means that spark users can’t execute queries with local client data, pandas dataframes, csv files of over 2GB.

Design

In Spark Connect, the client needs to serialize the local relation before transferring it to the server. It serializes data via an Arrow IPC stream as a single record batch and schema as a json string. It then embeds data and schema as LocalRelation{schema,data} proto message.
Small local relations (under 64MB) are sent directly as part of the ExecutePlanRequest.

image

Larger local relations are first sent to the server via addArtifact and stored in memory or on disk via BlockManager. Then an ExecutePlanRequest is sent containing CachedLocalRelation{hash}, where hash is the artifact hash. The server retrieves the cached LocalRelation from the BlockManager via the hash, deserializes it, adds it to the LogicalPlan and then executes it.

image

The server reads the data from the BlockManager as a stream and tries to create proto.LocalRelation via

proto.Relation
.newBuilder()
.getLocalRelation
.getParserForType
.parseFrom(blockData.toInputStream())

This fails, because java protobuf library has a 2GB limit on deserializing protobuf messages from a string.

org.sparkproject.connect.com.google.protobuf.InvalidProtocolBufferException) CodedInputStream encountered an embedded string or message which claimed to have negative size.
image

To fix this, I propose avoiding the protobuf layer during the serialization on the client and deserialization on the server. Instead of caching the full protobuf LocalRelation message, we cache the data and schema as separate artifacts, send two hashes {data_hash, schema_hash} to the server, load them both from BlockManager directly and create a LocalRelation on the server based on the unpacked data and schema.

image

After creating a prototype with the new proto message, I discovered that there are additional limits for CachedLocalRelations. Both the Scala Client and the Server store the data in a single Java Array[Byte], which has a 2GB size limit in Java. To avoid this limit, I propose transferring data in chunks. The Python and Scala clients will split data into multiple Arrow batches and upload them separately to the server. Each batch will be uploaded and stored a separate artifact. The Server will then load and process each batch separately. We will keep batch sizes around 16MB (TBD), well below the 2GB limit. This way we will avoid 2GB limits on both clients and on the server.

image

The final proto message looks like this:

message ChunkedCachedLocalRelation {
  repeated string dataHashes = 1;
  optional string schemaHash = 2;
}

Why are the changes needed?

LocalRelations currently have a hard size limit of 2GB, which means that spark users can’t execute queries with local client data, pandas dataframes, csv files of over 2GB.

Does this PR introduce any user-facing change?

No

How was this patch tested?

New python and scala tests for large local relations.

Was this patch authored or co-authored using generative AI tooling?

No

@khakhlyuk khakhlyuk changed the title [WIP][CONNECT] Support large local relations [SPARK-53917][CONNECT] Support large local relations Oct 15, 2025
@khakhlyuk khakhlyuk changed the title [SPARK-53917][CONNECT] Support large local relations [WIP][SPARK-53917][CONNECT] Support large local relations Oct 15, 2025
@pan3793
Copy link
Member

pan3793 commented Oct 17, 2025

constructing large local relations in driver sounds dangerous, is it possible to offload the data from the driver to the executor side (an RDD)?

@khakhlyuk
Copy link
Contributor Author

khakhlyuk commented Oct 17, 2025

constructing large local relations in driver sounds dangerous, is it possible to offload the data from the driver to the executor side (an RDD)?

Hey @pan3793!
Thanks for the feedback, you are totally correct. Spark already materializes the LocalRelations fully on the driver today (both in the classic and connect mode), so this PR is a net-positive improvement over the existing behaviour. My changes remove the hard limit of 2GB, the new limit can be controlled via the spark.sql.session.localRelationSizeLimit conf and which is set to 3GB by default.
Offloading the data materialization from the driver to the executor would be the next important improvement and I believe it should be done, but it's out of scope of this PR and outside of my expertise (I'm mainly familiar with spark connect). I may be able to work on the executor changes in several months. If someone else can pick up the executor work, I'm happy with that. Also happy to create a jira ticket for tracking the follow-up.

@khakhlyuk khakhlyuk force-pushed the largelocalrelations branch from c34471f to 6cbb246 Compare October 17, 2025 11:15
Copy link
Contributor

@hvanhovell hvanhovell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@hvanhovell hvanhovell changed the title [WIP][SPARK-53917][CONNECT] Support large local relations [SPARK-53917][CONNECT] Support large local relations Oct 22, 2025
dongjoon-hyun added a commit to apache/spark-connect-swift that referenced this pull request Oct 27, 2025
…th `4.1.0-preview3` RC1

### What changes were proposed in this pull request?

This PR aims to update Spark Connect-generated Swift source code with Apache Spark `4.1.0-preview3` RC1.

### Why are the changes needed?

There are many changes between Apache Spark 4.1.0-preview2 and preview3.

- apache/spark#52685
- apache/spark#52613
- apache/spark#52553
- apache/spark#52532
- apache/spark#52517
- apache/spark#52514
- apache/spark#52487
- apache/spark#52328
- apache/spark#52200
- apache/spark#52154
- apache/spark#51344

To use the latest bug fixes and new messages to develop for new features of `4.1.0-preview3`.

```
$ git clone -b v4.1.0-preview3 https://github.com/apache/spark.git
$ cd spark/sql/connect/common/src/main/protobuf/
$ protoc --swift_out=. spark/connect/*.proto
$ protoc --grpc-swift_out=. spark/connect/*.proto

// Remove empty GRPC files
$ cd spark/connect
$ grep 'This file contained no services' * | awk -F: '{print $1}' | xargs rm
```

### Does this PR introduce _any_ user-facing change?

Pass the CIs.

### How was this patch tested?

Pass the CIs. I manually tested with `Apache Spark 4.1.0-preview3` (with the two SDP ignored tests).

```
$ swift test --no-parallel
...
✔ Test run with 203 tests in 21 suites passed after 19.088 seconds.
```
```

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #252 from dongjoon-hyun/SPARK-54043.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
asf-gitbox-commits pushed a commit that referenced this pull request Nov 12, 2025
### What changes were proposed in this pull request?

This PR adds several fixes and improvements over #52613 which added support for 2GB+ local relations in Spark Connect.

#### Uploading batches of chunks
Currently, before caching the local relation on the server via `ChunkedCachedLocalRelation`, the client materializes all chunks in memory (1 schema chunk and N data chunks). This can lead to high memory pressure on the client when uploading very large local relations.

In this PR, I'm changing how the client uploads the local relation. Instead of materializing all chunks in memory, the client will materialize a batch of chunks in memory, upload the batch of chunks to the server, and proceed to collecting the next batch of chunks. The size of the batch of chunks is controlled via `spark.sql.session.localRelationBatchOfChunksSizeBytes` (1GB by default). This way, the uploading mechanism only consumes 1GB of memory at each point in time. Alternatives to this approach are:
a) uploading each chunk separately - would require one pair of `ArtifactStatuses` and `AddArtifactsRequest` RPC calls for each chunk, which is inefficient.
b) (current implementation) materializing all chunks in memory and uploading them via a single pair of `ArtifactStatuses` and `AddArtifactsRequest` RPC calls. This can lead to high memory pressure on the client.
Uploading batches of chunks is a middle-ground solution.

Changes are implemented both for the python and scala clients.

#### Minor fixes and improvements
- Replace `ArraySeq.unsafeWrapArray(data.map(_.copy()).toArray))` with `data.map(_.copy()).toArray.toImmutableArraySeq` in `SparkConnectPlanner.scala`. The latter is compatible with both scala 2.13 and scala 2.12 and is consistent with how arrays are converted to sequences in other places in the code base.
- Improved asserts and tests in the python client.

### Why are the changes needed?

Reduce memory pressure in the spark connect python and scala clients when uploading very large local relations to the server.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #52973 from khakhlyuk/largelocalrelations-followup.

Authored-by: Alex Khakhlyuk <[email protected]>
Signed-off-by: Herman van Hovell <[email protected]>
asf-gitbox-commits pushed a commit that referenced this pull request Nov 12, 2025
### What changes were proposed in this pull request?

This PR adds several fixes and improvements over #52613 which added support for 2GB+ local relations in Spark Connect.

#### Uploading batches of chunks
Currently, before caching the local relation on the server via `ChunkedCachedLocalRelation`, the client materializes all chunks in memory (1 schema chunk and N data chunks). This can lead to high memory pressure on the client when uploading very large local relations.

In this PR, I'm changing how the client uploads the local relation. Instead of materializing all chunks in memory, the client will materialize a batch of chunks in memory, upload the batch of chunks to the server, and proceed to collecting the next batch of chunks. The size of the batch of chunks is controlled via `spark.sql.session.localRelationBatchOfChunksSizeBytes` (1GB by default). This way, the uploading mechanism only consumes 1GB of memory at each point in time. Alternatives to this approach are:
a) uploading each chunk separately - would require one pair of `ArtifactStatuses` and `AddArtifactsRequest` RPC calls for each chunk, which is inefficient.
b) (current implementation) materializing all chunks in memory and uploading them via a single pair of `ArtifactStatuses` and `AddArtifactsRequest` RPC calls. This can lead to high memory pressure on the client.
Uploading batches of chunks is a middle-ground solution.

Changes are implemented both for the python and scala clients.

#### Minor fixes and improvements
- Replace `ArraySeq.unsafeWrapArray(data.map(_.copy()).toArray))` with `data.map(_.copy()).toArray.toImmutableArraySeq` in `SparkConnectPlanner.scala`. The latter is compatible with both scala 2.13 and scala 2.12 and is consistent with how arrays are converted to sequences in other places in the code base.
- Improved asserts and tests in the python client.

### Why are the changes needed?

Reduce memory pressure in the spark connect python and scala clients when uploading very large local relations to the server.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #52973 from khakhlyuk/largelocalrelations-followup.

Authored-by: Alex Khakhlyuk <[email protected]>
Signed-off-by: Herman van Hovell <[email protected]>
(cherry picked from commit 40ba971)
Signed-off-by: Herman van Hovell <[email protected]>
zifeif2 pushed a commit to zifeif2/spark that referenced this pull request Nov 22, 2025
### What changes were proposed in this pull request?

This PR adds several fixes and improvements over apache#52613 which added support for 2GB+ local relations in Spark Connect.

#### Uploading batches of chunks
Currently, before caching the local relation on the server via `ChunkedCachedLocalRelation`, the client materializes all chunks in memory (1 schema chunk and N data chunks). This can lead to high memory pressure on the client when uploading very large local relations.

In this PR, I'm changing how the client uploads the local relation. Instead of materializing all chunks in memory, the client will materialize a batch of chunks in memory, upload the batch of chunks to the server, and proceed to collecting the next batch of chunks. The size of the batch of chunks is controlled via `spark.sql.session.localRelationBatchOfChunksSizeBytes` (1GB by default). This way, the uploading mechanism only consumes 1GB of memory at each point in time. Alternatives to this approach are:
a) uploading each chunk separately - would require one pair of `ArtifactStatuses` and `AddArtifactsRequest` RPC calls for each chunk, which is inefficient.
b) (current implementation) materializing all chunks in memory and uploading them via a single pair of `ArtifactStatuses` and `AddArtifactsRequest` RPC calls. This can lead to high memory pressure on the client.
Uploading batches of chunks is a middle-ground solution.

Changes are implemented both for the python and scala clients.

#### Minor fixes and improvements
- Replace `ArraySeq.unsafeWrapArray(data.map(_.copy()).toArray))` with `data.map(_.copy()).toArray.toImmutableArraySeq` in `SparkConnectPlanner.scala`. The latter is compatible with both scala 2.13 and scala 2.12 and is consistent with how arrays are converted to sequences in other places in the code base.
- Improved asserts and tests in the python client.

### Why are the changes needed?

Reduce memory pressure in the spark connect python and scala clients when uploading very large local relations to the server.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#52973 from khakhlyuk/largelocalrelations-followup.

Authored-by: Alex Khakhlyuk <[email protected]>
Signed-off-by: Herman van Hovell <[email protected]>
huangxiaopingRD pushed a commit to huangxiaopingRD/spark that referenced this pull request Nov 25, 2025
### What changes were proposed in this pull request?

https://issues.apache.org/jira/browse/SPARK-53917

#### Problem description
LocalRelation is a Catalyst logical operator used to represent a dataset of rows inline as part of the LogicalPlan. LocalRelations represent dataframes created directly from Python and Scala objects, e.g., Python and Scala lists, pandas dataframes, csv files loaded in memory, etc.
In Spark Connect, local relations are transferred over gRPC using LocalRelation (for relations under 64MB) and CachedLocalRelation (larger relations over 64MB) messages.
CachedLocalRelations currently have a hard size limit of 2GB, which means that spark users can’t execute queries with local client data, pandas dataframes, csv files of over 2GB.

#### Design
In Spark Connect, the client needs to serialize the local relation before transferring it to the server. It serializes data via an Arrow IPC stream as a single record batch and schema as a json string. It then embeds data and schema as LocalRelation{schema,data} proto message.
Small local relations (under 64MB) are sent directly as part of the ExecutePlanRequest.

<img width="1398" height="550" alt="image" src="https://github.com/user-attachments/assets/c176f4cd-1a8f-4d72-8217-5a3bc221ace9" />

Larger local relations are first sent to the server via addArtifact and stored in memory or on disk via BlockManager. Then an ExecutePlanRequest is sent containing CachedLocalRelation{hash}, where hash is the artifact hash. The server retrieves the cached LocalRelation from the BlockManager via the hash, deserializes it, adds it to the LogicalPlan and then executes it.

<img width="1401" height="518" alt="image" src="https://github.com/user-attachments/assets/51352194-5439-4559-9d43-fc19dfe81437" />

The server reads the data from the BlockManager as a stream and tries to create proto.LocalRelation via
```
proto.Relation
.newBuilder()
.getLocalRelation
.getParserForType
.parseFrom(blockData.toInputStream())
```
This fails, because java protobuf library has a 2GB limit on deserializing protobuf messages from a string.
```
org.sparkproject.connect.com.google.protobuf.InvalidProtocolBufferException) CodedInputStream encountered an embedded string or message which claimed to have negative size.
```
<img width="1396" height="503" alt="image" src="https://github.com/user-attachments/assets/60da9441-f4cc-45d5-b028-57573a0175c2" />

To fix this, I propose avoiding the protobuf layer during the serialization on the client and deserialization on the server. Instead of caching the full protobuf LocalRelation message, we cache the data and schema as separate artifacts, send two hashes {data_hash, schema_hash} to the server, load them both from BlockManager directly and create a LocalRelation on the server based on the unpacked data and schema.

<img width="1397" height="515" alt="image" src="https://github.com/user-attachments/assets/e44558de-df64-43b0-8813-d03de6689810" />

After creating a prototype with the new proto message, I discovered that there are additional limits for CachedLocalRelations. Both the Scala Client and the Server store the data in a single Java Array[Byte], which has a 2GB size limit in Java. To avoid this limit, I propose transferring data in chunks. The Python and Scala clients will split data into multiple Arrow batches and upload them separately to the server. Each batch will be uploaded and stored a separate artifact. The Server will then load and process each batch separately. We will keep batch sizes around 16MB (TBD), well below the 2GB limit. This way we will avoid 2GB limits on both clients and on the server.

<img width="1395" height="569" alt="image" src="https://github.com/user-attachments/assets/16fac7b2-d247-42a6-9ac3-decb48df023d" />

The final proto message looks like this:
```
message ChunkedCachedLocalRelation {
  repeated string dataHashes = 1;
  optional string schemaHash = 2;
}
```

### Why are the changes needed?

LocalRelations currently have a hard size limit of 2GB, which means that spark users can’t execute queries with local client data, pandas dataframes, csv files of over 2GB.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

New python and scala tests for large local relations.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#52613 from khakhlyuk/largelocalrelations.

Authored-by: Alex Khakhlyuk <[email protected]>
Signed-off-by: Herman van Hovell <[email protected]>
huangxiaopingRD pushed a commit to huangxiaopingRD/spark that referenced this pull request Nov 25, 2025
### What changes were proposed in this pull request?

This PR adds several fixes and improvements over apache#52613 which added support for 2GB+ local relations in Spark Connect.

#### Uploading batches of chunks
Currently, before caching the local relation on the server via `ChunkedCachedLocalRelation`, the client materializes all chunks in memory (1 schema chunk and N data chunks). This can lead to high memory pressure on the client when uploading very large local relations.

In this PR, I'm changing how the client uploads the local relation. Instead of materializing all chunks in memory, the client will materialize a batch of chunks in memory, upload the batch of chunks to the server, and proceed to collecting the next batch of chunks. The size of the batch of chunks is controlled via `spark.sql.session.localRelationBatchOfChunksSizeBytes` (1GB by default). This way, the uploading mechanism only consumes 1GB of memory at each point in time. Alternatives to this approach are:
a) uploading each chunk separately - would require one pair of `ArtifactStatuses` and `AddArtifactsRequest` RPC calls for each chunk, which is inefficient.
b) (current implementation) materializing all chunks in memory and uploading them via a single pair of `ArtifactStatuses` and `AddArtifactsRequest` RPC calls. This can lead to high memory pressure on the client.
Uploading batches of chunks is a middle-ground solution.

Changes are implemented both for the python and scala clients.

#### Minor fixes and improvements
- Replace `ArraySeq.unsafeWrapArray(data.map(_.copy()).toArray))` with `data.map(_.copy()).toArray.toImmutableArraySeq` in `SparkConnectPlanner.scala`. The latter is compatible with both scala 2.13 and scala 2.12 and is consistent with how arrays are converted to sequences in other places in the code base.
- Improved asserts and tests in the python client.

### Why are the changes needed?

Reduce memory pressure in the spark connect python and scala clients when uploading very large local relations to the server.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#52973 from khakhlyuk/largelocalrelations-followup.

Authored-by: Alex Khakhlyuk <[email protected]>
Signed-off-by: Herman van Hovell <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants