Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,24 @@ object SparkSubmit {
}
}

if (args.isPython && clusterManager == YARN) {
sys.env.get("PYSPARK_ARCHIVES_PATH") match {
case Some(archivesPath) => args.files = mergeFileLists(args.files, archivesPath)
case None =>
// Zip PySpark from ${SPARK_HOME}/python/pyspark to ${SPARK_HOME}/lib/pyspark.zip
// and ship to executors by Yarn.
for (sparkHome <- sys.env.get("SPARK_HOME")) {
val srcFile = new File(Seq(sparkHome, "python", "pyspark").mkString(File.separator))
val archives = new File(Seq(sparkHome, "lib", "pyspark.zip").mkString(File.separator))
if (archives.exists() || Utils.createZipArchives(archives, srcFile, "pyspark")) {
val py4jPath = Seq(sparkHome, "python", "lib", "py4j-0.8.2.1-src.zip")
.mkString(File.separator)
args.files = mergeFileLists(args.files, archives.getAbsolutePath, py4jPath)
}
}
}
}

// Special flag to avoid deprecation warnings at the client
sysProps("SPARK_SUBMIT") = "true"

Expand Down
51 changes: 51 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io._
import java.lang.management.ManagementFactory
import java.net._
import java.nio.ByteBuffer
import java.util.zip.{ZipEntry, ZipOutputStream}
import java.util.{Properties, Locale, Random, UUID}
import java.util.concurrent.{ThreadFactory, ConcurrentHashMap, Executors, ThreadPoolExecutor}
import javax.net.ssl.HttpsURLConnection
Expand Down Expand Up @@ -2106,6 +2107,56 @@ private[spark] object Utils extends Logging {
.getOrElse(UserGroupInformation.getCurrentUser().getShortUserName())
}

/**
* Create zip archives.
*/
def createZipArchives(archives: File, srcFile: File, rootPath: String): Boolean = {
var flag = false
try {
val fileOutStream = new FileOutputStream(archives)
val buffOutStream = new BufferedOutputStream(fileOutStream)
val zipOutStream = new ZipOutputStream(buffOutStream)
flag = doZip(zipOutStream, rootPath, srcFile)
zipOutStream.close()
buffOutStream.close()
fileOutStream.close()

} catch {
case e: FileNotFoundException => logError("File to zip not found")
}
flag
}

private def doZip(zipOutStream: ZipOutputStream, curPath: String, file: File): Boolean = {
var flag = false
if (file.isDirectory) {
val files = file.listFiles()
if (files != null && files.length > 0) {
zipOutStream.putNextEntry(new ZipEntry(curPath + File.separator))
val nextPath = if (curPath.length == 0) "" else curPath + File.separator
for (subFile <- files) {
flag = doZip(zipOutStream, nextPath + subFile.getName, subFile)
}
}
} else {
zipOutStream.putNextEntry(new ZipEntry(curPath))
val fileInStream = new FileInputStream(file)
val buffInStream = new BufferedInputStream(fileInStream)
val bufSize = 8192
val buf = new Array[Byte](bufSize)
var len: Int = buffInStream.read(buf, 0, bufSize)
while (len != -1) {
zipOutStream.write(buf, 0, len)
len = buffInStream.read(buf, 0, bufSize)
}
zipOutStream.flush()
flag = true
buffInStream.close()
fileInStream.close()
}
flag
}

}

/**
Expand Down
27 changes: 27 additions & 0 deletions yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.deploy.yarn

import java.io.File
import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException}
import java.nio.ByteBuffer

Expand Down Expand Up @@ -386,6 +387,32 @@ private[spark] class Client(
val appStagingDir = getAppStagingDir(appId)
val localResources = prepareLocalResources(appStagingDir)
val launchEnv = setupLaunchEnv(appStagingDir)

// From SPARK-1920 and SPARK-1520 we know PySpark on Yarn can not work when the assembly jar are
// package by JDK 1.7+, so we ship PySpark archives to executors as assembly jar, and add this
// path to PYTHONPATH.
val pysparkArchives = new ArrayBuffer[String]()
sys.env.get("PYSPARK_ARCHIVES_PATH") match {
case Some(archivesPath) =>
archivesPath.split(",").foreach { path =>
val uri = new URI(path)
if (uri.getScheme == LOCAL_SCHEME) {
pysparkArchives.+=(uri.getPath)
} else {
pysparkArchives.+=(new File(path).getName)
}
}
case None =>
for ((resLink, res) <- localResources) {
if (resLink.contains("pyspark") || resLink.contains("py4j")) {
pysparkArchives.+=(resLink)
}
}
}
val pythonPath = pysparkArchives.toArray.mkString(File.pathSeparator)
launchEnv("PYTHONPATH") = pythonPath
sparkConf.setExecutorEnv("PYTHONPATH", pythonPath)

val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
amContainer.setLocalResources(localResources)
amContainer.setEnvironment(launchEnv)
Expand Down