Skip to content

Conversation

@vicennial
Copy link
Contributor

@vicennial vicennial commented Sep 2, 2025

What changes were proposed in this pull request?

Adds a new experimental/developer RPC CloneSession to the SparkConnectService.

✅ CLONED (from SparkSession)

  • SessionState - SQL configs, temp views, UDFs, catalog metadata
  • ArtifactManager - JARs, files, classes added to session
  • ManagedJobTags - Job group tags for tracking
  • SharedState (reference) - Metastore, global temp views
  • SparkContext (reference) - Core Spark engine

❌ NOT CLONED (SessionHolder - Spark Connect layer)

  • planCache - (Partially analyzed) Logical plans for query optimization
  • operationIds - Currently executing operations
  • errorIdToError - Recent errors for debugging
  • eventManager - Session lifecycle events
  • dataFrameCache - DataFrames for foreachBatch callbacks
  • mlCache - ML models and pipelines
  • listenerCache - Streaming query listeners
  • pipelineExecutions - Active pipeline contexts
  • dataflowGraphRegistry - Registered dataflow graphs
  • streamingForeachBatchRunnerCleanerCache - Python streaming workers
  • pythonAccumulator - Python metrics collection
  • Session timings - Start time, last access, custom timeout

The clone preserves all SQL/catalog state but creates a fresh runtime environment. An analogy is cloning a database schema/config but not the active connections, caches, or running jobs.

Why are the changes needed?

Spark Connect introduced the concept of resource isolation (via ArtifactManager, which has been ported to classic Spark) and thus, jars/pyfiles/artifacts added to each session are isolated from other sessions.

A slight rough edge is that if a user wishes to fork the state of a session but maintain independence, the only possible way is to create a new session and reupload/reinit all base jars/artifacts/pyfiles, etc.

Support for cloning through the API helps address the rough edge while maintaining all the benefits of session resource isolation.

Does this PR introduce any user-facing change?

Yes

  spark = SparkSession.builder.remote("sc://localhost").getOrCreate()
  spark.conf.set("my.custom.config", "value")
  spark.addArtifact("/path/to/my.jar")
  spark.sql("CREATE TEMP VIEW my_view AS SELECT 1 AS id")

  # Clone the session
  cloned_spark = spark.cloneSession()

  # The cloned session has all the same state
  assert cloned_spark.conf.get("my.custom.config") == "value"
  assert cloned_spark.sql("SELECT * FROM my_view").collect() == [Row(id=1)]

  # But operations are isolated between sessions
  cloned_spark.sql("DROP VIEW my_view")  # Only affects cloned session
  spark.sql("SELECT * FROM my_view").collect()  # Original still works

How was this patch tested?

New individual unit tests along with new test suites.

Was this patch authored or co-authored using generative AI tooling?

Co-authored with assistance from Claude Code.

@vicennial vicennial changed the title [WIP[SPARK-53455][CONNECT] Add CloneSession RPC [WIP][SPARK-53455][CONNECT] Add CloneSession RPC Sep 2, 2025
optional string client_type = 3;

// (Optional)
// The session_id for the new cloned session. If not provided, a new UUID will be generated.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the server allowed to return a different session id if you provide this id?

Copy link
Contributor Author

@vicennial vicennial Sep 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No it isn't, it must be an ID that doesn't already exist/already closed

* Create a clone of this Spark Connect session on the server side with a custom session ID.
* The server-side session is cloned with all its current state (SQL configurations, temporary
* views, registered functions, catalog state) copied over to a new independent session with
* the specified session ID. The returned cloned session will remain isolated from this session.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'remain' is a bit weird since the session is new.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

// FetchErrorDetails retrieves the matched exception with details based on a provided error id.
rpc FetchErrorDetails(FetchErrorDetailsRequest) returns (FetchErrorDetailsResponse) {}

// Clone a session. Creates a new session that shares the same configuration and state
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this as descriptive as the Scala API doc?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

* @note This creates a new server-side session with the specified session ID while preserving
* the current session's configuration and state.
*/
def cloneSession(sessionId: String): SparkSession = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make this part of the public API?
Please annotate this with @DeveloperAPI.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Developer API makes sesnse here, annotating 👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add this method the org.apache.spark.sql.SparkSession class as well.


// Next ID: 5
message CloneSessionResponse {
// Session id of the original session that was cloned.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need the old ids?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not? All of our responses contain the id of the calling session. I don't see a reason to leave it out here

// configuration, catalog, session state, temporary views, and registered functions
val clonedSparkSession = sourceSessionHolder.session.cloneSession()

val newHolder = SessionHolder(newKey.userId, newKey.sessionId, clonedSparkSession)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we clone more state here? For example MLCache, or DataFrameCache?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intentionally did not clone caches, added some more info to the PR description.

The clone preserves all SQL/catalog state but creates a fresh runtime environment. An analogy is cloning a database schema/config but not the active connections, caches, or running jobs.

assert(ex.getMessage.contains("Session not found"))
}

test("successful clone session creates new session") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we test session independence?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

},
"TARGET_SESSION_ID_FORMAT" : {
"message" : [
"Target session ID <targetSessionId> for clone operation must be an UUID string of the format '00112233-4455-6677-8899-aabbccddeeff'."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't there a different error for this as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, but this makes it clear that the format of the target session is invalid, so the exception sent to the server isn't confusing

@vicennial vicennial marked this pull request as ready for review September 23, 2025 14:20
@vicennial vicennial changed the title [WIP][SPARK-53455][CONNECT] Add CloneSession RPC [SPARK-53455][CONNECT] Add CloneSession RPC Sep 23, 2025
"removeProgressHandler",
}
expected_missing_classic_methods = set()
expected_missing_classic_methods = {"cloneSession"}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should also be added to classic.

assert(spark.sessionId !== clonedSession.sessionId)
}

