Skip to content

Commit 420da39

Browse files
committed
Merge remote-tracking branch 'upstream/master' into addToString
2 parents c22f352 + 8d2a36c commit 420da39

File tree

164 files changed

+2191
-1373
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

164 files changed

+2191
-1373
lines changed

bin/load-spark-env.cmd

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
@echo off
2+
3+
rem
4+
rem Licensed to the Apache Software Foundation (ASF) under one or more
5+
rem contributor license agreements. See the NOTICE file distributed with
6+
rem this work for additional information regarding copyright ownership.
7+
rem The ASF licenses this file to You under the Apache License, Version 2.0
8+
rem (the "License"); you may not use this file except in compliance with
9+
rem the License. You may obtain a copy of the License at
10+
rem
11+
rem http://www.apache.org/licenses/LICENSE-2.0
12+
rem
13+
rem Unless required by applicable law or agreed to in writing, software
14+
rem distributed under the License is distributed on an "AS IS" BASIS,
15+
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
rem See the License for the specific language governing permissions and
17+
rem limitations under the License.
18+
rem
19+
20+
rem This script loads spark-env.cmd if it exists, and ensures it is only loaded once.
21+
rem spark-env.cmd is loaded from SPARK_CONF_DIR if set, or within the current directory's
22+
rem conf/ subdirectory.
23+
24+
if [%SPARK_ENV_LOADED%] == [] (
25+
set SPARK_ENV_LOADED=1
26+
27+
if not [%SPARK_CONF_DIR%] == [] (
28+
set user_conf_dir=%SPARK_CONF_DIR%
29+
) else (
30+
set user_conf_dir=%~dp0..\..\conf
31+
)
32+
33+
call :LoadSparkEnv
34+
)
35+
36+
rem Setting SPARK_SCALA_VERSION if not already set.
37+
38+
set ASSEMBLY_DIR2=%SPARK_HOME%/assembly/target/scala-2.11
39+
set ASSEMBLY_DIR1=%SPARK_HOME%/assembly/target/scala-2.10
40+
41+
if [%SPARK_SCALA_VERSION%] == [] (
42+
43+
if exist %ASSEMBLY_DIR2% if exist %ASSEMBLY_DIR1% (
44+
echo "Presence of build for both scala versions(SCALA 2.10 and SCALA 2.11) detected."
45+
echo "Either clean one of them or, set SPARK_SCALA_VERSION=2.11 in spark-env.cmd."
46+
exit 1
47+
)
48+
if exist %ASSEMBLY_DIR2% (
49+
set SPARK_SCALA_VERSION=2.11
50+
) else (
51+
set SPARK_SCALA_VERSION=2.10
52+
)
53+
)
54+
exit /b 0
55+
56+
:LoadSparkEnv
57+
if exist "%user_conf_dir%\spark-env.cmd" (
58+
call "%user_conf_dir%\spark-env.cmd"
59+
)

bin/pyspark2.cmd

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,7 @@ rem
2020
rem Figure out where the Spark framework is installed
2121
set SPARK_HOME=%~dp0..
2222

23-
rem Load environment variables from conf\spark-env.cmd, if it exists
24-
if exist "%SPARK_HOME%\conf\spark-env.cmd" call "%SPARK_HOME%\conf\spark-env.cmd"
23+
call %SPARK_HOME%\bin\load-spark-env.cmd
2524

2625
rem Figure out which Python to use.
2726
if "x%PYSPARK_DRIVER_PYTHON%"=="x" (

bin/run-example2.cmd

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,7 @@ set FWDIR=%~dp0..\
2525
rem Export this as SPARK_HOME
2626
set SPARK_HOME=%FWDIR%
2727

28-
rem Load environment variables from conf\spark-env.cmd, if it exists
29-
if exist "%FWDIR%conf\spark-env.cmd" call "%FWDIR%conf\spark-env.cmd"
28+
call %SPARK_HOME%\bin\load-spark-env.cmd
3029

3130
rem Test that an argument was given
3231
if not "x%1"=="x" goto arg_given

bin/spark-class2.cmd

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,7 @@ rem
2020
rem Figure out where the Spark framework is installed
2121
set SPARK_HOME=%~dp0..
2222

23-
rem Load environment variables from conf\spark-env.cmd, if it exists
24-
if exist "%SPARK_HOME%\conf\spark-env.cmd" call "%SPARK_HOME%\conf\spark-env.cmd"
23+
call %SPARK_HOME%\bin\load-spark-env.cmd
2524

2625
rem Test that an argument was given
2726
if "x%1"=="x" (

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@
1717

1818
package org.apache.spark
1919

20+
import java.util.concurrent.{Executors, TimeUnit}
21+
2022
import scala.collection.mutable
2123

2224
import org.apache.spark.scheduler._
23-
import org.apache.spark.util.{SystemClock, Clock}
25+
import org.apache.spark.util.{Clock, SystemClock, Utils}
2426

2527
/**
2628
* An agent that dynamically allocates and removes executors based on the workload.
@@ -129,6 +131,10 @@ private[spark] class ExecutorAllocationManager(
129131
// Listener for Spark events that impact the allocation policy
130132
private val listener = new ExecutorAllocationListener
131133

134+
// Executor that handles the scheduling task.
135+
private val executor = Executors.newSingleThreadScheduledExecutor(
136+
Utils.namedThreadFactory("spark-dynamic-executor-allocation"))
137+
132138
/**
133139
* Verify that the settings specified through the config are valid.
134140
* If not, throw an appropriate exception.
@@ -173,32 +179,24 @@ private[spark] class ExecutorAllocationManager(
173179
}
174180

175181
/**
176-
* Register for scheduler callbacks to decide when to add and remove executors.
182+
* Register for scheduler callbacks to decide when to add and remove executors, and start
183+
* the scheduling task.
177184
*/
178185
def start(): Unit = {
179186
listenerBus.addListener(listener)
180-
startPolling()
187+
188+
val scheduleTask = new Runnable() {
189+
override def run(): Unit = Utils.logUncaughtExceptions(schedule())
190+
}
191+
executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
181192
}
182193

183194
/**
184-
* Start the main polling thread that keeps track of when to add and remove executors.
195+
* Stop the allocation manager.
185196
*/
186-
private def startPolling(): Unit = {
187-
val t = new Thread {
188-
override def run(): Unit = {
189-
while (true) {
190-
try {
191-
schedule()
192-
} catch {
193-
case e: Exception => logError("Exception in dynamic executor allocation thread!", e)
194-
}
195-
Thread.sleep(intervalMillis)
196-
}
197-
}
198-
}
199-
t.setName("spark-dynamic-executor-allocation")
200-
t.setDaemon(true)
201-
t.start()
197+
def stop(): Unit = {
198+
executor.shutdown()
199+
executor.awaitTermination(10, TimeUnit.SECONDS)
202200
}
203201

204202
/**

core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala

Lines changed: 63 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,15 @@
1717

1818
package org.apache.spark
1919

20-
import scala.concurrent.duration._
21-
import scala.collection.mutable
20+
import java.util.concurrent.{ScheduledFuture, TimeUnit, Executors}
2221

23-
import akka.actor.{Actor, Cancellable}
22+
import scala.collection.mutable
2423

2524
import org.apache.spark.executor.TaskMetrics
25+
import org.apache.spark.rpc.{ThreadSafeRpcEndpoint, RpcEnv, RpcCallContext}
2626
import org.apache.spark.storage.BlockManagerId
2727
import org.apache.spark.scheduler.{SlaveLost, TaskScheduler}
28-
import org.apache.spark.util.ActorLogReceive
28+
import org.apache.spark.util.Utils
2929

3030
/**
3131
* A heartbeat from executors to the driver. This is a shared message used by several internal
@@ -37,15 +37,25 @@ private[spark] case class Heartbeat(
3737
taskMetrics: Array[(Long, TaskMetrics)], // taskId -> TaskMetrics
3838
blockManagerId: BlockManagerId)
3939

40+
/**
41+
* An event that SparkContext uses to notify HeartbeatReceiver that SparkContext.taskScheduler is
42+
* created.
43+
*/
44+
private[spark] case object TaskSchedulerIsSet
45+
4046
private[spark] case object ExpireDeadHosts
4147

4248
private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean)
4349

4450
/**
4551
* Lives in the driver to receive heartbeats from executors..
4652
*/
47-
private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskScheduler)
48-
extends Actor with ActorLogReceive with Logging {
53+
private[spark] class HeartbeatReceiver(sc: SparkContext)
54+
extends ThreadSafeRpcEndpoint with Logging {
55+
56+
override val rpcEnv: RpcEnv = sc.env.rpcEnv
57+
58+
private[spark] var scheduler: TaskScheduler = null
4959

5060
// executor ID -> timestamp of when the last heartbeat from this executor was received
5161
private val executorLastSeen = new mutable.HashMap[String, Long]
@@ -61,24 +71,44 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskSchedule
6171
sc.conf.getOption("spark.network.timeoutInterval").map(_.toLong * 1000).
6272
getOrElse(sc.conf.getLong("spark.storage.blockManagerTimeoutIntervalMs", 60000))
6373

64-
private var timeoutCheckingTask: Cancellable = null
65-
66-
override def preStart(): Unit = {
67-
import context.dispatcher
68-
timeoutCheckingTask = context.system.scheduler.schedule(0.seconds,
69-
checkTimeoutIntervalMs.milliseconds, self, ExpireDeadHosts)
70-
super.preStart()
74+
private var timeoutCheckingTask: ScheduledFuture[_] = null
75+
76+
private val timeoutCheckingThread = Executors.newSingleThreadScheduledExecutor(
77+
Utils.namedThreadFactory("heartbeat-timeout-checking-thread"))
78+
79+
private val killExecutorThread = Executors.newSingleThreadExecutor(
80+
Utils.namedThreadFactory("kill-executor-thread"))
81+
82+
override def onStart(): Unit = {
83+
timeoutCheckingTask = timeoutCheckingThread.scheduleAtFixedRate(new Runnable {
84+
override def run(): Unit = Utils.tryLogNonFatalError {
85+
Option(self).foreach(_.send(ExpireDeadHosts))
86+
}
87+
}, 0, checkTimeoutIntervalMs, TimeUnit.MILLISECONDS)
7188
}
72-
73-
override def receiveWithLogging: PartialFunction[Any, Unit] = {
74-
case Heartbeat(executorId, taskMetrics, blockManagerId) =>
75-
val unknownExecutor = !scheduler.executorHeartbeatReceived(
76-
executorId, taskMetrics, blockManagerId)
77-
val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
78-
executorLastSeen(executorId) = System.currentTimeMillis()
79-
sender ! response
89+
90+
override def receive: PartialFunction[Any, Unit] = {
8091
case ExpireDeadHosts =>
8192
expireDeadHosts()
93+
case TaskSchedulerIsSet =>
94+
scheduler = sc.taskScheduler
95+
}
96+
97+
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
98+
case heartbeat @ Heartbeat(executorId, taskMetrics, blockManagerId) =>
99+
if (scheduler != null) {
100+
val unknownExecutor = !scheduler.executorHeartbeatReceived(
101+
executorId, taskMetrics, blockManagerId)
102+
val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
103+
executorLastSeen(executorId) = System.currentTimeMillis()
104+
context.reply(response)
105+
} else {
106+
// Because Executor will sleep several seconds before sending the first "Heartbeat", this
107+
// case rarely happens. However, if it really happens, log it and ask the executor to
108+
// register itself again.
109+
logWarning(s"Dropping $heartbeat because TaskScheduler is not ready yet")
110+
context.reply(HeartbeatResponse(reregisterBlockManager = true))
111+
}
82112
}
83113

84114
private def expireDeadHosts(): Unit = {
@@ -91,17 +121,25 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskSchedule
91121
scheduler.executorLost(executorId, SlaveLost("Executor heartbeat " +
92122
s"timed out after ${now - lastSeenMs} ms"))
93123
if (sc.supportDynamicAllocation) {
94-
sc.killExecutor(executorId)
124+
// Asynchronously kill the executor to avoid blocking the current thread
125+
killExecutorThread.submit(new Runnable {
126+
override def run(): Unit = sc.killExecutor(executorId)
127+
})
95128
}
96129
executorLastSeen.remove(executorId)
97130
}
98131
}
99132
}
100133

101-
override def postStop(): Unit = {
134+
override def onStop(): Unit = {
102135
if (timeoutCheckingTask != null) {
103-
timeoutCheckingTask.cancel()
136+
timeoutCheckingTask.cancel(true)
104137
}
105-
super.postStop()
138+
timeoutCheckingThread.shutdownNow()
139+
killExecutorThread.shutdownNow()
106140
}
107141
}
142+
143+
object HeartbeatReceiver {
144+
val ENDPOINT_NAME = "HeartbeatReceiver"
145+
}

0 commit comments

Comments
 (0)