Skip to content

Commit 6ce2020

Browse files
Arnoldosmiumrshkv
andauthored
Always offer File type conda instructions to executors using driver's transitive packages (#710)
* use verbosity flag in install * always pass down file instruction for executors conda env * check style * reuse yarn test to double check executor conda env * fix yarn cluster test python file * better documentation, tests * update doc w/ checkstyle Co-authored-by: Willi Raschkowski <[email protected]> * remove unnecessary import test Co-authored-by: Willi Raschkowski <[email protected]>
1 parent 8128a21 commit 6ce2020

File tree

3 files changed

+84
-15
lines changed

3 files changed

+84
-15
lines changed

core/src/main/scala/org/apache/spark/api/conda/CondaEnvironment.scala

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ final class CondaEnvironment(
9292
manager.runCondaProcess(rootPath,
9393
List("install", "-n", envName, "-y")
9494
::: extraArgs.toList
95+
::: manager.verbosityFlags
9596
::: "--" :: packages.toList,
9697
description = s"install dependencies in conda env $condaEnvDir",
9798
channels = channels.iterator.map(_.url).toList,
@@ -113,17 +114,35 @@ final class CondaEnvironment(
113114
}
114115

115116
/**
116-
* This is for sending the instructions to the executors so they can replicate the same steps.
117+
* This is for sending the instructions to the executors so they can replicate the same conda
118+
* environment.
119+
* <p><ul>
120+
* <li>In {@code File} mode, re-use the same specfile on executors.</li>
121+
* <li>In {@code Solve} mode, list resolved packages into a specfile
122+
* and use that on executors.</li>
123+
* </ul>
117124
*/
118125
def buildSetupInstructions: CondaSetupInstructions = {
119-
CondaSetupInstructions(
120-
bootstrapMode,
121-
packages.toList,
122-
bootstrapPackageUrls,
123-
bootstrapPackageUrlsUserInfo,
124-
channels.toList,
125-
extraArgs,
126-
envVars)
126+
bootstrapMode match {
127+
case CondaBootstrapMode.Solve =>
128+
CondaSetupInstructions(
129+
CondaBootstrapMode.File,
130+
packages.toList,
131+
getTransitivePackageUrls(),
132+
bootstrapPackageUrlsUserInfo,
133+
channels.toList,
134+
extraArgs,
135+
envVars)
136+
case CondaBootstrapMode.File =>
137+
CondaSetupInstructions(
138+
bootstrapMode,
139+
packages.toList,
140+
bootstrapPackageUrls,
141+
bootstrapPackageUrlsUserInfo,
142+
channels.toList,
143+
extraArgs,
144+
envVars)
145+
}
127146
}
128147
}
129148

core/src/main/scala/org/apache/spark/api/conda/CondaEnvironmentManager.scala

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,10 @@ final class CondaEnvironmentManager(condaBinaryPath: String,
7171
defaultInfo("pkgs_dirs").extract[List[String]]
7272
}
7373

74+
private[conda] lazy val verbosityFlags: List[String] = {
75+
0.until(verbosity).map(_ => "-v").toList
76+
}
77+
7478
def listPackagesExplicit(envDir: String): List[String] = {
7579
logInfo("Retrieving a conda environment's list of installed packages")
7680
val command = Process(List(condaBinaryPath, "list", "-p", envDir, "--explicit"), None)
@@ -90,7 +94,13 @@ final class CondaEnvironmentManager(condaBinaryPath: String,
9094
condaEnvVars: Map[String, String] = Map.empty): CondaEnvironment = {
9195
condaMode match {
9296
case CondaBootstrapMode.Solve =>
93-
create(baseDir, condaPackages, condaChannelUrls, condaExtraArgs, condaEnvVars)
97+
create(
98+
baseDir,
99+
condaPackages,
100+
condaPackageUrlsUserInfo,
101+
condaChannelUrls,
102+
condaExtraArgs,
103+
condaEnvVars)
94104
case CondaBootstrapMode.File =>
95105
createWithFile(
96106
baseDir, condaPackageUrls, condaPackageUrlsUserInfo, condaExtraArgs, condaEnvVars)
@@ -100,6 +110,7 @@ final class CondaEnvironmentManager(condaBinaryPath: String,
100110
def create(
101111
baseDir: String,
102112
condaPackages: Seq[String],
113+
condaPackageUrlsUserInfo: Option[String],
103114
condaChannelUrls: Seq[String],
104115
condaExtraArgs: Seq[String] = Nil,
105116
condaEnvVars: Map[String, String] = Map.empty): CondaEnvironment = {
@@ -114,8 +125,6 @@ final class CondaEnvironmentManager(condaBinaryPath: String,
114125
logInfo(s"Creating symlink $linkedBaseDir -> $baseDir")
115126
Files.createSymbolicLink(linkedBaseDir, Paths.get(baseDir))
116127

117-
val verbosityFlags = 0.until(verbosity).map(_ => "-v").toList
118-
119128
// Attempt to create environment
120129
runCondaProcess(
121130
linkedBaseDir,
@@ -135,7 +144,7 @@ final class CondaEnvironmentManager(condaBinaryPath: String,
135144
CondaBootstrapMode.Solve,
136145
condaPackages,
137146
Nil,
138-
None,
147+
condaPackageUrlsUserInfo,
139148
condaChannelUrls,
140149
condaExtraArgs)
141150
}
@@ -159,8 +168,6 @@ final class CondaEnvironmentManager(condaBinaryPath: String,
159168
logInfo(s"Creating symlink $linkedBaseDir -> $baseDir")
160169
Files.createSymbolicLink(linkedBaseDir, Paths.get(baseDir))
161170

162-
val verbosityFlags = 0.until(verbosity).map(_ => "-v").toList
163-
164171
// Authenticate URLs if we have a UserInfo argument
165172
val finalCondaPackageUrls = if (condaPackageUrlsUserInfo.isDefined) {
166173
condaPackageUrls.map { packageUrl =>

resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,35 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
160160
| sc.stop()
161161
""".stripMargin
162162

163+
private val TEST_CONDA_DRIVER_INIT_PYFILE = """
164+
|import os
165+
|import sys
166+
|
167+
|from pyspark import SparkConf , SparkContext
168+
|if __name__ == "__main__":
169+
| if len(sys.argv) != 2:
170+
| print >> sys.stderr, "Usage: test.py [result file]"
171+
| exit(-1)
172+
| sc = SparkContext(conf=SparkConf())
173+
|
174+
| status = open(sys.argv[1],'w')
175+
|
176+
| def numpy_multiply(x):
177+
| # Ensure executors have the same packages of the driver.
178+
| import numpy
179+
| return numpy.multiply(x, 2)
180+
|
181+
| rdd = sc.parallelize(range(10)).map(numpy_multiply)
182+
| rdd_sum = rdd.sum()
183+
| if rdd_sum == 90: # sum(0:9) * 2 = 90
184+
| result = "success"
185+
| else:
186+
| result = "failure"
187+
| status.write(result)
188+
| status.close()
189+
| sc.stop()
190+
""".stripMargin
191+
163192
private val TEST_PYMODULE = """
164193
|def func():
165194
| return 42
@@ -282,6 +311,20 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
282311
testCondaPySparkAllModes(clientMode = false)
283312
}
284313

314+
test("Python application within Conda in yarn-cluster mode has necessary pkgs") {
315+
// run conda driver
316+
val extraConfForCreate: Map[String, String] = Map(
317+
"spark.conda.binaryPath" -> sys.env("CONDA_BIN"),
318+
"spark.conda.channelUrls" -> "https://repo.continuum.io/pkgs/main",
319+
"spark.conda.bootstrapPackages" -> "python=3.6,numpy=1.14.0"
320+
)
321+
322+
testCondaPySpark(
323+
clientMode = false,
324+
TEST_CONDA_DRIVER_INIT_PYFILE,
325+
extraConf = extraConfForCreate)
326+
}
327+
285328
test("run Python application in yarn-cluster mode using " +
286329
"spark.yarn.appMasterEnv to override local envvar") {
287330
testPySpark(

0 commit comments

Comments
 (0)