Skip to content

Commit 473a7c5

Browse files
committed
merge master
2 parents c9d7301 + 7b4f39f commit 473a7c5

File tree

188 files changed

+1633
-261
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

188 files changed

+1633
-261
lines changed

bin/spark-class

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ else
105105
exit 1
106106
fi
107107
fi
108-
JAVA_VERSION=$("$RUNNER" -version 2>&1 | sed 's/.* version "\(.*\)\.\(.*\)\..*"/\1\2/; 1q')
108+
JAVA_VERSION=$("$RUNNER" -version 2>&1 | grep 'version' | sed 's/.* version "\(.*\)\.\(.*\)\..*"/\1\2/; 1q')
109109

110110
# Set JAVA_OPTS to be able to load native libraries and to set heap size
111111
if [ "$JAVA_VERSION" -ge 18 ]; then

core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,13 +111,14 @@ private[spark] class ExecutorRunner(
111111
case "{{EXECUTOR_ID}}" => execId.toString
112112
case "{{HOSTNAME}}" => host
113113
case "{{CORES}}" => cores.toString
114+
case "{{APP_ID}}" => appId
114115
case other => other
115116
}
116117

117118
def getCommandSeq = {
118119
val command = Command(
119120
appDesc.command.mainClass,
120-
appDesc.command.arguments.map(substituteVariables) ++ Seq(appId),
121+
appDesc.command.arguments.map(substituteVariables),
121122
appDesc.command.environment,
122123
appDesc.command.classPathEntries,
123124
appDesc.command.libraryPathEntries,

core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@ private[spark] class WorkerArguments(args: Array[String], conf: SparkConf) {
4141
if (System.getenv("SPARK_WORKER_CORES") != null) {
4242
cores = System.getenv("SPARK_WORKER_CORES").toInt
4343
}
44-
if (System.getenv("SPARK_WORKER_MEMORY") != null) {
45-
memory = Utils.memoryStringToMb(System.getenv("SPARK_WORKER_MEMORY"))
44+
if (conf.getenv("SPARK_WORKER_MEMORY") != null) {
45+
memory = Utils.memoryStringToMb(conf.getenv("SPARK_WORKER_MEMORY"))
4646
}
4747
if (System.getenv("SPARK_WORKER_WEBUI_PORT") != null) {
4848
webUiPort = System.getenv("SPARK_WORKER_WEBUI_PORT").toInt
@@ -56,6 +56,8 @@ private[spark] class WorkerArguments(args: Array[String], conf: SparkConf) {
5656

5757
parse(args.toList)
5858

59+
checkWorkerMemory()
60+
5961
def parse(args: List[String]): Unit = args match {
6062
case ("--ip" | "-i") :: value :: tail =>
6163
Utils.checkHost(value, "ip no longer supported, please use hostname " + value)
@@ -153,4 +155,11 @@ private[spark] class WorkerArguments(args: Array[String], conf: SparkConf) {
153155
// Leave out 1 GB for the operating system, but don't return a negative memory size
154156
math.max(totalMb - 1024, 512)
155157
}
158+
159+
def checkWorkerMemory(): Unit = {
160+
if (memory <= 0) {
161+
val message = "Memory can't be 0, missing a M or G on the end of the memory specification?"
162+
throw new IllegalStateException(message)
163+
}
164+
}
156165
}

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,9 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
152152
"Usage: CoarseGrainedExecutorBackend <driverUrl> <executorId> <hostname> " +
153153
"<cores> <appid> [<workerUrl>] ")
154154
System.exit(1)
155+
156+
// NB: These arguments are provided by SparkDeploySchedulerBackend (for standalone mode)
157+
// and CoarseMesosSchedulerBackend (for mesos mode).
155158
case 5 =>
156159
run(args(0), args(1), args(2), args(3).toInt, args(4), None)
157160
case x if x > 5 =>

core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -78,16 +78,18 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
7878
// greater than totalParts because we actually cap it at totalParts in runJob.
7979
var numPartsToTry = 1
8080
if (partsScanned > 0) {
81-
// If we didn't find any rows after the first iteration, just try all partitions next.
81+
// If we didn't find any rows after the previous iteration, quadruple and retry.
8282
// Otherwise, interpolate the number of partitions we need to try, but overestimate it
83-
// by 50%.
83+
// by 50%. We also cap the estimation in the end.
8484
if (results.size == 0) {
85-
numPartsToTry = totalParts - 1
85+
numPartsToTry = partsScanned * 4
8686
} else {
87-
numPartsToTry = (1.5 * num * partsScanned / results.size).toInt
87+
// the left side of max is >=1 whenever partsScanned >= 2
88+
numPartsToTry = Math.max(1,
89+
(1.5 * num * partsScanned / results.size).toInt - partsScanned)
90+
numPartsToTry = Math.min(numPartsToTry, partsScanned * 4)
8891
}
8992
}
90-
numPartsToTry = math.max(0, numPartsToTry) // guard against negative num of partitions
9193

9294
val left = num - results.size
9395
val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts)

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1079,15 +1079,17 @@ abstract class RDD[T: ClassTag](
10791079
// greater than totalParts because we actually cap it at totalParts in runJob.
10801080
var numPartsToTry = 1
10811081
if (partsScanned > 0) {
1082-
// If we didn't find any rows after the previous iteration, quadruple and retry. Otherwise,
1082+
// If we didn't find any rows after the previous iteration, quadruple and retry. Otherwise,
10831083
// interpolate the number of partitions we need to try, but overestimate it by 50%.
1084+
// We also cap the estimation in the end.
10841085
if (buf.size == 0) {
10851086
numPartsToTry = partsScanned * 4
10861087
} else {
1087-
numPartsToTry = (1.5 * num * partsScanned / buf.size).toInt
1088+
// the left side of max is >=1 whenever partsScanned >= 2
1089+
numPartsToTry = Math.max((1.5 * num * partsScanned / buf.size).toInt - partsScanned, 1)
1090+
numPartsToTry = Math.min(numPartsToTry, partsScanned * 4)
10881091
}
10891092
}
1090-
numPartsToTry = math.max(0, numPartsToTry) // guard against negative num of partitions
10911093

10921094
val left = num - buf.size
10931095
val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts)

core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@ private[spark] class SparkDeploySchedulerBackend(
5151
conf.get("spark.driver.host"),
5252
conf.get("spark.driver.port"),
5353
CoarseGrainedSchedulerBackend.ACTOR_NAME)
54-
val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{WORKER_URL}}")
54+
val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{APP_ID}}",
55+
"{{WORKER_URL}}")
5556
val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
5657
.map(Utils.splitCommandString).getOrElse(Seq.empty)
5758
val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath").toSeq.flatMap { cp =>

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -150,17 +150,17 @@ private[spark] class CoarseMesosSchedulerBackend(
150150
if (uri == null) {
151151
val runScript = new File(executorSparkHome, "./bin/spark-class").getCanonicalPath
152152
command.setValue(
153-
"\"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d".format(
154-
runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
153+
"\"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d %s".format(
154+
runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores, appId))
155155
} else {
156156
// Grab everything to the first '.'. We'll use that and '*' to
157157
// glob the directory "correctly".
158158
val basename = uri.split('/').last.split('.').head
159159
command.setValue(
160160
("cd %s*; " +
161-
"./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d")
161+
"./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d %s")
162162
.format(basename, driverUrl, offer.getSlaveId.getValue,
163-
offer.getHostname, numCores))
163+
offer.getHostname, numCores, appId))
164164
command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
165165
}
166166
command.build()

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -340,8 +340,8 @@ private[spark] object Utils extends Logging {
340340
val targetFile = new File(targetDir, filename)
341341
val uri = new URI(url)
342342
val fileOverwrite = conf.getBoolean("spark.files.overwrite", defaultValue = false)
343-
uri.getScheme match {
344-
case "http" | "https" | "ftp" =>
343+
Option(uri.getScheme) match {
344+
case Some("http") | Some("https") | Some("ftp") =>
345345
logInfo("Fetching " + url + " to " + tempFile)
346346

347347
var uc: URLConnection = null
@@ -374,7 +374,7 @@ private[spark] object Utils extends Logging {
374374
}
375375
}
376376
Files.move(tempFile, targetFile)
377-
case "file" | null =>
377+
case Some("file") | None =>
378378
// In the case of a local file, copy the local file to the target directory.
379379
// Note the difference between uri vs url.
380380
val sourceFile = if (uri.isAbsolute) new File(uri) else new File(url)
@@ -403,7 +403,7 @@ private[spark] object Utils extends Logging {
403403
logInfo("Copying " + sourceFile.getAbsolutePath + " to " + targetFile.getAbsolutePath)
404404
Files.copy(sourceFile, targetFile)
405405
}
406-
case _ =>
406+
case Some(other) =>
407407
// Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others
408408
val fs = getHadoopFileSystem(uri, hadoopConf)
409409
val in = fs.open(new Path(uri))
@@ -1368,16 +1368,17 @@ private[spark] object Utils extends Logging {
13681368
if (uri.getPath == null) {
13691369
throw new IllegalArgumentException(s"Given path is malformed: $uri")
13701370
}
1371-
uri.getScheme match {
1372-
case windowsDrive(d) if windows =>
1371+
1372+
Option(uri.getScheme) match {
1373+
case Some(windowsDrive(d)) if windows =>
13731374
new URI("file:/" + uri.toString.stripPrefix("/"))
1374-
case null =>
1375+
case None =>
13751376
// Preserve fragments for HDFS file name substitution (denoted by "#")
13761377
// For instance, in "abc.py#xyz.py", "xyz.py" is the name observed by the application
13771378
val fragment = uri.getFragment
13781379
val part = new File(uri.getPath).toURI
13791380
new URI(part.getScheme, part.getPath, fragment)
1380-
case _ =>
1381+
case Some(other) =>
13811382
uri
13821383
}
13831384
}
@@ -1399,10 +1400,11 @@ private[spark] object Utils extends Logging {
13991400
} else {
14001401
paths.split(",").filter { p =>
14011402
val formattedPath = if (windows) formatWindowsPath(p) else p
1402-
new URI(formattedPath).getScheme match {
1403-
case windowsDrive(d) if windows => false
1404-
case "local" | "file" | null => false
1405-
case _ => true
1403+
val uri = new URI(formattedPath)
1404+
Option(uri.getScheme) match {
1405+
case Some(windowsDrive(d)) if windows => false
1406+
case Some("local") | Some("file") | None => false
1407+
case Some(other) => true
14061408
}
14071409
}
14081410
}

core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,12 @@ import org.apache.spark.SparkConf
2626

2727
class ExecutorRunnerTest extends FunSuite {
2828
test("command includes appId") {
29-
def f(s:String) = new File(s)
29+
val appId = "12345-worker321-9876"
3030
val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
3131
val appDesc = new ApplicationDescription("app name", Some(8), 500,
32-
Command("foo", Seq(), Map(), Seq(), Seq(), Seq()), "appUiUrl")
33-
val appId = "12345-worker321-9876"
34-
val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", f(sparkHome),
35-
f("ooga"), "blah", new SparkConf, ExecutorState.RUNNING)
36-
32+
Command("foo", Seq(appId), Map(), Seq(), Seq(), Seq()), "appUiUrl")
33+
val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321",
34+
new File(sparkHome), new File("ooga"), "blah", new SparkConf, ExecutorState.RUNNING)
3735
assert(er.getCommandSeq.last === appId)
3836
}
3937
}

0 commit comments

Comments
 (0)