-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-44146][CONNECT] Isolate Spark Connect Session jars and classfiles #41701
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
The PR is ready to review (the falling tests are flaky, I've submitted a rerun request). |
|
@HyukjinKwon @hvanhovell Could you have a look? Thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks fine from a cursory look but I think we should have @hvanhovell signoff
| private lazy val pythonExec = | ||
| sys.env.getOrElse("PYSPARK_PYTHON", sys.env.getOrElse("PYSPARK_DRIVER_PYTHON", "python3")) | ||
|
|
||
| // SparkConnectPlanner is used per request. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could put this in the session holder right?
| } | ||
| val oldArtifactUri = currentArtifactRootUri | ||
| currentArtifactRootUri = SparkEnv.get.rpcEnv.fileServer | ||
| .addDirectoryIfAbsent(ARTIFACT_DIRECTORY_PREFIX, artifactRootPath.toFile) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use addDirectory instead? The if-absent bit if pretty well protected by this object.
| * @param f | ||
| * @tparam T | ||
| */ | ||
| def withContext[T](f: => T): T = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This name is a bit too vague for my liking. How about withContextClassLoader?
| */ | ||
| def withSessionBasedPythonPaths[T](f: => T): T = { | ||
| try { | ||
| session.conf.set( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
qq, we don't really need to unset this right? Or is exposing it to the client a bad idea? cc @HyukjinKwon
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah I planned to remove this and @vicennial removed it in #41789 (comment). This was just a hack I added to avoid additional refactoring.
| addHelloClass(holder1) | ||
|
|
||
| val classLoader1 = holder1.classloader | ||
| val instance1 = classLoader1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could add another session here where you can load Hello. In that cases the classes for the different sessions should not be equal.
hvanhovell
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
I am merging this to unblock a couple of follow-ups. Please address my comments in a small follow-up. |
…nnect Jar/Classfile Isolation ### What changes were proposed in this pull request? This PR is a follow-up of #41701 and addresses the comments mentioned [here](#41701 (comment)). The summary is: - `pythonIncludes` are directly fetched from the `ArtifactManager` via `SessionHolder` instead of propagating through the spark conf - `SessionHolder#withContext` renamed to `SessionHolder#withContextClassLoader` to decrease ambiguity. - General increased test coverage for isolated classloading (New unit test in `ArtifactManagerSuite` and a new suite `ClassLoaderIsolationSuite`. ### Why are the changes needed? General follow-ups from [here.](#41701 (comment)) ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? New test suite and unit tests. Closes #41789 from vicennial/SPARK-44246. Authored-by: vicennial <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
### Previous behaviour Previously, we kept `JobArtifactSet` and leveraged thread local for each client. 1. The execution block is wrapped with `SessionHolder.withSession` [here](https://github.com/apache/spark/blob/master/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala#L53). 2. `SessionHolder.withContextClassLoader` is then called [here](https://github.com/apache/spark/blob/master/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala#L130) which in turn calls `JobArtifactSet.withActive` [here](https://github.com/apache/spark/blob/master/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala#L118) and sets the active set to `SessionHolder.connectJobArtifactSet` 3. The actual `JobArtifactSet` that is used is built up [here](https://github.com/apache/spark/blob/master/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/artifact/SparkConnectArtifactManager.scala#L157) in `SparkConnectArtifactManager.jobArtifactSet` Since each client has their own `JobArtifactSet` made `active` when executing an operation, the `TaskDescription` would have artifacts specific to that client and subsequently, `IsolatedSessionState` in Executor. Therefore, we were able to separate the Spark Connect specific logic to the Spark Connect module. ### Problem Mainly it was all good; however, the problem is that we don't call `SparkContext.addFile` or `SparkContext.addJar`, but we just pass it directly at the scheduler (to `TaskDescription`). This is fine in general but exposes several problems by not directly calling `SparkContext.addFile`: - `SparkContext.postEnvironmentUpdate` is not invoked at `SparkContext.addFile` which matters in, for example, recording the events for History Server. - Specifically for archives, `Utils.unpack(source, dest)` is not invoked at `SparkContext.addFile` in order to untar properly in the Driver. Therefore, we should duplicate those logics in Spark Connect server side, which is not ideal. In addition, we already added the isolation logic into the Executor. Driver and Executor are the symmetry (not Spark Connect Server <> Executor). Therefore, it matters about code readability, and expectation in their roles. ### Solution in this PR This PR proposes to support session-based files and archives in Spark Connect. This PR leverages the basework for #41701 and #41625 (for jars in Spark Connect Scala client). The changed logic is as follows: - Keep the session UUID, and Spark Connect Server specific information such as REPL class path within a thread local. - Add session ID when we add files or archives. `SparkContext` keeps them with a map `Map(session -> Map(file and timestamp))` in order to reuse the existing logic to address the problem mentioned After that, on executor side, - Executors create additional directory, named by session UUID, on the top of the default directory (that is the current working directory, see `SparkFiles.getRootDirectory`). - When we execute Python workers, it sets the current working directory to the one created above. - End users access to these files via using the current working directory e.g., `./blahblah.txt` in their Python UDF. Therefore, compatible with/without Spark Connect. Note that: - Here it creates Python workers for individual session because we set the session UUID as an environment variable, and we create new Python workers if environment variables are different, see also `SparkEnv.createPythonWorker` - It already kills the daemon and Python workers if they are not used for a while. ### TODOs and limitations Executor also maintains the file list but with a cache so it can evict the cache. However, it has a problem - It works as follows: - New `IsolatedSessionState` is created. - Task is executed once, and `IsolatedSessionState` holds the file list. - Later `IsolatedSessionState` is evicted at https://github.com/apache/spark/pull/41625/files#diff-d7a989c491f3cb77cca02c701496a9e2a3443f70af73b0d1ab0899239f3a789dR187 - Executor will create a new `IsolatedSessionState` with empty file lists. - Executor will attempt to redownload and overwrite the files (see https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala#L1058-L1064) - `spark.files.overwrite` is `false` by default. So the task will suddenly fail at this point. Possible solutions are: - For 1., we should maintain a cache with TTL, and remove them - For 2. we should have a dedicated directory (which this PR does) and remove the directory away when the cache is evicted. So the overwrite does not happen ### Why are the changes needed? In order to allow session-based artifact control and multi tenancy. ### Does this PR introduce _any_ user-facing change? Yes, this PR now allows multiple sessions to have their own space. For example, session A and session B can add a file in the same name. Previously this was not possible. ### How was this patch tested? Unittests were added. Closes #41495 from HyukjinKwon/session-base-exec-dir. Authored-by: Hyukjin Kwon <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
What changes were proposed in this pull request?
This PR follows up on #41625 to utilize the classloader/resource isolation in Spark to support multi-user Spark Connect sessions which are isolated from each other (currently, classfiles and jars) and thus, enables multi-user REPLs and UDFs.
SparkArtifactManagerhandling all the artifact movement, each instance is now responsible for a singlesessionHolder(i.e a Spark Connect session) which it requires in it's constructor.sparkConnectArtifactDirectorywhich was initialised inSparkContext. Moving forward, all artifacts are instead separated based on the underlyingSparkSession(using it'ssessionUUID) they belong to in the format ofROOT_ARTIFACT_DIR/<sessionUUID>/jars/....SparkConnectArtifactManageralso builds aJobArtifactSethere which is eventually propagated to the executors where the classloader isolation mechanism uses theuuidparameter.Why are the changes needed?
To enable support for multi-user sessions coexisting on a singular Spark cluster. For example, multi-user Scala REPLs/UDFs will be supported with this PR.
Does this PR introduce any user-facing change?
Yes, multiple Spark Connect REPLs may use a single Spark cluster at once and execute their own UDFs without intefering with each other.
How was this patch tested?
New unit tests in
ArtifactManagerSuite+ existing tests.