Skip to content

Conversation

@HyukjinKwon
Copy link
Member

@HyukjinKwon HyukjinKwon commented Jun 7, 2023

Previous behaviour

Previously, we kept JobArtifactSet and leveraged thread local for each client.

  1. The execution block is wrapped with SessionHolder.withSession here.
  2. SessionHolder.withContextClassLoader is then called here which in turn calls JobArtifactSet.withActive here and sets the active set to SessionHolder.connectJobArtifactSet
  3. The actual JobArtifactSet that is used is built up here in SparkConnectArtifactManager.jobArtifactSet

Since each client has their own JobArtifactSet made active when executing an operation, the TaskDescription would have artifacts specific to that client and subsequently, IsolatedSessionState in Executor. Therefore, we were able to separate the Spark Connect specific logic to the Spark Connect module.

Problem

Mainly it was all good; however, the problem is that we don't call SparkContext.addFile or SparkContext.addJar, but we just pass it directly at the scheduler (to TaskDescription). This is fine in general but exposes several problems by not directly calling SparkContext.addFile:

  • SparkContext.postEnvironmentUpdate is not invoked at SparkContext.addFile which matters in, for example, recording the events for History Server.
  • Specifically for archives, Utils.unpack(source, dest) is not invoked at SparkContext.addFile in order to untar properly in the Driver.

Therefore, we should duplicate those logics in Spark Connect server side, which is not ideal.

In addition, we already added the isolation logic into the Executor. Driver and Executor are the symmetry (not Spark Connect Server <> Executor). Therefore, it matters about code readability, and expectation in their roles.

Solution in this PR

This PR proposes to support session-based files and archives in Spark Connect. This PR leverages the basework for #41701 and #41625 (for jars in Spark Connect Scala client).

The changed logic is as follows:

  • Keep the session UUID, and Spark Connect Server specific information such as REPL class path within a thread local.
  • Add session ID when we add files or archives. SparkContext keeps them with a map Map(session -> Map(file and timestamp)) in order to reuse the existing logic to address the problem mentioned

After that, on executor side,

  • Executors create additional directory, named by session UUID, on the top of the default directory (that is the current working directory, see SparkFiles.getRootDirectory).
  • When we execute Python workers, it sets the current working directory to the one created above.
  • End users access to these files via using the current working directory e.g., ./blahblah.txt in their Python UDF. Therefore, compatible with/without Spark Connect.

Note that:

  • Here it creates Python workers for individual session because we set the session UUID as an environment variable, and we create new Python workers if environment variables are different, see also SparkEnv.createPythonWorker
  • It already kills the daemon and Python workers if they are not used for a while.

TODOs and limitations

Executor also maintains the file list but with a cache so it can evict the cache. However, it has a problem - It works as follows:

Possible solutions are:

  • For 1., we should maintain a cache with TTL, and remove them
  • For 2. we should have a dedicated directory (which this PR does) and remove the directory away when the cache is evicted. So the overwrite does not happen

Why are the changes needed?

In order to allow session-based artifact control and multi tenancy.

Does this PR introduce any user-facing change?

Yes, this PR now allows multiple sessions to have their own space. For example, session A and session B can add a file in the same name. Previously this was not possible.

How was this patch tested?

Unittests were added.

@HyukjinKwon HyukjinKwon marked this pull request as draft June 7, 2023 11:25
@HyukjinKwon HyukjinKwon force-pushed the session-base-exec-dir branch 2 times, most recently from b62e592 to 2e3ecc2 Compare June 7, 2023 12:35
@HyukjinKwon HyukjinKwon changed the title [WIP] Session-based files and archives in Spark Connect [WIP][CONNECT] Session-based files and archives in Spark Connect Jun 7, 2023
@zhengruifeng
Copy link
Contributor

qq: If we make existing addXXX methods support destination directory, can we keep those mapping outside of spark core (in connect server) ?

@HyukjinKwon
Copy link
Member Author

Yeah, let me try to decouple this.

@HyukjinKwon HyukjinKwon force-pushed the session-base-exec-dir branch 5 times, most recently from f92d487 to 4e0a3a9 Compare June 8, 2023 11:13
hvanhovell pushed a commit that referenced this pull request Jun 16, 2023
### What changes were proposed in this pull request?

This adds SessionHolder rather than just SparkSession to `SparkConnectPlanner`. This is to allow access to session specific state at connect server level. Note that this is Spark-Connect specific session state, and is not stored with SparkSession.

E.g.
  * Mapping from _dataframe reference id_ to actual dataframe in  #41580
  * File and archives stored with session in #41495

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
 - Existing unit tests.

Closes #41618 from rangadi/session-holder.

Authored-by: Raghu Angadi <[email protected]>
Signed-off-by: Herman van Hovell <[email protected]>
czxm pushed a commit to czxm/spark that referenced this pull request Jun 19, 2023
### What changes were proposed in this pull request?

