Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 2 additions & 11 deletions core/src/main/scala/org/apache/spark/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.io.Source
import scala.reflect.{classTag, ClassTag}
import scala.sys.process.{Process, ProcessLogger}
import scala.sys.process.Process
import scala.util.Try

import com.google.common.io.{ByteStreams, Files}
Expand Down Expand Up @@ -204,16 +204,7 @@ private[spark] object TestUtils extends SparkTestUtils {
/**
* Test if a command is available.
*/
def testCommandAvailable(command: String): Boolean = {
val attempt = if (Utils.isWindows) {
Try(Process(Seq(
"cmd.exe", "/C", s"where $command")).run(ProcessLogger(_ => ())).exitValue())
} else {
Try(Process(Seq(
"sh", "-c", s"command -v $command")).run(ProcessLogger(_ => ())).exitValue())
}
attempt.isSuccess && attempt.get == 0
}
def testCommandAvailable(command: String): Boolean = Utils.checkCommandAvailable(command)

// SPARK-40053: This string needs to be updated when the
// minimum python supported version changes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,10 @@ private[spark] object PythonUtils extends Logging {
// Only for testing.
private[spark] var additionalTestingPath: Option[String] = None

private[spark] def createPythonFunction(command: Array[Byte]): SimplePythonFunction = {
val pythonExec: String = sys.env.getOrElse(
"PYSPARK_DRIVER_PYTHON", sys.env.getOrElse("PYSPARK_PYTHON", "python3"))
private[spark] val defaultPythonExec: String = sys.env.getOrElse(
"PYSPARK_DRIVER_PYTHON", sys.env.getOrElse("PYSPARK_PYTHON", "python3"))

private[spark] def createPythonFunction(command: Array[Byte]): SimplePythonFunction = {
val sourcePython = if (Utils.isTesting) {
// Put PySpark source code instead of the build zip archive so we don't need
// to build PySpark every time during development.
Expand All @@ -180,15 +180,15 @@ private[spark] object PythonUtils extends Logging {

val pythonVer: String =
Process(
Seq(pythonExec, "-c", "import sys; print('%d.%d' % sys.version_info[:2])"),
Seq(defaultPythonExec, "-c", "import sys; print('%d.%d' % sys.version_info[:2])"),
None,
"PYTHONPATH" -> pythonPath).!!.trim()

SimplePythonFunction(
command = command.toImmutableArraySeq,
envVars = mutable.Map("PYTHONPATH" -> pythonPath).asJava,
pythonIncludes = List.empty.asJava,
pythonExec = pythonExec,
pythonExec = defaultPythonExec,
pythonVer = pythonVer,
broadcastVars = List.empty.asJava,
accumulator = null)
Expand Down
17 changes: 17 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3027,6 +3027,23 @@ private[spark] object Utils
}
}

/**
* Check if a command is available.
*/
def checkCommandAvailable(command: String): Boolean = {
// To avoid conflicts with java.lang.Process
import scala.sys.process.{Process, ProcessLogger}

val attempt = if (Utils.isWindows) {
Try(Process(Seq(
"cmd.exe", "/C", s"where $command")).run(ProcessLogger(_ => ())).exitValue())
} else {
Try(Process(Seq(
"sh", "-c", s"command -v $command")).run(ProcessLogger(_ => ())).exitValue())
}
attempt.isSuccess && attempt.get == 0
}

/**
* Return whether we are using G1GC or not
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,18 @@

package org.apache.spark.sql.execution.datasources

import java.io.File
import java.util.Locale
import java.util.concurrent.ConcurrentHashMap
import java.util.regex.Pattern

import scala.jdk.CollectionConverters._

import org.apache.spark.api.python.PythonUtils
import org.apache.spark.internal.Logging
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.python.UserDefinedPythonDataSource
import org.apache.spark.util.Utils


/**
Expand Down Expand Up @@ -82,18 +85,40 @@ class DataSourceManager extends Logging {
}


object DataSourceManager {
// Visiable for testing
object DataSourceManager extends Logging {
// Visible for testing
private[spark] var dataSourceBuilders: Option[Map[String, UserDefinedPythonDataSource]] = None
private def initialDataSourceBuilders = this.synchronized {
if (dataSourceBuilders.isEmpty) {
val result = UserDefinedPythonDataSource.lookupAllDataSourcesInPython()
val builders = result.names.zip(result.dataSources).map { case (name, dataSource) =>
name ->
UserDefinedPythonDataSource(PythonUtils.createPythonFunction(dataSource))
}.toMap
dataSourceBuilders = Some(builders)
private lazy val shouldLoadPythonDataSources: Boolean = {
Utils.checkCommandAvailable(PythonUtils.defaultPythonExec) &&
// Make sure PySpark zipped files also exist.
PythonUtils.sparkPythonPath
.split(Pattern.quote(File.separator)).forall(new File(_).exists())
}

private def initialDataSourceBuilders: Map[String, UserDefinedPythonDataSource] = {
if (Utils.isTesting || shouldLoadPythonDataSources) this.synchronized {
if (dataSourceBuilders.isEmpty) {
val maybeResult = try {
Some(UserDefinedPythonDataSource.lookupAllDataSourcesInPython())
} catch {
case e: Throwable =>
// Even if it fails for whatever reason, we shouldn't make the whole
// application fail.
logWarning(
s"Skipping the lookup of Python Data Sources due to the failure: $e")
None
}

dataSourceBuilders = maybeResult.map { result =>
result.names.zip(result.dataSources).map { case (name, dataSource) =>
name ->
UserDefinedPythonDataSource(PythonUtils.createPythonFunction(dataSource))
}.toMap
}
}
dataSourceBuilders.getOrElse(Map.empty)
} else {
Map.empty
}
dataSourceBuilders.get
}
}