test("invalid session ID format throws exception") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also test when people try to provide an ID that is already taken?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose it is covered by server side tests.

Copy link
Contributor

@hvanhovell hvanhovell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@github-actions github-actions bot added the BUILD label Oct 8, 2025
@vicennial
Copy link
Contributor Author

The CI is green and ready for merge @hvanhovell

@hvanhovell
Copy link
Contributor

Thanks, merging to master!

dongjoon-hyun added a commit to apache/spark-connect-swift that referenced this pull request Oct 27, 2025
…th `4.1.0-preview3` RC1

### What changes were proposed in this pull request?

This PR aims to update Spark Connect-generated Swift source code with Apache Spark `4.1.0-preview3` RC1.

### Why are the changes needed?

There are many changes between Apache Spark 4.1.0-preview2 and preview3.

- apache/spark#52685
- apache/spark#52613
- apache/spark#52553
- apache/spark#52532
- apache/spark#52517
- apache/spark#52514
- apache/spark#52487
- apache/spark#52328
- apache/spark#52200
- apache/spark#52154
- apache/spark#51344

To use the latest bug fixes and new messages to develop for new features of `4.1.0-preview3`.

```
$ git clone -b v4.1.0-preview3 https://github.com/apache/spark.git
$ cd spark/sql/connect/common/src/main/protobuf/
$ protoc --swift_out=. spark/connect/*.proto
$ protoc --grpc-swift_out=. spark/connect/*.proto

// Remove empty GRPC files
$ cd spark/connect
$ grep 'This file contained no services' * | awk -F: '{print $1}' | xargs rm
```

### Does this PR introduce _any_ user-facing change?

Pass the CIs.

### How was this patch tested?

Pass the CIs. I manually tested with `Apache Spark 4.1.0-preview3` (with the two SDP ignored tests).

```
$ swift test --no-parallel
...
✔ Test run with 203 tests in 21 suites passed after 19.088 seconds.
```
```

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #252 from dongjoon-hyun/SPARK-54043.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
huangxiaopingRD pushed a commit to huangxiaopingRD/spark that referenced this pull request Nov 25, 2025
### What changes were proposed in this pull request?

Adds a new experimental/developer RPC `CloneSession` to the `SparkConnectService`.

  ✅ CLONED (from SparkSession)

  - SessionState - SQL configs, temp views, UDFs, catalog metadata
  - ArtifactManager - JARs, files, classes added to session
  - ManagedJobTags - Job group tags for tracking
  - SharedState (reference) - Metastore, global temp views
  - SparkContext (reference) - Core Spark engine

  ❌ NOT CLONED (SessionHolder - Spark Connect layer)

  - planCache - (Partially analyzed) Logical plans for query optimization
  - operationIds - Currently executing operations
  - errorIdToError - Recent errors for debugging
  - eventManager - Session lifecycle events
  - dataFrameCache - DataFrames for foreachBatch callbacks
  - mlCache - ML models and pipelines
  - listenerCache - Streaming query listeners
  - pipelineExecutions - Active pipeline contexts
  - dataflowGraphRegistry - Registered dataflow graphs
  - streamingForeachBatchRunnerCleanerCache - Python streaming workers
  - pythonAccumulator - Python metrics collection
  - Session timings - Start time, last access, custom timeout

The clone preserves all SQL/catalog state but creates a fresh runtime environment. An analogy is cloning a database schema/config but not the active connections, caches, or running jobs.
### Why are the changes needed?

Spark Connect introduced the concept of resource isolation (via `ArtifactManager`, which has been ported to classic Spark) and thus, jars/pyfiles/artifacts added to each session are isolated from other sessions.

A slight rough edge is that if a user wishes to fork the state of a session but maintain independence, the only possible way is to create a new session and reupload/reinit all base jars/artifacts/pyfiles, etc.

Support for cloning through the API helps address the rough edge while maintaining all the benefits of session resource isolation.
### Does this PR introduce _any_ user-facing change?

Yes

```python
  spark = SparkSession.builder.remote("sc://localhost").getOrCreate()
  spark.conf.set("my.custom.config", "value")
  spark.addArtifact("/path/to/my.jar")
  spark.sql("CREATE TEMP VIEW my_view AS SELECT 1 AS id")

  # Clone the session
  cloned_spark = spark.cloneSession()

  # The cloned session has all the same state
  assert cloned_spark.conf.get("my.custom.config") == "value"
  assert cloned_spark.sql("SELECT * FROM my_view").collect() == [Row(id=1)]

  # But operations are isolated between sessions
  cloned_spark.sql("DROP VIEW my_view")  # Only affects cloned session
  spark.sql("SELECT * FROM my_view").collect()  # Original still works
```

### How was this patch tested?

New individual unit tests along with new test suites.

### Was this patch authored or co-authored using generative AI tooling?

Co-authored with assistance from Claude Code.

Closes apache#52200 from vicennial/cloneAPI.

Authored-by: vicennial <[email protected]>
Signed-off-by: Herman van Hovell <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants