Skip to content

Commit 1c66070

Browse files
committed
Merge remote-tracking branch 'origin/master' into SPARK-4180
2 parents 06c5c54 + cb0eae3 commit 1c66070

File tree

232 files changed

+7643
-2022
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

232 files changed

+7643
-2022
lines changed

LICENSE

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -754,7 +754,7 @@ SUCH DAMAGE.
754754

755755

756756
========================================================================
757-
For Timsort (core/src/main/java/org/apache/spark/util/collection/Sorter.java):
757+
For Timsort (core/src/main/java/org/apache/spark/util/collection/TimSort.java):
758758
========================================================================
759759
Copyright (C) 2008 The Android Open Source Project
760760

@@ -771,6 +771,25 @@ See the License for the specific language governing permissions and
771771
limitations under the License.
772772

773773

774+
========================================================================
775+
For LimitedInputStream
776+
(network/common/src/main/java/org/apache/spark/network/util/LimitedInputStream.java):
777+
========================================================================
778+
Copyright (C) 2007 The Guava Authors
779+
780+
Licensed under the Apache License, Version 2.0 (the "License");
781+
you may not use this file except in compliance with the License.
782+
You may obtain a copy of the License at
783+
784+
http://www.apache.org/licenses/LICENSE-2.0
785+
786+
Unless required by applicable law or agreed to in writing, software
787+
distributed under the License is distributed on an "AS IS" BASIS,
788+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
789+
See the License for the specific language governing permissions and
790+
limitations under the License.
791+
792+
774793
========================================================================
775794
BSD-style licenses
776795
========================================================================

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
6666
// Lower and upper bounds on the number of executors. These are required.
6767
private val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", -1)
6868
private val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors", -1)
69-
verifyBounds()
7069

7170
// How long there must be backlogged tasks for before an addition is triggered
7271
private val schedulerBacklogTimeout = conf.getLong(
@@ -77,9 +76,14 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
7776
"spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", schedulerBacklogTimeout)
7877

7978
// How long an executor must be idle for before it is removed
80-
private val removeThresholdSeconds = conf.getLong(
79+
private val executorIdleTimeout = conf.getLong(
8180
"spark.dynamicAllocation.executorIdleTimeout", 600)
8281

82+
// During testing, the methods to actually kill and add executors are mocked out
83+
private val testing = conf.getBoolean("spark.dynamicAllocation.testing", false)
84+
85+
validateSettings()
86+
8387
// Number of executors to add in the next round
8488
private var numExecutorsToAdd = 1
8589

@@ -103,17 +107,14 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
103107
// Polling loop interval (ms)
104108
private val intervalMillis: Long = 100
105109

106-
// Whether we are testing this class. This should only be used internally.
107-
private val testing = conf.getBoolean("spark.dynamicAllocation.testing", false)
108-
109110
// Clock used to schedule when executors should be added and removed
110111
private var clock: Clock = new RealClock
111112

112113
/**
113-
* Verify that the lower and upper bounds on the number of executors are valid.
114+
* Verify that the settings specified through the config are valid.
114115
* If not, throw an appropriate exception.
115116
*/
116-
private def verifyBounds(): Unit = {
117+
private def validateSettings(): Unit = {
117118
if (minNumExecutors < 0 || maxNumExecutors < 0) {
118119
throw new SparkException("spark.dynamicAllocation.{min/max}Executors must be set!")
119120
}
@@ -124,6 +125,22 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
124125
throw new SparkException(s"spark.dynamicAllocation.minExecutors ($minNumExecutors) must " +
125126
s"be less than or equal to spark.dynamicAllocation.maxExecutors ($maxNumExecutors)!")
126127
}
128+
if (schedulerBacklogTimeout <= 0) {
129+
throw new SparkException("spark.dynamicAllocation.schedulerBacklogTimeout must be > 0!")
130+
}
131+
if (sustainedSchedulerBacklogTimeout <= 0) {
132+
throw new SparkException(
133+
"spark.dynamicAllocation.sustainedSchedulerBacklogTimeout must be > 0!")
134+
}
135+
if (executorIdleTimeout <= 0) {
136+
throw new SparkException("spark.dynamicAllocation.executorIdleTimeout must be > 0!")
137+
}
138+
// Require external shuffle service for dynamic allocation
139+
// Otherwise, we may lose shuffle files when killing executors
140+
if (!conf.getBoolean("spark.shuffle.service.enabled", false) && !testing) {
141+
throw new SparkException("Dynamic allocation of executors requires the external " +
142+
"shuffle service. You may enable this through spark.shuffle.service.enabled.")
143+
}
127144
}
128145

129146
/**
@@ -254,7 +271,7 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
254271
val removeRequestAcknowledged = testing || sc.killExecutor(executorId)
255272
if (removeRequestAcknowledged) {
256273
logInfo(s"Removing executor $executorId because it has been idle for " +
257-
s"$removeThresholdSeconds seconds (new desired total will be ${numExistingExecutors - 1})")
274+
s"$executorIdleTimeout seconds (new desired total will be ${numExistingExecutors - 1})")
258275
executorsPendingToRemove.add(executorId)
259276
true
260277
} else {
@@ -329,8 +346,8 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
329346
private def onExecutorIdle(executorId: String): Unit = synchronized {
330347
if (!removeTimes.contains(executorId) && !executorsPendingToRemove.contains(executorId)) {
331348
logDebug(s"Starting idle timer for $executorId because there are no more tasks " +
332-
s"scheduled to run on the executor (to expire in $removeThresholdSeconds seconds)")
333-
removeTimes(executorId) = clock.getTimeMillis + removeThresholdSeconds * 1000
349+
s"scheduled to run on the executor (to expire in $executorIdleTimeout seconds)")
350+
removeTimes(executorId) = clock.getTimeMillis + executorIdleTimeout * 1000
334351
}
335352
}
336353

core/src/main/scala/org/apache/spark/SecurityManager.scala

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import java.net.{Authenticator, PasswordAuthentication}
2222
import org.apache.hadoop.io.Text
2323

2424
import org.apache.spark.deploy.SparkHadoopUtil
25+
import org.apache.spark.network.sasl.SecretKeyHolder
2526

2627
/**
2728
* Spark class responsible for security.
@@ -84,7 +85,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
8485
* Authenticator installed in the SecurityManager to how it does the authentication
8586
* and in this case gets the user name and password from the request.
8687
*
87-
* - ConnectionManager -> The Spark ConnectionManager uses java nio to asynchronously
88+
* - BlockTransferService -> The Spark BlockTransferServices uses java nio to asynchronously
8889
* exchange messages. For this we use the Java SASL
8990
* (Simple Authentication and Security Layer) API and again use DIGEST-MD5
9091
* as the authentication mechanism. This means the shared secret is not passed
@@ -98,7 +99,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
9899
* of protection they want. If we support those, the messages will also have to
99100
* be wrapped and unwrapped via the SaslServer/SaslClient.wrap/unwrap API's.
100101
*
101-
* Since the connectionManager does asynchronous messages passing, the SASL
102+
* Since the NioBlockTransferService does asynchronous messages passing, the SASL
102103
* authentication is a bit more complex. A ConnectionManager can be both a client
103104
* and a Server, so for a particular connection is has to determine what to do.
104105
* A ConnectionId was added to be able to track connections and is used to
@@ -107,6 +108,10 @@ import org.apache.spark.deploy.SparkHadoopUtil
107108
* and waits for the response from the server and does the handshake before sending
108109
* the real message.
109110
*
111+
* The NettyBlockTransferService ensures that SASL authentication is performed
112+
* synchronously prior to any other communication on a connection. This is done in
113+
* SaslClientBootstrap on the client side and SaslRpcHandler on the server side.
114+
*
110115
* - HTTP for the Spark UI -> the UI was changed to use servlets so that javax servlet filters
111116
* can be used. Yarn requires a specific AmIpFilter be installed for security to work
112117
* properly. For non-Yarn deployments, users can write a filter to go through a
@@ -139,7 +144,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
139144
* can take place.
140145
*/
141146

142-
private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging {
147+
private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging with SecretKeyHolder {
143148

144149
// key used to store the spark secret in the Hadoop UGI
145150
private val sparkSecretLookupKey = "sparkCookie"
@@ -337,4 +342,16 @@ private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging {
337342
* @return the secret key as a String if authentication is enabled, otherwise returns null
338343
*/
339344
def getSecretKey(): String = secretKey
345+
346+
override def getSaslUser(appId: String): String = {
347+
val myAppId = sparkConf.getAppId
348+
require(appId == myAppId, s"SASL appId $appId did not match my appId ${myAppId}")
349+
getSaslUser()
350+
}
351+
352+
override def getSecretKey(appId: String): String = {
353+
val myAppId = sparkConf.getAppId
354+
require(appId == myAppId, s"SASL appId $appId did not match my appId ${myAppId}")
355+
getSecretKey()
356+
}
340357
}

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,12 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
217217
*/
218218
getAll.filter { case (k, _) => isAkkaConf(k) }
219219

220+
/**
221+
* Returns the Spark application id, valid in the Driver after TaskScheduler registration and
222+
* from the start in the Executor.
223+
*/
224+
def getAppId: String = get("spark.app.id")
225+
220226
/** Does the configuration contain a given parameter? */
221227
def contains(key: String): Boolean = settings.contains(key)
222228

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,8 @@ import scala.language.implicitConversions
2121

2222
import java.io._
2323
import java.net.URI
24-
import java.util.Arrays
24+
import java.util.{Arrays, Properties, UUID}
2525
import java.util.concurrent.atomic.AtomicInteger
26-
import java.util.{Properties, UUID}
2726
import java.util.UUID.randomUUID
2827
import scala.collection.{Map, Set}
2928
import scala.collection.generic.Growable
@@ -41,6 +40,7 @@ import akka.actor.Props
4140
import org.apache.spark.annotation.{DeveloperApi, Experimental}
4241
import org.apache.spark.broadcast.Broadcast
4342
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
43+
import org.apache.spark.executor.TriggerThreadDump
4444
import org.apache.spark.input.{StreamInputFormat, PortableDataStream, WholeTextFileInputFormat, FixedLengthBinaryInputFormat}
4545
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
4646
import org.apache.spark.rdd._
@@ -51,7 +51,7 @@ import org.apache.spark.scheduler.local.LocalBackend
5151
import org.apache.spark.storage._
5252
import org.apache.spark.ui.SparkUI
5353
import org.apache.spark.ui.jobs.JobProgressListener
54-
import org.apache.spark.util.{CallSite, ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedWeakValueHashMap, Utils}
54+
import org.apache.spark.util._
5555

5656
/**
5757
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
@@ -340,6 +340,8 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
340340
val applicationId: String = taskScheduler.applicationId()
341341
conf.set("spark.app.id", applicationId)
342342

343+
env.blockManager.initialize(applicationId)
344+
343345
val metricsSystem = env.metricsSystem
344346

345347
// The metrics system for Driver need to be set spark.app.id to app ID.
@@ -388,6 +390,29 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
388390
override protected def childValue(parent: Properties): Properties = new Properties(parent)
389391
}
390392

393+
/**
394+
* Called by the web UI to obtain executor thread dumps. This method may be expensive.
395+
* Logs an error and returns None if we failed to obtain a thread dump, which could occur due
396+
* to an executor being dead or unresponsive or due to network issues while sending the thread
397+
* dump message back to the driver.
398+
*/
399+
private[spark] def getExecutorThreadDump(executorId: String): Option[Array[ThreadStackTrace]] = {
400+
try {
401+
if (executorId == SparkContext.DRIVER_IDENTIFIER) {
402+
Some(Utils.getThreadDump())
403+
} else {
404+
val (host, port) = env.blockManager.master.getActorSystemHostPortForExecutor(executorId).get
405+
val actorRef = AkkaUtils.makeExecutorRef("ExecutorActor", conf, host, port, env.actorSystem)
406+
Some(AkkaUtils.askWithReply[Array[ThreadStackTrace]](TriggerThreadDump, actorRef,
407+
AkkaUtils.numRetries(conf), AkkaUtils.retryWaitMs(conf), AkkaUtils.askTimeout(conf)))
408+
}
409+
} catch {
410+
case e: Exception =>
411+
logError(s"Exception getting thread dump from executor $executorId", e)
412+
None
413+
}
414+
}
415+
391416
private[spark] def getLocalProperties: Properties = localProperties.get()
392417

393418
private[spark] def setLocalProperties(props: Properties) {

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -274,9 +274,9 @@ object SparkEnv extends Logging {
274274
val shuffleMemoryManager = new ShuffleMemoryManager(conf)
275275

276276
val blockTransferService =
277-
conf.get("spark.shuffle.blockTransferService", "nio").toLowerCase match {
277+
conf.get("spark.shuffle.blockTransferService", "netty").toLowerCase match {
278278
case "netty" =>
279-
new NettyBlockTransferService(conf)
279+
new NettyBlockTransferService(conf, securityManager)
280280
case "nio" =>
281281
new NioBlockTransferService(conf, securityManager)
282282
}
@@ -285,8 +285,9 @@ object SparkEnv extends Logging {
285285
"BlockManagerMaster",
286286
new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf, isDriver)
287287

288+
// NB: blockManager is not valid until initialize() is called later.
288289
val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,
289-
serializer, conf, mapOutputTracker, shuffleManager, blockTransferService)
290+
serializer, conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager)
290291

291292
val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
292293

0 commit comments

Comments
 (0)