Skip to content

Conversation

@khakhlyuk
Copy link
Contributor

What changes were proposed in this pull request?

This PR adds several fixes and improvements over #52613 which added support for 2GB+ local relations in Spark Connect.

Uploading batches of chunks

Currently, before caching the local relation on the server via ChunkedCachedLocalRelation, the client materializes all chunks in memory (1 schema chunk and N data chunks). This can lead to high memory pressure on the client when uploading very large local relations.

In this PR, I'm changing how the client uploads the local relation. Instead of materializing all chunks in memory, the client will materialize a batch of chunks in memory, upload the batch of chunks to the server, and proceed to collecting the next batch of chunks. The size of the batch of chunks is controlled via spark.sql.session.localRelationBatchOfChunksSizeBytes (1GB by default). This way, the uploading mechanism only consumes 1GB of memory at each point in time. Alternatives to this approach are:
a) uploading each chunk separately - would require one pair of ArtifactStatuses and AddArtifactsRequest RPC calls for each chunk, which is inefficient.
b) (current implementation) materializing all chunks in memory and uploading them via a single pair of ArtifactStatuses and AddArtifactsRequest RPC calls. This can lead to high memory pressure on the client.
Uploading batches of chunks is a middle-ground solution.

Changes are implemented both for the python and scala clients.

Minor fixes and improvements

  • Replace ArraySeq.unsafeWrapArray(data.map(_.copy()).toArray)) with data.map(_.copy()).toArray.toImmutableArraySeq in SparkConnectPlanner.scala. The latter is compatible with both scala 2.13 and scala 2.12 and is consistent with how arrays are converted to sequences in other places in the code base.
  • Improved asserts and tests in the python client.

Why are the changes needed?

Reduce memory pressure in the spark connect python and scala clients when uploading very large local relations to the server.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Existing tests.

Was this patch authored or co-authored using generative AI tooling?

No

"materialize all chunks in memory.")
.version("4.1.0")
.longConf
.checkValue(_ > 0, "The batch size in bytes must be positive")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should check if this value is greater than the chunk size value value as an initial step

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case of conflicts, this conf should be respected and the operation should error out as we wouldn't want to bypass an explicitly set max materialisation size (to avoid system failures)

@@ -6003,6 +6003,19 @@ object SQLConf {
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("3GB")

val LOCAL_RELATION_BATCH_OF_CHUNKS_SIZE_BYTES =
buildConf(SqlApiConfHelper.LOCAL_RELATION_BATCH_OF_CHUNKS_SIZE_BYTES_KEY)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could update the name here to be a bit more explicit in the sense that this pertains to the maximum number of bytes that we will materialise in memory for the specific local relation

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(specific because multi-threading can result in multiple artifacts being materialised at once)

@khakhlyuk khakhlyuk force-pushed the largelocalrelations-followup branch from 3ceed3c to 84a54fe Compare November 12, 2025 11:35
@hvanhovell
Copy link
Contributor

merging to master/4.1

asf-gitbox-commits pushed a commit that referenced this pull request Nov 12, 2025
### What changes were proposed in this pull request?

This PR adds several fixes and improvements over #52613 which added support for 2GB+ local relations in Spark Connect.

#### Uploading batches of chunks
Currently, before caching the local relation on the server via `ChunkedCachedLocalRelation`, the client materializes all chunks in memory (1 schema chunk and N data chunks). This can lead to high memory pressure on the client when uploading very large local relations.

In this PR, I'm changing how the client uploads the local relation. Instead of materializing all chunks in memory, the client will materialize a batch of chunks in memory, upload the batch of chunks to the server, and proceed to collecting the next batch of chunks. The size of the batch of chunks is controlled via `spark.sql.session.localRelationBatchOfChunksSizeBytes` (1GB by default). This way, the uploading mechanism only consumes 1GB of memory at each point in time. Alternatives to this approach are:
a) uploading each chunk separately - would require one pair of `ArtifactStatuses` and `AddArtifactsRequest` RPC calls for each chunk, which is inefficient.
b) (current implementation) materializing all chunks in memory and uploading them via a single pair of `ArtifactStatuses` and `AddArtifactsRequest` RPC calls. This can lead to high memory pressure on the client.
Uploading batches of chunks is a middle-ground solution.

