Skip to content

Conversation

@HyukjinKwon
Copy link
Member

@HyukjinKwon HyukjinKwon commented May 24, 2023

What changes were proposed in this pull request?

This PR proposes to add the support of archive (.zip, .jar, .tar.gz, .tgz, or .tar files) in SparkSession.addArtifacts so we can support Python dependency management in Python Spark Connect.

Why are the changes needed?

In order for end users to add the dependencies and archive files in Python Spark Connect client.

This PR enables the Python dependency management (https://www.databricks.com/blog/2020/12/22/how-to-manage-python-dependencies-in-pyspark.html) usecase in Spark Connect.

See below how to do this with Spark Connect Python client:

Precondition

Assume that we have a Spark Connect server already running, e.g., by:

./sbin/start-connect-server.sh --jars `ls connector/connect/server/target/**/spark-connect*SNAPSHOT.jar` --master "local-cluster[2,2,1024]"

and assume that you already have a dev env:

# Notice that you should install `conda-pack`.
conda create -y -n pyspark_conda_env -c conda-forge conda-pack python=3.9
conda activate pyspark_conda_env
pip install --upgrade -r dev/requirements.txt

Dependency management

./bin/pyspark --remote "sc://localhost:15002"
import conda_pack
import os
# Pack the current environment ('pyspark_conda_env') to 'pyspark_conda_env.tar.gz'. 
# Or you can run 'conda pack' in your shell.
conda_pack.pack()  
spark.addArtifact(f"{os.environ.get('CONDA_DEFAULT_ENV')}.tar.gz#environment", archive=True)
spark.conf.set("spark.sql.execution.pyspark.python", "environment/bin/python")
# From now on, Python workers on executors use `pyspark_conda_env` Conda environment.

Run your Python UDFs

import pandas as pd
from pyspark.sql.functions import pandas_udf

@pandas_udf("long")
def plug_one(s: pd.Series) -> pd.Series:
    return s + 1

spark.range(10).select(plug_one("id")).show()

Does this PR introduce any user-facing change?

Yes, it adds the support of archive (.zip, .jar, .tar.gz, .tgz, or .tar files) in SparkSession.addArtifacts.

How was this patch tested?

Manually tested as described above, and added a unittest.

Also, manually tested with local-cluster mode with the code below:

Also verified via:

import sys
from pyspark.sql.functions import udf

spark.range(1).select(udf(lambda x: sys.executable)("id")).show(truncate=False)
+----------------------------------------------------------------+
|<lambda>(id)                                                    |
+----------------------------------------------------------------+
|/.../spark/work/app-20230524132024-0000/1/environment/bin/python|
+----------------------------------------------------------------+

# This has a limitation - hash(#) in the file name would not work.
if "#" in local_path:
raise ValueError("'#' in the path is not supported for adding an archive.")
name = f"{name}#{parsed.fragment}"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually pretty ugly workaround to support fragment in URI (but I believe this is the minimized change). Maybe we should pass URI instead of file path in Artifacts (?) but I would like to avoid touching the whole implementation in this PR. cc @hvanhovell @vicennial

@HyukjinKwon
Copy link
Member Author

Merged to master.

czxm pushed a commit to czxm/spark that referenced this pull request Jun 12, 2023
…n Python Spark Connect

### What changes were proposed in this pull request?

This PR proposes to add the support of archive (`.zip`, `.jar`, `.tar.gz`, `.tgz`, or `.tar` files) in `SparkSession.addArtifacts` so we can support Python dependency management in Python Spark Connect.

### Why are the changes needed?

In order for end users to add the dependencies and archive files in Python Spark Connect client.

This PR enables the Python dependency management (https://www.databricks.com/blog/2020/12/22/how-to-manage-python-dependencies-in-pyspark.html) usecase in Spark Connect.

See below how to do this with Spark Connect Python client:

#### Precondition

Assume that we have a Spark Connect server already running, e.g., by:

```bash
./sbin/start-connect-server.sh --jars `ls connector/connect/server/target/**/spark-connect*SNAPSHOT.jar` --master "local-cluster[2,2,1024]"
```

and assume that you already have a dev env:

```bash
# Notice that you should install `conda-pack`.
conda create -y -n pyspark_conda_env -c conda-forge conda-pack python=3.9
conda activate pyspark_conda_env
pip install --upgrade -r dev/requirements.txt
```

#### Dependency management

```python
./bin/pyspark --remote "sc://localhost:15002"
```

```python
import conda_pack
import os
# Pack the current environment ('pyspark_conda_env') to 'pyspark_conda_env.tar.gz'.
# Or you can run 'conda pack' in your shell.
conda_pack.pack()
spark.addArtifact(f"{os.environ.get('CONDA_DEFAULT_ENV')}.tar.gz#environment", archive=True)
spark.conf.set("spark.sql.execution.pyspark.python", "environment/bin/python")
# From now on, Python workers on executors use `pyspark_conda_env` Conda environment.
```

Run your Python UDFs

```python
import pandas as pd
from pyspark.sql.functions import pandas_udf

pandas_udf("long")
def plug_one(s: pd.Series) -> pd.Series:
    return s + 1

spark.range(10).select(plug_one("id")).show()
```

### Does this PR introduce _any_ user-facing change?

Yes, it adds the support of archive (`.zip`, `.jar`, `.tar.gz`, `.tgz`, or `.tar` files) in `SparkSession.addArtifacts`.

### How was this patch tested?

Manually tested as described above, and added a unittest.

Also, manually tested with `local-cluster` mode with the code below:

Also verified via:

```python
import sys
from pyspark.sql.functions import udf

spark.range(1).select(udf(lambda x: sys.executable)("id")).show(truncate=False)
```
```
+----------------------------------------------------------------+
|<lambda>(id)                                                    |
+----------------------------------------------------------------+
|/.../spark/work/app-20230524132024-0000/1/environment/bin/python|
+----------------------------------------------------------------+
```

Closes apache#41292 from HyukjinKwon/python-addArchive.

Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
HyukjinKwon added a commit that referenced this pull request Aug 2, 2023
…isolated session cache is evicted

### What changes were proposed in this pull request?

This PR proposes to remove session-based directories when the isolated session is evicted from the cache.

### Why are the changes needed?

SPARK-44078 added the cache for isolated sessions, and SPARK-44348 added the session-based directory for isolation.
When the isolated session cache is evicted, we should remove the session-based directory so it doesn't fail when the same session is used, see also #41625 (comment)

### Does this PR introduce _any_ user-facing change?

No to end users since the feature has not been released yet.

### How was this patch tested?

I manually tested as described in #41292. Especially, I reduced the TTL to few minutes, and tested as below at the last step:

```python
spark.range(10).select(plug_one("id")).show()
spark.range(10).select(plug_one("id")).show()
# Wait few minutes
spark.range(10).select(plug_one("id")).show()
```

I verified that the same session can be added back to the cache, and creates the directory with the same name by reading executor's stderr at Spark UI.

Closes #42289 from HyukjinKwon/SPARK-44631.

Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
HyukjinKwon added a commit that referenced this pull request Aug 2, 2023
…isolated session cache is evicted

### What changes were proposed in this pull request?

This PR proposes to remove session-based directories when the isolated session is evicted from the cache.

### Why are the changes needed?

SPARK-44078 added the cache for isolated sessions, and SPARK-44348 added the session-based directory for isolation.
When the isolated session cache is evicted, we should remove the session-based directory so it doesn't fail when the same session is used, see also #41625 (comment)

### Does this PR introduce _any_ user-facing change?

No to end users since the feature has not been released yet.

### How was this patch tested?

I manually tested as described in #41292. Especially, I reduced the TTL to few minutes, and tested as below at the last step:

```python
spark.range(10).select(plug_one("id")).show()
spark.range(10).select(plug_one("id")).show()
# Wait few minutes
spark.range(10).select(plug_one("id")).show()
```

I verified that the same session can be added back to the cache, and creates the directory with the same name by reading executor's stderr at Spark UI.

Closes #42289 from HyukjinKwon/SPARK-44631.

Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit 35d4765)
Signed-off-by: Hyukjin Kwon <[email protected]>
@HyukjinKwon HyukjinKwon deleted the python-addArchive branch January 15, 2024 00:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants