Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ export PYSPARK_DRIVER_PYTHON_OPTS

# Add the PySpark classes to the Python path:
export PYTHONPATH="${SPARK_HOME}/python/:$PYTHONPATH"
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.9.4-src.zip:$PYTHONPATH"
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.9.3-src.zip:$PYTHONPATH"

# Load the PySpark shell.py script when ./pyspark is used interactively:
export OLD_PYTHONSTARTUP="$PYTHONSTARTUP"
Expand Down
2 changes: 1 addition & 1 deletion bin/pyspark2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ if "x%PYSPARK_DRIVER_PYTHON%"=="x" (
)

set PYTHONPATH=%SPARK_HOME%\python;%PYTHONPATH%
set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.10.9.4-src.zip;%PYTHONPATH%
set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.10.9.3-src.zip;%PYTHONPATH%

set OLD_PYTHONSTARTUP=%PYTHONSTARTUP%
set PYTHONSTARTUP=%SPARK_HOME%\python\pyspark\shell.py
Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@
<dependency>
<groupId>net.sf.py4j</groupId>
<artifactId>py4j</artifactId>
<version>0.10.9.4</version>
<version>0.10.9.3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.SparkContext
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}

private[spark] object PythonUtils {
val PY4J_ZIP_NAME = "py4j-0.10.9.4-src.zip"
val PY4J_ZIP_NAME = "py4j-0.10.9.3-src.zip"

/** Get the PYTHONPATH for PySpark, either from SPARK_HOME, if it is set, or from our JAR */
def sparkPythonPath: String = {
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-2-hive-2.3
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ parquet-hadoop/1.12.2//parquet-hadoop-1.12.2.jar
parquet-jackson/1.12.2//parquet-jackson-1.12.2.jar
pickle/1.2//pickle-1.2.jar
protobuf-java/2.5.0//protobuf-java-2.5.0.jar
py4j/0.10.9.4//py4j-0.10.9.4.jar
py4j/0.10.9.3//py4j-0.10.9.3.jar
remotetea-oncrpc/1.1.2//remotetea-oncrpc-1.1.2.jar
rocksdbjni/6.20.3//rocksdbjni-6.20.3.jar
scala-collection-compat_2.12/2.1.1//scala-collection-compat_2.12-2.1.1.jar
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-3-hive-2.3
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ parquet-hadoop/1.12.2//parquet-hadoop-1.12.2.jar
parquet-jackson/1.12.2//parquet-jackson-1.12.2.jar
pickle/1.2//pickle-1.2.jar
protobuf-java/2.5.0//protobuf-java-2.5.0.jar
py4j/0.10.9.4//py4j-0.10.9.4.jar
py4j/0.10.9.3//py4j-0.10.9.3.jar
remotetea-oncrpc/1.1.2//remotetea-oncrpc-1.1.2.jar
rocksdbjni/6.20.3//rocksdbjni-6.20.3.jar
scala-collection-compat_2.12/2.1.1//scala-collection-compat_2.12-2.1.1.jar
Expand Down
2 changes: 1 addition & 1 deletion docs/job-scheduling.md
Original file line number Diff line number Diff line change
Expand Up @@ -304,5 +304,5 @@ via `sc.setJobGroup` in a separate PVM thread, which also disallows to cancel th
later.

`pyspark.InheritableThread` is recommended to use together for a PVM thread to inherit the inheritable attributes
such as local properties in a JVM thread.
such as local properties in a JVM thread, and to avoid resource leak.

2 changes: 1 addition & 1 deletion python/docs/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ SPHINXBUILD ?= sphinx-build
SOURCEDIR ?= source
BUILDDIR ?= build

export PYTHONPATH=$(realpath ..):$(realpath ../lib/py4j-0.10.9.4-src.zip)
export PYTHONPATH=$(realpath ..):$(realpath ../lib/py4j-0.10.9.3-src.zip)

# Put it first so that "make" without argument is like "make help".
help:
Expand Down
2 changes: 1 addition & 1 deletion python/docs/make2.bat
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ if "%SPHINXBUILD%" == "" (
set SOURCEDIR=source
set BUILDDIR=build

set PYTHONPATH=..;..\lib\py4j-0.10.9.4-src.zip
set PYTHONPATH=..;..\lib\py4j-0.10.9.3-src.zip

if "%1" == "" goto help

Expand Down
2 changes: 1 addition & 1 deletion python/docs/source/getting_started/install.rst
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ Package Minimum supported version Note
`pandas` 1.0.5 Optional for Spark SQL
`NumPy` 1.7 Required for MLlib DataFrame-based API
`pyarrow` 1.0.0 Optional for Spark SQL
`Py4J` 0.10.9.4 Required
`Py4J` 0.10.9.3 Required
`pandas` 1.0.5 Required for pandas API on Spark
`pyarrow` 1.0.0 Required for pandas API on Spark
`Numpy` 1.14 Required for pandas API on Spark
Expand Down
Binary file added python/lib/py4j-0.10.9.3-src.zip
Binary file not shown.
Binary file removed python/lib/py4j-0.10.9.4-src.zip
Binary file not shown.
6 changes: 3 additions & 3 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -1365,7 +1365,7 @@ def setJobGroup(self, groupId: str, description: str, interruptOnCancel: bool =
to HDFS-1208, where HDFS may respond to Thread.interrupt() by marking nodes as dead.

If you run jobs in parallel, use :class:`pyspark.InheritableThread` for thread
local inheritance.
local inheritance, and preventing resource leak.

Examples
--------
Expand Down Expand Up @@ -1405,7 +1405,7 @@ def setLocalProperty(self, key: str, value: str) -> None:
Notes
-----
If you run jobs in parallel, use :class:`pyspark.InheritableThread` for thread
local inheritance.
local inheritance, and preventing resource leak.
"""
self._jsc.setLocalProperty(key, value)

Expand All @@ -1423,7 +1423,7 @@ def setJobDescription(self, value: str) -> None:
Notes
-----
If you run jobs in parallel, use :class:`pyspark.InheritableThread` for thread
local inheritance.
local inheritance, and preventing resource leak.
"""
self._jsc.setJobDescription(value)

Expand Down
35 changes: 30 additions & 5 deletions python/pyspark/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,10 +331,13 @@ def inheritable_thread_target(f: Callable) -> Callable:

@functools.wraps(f)
def wrapped(*args: Any, **kwargs: Any) -> Any:
# Set local properties in child thread.
assert SparkContext._active_spark_context is not None
SparkContext._active_spark_context._jsc.sc().setLocalProperties(properties)
return f(*args, **kwargs)
try:
# Set local properties in child thread.
assert SparkContext._active_spark_context is not None
SparkContext._active_spark_context._jsc.sc().setLocalProperties(properties)
return f(*args, **kwargs)
finally:
InheritableThread._clean_py4j_conn_for_current_thread()

return wrapped
else:
Expand Down Expand Up @@ -374,7 +377,10 @@ def copy_local_properties(*a: Any, **k: Any) -> Any:
assert hasattr(self, "_props")
assert SparkContext._active_spark_context is not None
SparkContext._active_spark_context._jsc.sc().setLocalProperties(self._props)
return target(*a, **k)
try:
return target(*a, **k)
finally:
InheritableThread._clean_py4j_conn_for_current_thread()

super(InheritableThread, self).__init__(
target=copy_local_properties, *args, **kwargs # type: ignore[misc]
Expand All @@ -395,6 +401,25 @@ def start(self) -> None:
self._props = SparkContext._active_spark_context._jsc.sc().getLocalProperties().clone()
return super(InheritableThread, self).start()

@staticmethod
def _clean_py4j_conn_for_current_thread() -> None:
from pyspark import SparkContext

jvm = SparkContext._jvm
assert jvm is not None
thread_connection = jvm._gateway_client.get_thread_connection()
if thread_connection is not None:
try:
# Dequeue is shared across other threads but it's thread-safe.
# If this function has to be invoked one more time in the same thead
# Py4J will create a new connection automatically.
jvm._gateway_client.deque.remove(thread_connection)
except ValueError:
# Should never reach this point
return
finally:
thread_connection.close()


if __name__ == "__main__":
if "pypy" not in platform.python_implementation().lower() and sys.version_info[:2] >= (3, 7):
Expand Down
2 changes: 1 addition & 1 deletion python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ def run(self):
license='http://www.apache.org/licenses/LICENSE-2.0',
# Don't forget to update python/docs/source/getting_started/install.rst
# if you're updating the versions or dependencies.
install_requires=['py4j==0.10.9.4'],
install_requires=['py4j==0.10.9.3'],
extras_require={
'ml': ['numpy>=1.15'],
'mllib': ['numpy>=1.15'],
Expand Down
2 changes: 1 addition & 1 deletion sbin/spark-config.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,6 @@ export SPARK_CONF_DIR="${SPARK_CONF_DIR:-"${SPARK_HOME}/conf"}"
# Add the PySpark classes to the PYTHONPATH:
if [ -z "${PYSPARK_PYTHONPATH_SET}" ]; then
export PYTHONPATH="${SPARK_HOME}/python:${PYTHONPATH}"
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.9.4-src.zip:${PYTHONPATH}"
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.9.3-src.zip:${PYTHONPATH}"
export PYSPARK_PYTHONPATH_SET=1
fi