Changes are implemented both for the python and scala clients.

#### Minor fixes and improvements
- Replace `ArraySeq.unsafeWrapArray(data.map(_.copy()).toArray))` with `data.map(_.copy()).toArray.toImmutableArraySeq` in `SparkConnectPlanner.scala`. The latter is compatible with both scala 2.13 and scala 2.12 and is consistent with how arrays are converted to sequences in other places in the code base.
- Improved asserts and tests in the python client.

### Why are the changes needed?

Reduce memory pressure in the spark connect python and scala clients when uploading very large local relations to the server.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #52973 from khakhlyuk/largelocalrelations-followup.

Authored-by: Alex Khakhlyuk <[email protected]>
Signed-off-by: Herman van Hovell <[email protected]>
(cherry picked from commit 40ba971)
Signed-off-by: Herman van Hovell <[email protected]>
@dongjoon-hyun
Copy link
Member

Thank you, @khakhlyuk and @hvanhovell .

@pan3793
Copy link
Member

pan3793 commented Nov 14, 2025

@khakhlyuk, I hit NoSuchElementException while developing other Connect modules, it seems that the default value of config spark.sql.session.localRelationBatchOfChunksSizeBytes is not properly handled

java.util.NoSuchElementException: spark.sql.session.localRelationBatchOfChunksSizeBytes
	at org.apache.spark.sql.connect.RuntimeConfig.$anonfun$get$1(RuntimeConfig.scala:51)
	at scala.Option.getOrElse(Option.scala:201)
	at org.apache.spark.sql.connect.RuntimeConfig.get(RuntimeConfig.scala:51)
	at org.apache.spark.sql.connect.SparkSession.$anonfun$createDataset$1(SparkSession.scala:124)
	at org.apache.spark.sql.connect.SparkSession.$anonfun$createDataset$1$adapted(SparkSession.scala:118)
	at org.apache.spark.sql.connect.SparkSession.newDataset(SparkSession.scala:488)
	at org.apache.spark.sql.connect.SparkSession.newDataset(SparkSession.scala:453)
	at org.apache.spark.sql.connect.SparkSession.createDataset(SparkSession.scala:118)
	at org.apache.spark.sql.connect.SparkSession.createDataset(SparkSession.scala:214)
	at org.apache.spark.sql.connect.SQLImplicits.localSeqToDatasetHolder(SQLImplicits.scala:30)
	at org.apache.spark.sql.connect.client.jdbc.SparkConnectDatabaseMetaData.$anonfun$getColumns$2(SparkConnectDatabaseMetaData.scala:543)
	at scala.collection.ArrayOps$.map$extension(ArrayOps.scala:936)
	at org.apache.spark.sql.connect.client.jdbc.SparkConnectDatabaseMetaData.getColumns(SparkConnectDatabaseMetaData.scala:523)

@khakhlyuk
Copy link
Contributor Author

@pan3793 I have made additional changes in a follow-up PR: #52973
Is it possible that you have updated the client-side code, but not the server-side code? If that is not the case, could you share a minimal repro please?

@pan3793
Copy link
Member

pan3793 commented Nov 15, 2025

@khakhlyuk, yes, I use an older version 4.1.0-preview3 connect server for testing. do we have a compatible policy for connect client/server, if it requires client version must be <= server version, I think we can ignore the issue.

@khakhlyuk
Copy link
Contributor Author

@pan3793 yes, exactly, the policy is to maintain forward compatibility - new servers have to support old clients. So client version <= server version.

zifeif2 pushed a commit to zifeif2/spark that referenced this pull request Nov 22, 2025
### What changes were proposed in this pull request?

