From 9f0ccc958518ecc1050304c27b848c6806350167 Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Fri, 29 Jan 2016 11:43:24 +0800 Subject: [PATCH 1/8] temp save --- .../scala/org/apache/spark/SparkEnv.scala | 2 +- .../apache/spark/api/python/PythonRDD.scala | 9 ++ .../api/python/PythonWorkerFactory.scala | 84 ++++++++++++++++++- python/pyspark/context.py | 9 ++ .../org/apache/spark/deploy/yarn/Client.scala | 1 + 5 files changed, 100 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 1ffeb129880f..44afc808d84f 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -112,7 +112,7 @@ class SparkEnv ( def createPythonWorker(pythonExec: String, envVars: Map[String, String]): java.net.Socket = { synchronized { val key = (pythonExec, envVars) - pythonWorkers.getOrElseUpdate(key, new PythonWorkerFactory(pythonExec, envVars)).create() + pythonWorkers.getOrElseUpdate(key, new PythonWorkerFactory(pythonExec, envVars, conf)).create() } } diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 0ca91b9bf86c..64c3604f9ac2 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -58,6 +58,15 @@ private[spark] class PythonRDD( val asJavaRDD: JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this) +// { +// logInfo("parent:" + parent.toString) +// logInfo("command:" + command) +// logInfo("envVars:" + envVars) +// logInfo("pythonIncludes:" + pythonIncludes) +// logInfo("pythonExec:" + pythonExec) +// logInfo("pythonVer:" + pythonVer) +// } + override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = { val runner = PythonRunner(func, bufferSize, reuse_worker) runner.compute(firstParent.iterator(split, context), split.index, context) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala index 6a5e6f7c5afb..337d388d8531 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala @@ -17,10 +17,13 @@ package org.apache.spark.api.python -import java.io.{DataInputStream, DataOutputStream, InputStream, OutputStreamWriter} +import java.io._ import java.net.{InetAddress, ServerSocket, Socket, SocketException} import java.nio.charset.StandardCharsets import java.util.Arrays +import java.util.{UUID, Arrays} +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicBoolean import scala.collection.mutable import scala.collection.JavaConverters._ @@ -29,7 +32,10 @@ import org.apache.spark._ import org.apache.spark.internal.Logging import org.apache.spark.util.{RedirectThread, Utils} -private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String, String]) +import scala.collection.mutable.ArrayBuffer +import scala.io.Source + +private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String, String], conf: SparkConf) extends Logging { import PythonWorkerFactory._ @@ -46,6 +52,8 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String val daemonWorkers = new mutable.WeakHashMap[Socket, Int]() val idleWorkers = new mutable.Queue[Socket]() var lastActivity = 0L + val virtualEnvSetup = new AtomicBoolean(false) + var virtualEnvDir: Option[String] = _ new MonitorThread().start() var simpleWorkers = new mutable.WeakHashMap[Socket, Process]() @@ -55,6 +63,14 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String envVars.getOrElse("PYTHONPATH", ""), sys.env.getOrElse("PYTHONPATH", "")) + synchronized { + if (conf.getBoolean("spark.pyspark.virtualenv.enabled", false) && !virtualEnvSetup.get()) { + createVirtualEnv() + initVirutalEnv() + virtualEnvSetup.set(true) + } + } + def create(): Socket = { if (useDaemon) { synchronized { @@ -68,6 +84,61 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String } } + class StreamGobbler(is:InputStream, isError: Boolean) extends Thread { + + var output: List[String] = _ + + override def run() : Unit = { + import scala.collection.JavaConverters._ + try { + val isr = new InputStreamReader(is); + val br = new BufferedReader(isr); + val lines = Source.fromInputStream(is).bufferedReader().lines() + output = lines.iterator().asScala.toList + output.foreach(log.info) + } catch { + case ioe :IOException => ioe.printStackTrace(); + } + } + } + + + def createVirtualEnv(): Unit = { + virtualEnvDir = Some(UUID.randomUUID().toString) + val pb = new ProcessBuilder( + Arrays.asList(conf.get("spark.pyspark.virtualenv.path","virtualenv"), + "-p", pythonExec, + "--no-site-packages", virtualEnvDir.get)) + val proc = pb.start() + val stderr = new StreamGobbler(proc.getErrorStream, true) + stderr.start() + val stdin = new StreamGobbler(proc.getInputStream, false) + stdin.start() + val exitCode = proc.waitFor() + if (exitCode != 0) { + throw new RuntimeException(stdin.output + "\n" + stderr.output) + } + } + + def initVirutalEnv(): Unit = { + val pyspark_requirement = + if (conf.get("spark.master").contains("local")) + conf.get("spark.pyspark.virtualenv.requirements") + else + conf.get("spark.pyspark.virtualenv.requirements").split("/").last + val pb = new ProcessBuilder(Arrays.asList(virtualEnvDir.get + "/bin/pip", "install", + "-r" , pyspark_requirement)) + val proc = pb.start() + val stderr = new StreamGobbler(proc.getErrorStream, true) + stderr.start() + val stdin = new StreamGobbler(proc.getInputStream, false) + stdin.start() + val exitCode = proc.waitFor() + if (exitCode != 0) { + throw new RuntimeException(stdin.output + "\n" + stderr.output) + } + } + /** * Connect to a worker launched through pyspark/daemon.py, which forks python processes itself * to avoid the high cost of forking from Java. This currently only works on UNIX-based systems. @@ -111,7 +182,10 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(Array(127, 0, 0, 1))) // Create and start the worker - val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", "pyspark.worker")) + val realPythonExec = if (virtualEnvDir.isDefined) virtualEnvDir.get + "/bin/python" + else pythonExec + val pb = new ProcessBuilder(Arrays.asList(realPythonExec, + "-m", "pyspark.worker")) val workerEnv = pb.environment() workerEnv.putAll(envVars.asJava) workerEnv.put("PYTHONPATH", pythonPath) @@ -154,7 +228,9 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String try { // Create and start the daemon - val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", "pyspark.daemon")) + val realPythonExec = if (virtualEnvDir.isDefined) virtualEnvDir.get + "/bin/python" + else pythonExec + val pb = new ProcessBuilder(Arrays.asList(realPythonExec, "-m", "pyspark.daemon")) val workerEnv = pb.environment() workerEnv.putAll(envVars.asJava) workerEnv.put("PYTHONPATH", pythonPath) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index a3dd1950a522..0fd038f76a1e 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -177,6 +177,15 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, self._jsc.sc().register(self._javaAccumulator) self.pythonExec = os.environ.get("PYSPARK_PYTHON", 'python') + print("*************************pythonExec:" + self.pythonExec) + if self._conf.get("spark.pyspark.virtualenv.enabled") == "true": + requirements = self._conf.get("spark.pyspark.virtualenv.requirements") + if not requirements: + raise Exception("spark.pyspark.virtualenv.enabled is set as true but no value for " + "spark.pyspark.virtualenv.requirements") + else: + self.addPyFile(self._conf.get("spark.pyspark.virtualenv.requirements")) + self.pythonVer = "%d.%d" % sys.version_info[:2] # Broadcast's __reduce__ method stores Broadcast instances here. diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index ea4e1160b767..9cb504ca6cba 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -748,6 +748,7 @@ private[spark] class Client( env("SPARK_YARN_MODE") = "true" env("SPARK_YARN_STAGING_DIR") = stagingDirPath.toString env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName() +// env("PATH") = "$PATH:" + "/Users/jzhang/anaconda/bin" if (loginFromKeytab) { val credentialsFile = "credentials-" + UUID.randomUUID().toString sparkConf.set(CREDENTIALS_FILE_PATH, new Path(stagingDirPath, credentialsFile).toString) From 9386c319fdc92ee71054a402721d2fb9e6e4de0b Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Mon, 1 Feb 2016 15:54:54 +0800 Subject: [PATCH 2/8] change it to java 7 stule --- .../org/apache/spark/api/python/PythonWorkerFactory.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala index 337d388d8531..93f5375a3e19 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala @@ -25,6 +25,8 @@ import java.util.{UUID, Arrays} import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicBoolean +import org.apache.commons.io.IOUtils + import scala.collection.mutable import scala.collection.JavaConverters._ @@ -93,7 +95,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String try { val isr = new InputStreamReader(is); val br = new BufferedReader(isr); - val lines = Source.fromInputStream(is).bufferedReader().lines() + val lines = IOUtils.readLines(br) output = lines.iterator().asScala.toList output.foreach(log.info) } catch { From 2e9c9fecefc2383cb84596f8500c631b8a9a8afb Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Mon, 1 Feb 2016 16:14:03 +0800 Subject: [PATCH 3/8] minor fix --- .../scala/org/apache/spark/api/python/PythonWorkerFactory.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala index 93f5375a3e19..6b1f168ca822 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala @@ -55,7 +55,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String val idleWorkers = new mutable.Queue[Socket]() var lastActivity = 0L val virtualEnvSetup = new AtomicBoolean(false) - var virtualEnvDir: Option[String] = _ + var virtualEnvDir: Option[String] = None new MonitorThread().start() var simpleWorkers = new mutable.WeakHashMap[Socket, Process]() From 567126c591951ec7db400876c9c7394a71184712 Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Tue, 2 Feb 2016 09:42:53 +0800 Subject: [PATCH 4/8] fix shebang line limitation --- .../org/apache/spark/api/python/PythonWorkerFactory.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala index 6b1f168ca822..01c07173722e 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala @@ -107,6 +107,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String def createVirtualEnv(): Unit = { virtualEnvDir = Some(UUID.randomUUID().toString) + logInfo("**********current working directory=" + new File(".").getAbsolutePath) val pb = new ProcessBuilder( Arrays.asList(conf.get("spark.pyspark.virtualenv.path","virtualenv"), "-p", pythonExec, @@ -128,8 +129,8 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String conf.get("spark.pyspark.virtualenv.requirements") else conf.get("spark.pyspark.virtualenv.requirements").split("/").last - val pb = new ProcessBuilder(Arrays.asList(virtualEnvDir.get + "/bin/pip", "install", - "-r" , pyspark_requirement)) + val pb = new ProcessBuilder(Arrays.asList(virtualEnvDir.get + "/bin/python", "-m", "pip", + "install", "-r" , pyspark_requirement)) val proc = pb.start() val stderr = new StreamGobbler(proc.getErrorStream, true) stderr.start() From 12fb7731d8040f77972eb03a1f93985bd74790c5 Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Tue, 2 Feb 2016 11:55:38 +0800 Subject: [PATCH 5/8] minor refactoring --- .../api/python/PythonWorkerFactory.scala | 30 ++++++++----------- 1 file changed, 12 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala index 01c07173722e..531b6e41a4ed 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala @@ -67,8 +67,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String synchronized { if (conf.getBoolean("spark.pyspark.virtualenv.enabled", false) && !virtualEnvSetup.get()) { - createVirtualEnv() - initVirutalEnv() + setupVirtualEnv() virtualEnvSetup.set(true) } } @@ -105,32 +104,27 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String } - def createVirtualEnv(): Unit = { + def setupVirtualEnv(): Unit = { virtualEnvDir = Some(UUID.randomUUID().toString) logInfo("**********current working directory=" + new File(".").getAbsolutePath) - val pb = new ProcessBuilder( - Arrays.asList(conf.get("spark.pyspark.virtualenv.path","virtualenv"), + logInfo("Starting creating virtualenv") + execCommand(Arrays.asList(conf.get("spark.pyspark.virtualenv.path","virtualenv"), "-p", pythonExec, "--no-site-packages", virtualEnvDir.get)) - val proc = pb.start() - val stderr = new StreamGobbler(proc.getErrorStream, true) - stderr.start() - val stdin = new StreamGobbler(proc.getInputStream, false) - stdin.start() - val exitCode = proc.waitFor() - if (exitCode != 0) { - throw new RuntimeException(stdin.output + "\n" + stderr.output) - } - } - - def initVirutalEnv(): Unit = { + logInfo("Complete creating virtualenv") + logInfo("Starting initing virtualenv") val pyspark_requirement = if (conf.get("spark.master").contains("local")) conf.get("spark.pyspark.virtualenv.requirements") else conf.get("spark.pyspark.virtualenv.requirements").split("/").last - val pb = new ProcessBuilder(Arrays.asList(virtualEnvDir.get + "/bin/python", "-m", "pip", + execCommand(Arrays.asList(virtualEnvDir.get + "/bin/python", "-m", "pip", "install", "-r" , pyspark_requirement)) + logInfo("Finish initing virtualenv") + } + + def execCommand(commands: java.util.List[String]): Unit ={ + val pb = new ProcessBuilder(commands) val proc = pb.start() val stderr = new StreamGobbler(proc.getErrorStream, true) stderr.start() From 6397c560a2994432090b695df24cd165197e8d72 Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Wed, 3 Feb 2016 15:22:26 +0800 Subject: [PATCH 6/8] fix cache_dir issue --- .../scala/org/apache/spark/SparkEnv.scala | 3 +- .../apache/spark/api/python/PythonRDD.scala | 9 -- .../api/python/PythonWorkerFactory.scala | 123 +++++++++--------- python/pyspark/context.py | 17 ++- .../org/apache/spark/deploy/yarn/Client.scala | 1 - 5 files changed, 75 insertions(+), 78 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 44afc808d84f..3cb1b37d194b 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -112,7 +112,8 @@ class SparkEnv ( def createPythonWorker(pythonExec: String, envVars: Map[String, String]): java.net.Socket = { synchronized { val key = (pythonExec, envVars) - pythonWorkers.getOrElseUpdate(key, new PythonWorkerFactory(pythonExec, envVars, conf)).create() + pythonWorkers.getOrElseUpdate(key, + new PythonWorkerFactory(pythonExec, envVars, conf)).create() } } diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 64c3604f9ac2..0ca91b9bf86c 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -58,15 +58,6 @@ private[spark] class PythonRDD( val asJavaRDD: JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this) -// { -// logInfo("parent:" + parent.toString) -// logInfo("command:" + command) -// logInfo("envVars:" + envVars) -// logInfo("pythonIncludes:" + pythonIncludes) -// logInfo("pythonExec:" + pythonExec) -// logInfo("pythonVer:" + pythonVer) -// } - override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = { val runner = PythonRunner(func, bufferSize, reuse_worker) runner.compute(firstParent.iterator(split, context), split.index, context) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala index 531b6e41a4ed..e0a9dcaa45bc 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala @@ -21,23 +21,19 @@ import java.io._ import java.net.{InetAddress, ServerSocket, Socket, SocketException} import java.nio.charset.StandardCharsets import java.util.Arrays -import java.util.{UUID, Arrays} -import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicInteger -import org.apache.commons.io.IOUtils - -import scala.collection.mutable import scala.collection.JavaConverters._ +import scala.collection.mutable import org.apache.spark._ import org.apache.spark.internal.Logging import org.apache.spark.util.{RedirectThread, Utils} -import scala.collection.mutable.ArrayBuffer -import scala.io.Source -private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String, String], conf: SparkConf) +private[spark] class PythonWorkerFactory(pythonExec: String, + envVars: Map[String, String], + conf: SparkConf) extends Logging { import PythonWorkerFactory._ @@ -54,8 +50,12 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String val daemonWorkers = new mutable.WeakHashMap[Socket, Int]() val idleWorkers = new mutable.Queue[Socket]() var lastActivity = 0L - val virtualEnvSetup = new AtomicBoolean(false) - var virtualEnvDir: Option[String] = None + val virtualEnvEnabled = conf.getBoolean("spark.pyspark.virtualenv.enabled", false) + val virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native") + val virtualEnvPath = conf.get("spark.pyspark.virtualenv.bin.path", "") + var virtualEnvName: String = _ + var virtualPythonExec: String = _ + new MonitorThread().start() var simpleWorkers = new mutable.WeakHashMap[Socket, Process]() @@ -65,11 +65,8 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String envVars.getOrElse("PYTHONPATH", ""), sys.env.getOrElse("PYTHONPATH", "")) - synchronized { - if (conf.getBoolean("spark.pyspark.virtualenv.enabled", false) && !virtualEnvSetup.get()) { - setupVirtualEnv() - virtualEnvSetup.set(true) - } + if (conf.getBoolean("spark.pyspark.virtualenv.enabled", false)) { + setupVirtualEnv() } def create(): Socket = { @@ -85,54 +82,59 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String } } - class StreamGobbler(is:InputStream, isError: Boolean) extends Thread { - - var output: List[String] = _ - - override def run() : Unit = { - import scala.collection.JavaConverters._ - try { - val isr = new InputStreamReader(is); - val br = new BufferedReader(isr); - val lines = IOUtils.readLines(br) - output = lines.iterator().asScala.toList - output.foreach(log.info) - } catch { - case ioe :IOException => ioe.printStackTrace(); - } - } - } - - + /** + * Create virtualenv using native virtualenv or conda + * + * Native Virtualenv: + * - Execute command: virtualenv -p pythonExec --no-site-packages virtualenvName + * - Execute command: python -m pip --cache-dir cache-dir install -r requirement_file + * + * Conda + * - Execute command: conda create --name virtualenvName --file requirement_file -y + * + */ def setupVirtualEnv(): Unit = { - virtualEnvDir = Some(UUID.randomUUID().toString) - logInfo("**********current working directory=" + new File(".").getAbsolutePath) - logInfo("Starting creating virtualenv") - execCommand(Arrays.asList(conf.get("spark.pyspark.virtualenv.path","virtualenv"), - "-p", pythonExec, - "--no-site-packages", virtualEnvDir.get)) - logInfo("Complete creating virtualenv") - logInfo("Starting initing virtualenv") - val pyspark_requirement = - if (conf.get("spark.master").contains("local")) + logDebug("Start to setup virtualenv...") + virtualEnvName = "virtualenv_" + conf.getAppId + "_" + WORKER_Id.getAndIncrement() + // use the absolute path when it is local mode otherwise just use filename as it would be + // fetched from FileServer + val pyspark_requirements = + if (Utils.isLocalMaster(conf)) { conf.get("spark.pyspark.virtualenv.requirements") - else + } else { conf.get("spark.pyspark.virtualenv.requirements").split("/").last - execCommand(Arrays.asList(virtualEnvDir.get + "/bin/python", "-m", "pip", - "install", "-r" , pyspark_requirement)) - logInfo("Finish initing virtualenv") + } + + val createEnvCommand = + if (virtualEnvType == "native") { + Arrays.asList(virtualEnvPath, + "-p", pythonExec, + "--no-site-packages", virtualEnvName) + } else { + Arrays.asList(virtualEnvPath, + "create", "--prefix", System.getProperty("user.dir") + "/" + virtualEnvName, + "--file", pyspark_requirements, "-y") + } + execCommand(createEnvCommand) + // virtualenv will be created in the working directory of Executor. + virtualPythonExec = virtualEnvName + "/bin/python" + if (virtualEnvType == "native") { + execCommand(Arrays.asList(virtualPythonExec, "-m", "pip", + "--cache-dir", System.getProperty("user.home"), + "install", "-r", pyspark_requirements)) + } } - def execCommand(commands: java.util.List[String]): Unit ={ - val pb = new ProcessBuilder(commands) + def execCommand(commands: java.util.List[String]): Unit = { + logDebug("Running command:" + commands.asScala.mkString(" ")) + val pb = new ProcessBuilder(commands).inheritIO() + pb.environment().putAll(envVars.asJava) + pb.environment().putAll(System.getenv()) + pb.environment().put("HOME", System.getProperty("user.home")) val proc = pb.start() - val stderr = new StreamGobbler(proc.getErrorStream, true) - stderr.start() - val stdin = new StreamGobbler(proc.getInputStream, false) - stdin.start() val exitCode = proc.waitFor() if (exitCode != 0) { - throw new RuntimeException(stdin.output + "\n" + stderr.output) + throw new RuntimeException("Fail to run command: " + commands.asScala.mkString(" ")) } } @@ -179,8 +181,8 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(Array(127, 0, 0, 1))) // Create and start the worker - val realPythonExec = if (virtualEnvDir.isDefined) virtualEnvDir.get + "/bin/python" - else pythonExec + val realPythonExec = if (virtualEnvEnabled) virtualPythonExec else pythonExec + logDebug(s"Starting worker with pythonExec: ${realPythonExec}") val pb = new ProcessBuilder(Arrays.asList(realPythonExec, "-m", "pyspark.worker")) val workerEnv = pb.environment() @@ -225,8 +227,8 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String try { // Create and start the daemon - val realPythonExec = if (virtualEnvDir.isDefined) virtualEnvDir.get + "/bin/python" - else pythonExec + val realPythonExec = if (virtualEnvEnabled) virtualPythonExec else pythonExec + logDebug(s"Starting daemon with pythonExec: ${realPythonExec}") val pb = new ProcessBuilder(Arrays.asList(realPythonExec, "-m", "pyspark.daemon")) val workerEnv = pb.environment() workerEnv.putAll(envVars.asJava) @@ -380,6 +382,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String } private object PythonWorkerFactory { + val WORKER_Id = new AtomicInteger() val PROCESS_WAIT_TIMEOUT_MS = 10000 val IDLE_WORKER_TIMEOUT_MS = 60000 // kill idle workers after 1 minute } diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 0fd038f76a1e..8b57c64ae520 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -177,14 +177,17 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, self._jsc.sc().register(self._javaAccumulator) self.pythonExec = os.environ.get("PYSPARK_PYTHON", 'python') - print("*************************pythonExec:" + self.pythonExec) if self._conf.get("spark.pyspark.virtualenv.enabled") == "true": - requirements = self._conf.get("spark.pyspark.virtualenv.requirements") - if not requirements: - raise Exception("spark.pyspark.virtualenv.enabled is set as true but no value for " - "spark.pyspark.virtualenv.requirements") - else: - self.addPyFile(self._conf.get("spark.pyspark.virtualenv.requirements")) + requirements = self._conf.get("spark.pyspark.virtualenv.requirements") + virtualEnvBinPath = self._conf.get("spark.pyspark.virtualenv.bin.path") + if not requirements: + raise Exception("spark.pyspark.virtualenv.enabled is set as true but no value for " + "spark.pyspark.virtualenv.requirements") + if not virtualEnvBinPath: + raise Exception("spark.pyspark.virtualenv.enabled is set as true but no value for " + "spark.pyspark.virtualenv.bin.path") + else: + self.addFile(self._conf.get("spark.pyspark.virtualenv.requirements")) self.pythonVer = "%d.%d" % sys.version_info[:2] diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 9cb504ca6cba..ea4e1160b767 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -748,7 +748,6 @@ private[spark] class Client( env("SPARK_YARN_MODE") = "true" env("SPARK_YARN_STAGING_DIR") = stagingDirPath.toString env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName() -// env("PATH") = "$PATH:" + "/Users/jzhang/anaconda/bin" if (loginFromKeytab) { val credentialsFile = "credentials-" + UUID.randomUUID().toString sparkConf.set(CREDENTIALS_FILE_PATH, new Path(stagingDirPath, credentialsFile).toString) From 7e3063ed47003d16ec08be6ed8f79ed3408cdc7d Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Fri, 10 Jun 2016 19:39:18 +0800 Subject: [PATCH 7/8] Revert "[SPARK-15803][PYSPARK] Support with statement syntax for SparkSession" This reverts commit 2ab64b41137374b935f939d919fec7cb2f56cd63. --- .../spark/api/python/PythonWorkerFactory.scala | 4 ++-- python/pyspark/sql/session.py | 16 ---------------- 2 files changed, 2 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala index e0a9dcaa45bc..f1af03f03d69 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala @@ -17,14 +17,14 @@ package org.apache.spark.api.python -import java.io._ +import java.io.{DataInputStream, DataOutputStream, InputStream, OutputStreamWriter} import java.net.{InetAddress, ServerSocket, Socket, SocketException} import java.nio.charset.StandardCharsets import java.util.Arrays import java.util.concurrent.atomic.AtomicInteger -import scala.collection.JavaConverters._ import scala.collection.mutable +import scala.collection.JavaConverters._ import org.apache.spark._ import org.apache.spark.internal.Logging diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index 8418abf99c8d..e1d495e31a45 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -597,22 +597,6 @@ def stop(self): self._sc.stop() SparkSession._instantiatedContext = None - @since(2.0) - def __enter__(self): - """ - Enable 'with SparkSession.builder.(...).getOrCreate() as session: app' syntax. - """ - return self - - @since(2.0) - def __exit__(self, exc_type, exc_val, exc_tb): - """ - Enable 'with SparkSession.builder.(...).getOrCreate() as session: app' syntax. - - Specifically stop the SparkSession on exit of the with block. - """ - self.stop() - def _test(): import os From cf977e4f7dca5be43e51a1b7788646d7a4f2d775 Mon Sep 17 00:00:00 2001 From: Gaetan Semet Date: Wed, 7 Sep 2016 11:41:51 +0200 Subject: [PATCH 8/8] [SPARK-16367][PYSPARK] Add wheelhouse support - Merge of #13599 ("virtualenv in pyspark", Bug SPARK-13587) - and #5408 (wheel package support for Pyspark", bug SPARK-6764) - Documentation updated - only Standalone and YARN supported. Mesos not supported - only tested with virtualenv/pip. Conda not tested - client deployment + pip install w/ index: ok (1 min 30 exec) - client deployment + wheelhouse w/o index: ko (cffi refuse the builded wheel) Signed-off-by: Gaetan Semet --- .../api/python/PythonWorkerFactory.scala | 198 +++++++- .../spark/deploy/SparkSubmitArguments.scala | 4 +- docs/cluster-overview.md | 15 +- docs/configuration.md | 2 +- docs/programming-guide.md | 16 +- docs/submitting-applications.md | 444 +++++++++++++++++- .../apache/spark/launcher/SparkLauncher.java | 2 +- python/pyspark/context.py | 68 ++- 8 files changed, 687 insertions(+), 62 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala index f1af03f03d69..129c12120fea 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala @@ -18,14 +18,18 @@ package org.apache.spark.api.python import java.io.{DataInputStream, DataOutputStream, InputStream, OutputStreamWriter} +import java.io.{File, FileInputStream, FileOutputStream, IOException} import java.net.{InetAddress, ServerSocket, Socket, SocketException} import java.nio.charset.StandardCharsets +import java.nio.file.{Paths, Files} import java.util.Arrays import java.util.concurrent.atomic.AtomicInteger +import java.util.zip.{ZipEntry, ZipInputStream} import scala.collection.mutable import scala.collection.JavaConverters._ +import org.apache.commons.io.IOUtils import org.apache.spark._ import org.apache.spark.internal.Logging import org.apache.spark.util.{RedirectThread, Utils} @@ -50,12 +54,32 @@ private[spark] class PythonWorkerFactory(pythonExec: String, val daemonWorkers = new mutable.WeakHashMap[Socket, Int]() val idleWorkers = new mutable.Queue[Socket]() var lastActivity = 0L + val sparkFiles = conf.getOption("spark.files") val virtualEnvEnabled = conf.getBoolean("spark.pyspark.virtualenv.enabled", false) val virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native") - val virtualEnvPath = conf.get("spark.pyspark.virtualenv.bin.path", "") + val virtualEnvPath = conf.get("spark.pyspark.virtualenv.bin.path", "virtualenv") + val virtualEnvSystemSitePackages = conf.getBoolean( + "spark.pyspark.virtualenv.system_site_packages", false) + val virtualWheelhouse = conf.get("spark.pyspark.virtualenv.wheelhouse", "wheelhouse.zip") + // virtualRequirements is empty string by default + val virtualRequirements = conf.get("spark.pyspark.virtualenv.requirements", "") + val virtualIndexUrl = conf.get("spark.pyspark.virtualenv.index_url", null) + val virtualTrustedHost = conf.get("spark.pyspark.virtualenv.trusted_host", null) + val virtualInstallPackage = conf.get("spark.pyspark.virtualenv.install_package", null) + val upgradePip = conf.getBoolean("spark.pyspark.virtualenv.upgrade_pip", false) + val virtualUseIndex = conf.getBoolean("spark.pyspark.virtualenv.use_index", true) var virtualEnvName: String = _ var virtualPythonExec: String = _ + // search for "wheelhouse.zip" to trigger unzipping and installation of wheelhouse + // also search for "requirements.txt if provided" + for (filename <- sparkFiles.map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten) { + logDebug("Looking inside" + filename) + val file = new File(filename) + val prefixes = Iterator.iterate(file)(_.getParentFile).takeWhile(_ != null).toList.reverse + logDebug("=> prefixes" + prefixes) + } + new MonitorThread().start() var simpleWorkers = new mutable.WeakHashMap[Socket, Process]() @@ -65,7 +89,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars.getOrElse("PYTHONPATH", ""), sys.env.getOrElse("PYTHONPATH", "")) - if (conf.getBoolean("spark.pyspark.virtualenv.enabled", false)) { + if (virtualEnvEnabled) { setupVirtualEnv() } @@ -82,15 +106,73 @@ private[spark] class PythonWorkerFactory(pythonExec: String, } } + + def unzipWheelhouse(zipFile: String, outputFolder: String): Unit = { + val buffer = new Array[Byte](1024) + try { + // output directory + val folder = new File(outputFolder); + if (!folder.exists()) { + folder.mkdir(); + } + + // zip file content + val zis: ZipInputStream = new ZipInputStream(new FileInputStream(zipFile)); + // get the zipped file list entry + var ze: ZipEntry = zis.getNextEntry(); + + while (ze != null) { + if (!ze.isDirectory()) { + val fileName = ze.getName(); + val newFile = new File(outputFolder + File.separator + fileName); + logDebug("Unzipping file " + newFile.getAbsoluteFile()); + + // create folders + new File(newFile.getParent()).mkdirs(); + val fos = new FileOutputStream(newFile); + var len: Int = zis.read(buffer); + + while (len > 0) { + fos.write(buffer, 0, len) + len = zis.read(buffer) + } + fos.close() + } + ze = zis.getNextEntry() + } + zis.closeEntry() + zis.close() + } catch { + case e: IOException => logError("exception caught: " + e.getMessage) + } + } + /** * Create virtualenv using native virtualenv or conda * * Native Virtualenv: - * - Execute command: virtualenv -p pythonExec --no-site-packages virtualenvName - * - Execute command: python -m pip --cache-dir cache-dir install -r requirement_file + * - Install virtualenv: + * virtualenv -p pythonExec [--system-site-packages] virtualenvName + * - if wheelhouse specified: + * - unzip wheelhouse + * - upgrade pip if set by conf (default: no) + * - install using pip: + * + * pip install -r requirement_file.txt \ + * --find-links=wheelhouse \ + * [--no-index] \ + * [--index-url http://pypi.mirror/simple] [--trusted-host pypi.mirror] \ + * [package.whl] + * + * else, if no wheelhouse is set: + * + * pip install -r requirement_file.txt \ + * [--no-index] \ + * [--index-url http://pypi.mirror/simple] [--trusted-host pypi.mirror] \ + * [package.whl] * * Conda - * - Execute command: conda create --name virtualenvName --file requirement_file -y + * - Execute command: conda create --name virtualenvName --file requirement_file.txt -y * */ def setupVirtualEnv(): Unit = { @@ -100,41 +182,114 @@ private[spark] class PythonWorkerFactory(pythonExec: String, // fetched from FileServer val pyspark_requirements = if (Utils.isLocalMaster(conf)) { - conf.get("spark.pyspark.virtualenv.requirements") + virtualRequirements } else { - conf.get("spark.pyspark.virtualenv.requirements").split("/").last + virtualRequirements.split("/").last } + logDebug("wheelhouse: " + virtualWheelhouse) + if (virtualWheelhouse != null && + !virtualWheelhouse.isEmpty && + Files.exists(Paths.get(virtualWheelhouse))) { + logDebug("Unziping wheelhouse archive " + virtualWheelhouse) + unzipWheelhouse(virtualWheelhouse, "wheelhouse") + } + val createEnvCommand = if (virtualEnvType == "native") { - Arrays.asList(virtualEnvPath, - "-p", pythonExec, - "--no-site-packages", virtualEnvName) + if (virtualEnvSystemSitePackages) { + Arrays.asList(virtualEnvPath, "-p", pythonExec, "--system-site-packages", virtualEnvName) + } + else { + Arrays.asList(virtualEnvPath, "-p", pythonExec, virtualEnvName) + } } else { - Arrays.asList(virtualEnvPath, - "create", "--prefix", System.getProperty("user.dir") + "/" + virtualEnvName, - "--file", pyspark_requirements, "-y") + // Conda creates everything and install the packages + var basePipArgs = mutable.ListBuffer[String]() + basePipArgs += (virtualEnvPath, + "create", + "--prefix", + System.getProperty("user.dir") + "/" + virtualEnvName) + if (pyspark_requirements != null && !pyspark_requirements.isEmpty) { + basePipArgs += ("--file", pyspark_requirements) + } + basePipArgs += ("-y") + basePipArgs.toList.asJava } execCommand(createEnvCommand) - // virtualenv will be created in the working directory of Executor. virtualPythonExec = virtualEnvName + "/bin/python" + + // virtualenv will be created in the working directory of Executor. if (virtualEnvType == "native") { - execCommand(Arrays.asList(virtualPythonExec, "-m", "pip", - "--cache-dir", System.getProperty("user.home"), - "install", "-r", pyspark_requirements)) + var virtualenvPipExec = virtualEnvName + "/bin/pip" + var pipUpgradeArgs = mutable.ListBuffer[String]() + if (upgradePip){ + pipUpgradeArgs += (virtualenvPipExec, "install", "--upgrade", "pip") + } + var basePipArgs = mutable.ListBuffer[String]() + basePipArgs += (virtualenvPipExec, "install") + if (pyspark_requirements != null && !pyspark_requirements.isEmpty) { + basePipArgs += ("-r", pyspark_requirements) + } + if (virtualWheelhouse != null && + !virtualWheelhouse.isEmpty && + Files.exists(Paths.get(virtualWheelhouse))) { + basePipArgs += ("--find-links=wheelhouse") + pipUpgradeArgs += ("--find-links=wheelhouse") + } + if (virtualIndexUrl != null && !virtualIndexUrl.isEmpty) { + basePipArgs += ("--index-url", virtualIndexUrl) + pipUpgradeArgs += ("--index-url", virtualIndexUrl) + } else if (! virtualUseIndex){ + basePipArgs += ("--no-index") + pipUpgradeArgs += ("--no-index") + } + if (virtualTrustedHost != null && !virtualTrustedHost.isEmpty) { + basePipArgs += ("--trusted-host", virtualTrustedHost) + pipUpgradeArgs += ("--trusted-host", virtualTrustedHost) + } + if (upgradePip){ + // upgrade pip in the virtualenv + execCommand(pipUpgradeArgs.toList.asJava) + } + if (virtualInstallPackage != null && !virtualInstallPackage.isEmpty) { + basePipArgs += (virtualInstallPackage) + } + execCommand(basePipArgs.toList.asJava) } + // do not execute a second command line in "conda" mode } def execCommand(commands: java.util.List[String]): Unit = { - logDebug("Running command:" + commands.asScala.mkString(" ")) - val pb = new ProcessBuilder(commands).inheritIO() + logDebug("Running command: " + commands.asScala.mkString(" ")) + + val pb = new ProcessBuilder(commands) pb.environment().putAll(envVars.asJava) pb.environment().putAll(System.getenv()) pb.environment().put("HOME", System.getProperty("user.home")) + val proc = pb.start() + val exitCode = proc.waitFor() if (exitCode != 0) { - throw new RuntimeException("Fail to run command: " + commands.asScala.mkString(" ")) + val errString = try { + val err = Option(proc.getErrorStream()) + err.map(IOUtils.toString) + } catch { + case io: IOException => None + } + + val outString = try { + val out = Option(proc.getInputStream()) + out.map(IOUtils.toString) + } catch { + case io: IOException => None + } + + throw new RuntimeException("Fail to run command: " + commands.asScala.mkString(" ") + + "\nOutput: " + outString + + "\nStderr: " + errString + ) } } @@ -183,8 +338,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, // Create and start the worker val realPythonExec = if (virtualEnvEnabled) virtualPythonExec else pythonExec logDebug(s"Starting worker with pythonExec: ${realPythonExec}") - val pb = new ProcessBuilder(Arrays.asList(realPythonExec, - "-m", "pyspark.worker")) + val pb = new ProcessBuilder(Arrays.asList(realPythonExec, "-m", "pyspark.worker")) val workerEnv = pb.environment() workerEnv.putAll(envVars.asJava) workerEnv.put("PYTHONPATH", pythonPath) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index f1761e7c1ec9..f9c4a1544fe9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -505,8 +505,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S | dependency conflicts. | --repositories Comma-separated list of additional remote repositories to | search for the maven coordinates given with --packages. - | --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to place - | on the PYTHONPATH for Python apps. + | --py-files PY_FILES Comma-separated list of .zip, .egg, .whl or .py files to + | place on the PYTHONPATH for Python apps. | --files FILES Comma-separated list of files to be placed in the working | directory of each executor. | diff --git a/docs/cluster-overview.md b/docs/cluster-overview.md index 814e4406cf43..6bf5ae6aab65 100644 --- a/docs/cluster-overview.md +++ b/docs/cluster-overview.md @@ -87,9 +87,18 @@ The following table summarizes terms you'll see used to refer to cluster concept Application jar - A jar containing the user's Spark application. In some cases users will want to create - an "uber jar" containing their application along with its dependencies. The user's jar - should never include Hadoop or Spark libraries, however, these will be added at runtime. + A jar containing the user's Spark application (for Java and Scala driver). In some cases + users will want to create an "uber jar" containing their application along with its + dependencies. The user's jar should never include Hadoop or Spark libraries, however, these + will be added at runtime. + + + + Application Wheelhouse + + An archive containing precompiled wheels of the user's PySpark application and dependencies + (for Python driver). The user's wheelhouse should not include jars, only Python Wheel files + for one or more architectures. diff --git a/docs/configuration.md b/docs/configuration.md index 82ce232b336d..04c51252d214 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -392,7 +392,7 @@ Apart from these, the following properties are also available, and may be useful spark.submit.pyFiles - Comma-separated list of .zip, .egg, or .py files to place on the PYTHONPATH for Python apps. + Comma-separated list of .zip, .egg, .whl or .py files to place on the PYTHONPATH for Python apps. diff --git a/docs/programming-guide.md b/docs/programming-guide.md index 74d5ee1ca6b3..032f7fd01245 100644 --- a/docs/programming-guide.md +++ b/docs/programming-guide.md @@ -24,7 +24,7 @@ along with if you launch Spark's interactive shell -- either `bin/spark-shell` f
-Spark {{site.SPARK_VERSION}} is built and distributed to work with Scala {{site.SCALA_BINARY_VERSION}} +Spark {{site.SPARK_VERSION}} is built and distributed to work with Scala {{site.SCALA_BINARY_VERSION}} by default. (Spark can be built to work with other versions of Scala, too.) To write applications in Scala, you will need to use a compatible Scala version (e.g. {{site.SCALA_BINARY_VERSION}}.X). @@ -211,7 +211,7 @@ For a complete list of options, run `spark-shell --help`. Behind the scenes, In the PySpark shell, a special interpreter-aware SparkContext is already created for you, in the variable called `sc`. Making your own SparkContext will not work. You can set which master the -context connects to using the `--master` argument, and you can add Python .zip, .egg or .py files +context connects to using the `--master` argument, and you can add Python .zip, .egg, .whl or .py files to the runtime path by passing a comma-separated list to `--py-files`. You can also add dependencies (e.g. Spark Packages) to your shell session by supplying a comma-separated list of maven coordinates to the `--packages` argument. Any additional repositories where dependencies might exist (e.g. SonaType) @@ -240,13 +240,13 @@ use IPython, set the `PYSPARK_DRIVER_PYTHON` variable to `ipython` when running $ PYSPARK_DRIVER_PYTHON=ipython ./bin/pyspark {% endhighlight %} -To use the Jupyter notebook (previously known as the IPython notebook), +To use the Jupyter notebook (previously known as the IPython notebook), {% highlight bash %} $ PYSPARK_DRIVER_PYTHON=jupyter ./bin/pyspark {% endhighlight %} -You can customize the `ipython` or `jupyter` commands by setting `PYSPARK_DRIVER_PYTHON_OPTS`. +You can customize the `ipython` or `jupyter` commands by setting `PYSPARK_DRIVER_PYTHON_OPTS`. After the Jupyter Notebook server is launched, you can create a new "Python 2" notebook from the "Files" tab. Inside the notebook, you can input the command `%pylab inline` as part of @@ -812,7 +812,7 @@ The variables within the closure sent to each executor are now copies and thus, In local mode, in some circumstances the `foreach` function will actually execute within the same JVM as the driver and will reference the same original **counter**, and may actually update it. -To ensure well-defined behavior in these sorts of scenarios one should use an [`Accumulator`](#accumulators). Accumulators in Spark are used specifically to provide a mechanism for safely updating a variable when execution is split up across worker nodes in a cluster. The Accumulators section of this guide discusses these in more detail. +To ensure well-defined behavior in these sorts of scenarios one should use an [`Accumulator`](#accumulators). Accumulators in Spark are used specifically to provide a mechanism for safely updating a variable when execution is split up across worker nodes in a cluster. The Accumulators section of this guide discusses these in more detail. In general, closures - constructs like loops or locally defined methods, should not be used to mutate some global state. Spark does not define or guarantee the behavior of mutations to objects referenced from outside of closures. Some code that does this may work in local mode, but that's just by accident and such code will not behave as expected in distributed mode. Use an Accumulator instead if some global aggregation is needed. @@ -1231,8 +1231,8 @@ storage levels is: -**Note:** *In Python, stored objects will always be serialized with the [Pickle](https://docs.python.org/2/library/pickle.html) library, -so it does not matter whether you choose a serialized level. The available storage levels in Python include `MEMORY_ONLY`, `MEMORY_ONLY_2`, +**Note:** *In Python, stored objects will always be serialized with the [Pickle](https://docs.python.org/2/library/pickle.html) library, +so it does not matter whether you choose a serialized level. The available storage levels in Python include `MEMORY_ONLY`, `MEMORY_ONLY_2`, `MEMORY_AND_DISK`, `MEMORY_AND_DISK_2`, `DISK_ONLY`, and `DISK_ONLY_2`.* Spark also automatically persists some intermediate data in shuffle operations (e.g. `reduceByKey`), even without users calling `persist`. This is done to avoid recomputing the entire input if a node fails during the shuffle. We still recommend users call `persist` on the resulting RDD if they plan to reuse it. @@ -1374,7 +1374,7 @@ res2: Long = 10 While this code used the built-in support for accumulators of type Long, programmers can also create their own types by subclassing [AccumulatorV2](api/scala/index.html#org.apache.spark.AccumulatorV2). -The AccumulatorV2 abstract class has several methods which need to override: +The AccumulatorV2 abstract class has several methods which need to override: `reset` for resetting the accumulator to zero, and `add` for add anothor value into the accumulator, `merge` for merging another same-type accumulator into this one. Other methods need to override can refer to scala API document. For example, supposing we had a `MyVector` class representing mathematical vectors, we could write: diff --git a/docs/submitting-applications.md b/docs/submitting-applications.md index 6fe304999587..57d3c8c2dafe 100644 --- a/docs/submitting-applications.md +++ b/docs/submitting-applications.md @@ -20,7 +20,8 @@ script as shown here while passing your jar. For Python, you can use the `--py-files` argument of `spark-submit` to add `.py`, `.zip` or `.egg` files to be distributed with your application. If you depend on multiple Python files we recommend -packaging them into a `.zip` or `.egg`. +packaging them into a `.zip` or `.egg`. For a more complex packaging system for Python, see the +section *Advanced Dependency Management* bellow) # Launching Applications with spark-submit @@ -192,8 +193,445 @@ with `--packages`. All transitive dependencies will be handled when using this c repositories (or resolvers in SBT) can be added in a comma-delimited fashion with the flag `--repositories`. These commands can be used with `pyspark`, `spark-shell`, and `spark-submit` to include Spark Packages. -For Python, the equivalent `--py-files` option can be used to distribute `.egg`, `.zip` and `.py` libraries -to executors. +# Wheel Support for Pyspark + +For Python, the `--py-files` option can be used to distribute `.egg`, `.zip`, `.whl` and `.py` +libraries to executors. This modigies the `PYTHONPATH` environment variable to inject this +dependency into the executed script environment. + +This solution does not scale well with complex project with multiple dependencies. + +There are however other solutions to deploy projects with dependencies: +- Describe dependencies inside a `requirements.txt` and have everything installed into an isolated + virtual environment +- Distribute a Python Source Distribution Package or a Wheel with a complete Python module + +Dependency distribution is also configurable: +- Let each node of the Spark Cluster automatically fetch and install dependencies from Pypi or any + configured Pypi mirror +- Distribute a single archive *wheelhouse* with all dependencies precompiled in Wheels files + +**What is a Wheel?** + + A Wheel is a Python packaging format that contains a fully prepared module for a given system or + environment. Wheel also allow to be installed on several system. For example, modules such as + Numpy requires compilation of some C files, so the Wheels allows to store the object files already + compiled inside the various wheel packages. That why wheel might be system dependent (ex: 32 + bits/64 bits, Linux/Windows/Mac OS, ...). The Wheel format also provides a unified metadata + description, so tools such as `pip install` will automatically select the precompiled wheel + package that best fit the current system. + + Said differently, if the wheel is already prepared, no compilation occurs during installation. + +**How to deploy Wheels or Wheelhouse** + +The usage of the deployment methods described in the current section of this documentation is +recommended for the following cases: + +- your PySpark script has increased in complexity and dependencies. For example, it now depends on + numpy for a numerical calculus, and a bunch extra packages from Pypi you are used to work with +- you project might also depends on package that are *not* on Pypi, for example, Python libraries + internal to your company +- you do not want to deal with the IT department each time you need a new Python package on each + Spark node. For example, you need an upgraded version of a module A +- you have conflict with some version of dependencies, you need a certain version of a given + package, while another team wants another version of the same package + +All deployment methods described bellow involve the creation of a temporary virtual environment, +using `virtualenv`, on each node of the Spark Cluster. This will be automatically done during the +`spark-submit` step. + +Please also be aware than Wheelhouse support with a virtualenv is only supported for YARN and +standalone cluster. Mesos cluster does not support Wheelhouse yet. You can however send wheel with +`--py-files` to inject whl in the `PYTHON_PATH`. + +The following methods only differe by the way Wheels files are sent or retrieved in your Spark +cluster nodes: + +- use *Big Fat Wheelhouse* when all node of your Spark Cluster does not have access to the Internet + and so cannot hit `pypi.python.org` website, and if your do not have any Pypi mirror internal to + your organization. +- if your Spark cluster have access to the official Pypi or a mirror, you can configure + `spark-submit` so `pip` will automatically find and download the dependency wheels. +- You can use `--py-files` to send all wheels manually. + +**Pypi Mirror** + +Mirroring might be useful for company with a strict Internet access policy, or weird Proxy settings. + +There are several solutions for mirroring Pypi internally to your organization: +- http://doc.devpi.net/latest/: a Free mirroring solution +- Artifactory provides + [Pypi mirroring](https://www.jfrog.com/confluence/display/RTF/PyPI+Repositories) in its + non-commercial license + +**Project packaging Workflow** + +The following workflow describes how to deploy a complex Python project with different dependencies +to your Spark Cluster, for example, you are writing a PySpark job that depends on a library that is +only available from inside your organization, or you use a special version of Pypi package such as +Numpy. + +The workflow is now: + +- create a virtualenv in a directory, for instance `env`, if not already in one: + +{% highlight bash %} +virtualenv env +{% endhighlight %} + +- Turn your script into a real standard Python package. You need to write a standard `setup.py` to + make your package installable through `pip install -e . -r requirements.txt` (do not use `python + setup.py develop` or `python setup.py install` with `pip`, [it is not + recommended](https://pip.pypa.io/en/stable/reference/pip_install/#hash-checking-mode) and does not + handle editable dependencies correctly). + +- Write a `requirements.txt`. It is recommended to freeze the version of each dependency in this + description file. This way your job will be garentee to be deployable everytime, even if a buggy + version of a dependency is released on Pypi. + + Example of `requirements.txt` with frozen package version: + + astroid==1.4.6 + autopep8==1.2.4 + click==6.6 + colorama==0.3.7 + enum34==1.1.6 + findspark==1.0.0 + first==2.0.1 + hypothesis==3.4.0 + lazy-object-proxy==1.2.2 + linecache2==1.0.0 + pbr==1.10.0 + pep8==1.7.0 + pip-tools==1.6.5 + py==1.4.31 + pyflakes==1.2.3 + pylint==1.5.6 + pytest==2.9.2 + six==1.10.0 + spark-testing-base==0.0.7.post2 + traceback2==1.4.0 + unittest2==1.1.0 + wheel==0.29.0 + wrapt==1.10.8 + + Ensure you write a clean `setup.py` that refers this `requirements.txt` file: + + from pip.req import parse_requirements + + # parse_requirements() returns generator of pip.req.InstallRequirement objects + install_reqs = parse_requirements(, session=False) + + # reqs is a list of requirement + # e.g. ['django==1.5.1', 'mezzanine==1.4.6'] + reqs = [str(ir.req) for ir in install_reqs] + + setup( + ... + install_requires=reqs + ) + +- If you want to deploy a *Big Fat Wheelhouse* archive, ie, a zip file containing all Wheels: + + Create the wheelhouse for your current project: +{% highlight bash %} +pip install wheelhouse +pip wheel . --wheel-dir wheelhouse -r requirements.txt +{% endhighlight %} + + Note: please use Pip >= 8.1.3 to generate all wheels, even for "editable" dependencies. To specify + reference to other internal project, you can use the following syntax in your `requirements.txt` + + -e git+ssh://local.server/a/project/name@7f4a7623aa219743e9b96b228b4cd86fe9bc5595#egg=package_name + + It is highly recommended to specify the SHA-1, a tag or a branch of the internal dependency, and + update it at each release. Do not forget to specify the `egg` package name. + + Documentation: [Requirements Files](https://pip.pypa.io/en/latest/user_guide/#requirements-files) + + Usually, most dependencies will not be compiled if it has been done previously, `pip` is able to + cache all wheel file be default in the user home cache, and if wheels can be found from Pypi they + are automatically retrieved. + + At the end you have all the `.whl` required *for your current system* in the `wheelhouse` + directory. + + Zip the {{wheelhouse}} directory into a {{wheelhouse.zip}}. + +{% highlight bash %} +rm -fv wheelhouse.zip +zip -vrj wheelhouse.zip wheelhouse +{% endhighlight %} + + You now have a `wheelhouse.zip` archive with all the dependencies of your project *and also your + project module*. For example, if you have defined a module `mymodule` in `setup.py`, it will be + automatically installed with `spark-submit`. + +- If you want to let Spark cluster access to Pypi or a Pypi mirror, just build the source + distribution package: + + python setup.py sdist + + Or the wheel with: + + python setup.py bdist_wheel + + You now have a tar.gz or a whl file containing your current project. + + Note: most of the time, your Spark job will not have low-level dependency, so building a source + distribution package is enough. + +To execute your application, you need a *launcher script*, ie, a script which is executed directly +and will call your built package. There is no equivalent to the `--class` argument of `spark-submit` +for Python jobs. Please note that this file might only contain a few lines. For example: + +{% highlight python %} +/#!/usr/bin/env python + +from mypackage import run +run() +{% endhighlight %} + +Note that all the logic is stored into the `mypackage` module, which has been declared in you +`setup.py`. + +**Deployment Modes support** + +In **Standalone**, only the `client` deployment mode is supported. You cannot use 'cluster' +deployment. This means the driver will be executed from the machine that execute the `spark-submit`. +So you **need** to execute the `spark-submit` from within your development virtualenv. + +Workers will perform installation of dedicated virtualenv, if `spark.pyspark.virtualenv.enabled` is +set to `True`. + +In **YARN**, if you use `client` deployment mode, you also need to execute the `spark-submit` from +your virtualenv. You use `cluster` deployment, the virtualenv installation will be performed like on +all workers. + +There is no support for **Mesos** cluster with Virtualenv. Note than you can send wheel file with +`--py-files` and they will be added to `PYTHON_PATH`. This is not recommended since you will not +benefit from the advantages of installing with `pip`: + +- you cannot have `pip` automatically retrieve missing Python dependencies from `pypi.python.org` +- you cannot prepare and send several version of the same package, to support different + architectures (ex: some worker uses python 3.4, others python 3.4, or some machines are 32 bits + or other are 64 bits, or some are under MacOS X and other are under Linux,...) + +To have these advantages, you can use the Wheelhouse deployment described bellow. + +**Submitting your package to the Spark Cluster:** + +Please remember that in "standalone" Spark instance, only the `client` deployment mode is supported. + +Deploy a script with many dependencies to your standalone cluster: + +{% highlight bash %} +source /path/to/your/project/env/bin/activate # ensure you are in virtualenv +bin/spark-submit + --master spark://localhost + --deploy-mode client + --jars java-dependencies.jar + --files /path/to/your/project/requirements.txt + --conf "spark.pyspark.virtualenv.enabled=true" + --conf "spark.pyspark.virtualenv.requirements=requirements.txt" + --conf "spark.pyspark.virtualenv.index_url=https://pypi.python.org/simple" + /path/to/launch/simple_script_with_some_dependencies.py +{% endhighlight %} + +Execution: +- a virtualenv is created on each worker +- the dependencies described in `requirements.txt` are installed in each worker. +- dependencies are downloaded from the Pypi repository +- the driver is executed on the client, so this command line should be executed from *within* a + virtualenv. + + +Deploy a simple runner script along with a source distribution package of the complete job project: + +{% highlight bash %} +source /path/to/your/project/env/bin/activate # ensure you are in virtualenv +bin/spark-submit + --master spark://localhost + --deploy-mode client + --jars some-java-dependencies + --files /path/to/mypackage_sdist.tag.gz + --conf "spark.pyspark.virtualenv.enabled=true" + --conf "spark.pyspark.virtualenv.install_package=mypackage_sdist.tar.gz" + --conf "spark.pyspark.virtualenv.index_url=https://pypi.python.org/simple" + /path/to/launch/runner_script.py +{% endhighlight %} + +Execution: +- a virtualenv is created on each worker +- the package `mypackage_sdist.tar.gz` is installed with pip, so if the `setup.py`` refers + `requirements.txt` properly, each the dependencies are installed in each worker. +- dependencies are downloaded from the Pypi repository +- the driver is executed on the client, so this command line should be executed from *within* a + virtualenv. +- the runner script simply call an entry point within `mypackage`. + + +Deploy a wheelhouse package to your YARN cluster with: + +{% highlight bash %} +bin/spark-submit + --master yarn + --deploy-mode cluster + --jars java-dependencies.jar + --files /path/to/your/project/requirements.txt,/path/to/your/project/wheelhouse.zip + --conf "spark.pyspark.virtualenv.enabled=true" + --conf "spark.pyspark.virtualenv.requirements=requirements.txt" + --conf "spark.pyspark.virtualenv.install_package=a_package.whl" + --conf "spark.pyspark.virtualenv.index_url=https://pypi.python.org/simple" + /path/to/launch/launcher_script.py +{% endhighlight %} + +Execution: +- a virtualenv is created on each worker +- the dependencies described in `requirements.txt` are installed in each worker +- dependencies are found into the wheelhouse archive. If not found, it will be downloaded from Pypi + repository (to avoid this, remove `spark.pyspark.virtualenv.index_url` option) +- the driver is executed on the cluster, so this command line does *not* have to be executed from + within a virtualenv. + + +To deploy against an internal Pypi mirror (HTTPS mirror without certificates), force pip +upgrade (it is a good practice to always be at the latest version of pip), and inject some wheels +manually to the `PYTHONPATH`: + +{% highlight bash %} +bin/spark-submit + --master yarn + --deploy-mode cluster + --jars java-dependencies.jar + --files /path/to/your/project/requirements.txt + --py-files /path/to/your/project/binary/myproject.whl,/path/to/internal/dependency/other_project.whl + --conf "spark.pyspark.virtualenv.enabled=true" + --conf "spark.pyspark.virtualenv.requirements=requirements.txt" + --conf "spark.pyspark.virtualenv.upgrade_pip=true" + --conf "spark.pyspark.virtualenv.index_url=https://pypi.mycompany.com/"` + --conf "spark.pyspark.virtualenv.trusted_host=pypi.mycompany.com" + /path/to/launch/script.py +{% endhighlight %} + +Execution: +- a virtualenv is created on each worker +- the pip tool is updated to the latest version +- the dependencies described in `requirements.txt` are installed in each worker +- dependencies are found into the wheelhouse archive. If not found, it will be downloaded from a Pypi + mirror +- the two wheels set in the `--py-files` are added to the `PYTHONPATH`. You can use this to avoid + describing them in the `requirements.txt` and send them directly. Might be useful for development, + however for production you might want to have these dependency projects available on an internal + repository and referenced by URL. +- the driver is executed on the cluster, so this command line does *not* have to be executed from + within a virtualenv. + + +Here are the description of the configuration of the support of Wheel and Wheelhouse in Python: + +- `--jars java-dependencies.jar`: you still need to define the Java jars your requires inside a big + fat jar file with this argument, for instance if you use Spark Streaming. +- `spark.pyspark.virtualenv.enabled`: enable the creation of the virtualenv environment at each + deployment and trigger the installation of wheels. This virtual environment creation has a time + and disk space cost. Please note that, when deploying a Big Fat Wheelhouse, *no network* + connection to pypi.python.org or any mirror will be made. +- `--files /path/to/your/project/requirements.txt,/path/to/your/project/wheelhouse.zip`: this will + simply copy these two files to the root of the job working directory on each node. Enabling + 'virtualenv' will automatically use these files when they are found. Having at least + `requirements.txt` is mandatory. +- `--conf spark.pyspark.virtualenv.type=conda`: you can specify the format of your requirements.txt. + This parameter is optional. The default value, `native`, will use the native `pip` tool to install + your package on each Spark node. You can also use `conda` for Conda package manager. +- `--conf spark.pyspark.virtualenv.requirements=other_requirement.txt`: specify the name of the + requirement file. This parameter is optional. The default value is `requirements.txt`. Do not + forget to copy this file to the cluster with `--files` argument. +- `--conf spark.pyspark.virtualenv.bin.path=venv`: specify the command to create the virtual env. + This parameter is optional. The default value, `virtualenv`, should work on every kind of system, + but if you need to specify a different command line name (ex: `venv` for Python 3) or specify a + full path, set this value. +- `--conf spark.pyspark.virtualenv.wheelhouse=mywheelhouse.zip`: name of the wheelhouse archive. + This parameter is optional. The default value is `wheelhouse.zip`. Do not forget to move this file + to the cluster with `--files` argument. If found, the file will be unzipped to the `wheelhouse` + directory. It is not mandatory to use this archive to transfer modules found on Pypi if you have + Internet connectivity or a mirror of Pypi reachable from each worker. Use it primarily for + transfering precompiled, internal module dependencies. +- `--conf spark.pyspark.virtualenv.upgrade_pip=true`: upgrade `pip` automatically. It is a good + behavior to always have the latest `pip` version. Default: `false`. +- `--conf spark.pyspark.virtualenv.index_url=http://internalserver/pypimirror`: change the Pypi + repository URL (Default: `https://pypi.python.org/simple`, requires a network connectivity) +- `--conf spark.pyspark.virtualenv.trusted_host=internalserver`: Execute `pip` with the + `--trusted-host` argument, ie, provide the name of the server hostname to trusted, even though it + does not have valid or any HTTPS. Useful when using a Pypi mirror behind HTTPS without a full + certificate chain. +- `--conf spark.pyspark.virtualenv.system_site_packages=true`: this makes virtual environment + reference also the packages installed on the system. The default value, `false` will force + developers to specify all dependencies and let `pip` install them from `requirements.txt`. Set the + value to `true` to use preinstalled packages on each node. A virtualenv will still be created so + installing new packages will not compromise the worker Python installation. +- `--conf spark.pyspark.virtualenv.use_index=false`: if set to `false`, don't try to download + missing dependencies from Pypi or the index URL set by `spark.pyspark.virtualenv.index_url`, in + which case all dependencies should be packaged in the `wheelhouse.zip` archive. Default is set to + `true`. Please note that if `spark.pyspark.virtualenv.index_url` is manually set, + `spark.pyspark.virtualenv.use_index` will be forced to `true`. +- `--py-files /path/to/a/project/aproject.whl,/path/to/internal/dependency/other_project.whl`: + this allows to copy wheel to the cluster nodes and install them with `pip`. Using this arguments + implies two things: + - all wheels will be installed, you cannot have one wheel for linux 32 bits and another one for + linux 64 bits. In this situation zip them into a single archive and use `--files + wheelhouse.zip` + - you need to create the wheel of other internal dependencies (ie that are not on Pypi) manually + or select them after having made a `pip wheel` +- `/path/to/launch/script.py`: path to the runner script. Like said earlier, it is recommended to + keep this file as short as possible, and only call a `run()`-like method from a package defined in + your `setup.py`. + +**Advantages** + +- Installation is fast and does not require compilation +- No Internet connectivity needed when using a Big Fat Wheelhouse, no need to mess with your + corporate proxy, or even require a local mirroring of pypi. +- Package versions are isolated, so two Spark job can depends on two different version of a given + library without any conflict +- wheels are automatically cached (for pip version > 7.0), at the worst case, only the first time it + is downloaded the compilation might take time. Please note that compilation is quite rare since + most of the time the package on Pypi already provides precompiled wheels for major Python version + and systems (Ex: look all the wheels provided by [Numpy](https://pypi.python.org/pypi/numpy)). + +**Disadvantages** + +- Creating a virtualenv at each execution takes time, not that much, but still it can take some + seconds +- And consume more disk space than a simpler script without any dependency +- This is slighly more complex to setup than sending a simple python script +- The support of heterogenous Spark nodes (ex: Linux 32 bits/64 bits,...) is possible but you need + to ensure **all** wheels are in the wheelhouse, to ensure pip is able to install all needed + package on each node of you Spark cluster. The complexity of this task, that might be not trivial, + is moved on the hands of the script developer and not on the IT department + + +**Configuration Pypi proxy** + +To tell `spark-submit` to use a Pypi mirror internal to your company, you can use +`--conf "spark.pyspark.virtualenv.index_url=http://pypi.mycompany.com/"` argument. + +You can also update the {{~/.pip/pip.conf}} file of each node of your Spark cluster to point by +default to your mirror: + +{% highlight ini %} +[global] +; Low timeout +timeout = 20 +index-url = https://<user>:<pass>@pypi.mycompany.org/ +{% endhighlight %} + +Note: pip does not use system certificates, if you need to set up on manually, add this line in the +`[global]` section of `pip.conf`: + +{% highlight ini %} +cert = /path/to/your/internal/certificates.pem +{% endhighlight %} # More Information diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java index ea56214d2390..c0a787f7b759 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java @@ -351,7 +351,7 @@ public SparkLauncher addFile(String file) { } /** - * Adds a python file / zip / egg to be submitted with the application. + * Adds a python file / zip / whl / egg to be submitted with the application. * * @param file Path to the file. * @return This launcher. diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 8b57c64ae520..49823a83d068 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -22,7 +22,6 @@ import signal import sys import threading -from threading import RLock from tempfile import NamedTemporaryFile from pyspark import accumulators @@ -39,6 +38,7 @@ from pyspark.status import StatusTracker from pyspark.profiler import ProfilerCollector, BasicProfiler + if sys.version > '3': xrange = range @@ -66,10 +66,10 @@ class SparkContext(object): _jvm = None _next_accum_id = 0 _active_spark_context = None - _lock = RLock() - _python_includes = None # zip and egg files that need to be added to PYTHONPATH + _lock = threading.RLock() + _python_includes = None # zip, egg, whl and jar files that need to be added to PYTHONPATH - PACKAGE_EXTENSIONS = ('.zip', '.egg', '.jar') + PACKAGE_EXTENSIONS = ('.zip', '.egg', '.jar', '.whl') def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, environment=None, batchSize=0, serializer=PickleSerializer(), conf=None, @@ -82,9 +82,9 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, (e.g. mesos://host:port, spark://host:port, local[4]). :param appName: A name for your job, to display on the cluster web UI. :param sparkHome: Location where Spark is installed on cluster nodes. - :param pyFiles: Collection of .zip or .py files to send to the cluster - and add to PYTHONPATH. These can be paths on the local file - system or HDFS, HTTP, HTTPS, or FTP URLs. + :param pyFiles: Collection of .zip, .egg, .whl or .py files to send + to the cluster and add to PYTHONPATH. + These can be paths on the local file system or HDFS, HTTP, HTTPS, or FTP URLs. :param environment: A dictionary of environment variables to set on worker nodes. :param batchSize: The number of Python objects represented as a single @@ -178,16 +178,15 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, self.pythonExec = os.environ.get("PYSPARK_PYTHON", 'python') if self._conf.get("spark.pyspark.virtualenv.enabled") == "true": - requirements = self._conf.get("spark.pyspark.virtualenv.requirements") - virtualEnvBinPath = self._conf.get("spark.pyspark.virtualenv.bin.path") + requirements = self._conf.get("spark.pyspark.virtualenv.requirements", + "requirements.txt") + virtualEnvBinPath = self._conf.get("spark.pyspark.virtualenv.bin.path", "virtualenv") if not requirements: raise Exception("spark.pyspark.virtualenv.enabled is set as true but no value for " "spark.pyspark.virtualenv.requirements") if not virtualEnvBinPath: raise Exception("spark.pyspark.virtualenv.enabled is set as true but no value for " "spark.pyspark.virtualenv.bin.path") - else: - self.addFile(self._conf.get("spark.pyspark.virtualenv.requirements")) self.pythonVer = "%d.%d" % sys.version_info[:2] @@ -201,19 +200,34 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, root_dir = SparkFiles.getRootDirectory() sys.path.insert(1, root_dir) + self._python_wheels = set() + # Deploy any code dependencies specified in the constructor + # Wheel files will be installed by pip later. self._python_includes = list() for path in (pyFiles or []): self.addPyFile(path) # Deploy code dependencies set by spark-submit; these will already have been added # with SparkContext.addFile, so we just need to add them to the PYTHONPATH + # Wheel files will be installed by pip later. for path in self._conf.get("spark.submit.pyFiles", "").split(","): - if path != "": - (dirname, filename) = os.path.split(path) - if filename[-4:].lower() in self.PACKAGE_EXTENSIONS: - self._python_includes.append(filename) - sys.path.insert(1, os.path.join(SparkFiles.getRootDirectory(), filename)) + if path: + (_dirname, filename) = os.path.split(path) + extname = os.path.splitext(path)[1].lower() + if extname in self.PACKAGE_EXTENSIONS: + if extname == ".whl": + self._python_wheels.add(path) + else: + self._python_includes.append(filename) + sys.path.insert(1, os.path.join(SparkFiles.getRootDirectory(), filename)) + + # Install all wheel files at once. + if self._python_wheels: + if 'VIRTUAL_ENV' not in os.environ or not os.environ['VIRTUAL_ENV']: + raise Exception("Whl installation requires to run inside a virtualenv. " + "You may have forgotten to activate a virtualenv " + "when using spark-submit in 'client' deployment mode?") # Create a temporary directory inside spark.local.dir: local_dir = self._jvm.org.apache.spark.util.Utils.getLocalDir(self._jsc.sc().conf()) @@ -808,17 +822,27 @@ def addFile(self, path, recursive=False): def addPyFile(self, path): """ - Add a .py or .zip dependency for all tasks to be executed on this + Add a .py, .zip or .egg dependency for all tasks to be executed on this SparkContext in the future. The C{path} passed can be either a local file, a file in HDFS (or other Hadoop-supported filesystems), or an HTTP, HTTPS or FTP URI. + Note that .whl should not be handled by this method """ + if not path: + return self.addFile(path) - (dirname, filename) = os.path.split(path) # dirname may be directory or HDFS/S3 prefix - if filename[-4:].lower() in self.PACKAGE_EXTENSIONS: + + (_dirname, filename) = os.path.split(path) # dirname may be directory or HDFS/S3 prefix + extname = os.path.splitext(path)[1].lower() + if extname == '.whl': + return + + if extname in self.PACKAGE_EXTENSIONS: self._python_includes.append(filename) - # for tests in local mode - sys.path.insert(1, os.path.join(SparkFiles.getRootDirectory(), filename)) + if extname != '.whl': + # for tests in local mode + # Prepend the python package (except for *.whl) to sys.path + sys.path.insert(1, os.path.join(SparkFiles.getRootDirectory(), filename)) if sys.version > '3': import importlib importlib.invalidate_caches() @@ -975,7 +999,7 @@ def _test(): globs['sc'] = SparkContext('local[4]', 'PythonTest') globs['tempdir'] = tempfile.mkdtemp() atexit.register(lambda: shutil.rmtree(globs['tempdir'])) - (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) + (failure_count, _test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) globs['sc'].stop() if failure_count: exit(-1)