Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 56 additions & 4 deletions core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,29 @@

package org.apache.spark.deploy

import java.net.URI

import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConversions._

import org.apache.spark.api.python.{PythonUtils, RedirectThread}
import org.apache.spark.util.Utils

/**
* A main class used by spark-submit to launch Python applications. It executes python as a
* subprocess and then has it connect back to the JVM to access system properties, etc.
*/
object PythonRunner {
def main(args: Array[String]) {
val primaryResource = args(0)
val pythonFile = args(0)
val pyFiles = args(1)
val otherArgs = args.slice(2, args.length)

val pythonExec = sys.env.get("PYSPARK_PYTHON").getOrElse("python") // TODO: get this from conf

// Format python file paths before adding them to the PYTHONPATH
val formattedPythonFile = formatPath(pythonFile)
val formattedPyFiles = formatPaths(pyFiles)

// Launch a Py4J gateway server for the process to connect to; this will let it see our
// Java system properties and such
val gatewayServer = new py4j.GatewayServer(null, 0)
Expand All @@ -42,13 +48,13 @@ object PythonRunner {
// Build up a PYTHONPATH that includes the Spark assembly JAR (where this class is), the
// python directories in SPARK_HOME (if set), and any files in the pyFiles argument
val pathElements = new ArrayBuffer[String]
pathElements ++= Option(pyFiles).getOrElse("").split(",")
pathElements ++= formattedPyFiles
pathElements += PythonUtils.sparkPythonPath
pathElements += sys.env.getOrElse("PYTHONPATH", "")
val pythonPath = PythonUtils.mergePythonPaths(pathElements: _*)

// Launch Python process
val builder = new ProcessBuilder(Seq(pythonExec, "-u", primaryResource) ++ otherArgs)
val builder = new ProcessBuilder(Seq(pythonExec, "-u", formattedPythonFile) ++ otherArgs)
val env = builder.environment()
env.put("PYTHONPATH", pythonPath)
env.put("PYSPARK_GATEWAY_PORT", "" + gatewayServer.getListeningPort)
Expand All @@ -59,4 +65,50 @@ object PythonRunner {

System.exit(process.waitFor())
}

/**
* Format the python file path so that it can be added to the PYTHONPATH correctly.
*
* Python does not understand URI schemes in paths. Before adding python files to the
* PYTHONPATH, we need to extract the path from the URI. This is safe to do because we
* currently only support local python files.
*/
def formatPath(path: String, testWindows: Boolean = false): String = {
if (Utils.nonLocalPaths(path, testWindows).nonEmpty) {
throw new IllegalArgumentException("Launching Python applications through " +
s"spark-submit is currently only supported for local files: $path")
}
val windows = Utils.isWindows || testWindows
var formattedPath = if (windows) Utils.formatWindowsPath(path) else path

// Strip the URI scheme from the path
formattedPath =
new URI(formattedPath).getScheme match {
case Utils.windowsDrive(d) if windows => formattedPath
case null => formattedPath
case _ => new URI(formattedPath).getPath
}

// Guard against malformed paths potentially throwing NPE
if (formattedPath == null) {
throw new IllegalArgumentException(s"Python file path is malformed: $path")
}

// In Windows, the drive should not be prefixed with "/"
// For instance, python does not understand "/C:/path/to/sheep.py"
formattedPath = if (windows) formattedPath.stripPrefix("/") else formattedPath
formattedPath
}

/**
* Format each python file path in the comma-delimited list of paths, so it can be
* added to the PYTHONPATH correctly.
*/
def formatPaths(paths: String, testWindows: Boolean = false): Array[String] = {
Option(paths).getOrElse("")
.split(",")
.filter(_.nonEmpty)
.map { p => formatPath(p, testWindows) }
}

}
25 changes: 15 additions & 10 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,9 @@ object SparkSubmit {
args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs
args.files = mergeFileLists(args.files, args.primaryResource)
}
val pyFiles = Option(args.pyFiles).getOrElse("")
args.files = mergeFileLists(args.files, pyFiles)
sysProps("spark.submit.pyFiles") = pyFiles
args.files = mergeFileLists(args.files, args.pyFiles)
// Format python file paths properly before adding them to the PYTHONPATH
sysProps("spark.submit.pyFiles") = PythonRunner.formatPaths(args.pyFiles).mkString(",")
}

// If we're deploying into YARN, use yarn.Client as a wrapper around the user class
Expand Down Expand Up @@ -299,13 +299,18 @@ object SparkSubmit {
}

private def addJarToClasspath(localJar: String, loader: ExecutorURLClassLoader) {
val localJarFile = new File(localJar)
if (!localJarFile.exists()) {
printWarning(s"Jar $localJar does not exist, skipping.")
val uri = Utils.resolveURI(localJar)
uri.getScheme match {
case "file" | "local" =>
val file = new File(uri.getPath)
if (file.exists()) {
loader.addURL(file.toURI.toURL)
} else {
printWarning(s"Local jar $file does not exist, skipping.")
}
case _ =>
printWarning(s"Skip remote jar $uri.")
}

val url = localJarFile.getAbsoluteFile.toURI.toURL
loader.addURL(url)
}

/**
Expand All @@ -318,7 +323,7 @@ object SparkSubmit {
/**
* Return whether the given primary resource represents a shell.
*/
private def isShell(primaryResource: String): Boolean = {
private[spark] def isShell(primaryResource: String): Boolean = {
primaryResource == SPARK_SHELL || primaryResource == PYSPARK_SHELL
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
mainClass = jar.getManifest.getMainAttributes.getValue("Main-Class")
} catch {
case e: Exception =>
SparkSubmit.printErrorAndExit("Failed to read JAR: " + primaryResource)
SparkSubmit.printErrorAndExit("Cannot load main class from JAR: " + primaryResource)
return
}
}
Expand Down Expand Up @@ -148,6 +148,18 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
SparkSubmit.printErrorAndExit("--py-files given but primary resource is not a Python script")
}

// Require all python files to be local, so we can add them to the PYTHONPATH
if (isPython) {
if (Utils.nonLocalPaths(primaryResource).nonEmpty) {
SparkSubmit.printErrorAndExit(s"Only local python files are supported: $primaryResource")
}
val nonLocalPyFiles = Utils.nonLocalPaths(pyFiles).mkString(",")
if (nonLocalPyFiles.nonEmpty) {
SparkSubmit.printErrorAndExit(
s"Only local additional python files are supported: $nonLocalPyFiles")
}
}

if (master.startsWith("yarn")) {
val hasHadoopEnv = sys.env.contains("HADOOP_CONF_DIR") || sys.env.contains("YARN_CONF_DIR")
if (!hasHadoopEnv && !Utils.isTesting) {
Expand Down Expand Up @@ -263,19 +275,19 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
parse(tail)

case ("--files") :: value :: tail =>
files = value
files = Utils.resolveURIs(value)
parse(tail)

case ("--py-files") :: value :: tail =>
pyFiles = value
pyFiles = Utils.resolveURIs(value)
parse(tail)

case ("--archives") :: value :: tail =>
archives = value
archives = Utils.resolveURIs(value)
parse(tail)

case ("--jars") :: value :: tail =>
jars = value
jars = Utils.resolveURIs(value)
parse(tail)

case ("--help" | "-h") :: tail =>
Expand All @@ -296,7 +308,12 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
val errMessage = s"Unrecognized option '$value'."
SparkSubmit.printErrorAndExit(errMessage)
case v =>
primaryResource = v
primaryResource =
if (!SparkSubmit.isShell(v)) {
Utils.resolveURI(v).toString
} else {
v
}
inSparkOpts = false
isPython = SparkSubmit.isPython(v)
parse(tail)
Expand Down Expand Up @@ -327,8 +344,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
| --name NAME A name of your application.
| --jars JARS Comma-separated list of local jars to include on the driver
| and executor classpaths.
| --py-files PY_FILES Comma-separated list of .zip or .egg files to place on the
| PYTHONPATH for Python apps.
| --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to place
| on the PYTHONPATH for Python apps.
| --files FILES Comma-separated list of files to be placed in the working
| directory of each executor.
| --properties-file FILE Path to a file from which to load extra properties. If not
Expand Down
71 changes: 69 additions & 2 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1086,9 +1086,19 @@ private[spark] object Utils extends Logging {
}

/**
* Return true if this is Windows.
* Whether the underlying operating system is Windows.
*/
def isWindows = SystemUtils.IS_OS_WINDOWS
val isWindows = SystemUtils.IS_OS_WINDOWS

/**
* Pattern for matching a Windows drive, which contains only a single alphabet character.
*/
val windowsDrive = "([a-zA-Z])".r

/**
* Format a Windows path such that it can be safely passed to a URI.
*/
def formatWindowsPath(path: String): String = path.replace("\\", "/")

/**
* Indicates whether Spark is currently running unit tests.
Expand Down Expand Up @@ -1166,4 +1176,61 @@ private[spark] object Utils extends Logging {
true
}
}

/**
* Return a well-formed URI for the file described by a user input string.
*
* If the supplied path does not contain a scheme, or is a relative path, it will be
* converted into an absolute path with a file:// scheme.
*/
def resolveURI(path: String, testWindows: Boolean = false): URI = {

// In Windows, the file separator is a backslash, but this is inconsistent with the URI format
val windows = isWindows || testWindows
val formattedPath = if (windows) formatWindowsPath(path) else path

val uri = new URI(formattedPath)
if (uri.getPath == null) {
throw new IllegalArgumentException(s"Given path is malformed: $uri")
}
uri.getScheme match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also add a validation check here to ensure that the URI (and hence the original path given by the user) is valid and not malformed. I, just for fun, entered "file:x", and it gave me a NPE at a random location, as uri.getPath was null.

scala> new URI("file:x").getPath
res1: String = null

That is what we should check for here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, please add test cases for some bad paths that you can think of.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I think, at least for local files, we can check early, in spark submit whether that file exists or not. But that can be done in a different PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should delegate checking whether file exists to somewhere else outside of this function. This is intended to be a general utils function, where the file may not need to exist locally even if it is a file:/.

Yes, NPEs are bad error messages and I will add a guard them.

case windowsDrive(d) if windows =>
new URI("file:/" + uri.toString.stripPrefix("/"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a little confused with this. This function will convert window's path like c:\hello\world.txt to file:/c:/hello/world.txt. So the backslashes get permanently replaced by front slashes. How does resolving paths in window work after that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Windows, both backslashes and forward slashes are valid file separators

case null =>
// Preserve fragments for HDFS file name substitution (denoted by "#")
// For instance, in "abc.py#xyz.py", "xyz.py" is the name observed by the application
val fragment = uri.getFragment
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to document that we preserve the fragment because it has a special meaning for YARN URI's.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a small bullet point that explains this: http://people.apache.org/~pwendell/spark-1.0.0-rc7-docs/running-on-yarn.html#important-notes

I'll add a comment here.

val part = new File(uri.getPath).toURI
new URI(part.getScheme, part.getPath, fragment)
case _ =>
uri
}
}

/** Resolve a comma-separated list of paths. */
def resolveURIs(paths: String, testWindows: Boolean = false): String = {
if (paths == null || paths.trim.isEmpty) {
""
} else {
paths.split(",").map { p => Utils.resolveURI(p, testWindows) }.mkString(",")
}
}

/** Return all non-local paths from a comma-separated list of paths. */
def nonLocalPaths(paths: String, testWindows: Boolean = false): Array[String] = {
val windows = isWindows || testWindows
if (paths == null || paths.trim.isEmpty) {
Array.empty
} else {
paths.split(",").filter { p =>
val formattedPath = if (windows) formatWindowsPath(p) else p
new URI(formattedPath).getScheme match {
case windowsDrive(d) if windows => false
case "local" | "file" | null => false
case _ => true
}
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy

import org.scalatest.FunSuite

class PythonRunnerSuite extends FunSuite {

// Test formatting a single path to be added to the PYTHONPATH
test("format path") {
assert(PythonRunner.formatPath("spark.py") === "spark.py")
assert(PythonRunner.formatPath("file:/spark.py") === "/spark.py")
assert(PythonRunner.formatPath("file:///spark.py") === "/spark.py")
assert(PythonRunner.formatPath("local:/spark.py") === "/spark.py")
assert(PythonRunner.formatPath("local:///spark.py") === "/spark.py")
assert(PythonRunner.formatPath("C:/a/b/spark.py", testWindows = true) === "C:/a/b/spark.py")
assert(PythonRunner.formatPath("/C:/a/b/spark.py", testWindows = true) === "C:/a/b/spark.py")
assert(PythonRunner.formatPath("file:/C:/a/b/spark.py", testWindows = true) ===
"C:/a/b/spark.py")
intercept[IllegalArgumentException] { PythonRunner.formatPath("one:two") }
intercept[IllegalArgumentException] { PythonRunner.formatPath("hdfs:s3:xtremeFS") }
intercept[IllegalArgumentException] { PythonRunner.formatPath("hdfs:/path/to/some.py") }
}

// Test formatting multiple comma-separated paths to be added to the PYTHONPATH
test("format paths") {
assert(PythonRunner.formatPaths("spark.py") === Array("spark.py"))
assert(PythonRunner.formatPaths("file:/spark.py") === Array("/spark.py"))
assert(PythonRunner.formatPaths("file:/app.py,local:/spark.py") ===
Array("/app.py", "/spark.py"))
assert(PythonRunner.formatPaths("me.py,file:/you.py,local:/we.py") ===
Array("me.py", "/you.py", "/we.py"))
assert(PythonRunner.formatPaths("C:/a/b/spark.py", testWindows = true) ===
Array("C:/a/b/spark.py"))
assert(PythonRunner.formatPaths("/C:/a/b/spark.py", testWindows = true) ===
Array("C:/a/b/spark.py"))
assert(PythonRunner.formatPaths("C:/free.py,pie.py", testWindows = true) ===
Array("C:/free.py", "pie.py"))
assert(PythonRunner.formatPaths("lovely.py,C:/free.py,file:/d:/fry.py", testWindows = true) ===
Array("lovely.py", "C:/free.py", "d:/fry.py"))
intercept[IllegalArgumentException] { PythonRunner.formatPaths("one:two,three") }
intercept[IllegalArgumentException] { PythonRunner.formatPaths("two,three,four:five:six") }
intercept[IllegalArgumentException] { PythonRunner.formatPaths("hdfs:/some.py,foo.py") }
intercept[IllegalArgumentException] { PythonRunner.formatPaths("foo.py,hdfs:/some.py") }
}
}
Loading