diff --git a/core/src/main/scala/org/apache/spark/api/python/Py4JServer.scala b/core/src/main/scala/org/apache/spark/api/python/Py4JServer.scala index ef85bbd69f77..2e00e79f57f5 100644 --- a/core/src/main/scala/org/apache/spark/api/python/Py4JServer.scala +++ b/core/src/main/scala/org/apache/spark/api/python/Py4JServer.scala @@ -35,7 +35,7 @@ private[spark] class Py4JServer(sparkConf: SparkConf) extends Logging { // Java system properties and such private val localhost = InetAddress.getLoopbackAddress() private[spark] val server = if (sys.env.getOrElse( - "PYSPARK_PIN_THREAD", "true").toLowerCase(Locale.ROOT) == "true") { + "PYSPARK_PIN_THREAD", "false").toLowerCase(Locale.ROOT) == "true") { new py4j.ClientServer.ClientServerBuilder() .authToken(secret) .javaPort(0) diff --git a/docs/job-scheduling.md b/docs/job-scheduling.md index 4ed2aa911222..526171dd95e4 100644 --- a/docs/job-scheduling.md +++ b/docs/job-scheduling.md @@ -303,6 +303,10 @@ in each corresponding JVM thread. Due to this limitation, it is unable to set a via `sc.setJobGroup` in a separate PVM thread, which also disallows to cancel the job via `sc.cancelJobGroup` later. +In order to synchronize PVM threads with JVM threads, you should set `PYSPARK_PIN_THREAD` environment variable +to `true`. This pinned thread mode allows one PVM thread has one corresponding JVM thread. With this mode, `pyspark.InheritableThread` is recommended to use together for a PVM thread to inherit the inheritable attributes - such as local properties in a JVM thread, and to avoid resource leak. + such as local properties in a JVM thread. + +Note that `PYSPARK_PIN_THREAD` is currently experimental and not recommended for use in production. diff --git a/python/docs/source/migration_guide/pyspark_3.1_to_3.2.rst b/python/docs/source/migration_guide/pyspark_3.1_to_3.2.rst index 908d4d34d6e1..0d74a4f796f5 100644 --- a/python/docs/source/migration_guide/pyspark_3.1_to_3.2.rst +++ b/python/docs/source/migration_guide/pyspark_3.1_to_3.2.rst @@ -23,9 +23,3 @@ Upgrading from PySpark 3.1 to 3.2 * In Spark 3.2, the PySpark methods from sql, ml, spark_on_pandas modules raise the ``TypeError`` instead of ``ValueError`` when are applied to an param of inappropriate type. * In Spark 3.2, the traceback from Python UDFs, pandas UDFs and pandas function APIs are simplified by default without the traceback from the internal Python workers. In Spark 3.1 or earlier, the traceback from Python workers was printed out. To restore the behavior before Spark 3.2, you can set ``spark.sql.execution.pyspark.udf.simplifiedTraceback.enabled`` to ``false``. - -* In Spark 3.2, pinned thread mode is enabled by default to map each Python thread to the corresponding JVM thread. Previously, - one JVM thread could be reused for multiple Python threads, which resulted in one JVM thread local being shared to multiple Python threads. - Also, note that now ``pyspark.InheritableThread`` or ``pyspark.inheritable_thread_target`` is recommended to use together for a Python thread - to properly inherit the inheritable attributes such as local properties in a JVM thread, and to avoid a potential resource leak issue. - To restore the behavior before Spark 3.2, you can set ``PYSPARK_PIN_THREAD`` environment variable to ``false``. diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 6c9410645e09..918f3e82882e 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -1112,8 +1112,13 @@ def setJobGroup(self, groupId, description, interruptOnCancel=False): ensure that the tasks are actually stopped in a timely manner, but is off by default due to HDFS-1208, where HDFS may respond to Thread.interrupt() by marking nodes as dead. - If you run jobs in parallel, use :class:`pyspark.InheritableThread` for thread - local inheritance, and preventing resource leak. + Currently, setting a group ID (set to local properties) with multiple threads + does not properly work. Internally threads on PVM and JVM are not synced, and JVM + thread can be reused for multiple threads on PVM, which fails to isolate local + properties for each thread on PVM. + + To avoid this, enable the pinned thread mode by setting ``PYSPARK_PIN_THREAD`` + environment variable to ``true`` and uses :class:`pyspark.InheritableThread`. Examples -------- @@ -1152,8 +1157,13 @@ def setLocalProperty(self, key, value): Notes ----- - If you run jobs in parallel, use :class:`pyspark.InheritableThread` for thread - local inheritance, and preventing resource leak. + Currently, setting a local property with multiple threads does not properly work. + Internally threads on PVM and JVM are not synced, and JVM thread + can be reused for multiple threads on PVM, which fails to isolate local properties + for each thread on PVM. + + To avoid this, enable the pinned thread mode by setting ``PYSPARK_PIN_THREAD`` + environment variable to ``true`` and uses :class:`pyspark.InheritableThread`. """ self._jsc.setLocalProperty(key, value) @@ -1170,8 +1180,13 @@ def setJobDescription(self, value): Notes ----- - If you run jobs in parallel, use :class:`pyspark.InheritableThread` for thread - local inheritance, and preventing resource leak. + Currently, setting a job description (set to local properties) with multiple + threads does not properly work. Internally threads on PVM and JVM are not synced, + and JVM thread can be reused for multiple threads on PVM, which fails to isolate + local properties for each thread on PVM. + + To avoid this, enable the pinned thread mode by setting ``PYSPARK_PIN_THREAD`` + environment variable to ``true`` and uses :class:`pyspark.InheritableThread`. """ self._jsc.setJobDescription(value) diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index bffdc0b7c84d..afc3ea740cbb 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -130,7 +130,7 @@ def killChild(): atexit.register(killChild) # Connect to the gateway (or client server to pin the thread between JVM and Python) - if os.environ.get("PYSPARK_PIN_THREAD", "true").lower() == "true": + if os.environ.get("PYSPARK_PIN_THREAD", "false").lower() == "true": gateway = ClientServer( java_parameters=JavaParameters( port=gateway_port, diff --git a/python/pyspark/tests/test_context.py b/python/pyspark/tests/test_context.py index 4611d038f963..c116738159da 100644 --- a/python/pyspark/tests/test_context.py +++ b/python/pyspark/tests/test_context.py @@ -220,7 +220,7 @@ def test_progress_api(self): def run(): # When thread is pinned, job group should be set for each thread for now. # Local properties seem not being inherited like Scala side does. - if os.environ.get("PYSPARK_PIN_THREAD", "true").lower() == "true": + if os.environ.get("PYSPARK_PIN_THREAD", "false").lower() == "true": sc.setJobGroup('test_progress_api', '', True) try: rdd.count()