diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 60bc243ebf40..5e9f9eb9241e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -356,6 +356,24 @@ object SparkSubmit { } } + if (args.isPython && clusterManager == YARN) { + sys.env.get("PYSPARK_ARCHIVES_PATH") match { + case Some(archivesPath) => args.files = mergeFileLists(args.files, archivesPath) + case None => + // Zip PySpark from ${SPARK_HOME}/python/pyspark to ${SPARK_HOME}/lib/pyspark.zip + // and ship to executors by Yarn. + for (sparkHome <- sys.env.get("SPARK_HOME")) { + val srcFile = new File(Seq(sparkHome, "python", "pyspark").mkString(File.separator)) + val archives = new File(Seq(sparkHome, "lib", "pyspark.zip").mkString(File.separator)) + if (archives.exists() || Utils.createZipArchives(archives, srcFile, "pyspark")) { + val py4jPath = Seq(sparkHome, "python", "lib", "py4j-0.8.2.1-src.zip") + .mkString(File.separator) + args.files = mergeFileLists(args.files, archives.getAbsolutePath, py4jPath) + } + } + } + } + // Special flag to avoid deprecation warnings at the client sysProps("SPARK_SUBMIT") = "true" diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 0fdfaf300e95..bf7ef37d6ed8 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -21,6 +21,7 @@ import java.io._ import java.lang.management.ManagementFactory import java.net._ import java.nio.ByteBuffer +import java.util.zip.{ZipEntry, ZipOutputStream} import java.util.{Properties, Locale, Random, UUID} import java.util.concurrent.{ThreadFactory, ConcurrentHashMap, Executors, ThreadPoolExecutor} import javax.net.ssl.HttpsURLConnection @@ -2106,6 +2107,56 @@ private[spark] object Utils extends Logging { .getOrElse(UserGroupInformation.getCurrentUser().getShortUserName()) } + /** + * Create zip archives. + */ + def createZipArchives(archives: File, srcFile: File, rootPath: String): Boolean = { + var flag = false + try { + val fileOutStream = new FileOutputStream(archives) + val buffOutStream = new BufferedOutputStream(fileOutStream) + val zipOutStream = new ZipOutputStream(buffOutStream) + flag = doZip(zipOutStream, rootPath, srcFile) + zipOutStream.close() + buffOutStream.close() + fileOutStream.close() + + } catch { + case e: FileNotFoundException => logError("File to zip not found") + } + flag + } + + private def doZip(zipOutStream: ZipOutputStream, curPath: String, file: File): Boolean = { + var flag = false + if (file.isDirectory) { + val files = file.listFiles() + if (files != null && files.length > 0) { + zipOutStream.putNextEntry(new ZipEntry(curPath + File.separator)) + val nextPath = if (curPath.length == 0) "" else curPath + File.separator + for (subFile <- files) { + flag = doZip(zipOutStream, nextPath + subFile.getName, subFile) + } + } + } else { + zipOutStream.putNextEntry(new ZipEntry(curPath)) + val fileInStream = new FileInputStream(file) + val buffInStream = new BufferedInputStream(fileInStream) + val bufSize = 8192 + val buf = new Array[Byte](bufSize) + var len: Int = buffInStream.read(buf, 0, bufSize) + while (len != -1) { + zipOutStream.write(buf, 0, len) + len = buffInStream.read(buf, 0, bufSize) + } + zipOutStream.flush() + flag = true + buffInStream.close() + fileInStream.close() + } + flag + } + } /** diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index c1effd3c8a71..126a6a9f9642 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -17,6 +17,7 @@ package org.apache.spark.deploy.yarn +import java.io.File import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException} import java.nio.ByteBuffer @@ -386,6 +387,32 @@ private[spark] class Client( val appStagingDir = getAppStagingDir(appId) val localResources = prepareLocalResources(appStagingDir) val launchEnv = setupLaunchEnv(appStagingDir) + + // From SPARK-1920 and SPARK-1520 we know PySpark on Yarn can not work when the assembly jar are + // package by JDK 1.7+, so we ship PySpark archives to executors as assembly jar, and add this + // path to PYTHONPATH. + val pysparkArchives = new ArrayBuffer[String]() + sys.env.get("PYSPARK_ARCHIVES_PATH") match { + case Some(archivesPath) => + archivesPath.split(",").foreach { path => + val uri = new URI(path) + if (uri.getScheme == LOCAL_SCHEME) { + pysparkArchives.+=(uri.getPath) + } else { + pysparkArchives.+=(new File(path).getName) + } + } + case None => + for ((resLink, res) <- localResources) { + if (resLink.contains("pyspark") || resLink.contains("py4j")) { + pysparkArchives.+=(resLink) + } + } + } + val pythonPath = pysparkArchives.toArray.mkString(File.pathSeparator) + launchEnv("PYTHONPATH") = pythonPath + sparkConf.setExecutorEnv("PYTHONPATH", pythonPath) + val amContainer = Records.newRecord(classOf[ContainerLaunchContext]) amContainer.setLocalResources(localResources) amContainer.setEnvironment(launchEnv)