From 6f59d3401c1bd231c85f9b6df865077e67a14cf0 Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Thu, 28 Dec 2023 12:53:59 +0900 Subject: [PATCH 1/4] Check Python executable when looking up available Data Sources --- .../scala/org/apache/spark/TestUtils.scala | 13 ++----- .../apache/spark/api/python/PythonUtils.scala | 10 +++--- .../scala/org/apache/spark/util/Utils.scala | 17 ++++++++++ .../datasources/DataSourceManager.scala | 34 +++++++++++++------ 4 files changed, 48 insertions(+), 26 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala index fafde3cf12c6..e85f98ff55c5 100644 --- a/core/src/main/scala/org/apache/spark/TestUtils.scala +++ b/core/src/main/scala/org/apache/spark/TestUtils.scala @@ -34,7 +34,7 @@ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.io.Source import scala.reflect.{classTag, ClassTag} -import scala.sys.process.{Process, ProcessLogger} +import scala.sys.process.Process import scala.util.Try import com.google.common.io.{ByteStreams, Files} @@ -204,16 +204,7 @@ private[spark] object TestUtils extends SparkTestUtils { /** * Test if a command is available. */ - def testCommandAvailable(command: String): Boolean = { - val attempt = if (Utils.isWindows) { - Try(Process(Seq( - "cmd.exe", "/C", s"where $command")).run(ProcessLogger(_ => ())).exitValue()) - } else { - Try(Process(Seq( - "sh", "-c", s"command -v $command")).run(ProcessLogger(_ => ())).exitValue()) - } - attempt.isSuccess && attempt.get == 0 - } + def testCommandAvailable(command: String): Boolean = Utils.checkCommandAvailable(command) // SPARK-40053: This string needs to be updated when the // minimum python supported version changes. diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala index 093047ea72f5..26c790a12447 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala @@ -153,10 +153,10 @@ private[spark] object PythonUtils extends Logging { // Only for testing. private[spark] var additionalTestingPath: Option[String] = None - private[spark] def createPythonFunction(command: Array[Byte]): SimplePythonFunction = { - val pythonExec: String = sys.env.getOrElse( - "PYSPARK_DRIVER_PYTHON", sys.env.getOrElse("PYSPARK_PYTHON", "python3")) + private[spark] val defaultPythonExec: String = sys.env.getOrElse( + "PYSPARK_DRIVER_PYTHON", sys.env.getOrElse("PYSPARK_PYTHON", "python3")) + private[spark] def createPythonFunction(command: Array[Byte]): SimplePythonFunction = { val sourcePython = if (Utils.isTesting) { // Put PySpark source code instead of the build zip archive so we don't need // to build PySpark every time during development. @@ -180,7 +180,7 @@ private[spark] object PythonUtils extends Logging { val pythonVer: String = Process( - Seq(pythonExec, "-c", "import sys; print('%d.%d' % sys.version_info[:2])"), + Seq(defaultPythonExec, "-c", "import sys; print('%d.%d' % sys.version_info[:2])"), None, "PYTHONPATH" -> pythonPath).!!.trim() @@ -188,7 +188,7 @@ private[spark] object PythonUtils extends Logging { command = command.toImmutableArraySeq, envVars = mutable.Map("PYTHONPATH" -> pythonPath).asJava, pythonIncludes = List.empty.asJava, - pythonExec = pythonExec, + pythonExec = defaultPythonExec, pythonVer = pythonVer, broadcastVars = List.empty.asJava, accumulator = null) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index aa5bd73e8535..e3d30725d1c9 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -3027,6 +3027,23 @@ private[spark] object Utils } } + /** + * Check if a command is available. + */ + def checkCommandAvailable(command: String): Boolean = { + // To avoid conflicts with java.lang.Process + import scala.sys.process.{Process, ProcessLogger} + + val attempt = if (Utils.isWindows) { + Try(Process(Seq( + "cmd.exe", "/C", s"where $command")).run(ProcessLogger(_ => ())).exitValue()) + } else { + Try(Process(Seq( + "sh", "-c", s"command -v $command")).run(ProcessLogger(_ => ())).exitValue()) + } + attempt.isSuccess && attempt.get == 0 + } + /** * Return whether we are using G1GC or not */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceManager.scala index c207645ce526..eaf5965bd214 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceManager.scala @@ -17,8 +17,10 @@ package org.apache.spark.sql.execution.datasources +import java.io.File import java.util.Locale import java.util.concurrent.ConcurrentHashMap +import java.util.regex.Pattern import scala.jdk.CollectionConverters._ @@ -26,6 +28,7 @@ import org.apache.spark.api.python.PythonUtils import org.apache.spark.internal.Logging import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.python.UserDefinedPythonDataSource +import org.apache.spark.util.Utils /** @@ -83,17 +86,28 @@ class DataSourceManager extends Logging { object DataSourceManager { - // Visiable for testing + // Visible for testing private[spark] var dataSourceBuilders: Option[Map[String, UserDefinedPythonDataSource]] = None - private def initialDataSourceBuilders = this.synchronized { - if (dataSourceBuilders.isEmpty) { - val result = UserDefinedPythonDataSource.lookupAllDataSourcesInPython() - val builders = result.names.zip(result.dataSources).map { case (name, dataSource) => - name -> - UserDefinedPythonDataSource(PythonUtils.createPythonFunction(dataSource)) - }.toMap - dataSourceBuilders = Some(builders) + private lazy val shouldLoadPythonDataSources: Boolean = { + Utils.checkCommandAvailable(PythonUtils.defaultPythonExec) && + // Make sure PySpark zipped files also exist. + PythonUtils.sparkPythonPath + .split(Pattern.quote(File.separator)).forall(new File(_).exists()) + } + + private def initialDataSourceBuilders: Map[String, UserDefinedPythonDataSource] = { + if (Utils.isTesting || shouldLoadPythonDataSources) this.synchronized { + if (dataSourceBuilders.isEmpty) { + val result = UserDefinedPythonDataSource.lookupAllDataSourcesInPython() + val builders = result.names.zip(result.dataSources).map { case (name, dataSource) => + name -> + UserDefinedPythonDataSource(PythonUtils.createPythonFunction(dataSource)) + }.toMap + dataSourceBuilders = Some(builders) + } + dataSourceBuilders.get + } else { + Map.empty } - dataSourceBuilders.get } } From 355ca7352835f0735a3619ad155204005ac60e85 Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Fri, 29 Dec 2023 16:17:13 +0900 Subject: [PATCH 2/4] try-catch as well --- .../datasources/DataSourceManager.scala | 27 +++++++++++++------ 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceManager.scala index eaf5965bd214..0905df5fc98f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceManager.scala @@ -85,7 +85,7 @@ class DataSourceManager extends Logging { } -object DataSourceManager { +object DataSourceManager extends Logging { // Visible for testing private[spark] var dataSourceBuilders: Option[Map[String, UserDefinedPythonDataSource]] = None private lazy val shouldLoadPythonDataSources: Boolean = { @@ -98,14 +98,25 @@ object DataSourceManager { private def initialDataSourceBuilders: Map[String, UserDefinedPythonDataSource] = { if (Utils.isTesting || shouldLoadPythonDataSources) this.synchronized { if (dataSourceBuilders.isEmpty) { - val result = UserDefinedPythonDataSource.lookupAllDataSourcesInPython() - val builders = result.names.zip(result.dataSources).map { case (name, dataSource) => - name -> - UserDefinedPythonDataSource(PythonUtils.createPythonFunction(dataSource)) - }.toMap - dataSourceBuilders = Some(builders) + val maybeResult = try { + Some(UserDefinedPythonDataSource.lookupAllDataSourcesInPython()) + } catch { + case e: Throwable => + // Even if it fails for whatever reason, we shouldn't make the whole + // application fail. + logWarning( + s"Failed to execute Python worker: $e, giving up looking Python Data Sources") + None + } + + dataSourceBuilders = maybeResult.map { result => + result.names.zip(result.dataSources).map { case (name, dataSource) => + name -> + UserDefinedPythonDataSource(PythonUtils.createPythonFunction(dataSource)) + }.toMap + } } - dataSourceBuilders.get + dataSourceBuilders.getOrElse(Map.empty) } else { Map.empty } From 101296b26b4dfeb88d668738d3c1effafa3975c1 Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Fri, 29 Dec 2023 16:19:54 +0900 Subject: [PATCH 3/4] Fix warning message --- .../spark/sql/execution/datasources/DataSourceManager.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceManager.scala index 0905df5fc98f..66b408b406c7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceManager.scala @@ -105,7 +105,8 @@ object DataSourceManager extends Logging { // Even if it fails for whatever reason, we shouldn't make the whole // application fail. logWarning( - s"Failed to execute Python worker: $e, giving up looking Python Data Sources") + "Skipping the lookup of Python Data Sources " + + s"as it failed to execute the Python worker: $e") None } From eb56b579d1aa4362513bb473d9bbff90df176e11 Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Fri, 29 Dec 2023 16:22:51 +0900 Subject: [PATCH 4/4] fix msg --- .../spark/sql/execution/datasources/DataSourceManager.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceManager.scala index 66b408b406c7..4fc636a59e5a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceManager.scala @@ -105,8 +105,7 @@ object DataSourceManager extends Logging { // Even if it fails for whatever reason, we shouldn't make the whole // application fail. logWarning( - "Skipping the lookup of Python Data Sources " + - s"as it failed to execute the Python worker: $e") + s"Skipping the lookup of Python Data Sources due to the failure: $e") None }