This adds SessionHolder rather than just SparkSession to `SparkConnectPlanner`. This is to allow access to session specific state at connect server level. Note that this is Spark-Connect specific session state, and is not stored with SparkSession.

E.g.
  * Mapping from _dataframe reference id_ to actual dataframe in  apache#41580
  * File and archives stored with session in apache#41495

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
 - Existing unit tests.

Closes apache#41618 from rangadi/session-holder.

Authored-by: Raghu Angadi <[email protected]>
Signed-off-by: Herman van Hovell <[email protected]>
@HyukjinKwon HyukjinKwon force-pushed the session-base-exec-dir branch 3 times, most recently from 961f5cb to df475ec Compare July 5, 2023 02:16
@HyukjinKwon HyukjinKwon changed the title [WIP][CONNECT] Session-based files and archives in Spark Connect [WIP][SPARK-44290][CONNECT] Session-based files and archives in Spark Connect Jul 5, 2023
@HyukjinKwon HyukjinKwon force-pushed the session-base-exec-dir branch 3 times, most recently from 9a191f0 to 61c2c9c Compare July 5, 2023 07:00
@github-actions github-actions bot added the MESOS label Jul 5, 2023
@HyukjinKwon HyukjinKwon force-pushed the session-base-exec-dir branch 3 times, most recently from d2877f4 to 36f9be5 Compare July 6, 2023 03:17
@HyukjinKwon HyukjinKwon marked this pull request as ready for review July 6, 2023 05:34
@HyukjinKwon HyukjinKwon force-pushed the session-base-exec-dir branch 2 times, most recently from d422aef to cc464a2 Compare July 6, 2023 06:28
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I already faced the limitation here when the test takes longer than 5 mins. I increased it as a workaround for now. We should fix them before the release.

@HyukjinKwon HyukjinKwon force-pushed the session-base-exec-dir branch from 96938d4 to f476025 Compare July 7, 2023 05:06
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think with the helper instance, it makes much more sense now.

@HyukjinKwon HyukjinKwon force-pushed the session-base-exec-dir branch 5 times, most recently from 87eda2b to be78ab7 Compare July 7, 2023 10:04
@HyukjinKwon HyukjinKwon force-pushed the session-base-exec-dir branch from be78ab7 to c161ac9 Compare July 7, 2023 10:52
@HyukjinKwon
Copy link
Member Author

Both tests were flaky. I manually checked them locally.

Merged to master.

HyukjinKwon added a commit that referenced this pull request Jul 10, 2023
… session UUID together

### What changes were proposed in this pull request?

This PR is a followup of #41495 that skips a couple of flaky tests. In addition, this PR fixes a typo together.

### Why are the changes needed?

To keep the tests green. In order to reenable the tests, it needs other fixes together that might refactor the whole test cases which takes a while. I will followup and fix them in SPARK-44348

### Does this PR introduce _any_ user-facing change?

No, the feature is not released to end users yet.

### How was this patch tested?

Unittests skipped for now.

Closes #41913 from HyukjinKwon/SPARK-44290-followup.

Lead-authored-by: Hyukjin Kwon <[email protected]>
Co-authored-by: Kent Yao <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
HyukjinKwon added a commit that referenced this pull request Jul 12, 2023
…vant changes

### What changes were proposed in this pull request?

This PR is a sort of a followup of #41495. This PR contains several changes to make the tests working:

- Always uses `JobArtifactSet.getCurrentJobArtifactState` to get the current UUID in the current thread.
- Specify the current state (from `JobArtifactSet.getCurrentJobArtifactState`) when add the artifacts (so we can get the state in `SparkContext`).
- Creates a dedicated directory in Driver side too. We provide Spark Connect Server as a service. It creates a session dedicated directory, and put the added files there in the server.
  - Notice that we do not support `SparkFiles.getRootDirectory` in Spark Connect so this should be fine. This dedicated directory will also be used to execute Python process within Driver side (for dependency management, e.g., foreachBatch in Structured Streaming with Spark Connect).
- Get the current UUID in the Driver side for Python UDF execution. Previously, we tired to get it from executor side which results in `None`.
- Rename `sessionUUID` (or similar) to `jobArtifactUUID`. In Core code context, it's a job artifact state.
- Fix Spark Connect Python client local debug mode (e.g., `local` or `local-cluster`) to send jars when `local-cluster` mode is specified. If not, it throws an exception that `SparkConnectPlugin` cannot be found.
- Refactor and fix the tests to verify archive, file and pyfiles in both `local` and `local-cluster` modes.

### Why are the changes needed?

To make session-based artifact management working.

### Does this PR introduce _any_ user-facing change?

No, this feature has not been released yet.

### How was this patch tested?

Unittests added.

Closes #41942 from HyukjinKwon/SPARK-44348.

Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
@HyukjinKwon HyukjinKwon deleted the session-base-exec-dir branch January 15, 2024 00:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants