Skip to content

Commit 3b1e4c8

Browse files
committed
address tgravescs's comments
1 parent 9396346 commit 3b1e4c8

File tree

3 files changed

+19
-46
lines changed

3 files changed

+19
-46
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -343,10 +343,14 @@ object SparkSubmit {
343343
val pyLibPath = Seq(sparkHome, "python", "lib").mkString(File.separator)
344344
val pyArchivesFile = new File(pyLibPath, "pyspark.zip")
345345
if (!pyArchivesFile.exists()) {
346-
val pySrc = new File(Seq(sparkHome, "python", "pyspark").mkString(File.separator))
347-
Utils.zipRecursive(pySrc, pyArchivesFile)
346+
printErrorAndExit("pyspark.zip does not exist for python application in yarn mode.")
348347
}
349-
pythonPath += pyArchivesFile.getAbsolutePath
348+
val py4jFile = new File(pyLibPath, "py4j-0.8.2.1-src.zip")
349+
if (!py4jFile.exists()) {
350+
printErrorAndExit("py4j-0.8.2.1-src.zip does not exist for python application " +
351+
"in yarn mode.")
352+
}
353+
pythonPath += Seq(pyLibPath, "pyspark.zip").mkString(File.separator)
350354
pythonPath += Seq(pyLibPath, "py4j-0.8.2.1-src.zip").mkString(File.separator)
351355
}
352356
pyArchives = pythonPath.mkString(",")

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 0 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.spark.util
1919

2020
import java.io._
21-
import java.util.zip.{ZipOutputStream, ZipEntry}
2221
import java.lang.management.ManagementFactory
2322
import java.net._
2423
import java.nio.ByteBuffer
@@ -1001,40 +1000,6 @@ private[spark] object Utils extends Logging {
10011000
!fileInCanonicalDir.getCanonicalFile().equals(fileInCanonicalDir.getAbsoluteFile())
10021001
}
10031002

1004-
/**
1005-
* recursively add files to the zip file
1006-
*/
1007-
def addFilesToZip(parent: String, source: File, output: ZipOutputStream): Unit = {
1008-
if (source.isDirectory()) {
1009-
output.putNextEntry(new ZipEntry(parent + source.getName()))
1010-
for (file <- source.listFiles()) {
1011-
addFilesToZip(parent + source.getName + File.separator, file, output)
1012-
}
1013-
} else {
1014-
val in = new FileInputStream(source)
1015-
output.putNextEntry(new ZipEntry(parent + source.getName()))
1016-
val buf = new Array[Byte](8192)
1017-
var n = 0
1018-
while (n != -1) {
1019-
n = in.read(buf)
1020-
if (n != -1) {
1021-
output.write(buf, 0, n)
1022-
}
1023-
}
1024-
in.close()
1025-
}
1026-
}
1027-
1028-
/**
1029-
* zip source file to dest ZipFile
1030-
*/
1031-
def zipRecursive(source: File, destZipFile: File) = {
1032-
val destOutput = new ZipOutputStream(new FileOutputStream(destZipFile))
1033-
addFilesToZip("", source, destOutput)
1034-
destOutput.flush()
1035-
destOutput.close()
1036-
}
1037-
10381003
/**
10391004
* Determines if a directory contains any files newer than cutoff seconds.
10401005
*

yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.deploy.yarn
1919

20+
import java.io.File
2021
import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException}
2122
import java.nio.ByteBuffer
2223

@@ -326,14 +327,6 @@ private[spark] class Client(
326327
distCacheMgr.setDistFilesEnv(env)
327328
distCacheMgr.setDistArchivesEnv(env)
328329

329-
// if spark.submit.pyArchives is in sparkConf, set PYTHONPATH to be passed
330-
// on to the ApplicationMaster and the executors.
331-
if (sparkConf.contains("spark.submit.pyArchives")) {
332-
val archives = sparkConf.get("spark.submit.pyArchives")
333-
env("PYTHONPATH") = archives
334-
sparkConf.setExecutorEnv("PYTHONPATH", archives)
335-
}
336-
337330
// Pick up any environment variables for the AM provided through spark.yarn.appMasterEnv.*
338331
val amEnvPrefix = "spark.yarn.appMasterEnv."
339332
sparkConf.getAll
@@ -349,6 +342,17 @@ private[spark] class Client(
349342
env("SPARK_YARN_USER_ENV") = userEnvs
350343
}
351344

345+
// if spark.submit.pyArchives is in sparkConf, append pyArchives to PYTHONPATH
346+
// that can be passed on to the ApplicationMaster and the executors.
347+
if (sparkConf.contains("spark.submit.pyArchives")) {
348+
var pythonPath = sparkConf.get("spark.submit.pyArchives")
349+
if (env.contains("PYTHONPATH")) {
350+
pythonPath = Seq(env.get("PYTHONPATH"), pythonPath).mkString(File.pathSeparator)
351+
}
352+
env("PYTHONPATH") = pythonPath
353+
sparkConf.setExecutorEnv("PYTHONPATH", pythonPath)
354+
}
355+
352356
// In cluster mode, if the deprecated SPARK_JAVA_OPTS is set, we need to propagate it to
353357
// executors. But we can't just set spark.executor.extraJavaOptions, because the driver's
354358
// SparkContext will not let that set spark* system properties, which is expected behavior for

0 commit comments

Comments
 (0)