Skip to content

Commit 0171d3c

Browse files
committed
Merge branch 'master' of https://github.com/apache/spark into reduce-locations
2 parents bc4dfd6 + 2db6a85 commit 0171d3c

File tree

324 files changed

+10954
-4487
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

324 files changed

+10954
-4487
lines changed

LICENSE

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -771,6 +771,22 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
771771
See the License for the specific language governing permissions and
772772
limitations under the License.
773773

774+
========================================================================
775+
For TestTimSort (core/src/test/java/org/apache/spark/util/collection/TestTimSort.java):
776+
========================================================================
777+
Copyright (C) 2015 Stijn de Gouw
778+
779+
Licensed under the Apache License, Version 2.0 (the "License");
780+
you may not use this file except in compliance with the License.
781+
You may obtain a copy of the License at
782+
783+
http://www.apache.org/licenses/LICENSE-2.0
784+
785+
Unless required by applicable law or agreed to in writing, software
786+
distributed under the License is distributed on an "AS IS" BASIS,
787+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
788+
See the License for the specific language governing permissions and
789+
limitations under the License.
774790

775791
========================================================================
776792
For LimitedInputStream

assembly/pom.xml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -119,9 +119,9 @@
119119
<artifact>org.jblas:jblas</artifact>
120120
<excludes>
121121
<!-- Linux amd64 is OK; not statically linked -->
122-
<exclude>lib/Linux/i386/**</exclude>
123-
<exclude>lib/Mac OS X/**</exclude>
124-
<exclude>lib/Windows/**</exclude>
122+
<exclude>lib/static/Linux/i386/**</exclude>
123+
<exclude>lib/static/Mac OS X/**</exclude>
124+
<exclude>lib/static/Windows/**</exclude>
125125
</excludes>
126126
</filter>
127127
</filters>

conf/metrics.properties.template

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,15 @@
122122

123123
#worker.sink.csv.unit=minutes
124124

125+
# Enable Slf4jSink for all instances by class name
126+
#*.sink.slf4j.class=org.apache.spark.metrics.sink.Slf4jSink
127+
128+
# Polling period for Slf4JSink
129+
#*.sink.sl4j.period=1
130+
131+
#*.sink.sl4j.unit=minutes
132+
133+
125134
# Enable jvm source for instance master, worker, driver and executor
126135
#master.source.jvm.class=org.apache.spark.metrics.source.JvmSource
127136

core/src/main/java/org/apache/spark/util/collection/TimSort.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -425,15 +425,14 @@ private void pushRun(int runBase, int runLen) {
425425
private void mergeCollapse() {
426426
while (stackSize > 1) {
427427
int n = stackSize - 2;
428-
if (n > 0 && runLen[n-1] <= runLen[n] + runLen[n+1]) {
428+
if ( (n >= 1 && runLen[n-1] <= runLen[n] + runLen[n+1])
429+
|| (n >= 2 && runLen[n-2] <= runLen[n] + runLen[n-1])) {
429430
if (runLen[n - 1] < runLen[n + 1])
430431
n--;
431-
mergeAt(n);
432-
} else if (runLen[n] <= runLen[n + 1]) {
433-
mergeAt(n);
434-
} else {
432+
} else if (runLen[n] > runLen[n + 1]) {
435433
break; // Invariant is established
436434
}
435+
mergeAt(n);
437436
}
438437
}
439438

core/src/main/scala/org/apache/spark/Accumulators.scala

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import java.lang.ThreadLocal
2323

2424
import scala.collection.generic.Growable
2525
import scala.collection.mutable.Map
26+
import scala.ref.WeakReference
2627
import scala.reflect.ClassTag
2728

2829
import org.apache.spark.serializer.JavaSerializer
@@ -279,13 +280,24 @@ object AccumulatorParam {
279280

280281
// TODO: The multi-thread support in accumulators is kind of lame; check
281282
// if there's a more intuitive way of doing it right
282-
private[spark] object Accumulators {
283-
// TODO: Use soft references? => need to make readObject work properly then
284-
val originals = Map[Long, Accumulable[_, _]]()
285-
val localAccums = new ThreadLocal[Map[Long, Accumulable[_, _]]]() {
283+
private[spark] object Accumulators extends Logging {
284+
/**
285+
* This global map holds the original accumulator objects that are created on the driver.
286+
* It keeps weak references to these objects so that accumulators can be garbage-collected
287+
* once the RDDs and user-code that reference them are cleaned up.
288+
*/
289+
val originals = Map[Long, WeakReference[Accumulable[_, _]]]()
290+
291+
/**
292+
* This thread-local map holds per-task copies of accumulators; it is used to collect the set
293+
* of accumulator updates to send back to the driver when tasks complete. After tasks complete,
294+
* this map is cleared by `Accumulators.clear()` (see Executor.scala).
295+
*/
296+
private val localAccums = new ThreadLocal[Map[Long, Accumulable[_, _]]]() {
286297
override protected def initialValue() = Map[Long, Accumulable[_, _]]()
287298
}
288-
var lastId: Long = 0
299+
300+
private var lastId: Long = 0
289301

290302
def newId(): Long = synchronized {
291303
lastId += 1
@@ -294,7 +306,7 @@ private[spark] object Accumulators {
294306

295307
def register(a: Accumulable[_, _], original: Boolean): Unit = synchronized {
296308
if (original) {
297-
originals(a.id) = a
309+
originals(a.id) = new WeakReference[Accumulable[_, _]](a)
298310
} else {
299311
localAccums.get()(a.id) = a
300312
}
@@ -303,7 +315,13 @@ private[spark] object Accumulators {
303315
// Clear the local (non-original) accumulators for the current thread
304316
def clear() {
305317
synchronized {
306-
localAccums.get.clear
318+
localAccums.get.clear()
319+
}
320+
}
321+
322+
def remove(accId: Long) {
323+
synchronized {
324+
originals.remove(accId)
307325
}
308326
}
309327

@@ -320,7 +338,15 @@ private[spark] object Accumulators {
320338
def add(values: Map[Long, Any]): Unit = synchronized {
321339
for ((id, value) <- values) {
322340
if (originals.contains(id)) {
323-
originals(id).asInstanceOf[Accumulable[Any, Any]] ++= value
341+
// Since we are now storing weak references, we must check whether the underlying data
342+
// is valid.
343+
originals(id).get match {
344+
case Some(accum) => accum.asInstanceOf[Accumulable[Any, Any]] ++= value
345+
case None =>
346+
throw new IllegalAccessError("Attempted to access garbage collected Accumulator.")
347+
}
348+
} else {
349+
logWarning(s"Ignoring accumulator update for unknown accumulator id $id")
324350
}
325351
}
326352
}

core/src/main/scala/org/apache/spark/ContextCleaner.scala

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ private sealed trait CleanupTask
3232
private case class CleanRDD(rddId: Int) extends CleanupTask
3333
private case class CleanShuffle(shuffleId: Int) extends CleanupTask
3434
private case class CleanBroadcast(broadcastId: Long) extends CleanupTask
35+
private case class CleanAccum(accId: Long) extends CleanupTask
3536

3637
/**
3738
* A WeakReference associated with a CleanupTask.
@@ -114,6 +115,10 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
114115
registerForCleanup(rdd, CleanRDD(rdd.id))
115116
}
116117

118+
def registerAccumulatorForCleanup(a: Accumulable[_, _]): Unit = {
119+
registerForCleanup(a, CleanAccum(a.id))
120+
}
121+
117122
/** Register a ShuffleDependency for cleanup when it is garbage collected. */
118123
def registerShuffleForCleanup(shuffleDependency: ShuffleDependency[_, _, _]) {
119124
registerForCleanup(shuffleDependency, CleanShuffle(shuffleDependency.shuffleId))
@@ -145,6 +150,8 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
145150
doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
146151
case CleanBroadcast(broadcastId) =>
147152
doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
153+
case CleanAccum(accId) =>
154+
doCleanupAccum(accId, blocking = blockOnCleanupTasks)
148155
}
149156
}
150157
} catch {
@@ -181,15 +188,27 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
181188
/** Perform broadcast cleanup. */
182189
def doCleanupBroadcast(broadcastId: Long, blocking: Boolean) {
183190
try {
184-
logDebug("Cleaning broadcast " + broadcastId)
191+
logDebug(s"Cleaning broadcast $broadcastId")
185192
broadcastManager.unbroadcast(broadcastId, true, blocking)
186193
listeners.foreach(_.broadcastCleaned(broadcastId))
187-
logInfo("Cleaned broadcast " + broadcastId)
194+
logDebug(s"Cleaned broadcast $broadcastId")
188195
} catch {
189196
case e: Exception => logError("Error cleaning broadcast " + broadcastId, e)
190197
}
191198
}
192199

200+
/** Perform accumulator cleanup. */
201+
def doCleanupAccum(accId: Long, blocking: Boolean) {
202+
try {
203+
logDebug("Cleaning accumulator " + accId)
204+
Accumulators.remove(accId)
205+
listeners.foreach(_.accumCleaned(accId))
206+
logInfo("Cleaned accumulator " + accId)
207+
} catch {
208+
case e: Exception => logError("Error cleaning accumulator " + accId, e)
209+
}
210+
}
211+
193212
private def blockManagerMaster = sc.env.blockManager.master
194213
private def broadcastManager = sc.env.broadcastManager
195214
private def mapOutputTrackerMaster = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
@@ -206,4 +225,5 @@ private[spark] trait CleanerListener {
206225
def rddCleaned(rddId: Int)
207226
def shuffleCleaned(shuffleId: Int)
208227
def broadcastCleaned(broadcastId: Long)
228+
def accumCleaned(accId: Long)
209229
}

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 2 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark
2020
import scala.collection.mutable
2121

2222
import org.apache.spark.scheduler._
23+
import org.apache.spark.util.{SystemClock, Clock}
2324

2425
/**
2526
* An agent that dynamically allocates and removes executors based on the workload.
@@ -123,7 +124,7 @@ private[spark] class ExecutorAllocationManager(
123124
private val intervalMillis: Long = 100
124125

125126
// Clock used to schedule when executors should be added and removed
126-
private var clock: Clock = new RealClock
127+
private var clock: Clock = new SystemClock()
127128

128129
// Listener for Spark events that impact the allocation policy
129130
private val listener = new ExecutorAllocationListener
@@ -588,28 +589,3 @@ private[spark] class ExecutorAllocationManager(
588589
private object ExecutorAllocationManager {
589590
val NOT_SET = Long.MaxValue
590591
}
591-
592-
/**
593-
* An abstract clock for measuring elapsed time.
594-
*/
595-
private trait Clock {
596-
def getTimeMillis: Long
597-
}
598-
599-
/**
600-
* A clock backed by a monotonically increasing time source.
601-
* The time returned by this clock does not correspond to any notion of wall-clock time.
602-
*/
603-
private class RealClock extends Clock {
604-
override def getTimeMillis: Long = System.nanoTime / (1000 * 1000)
605-
}
606-
607-
/**
608-
* A clock that allows the caller to customize the time.
609-
* This is used mainly for testing.
610-
*/
611-
private class TestClock(startTimeMillis: Long) extends Clock {
612-
private var time: Long = startTimeMillis
613-
override def getTimeMillis: Long = time
614-
def tick(ms: Long): Unit = { time += ms }
615-
}

core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala

Lines changed: 59 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,33 +17,86 @@
1717

1818
package org.apache.spark
1919

20-
import akka.actor.Actor
20+
import scala.concurrent.duration._
21+
import scala.collection.mutable
22+
23+
import akka.actor.{Actor, Cancellable}
24+
2125
import org.apache.spark.executor.TaskMetrics
2226
import org.apache.spark.storage.BlockManagerId
23-
import org.apache.spark.scheduler.TaskScheduler
27+
import org.apache.spark.scheduler.{SlaveLost, TaskScheduler}
2428
import org.apache.spark.util.ActorLogReceive
2529

2630
/**
2731
* A heartbeat from executors to the driver. This is a shared message used by several internal
28-
* components to convey liveness or execution information for in-progress tasks.
32+
* components to convey liveness or execution information for in-progress tasks. It will also
33+
* expire the hosts that have not heartbeated for more than spark.network.timeout.
2934
*/
3035
private[spark] case class Heartbeat(
3136
executorId: String,
3237
taskMetrics: Array[(Long, TaskMetrics)], // taskId -> TaskMetrics
3338
blockManagerId: BlockManagerId)
3439

40+
private[spark] case object ExpireDeadHosts
41+
3542
private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean)
3643

3744
/**
3845
* Lives in the driver to receive heartbeats from executors..
3946
*/
40-
private[spark] class HeartbeatReceiver(scheduler: TaskScheduler)
47+
private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskScheduler)
4148
extends Actor with ActorLogReceive with Logging {
4249

50+
// executor ID -> timestamp of when the last heartbeat from this executor was received
51+
private val executorLastSeen = new mutable.HashMap[String, Long]
52+
53+
private val executorTimeoutMs = sc.conf.getLong("spark.network.timeout",
54+
sc.conf.getLong("spark.storage.blockManagerSlaveTimeoutMs", 120)) * 1000
55+
56+
private val checkTimeoutIntervalMs = sc.conf.getLong("spark.network.timeoutInterval",
57+
sc.conf.getLong("spark.storage.blockManagerTimeoutIntervalMs", 60)) * 1000
58+
59+
private var timeoutCheckingTask: Cancellable = null
60+
61+
override def preStart(): Unit = {
62+
import context.dispatcher
63+
timeoutCheckingTask = context.system.scheduler.schedule(0.seconds,
64+
checkTimeoutIntervalMs.milliseconds, self, ExpireDeadHosts)
65+
super.preStart()
66+
}
67+
4368
override def receiveWithLogging = {
4469
case Heartbeat(executorId, taskMetrics, blockManagerId) =>
45-
val response = HeartbeatResponse(
46-
!scheduler.executorHeartbeatReceived(executorId, taskMetrics, blockManagerId))
70+
val unknownExecutor = !scheduler.executorHeartbeatReceived(
71+
executorId, taskMetrics, blockManagerId)
72+
val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
73+
executorLastSeen(executorId) = System.currentTimeMillis()
4774
sender ! response
75+
case ExpireDeadHosts =>
76+
expireDeadHosts()
77+
}
78+
79+
private def expireDeadHosts(): Unit = {
80+
logTrace("Checking for hosts with no recent heartbeats in HeartbeatReceiver.")
81+
val now = System.currentTimeMillis()
82+
for ((executorId, lastSeenMs) <- executorLastSeen) {
83+
if (now - lastSeenMs > executorTimeoutMs) {
84+
logWarning(s"Removing executor $executorId with no recent heartbeats: " +
85+
s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms")
86+
scheduler.executorLost(executorId, SlaveLost("Executor heartbeat " +
87+
"timed out after ${now - lastSeenMs} ms"))
88+
if (sc.supportDynamicAllocation) {
89+
sc.killExecutor(executorId)
90+
}
91+
executorLastSeen.remove(executorId)
92+
}
93+
}
94+
}
95+
96+
override def postStop(): Unit = {
97+
if (timeoutCheckingTask != null) {
98+
timeoutCheckingTask.cancel()
99+
}
100+
super.postStop()
48101
}
49102
}

0 commit comments

Comments
 (0)