This PR adds several fixes and improvements over apache#52613 which added support for 2GB+ local relations in Spark Connect.

#### Uploading batches of chunks
Currently, before caching the local relation on the server via `ChunkedCachedLocalRelation`, the client materializes all chunks in memory (1 schema chunk and N data chunks). This can lead to high memory pressure on the client when uploading very large local relations.

In this PR, I'm changing how the client uploads the local relation. Instead of materializing all chunks in memory, the client will materialize a batch of chunks in memory, upload the batch of chunks to the server, and proceed to collecting the next batch of chunks. The size of the batch of chunks is controlled via `spark.sql.session.localRelationBatchOfChunksSizeBytes` (1GB by default). This way, the uploading mechanism only consumes 1GB of memory at each point in time. Alternatives to this approach are:
a) uploading each chunk separately - would require one pair of `ArtifactStatuses` and `AddArtifactsRequest` RPC calls for each chunk, which is inefficient.
b) (current implementation) materializing all chunks in memory and uploading them via a single pair of `ArtifactStatuses` and `AddArtifactsRequest` RPC calls. This can lead to high memory pressure on the client.
Uploading batches of chunks is a middle-ground solution.

Changes are implemented both for the python and scala clients.

#### Minor fixes and improvements
- Replace `ArraySeq.unsafeWrapArray(data.map(_.copy()).toArray))` with `data.map(_.copy()).toArray.toImmutableArraySeq` in `SparkConnectPlanner.scala`. The latter is compatible with both scala 2.13 and scala 2.12 and is consistent with how arrays are converted to sequences in other places in the code base.
- Improved asserts and tests in the python client.

### Why are the changes needed?

Reduce memory pressure in the spark connect python and scala clients when uploading very large local relations to the server.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#52973 from khakhlyuk/largelocalrelations-followup.

Authored-by: Alex Khakhlyuk <[email protected]>
Signed-off-by: Herman van Hovell <[email protected]>
huangxiaopingRD pushed a commit to huangxiaopingRD/spark that referenced this pull request Nov 25, 2025
### What changes were proposed in this pull request?

This PR adds several fixes and improvements over apache#52613 which added support for 2GB+ local relations in Spark Connect.

#### Uploading batches of chunks
Currently, before caching the local relation on the server via `ChunkedCachedLocalRelation`, the client materializes all chunks in memory (1 schema chunk and N data chunks). This can lead to high memory pressure on the client when uploading very large local relations.

In this PR, I'm changing how the client uploads the local relation. Instead of materializing all chunks in memory, the client will materialize a batch of chunks in memory, upload the batch of chunks to the server, and proceed to collecting the next batch of chunks. The size of the batch of chunks is controlled via `spark.sql.session.localRelationBatchOfChunksSizeBytes` (1GB by default). This way, the uploading mechanism only consumes 1GB of memory at each point in time. Alternatives to this approach are:
a) uploading each chunk separately - would require one pair of `ArtifactStatuses` and `AddArtifactsRequest` RPC calls for each chunk, which is inefficient.
b) (current implementation) materializing all chunks in memory and uploading them via a single pair of `ArtifactStatuses` and `AddArtifactsRequest` RPC calls. This can lead to high memory pressure on the client.
Uploading batches of chunks is a middle-ground solution.

Changes are implemented both for the python and scala clients.

#### Minor fixes and improvements
- Replace `ArraySeq.unsafeWrapArray(data.map(_.copy()).toArray))` with `data.map(_.copy()).toArray.toImmutableArraySeq` in `SparkConnectPlanner.scala`. The latter is compatible with both scala 2.13 and scala 2.12 and is consistent with how arrays are converted to sequences in other places in the code base.
- Improved asserts and tests in the python client.

### Why are the changes needed?

Reduce memory pressure in the spark connect python and scala clients when uploading very large local relations to the server.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#52973 from khakhlyuk/largelocalrelations-followup.

Authored-by: Alex Khakhlyuk <[email protected]>
Signed-off-by: Herman van Hovell <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants