Skip to content

Commit 6edf052

Browse files
committed
Merge github.com:apache/spark
Conflicts: core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala core/src/main/scala/org/apache/spark/ui/SparkUI.scala core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
2 parents 19e1fb4 + 9689b66 commit 6edf052

File tree

193 files changed

+6520
-2728
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

193 files changed

+6520
-2728
lines changed

.rat-excludes

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,4 +39,4 @@ work
3939
.*\.q
4040
golden
4141
test.out/*
42-
.*iml
42+
.*iml

bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -220,27 +220,31 @@ object Bagel extends Logging {
220220
*/
221221
private def comp[K: Manifest, V <: Vertex, M <: Message[K], C](
222222
sc: SparkContext,
223-
grouped: RDD[(K, (Seq[C], Seq[V]))],
223+
grouped: RDD[(K, (Iterable[C], Iterable[V]))],
224224
compute: (V, Option[C]) => (V, Array[M]),
225225
storageLevel: StorageLevel
226226
): (RDD[(K, (V, Array[M]))], Int, Int) = {
227227
var numMsgs = sc.accumulator(0)
228228
var numActiveVerts = sc.accumulator(0)
229-
val processed = grouped.flatMapValues {
230-
case (_, vs) if vs.size == 0 => None
231-
case (c, vs) =>
229+
val processed = grouped.mapValues(x => (x._1.iterator, x._2.iterator))
230+
.flatMapValues {
231+
case (_, vs) if !vs.hasNext => None
232+
case (c, vs) => {
232233
val (newVert, newMsgs) =
233-
compute(vs(0), c match {
234-
case Seq(comb) => Some(comb)
235-
case Seq() => None
236-
})
234+
compute(vs.next,
235+
c.hasNext match {
236+
case true => Some(c.next)
237+
case false => None
238+
}
239+
)
237240

238241
numMsgs += newMsgs.size
239242
if (newVert.active) {
240243
numActiveVerts += 1
241244
}
242245

243246
Some((newVert, newMsgs))
247+
}
244248
}.persist(storageLevel)
245249

246250
// Force evaluation of processed RDD for accurate performance measurements

bin/compute-classpath.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ fi
6363
# built with Hive, so first check if the datanucleus jars exist, and then ensure the current Spark
6464
# assembly is built for Hive, before actually populating the CLASSPATH with the jars.
6565
# Note that this check order is faster (by up to half a second) in the case where Hive is not used.
66-
num_datanucleus_jars=$(ls "$FWDIR"/lib_managed/jars/ | grep "datanucleus-.*\\.jar" | wc -l)
66+
num_datanucleus_jars=$(ls "$FWDIR"/lib_managed/jars/ 2>/dev/null | grep "datanucleus-.*\\.jar" | wc -l)
6767
if [ $num_datanucleus_jars -gt 0 ]; then
6868
AN_ASSEMBLY_JAR=${ASSEMBLY_JAR:-$DEPS_ASSEMBLY_JAR}
6969
num_hive_files=$(jar tvf "$AN_ASSEMBLY_JAR" org/apache/hadoop/hive/ql/exec 2>/dev/null | wc -l)

bin/spark-shell

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ set -o posix
3434
FWDIR="$(cd `dirname $0`/..; pwd)"
3535

3636
SPARK_REPL_OPTS="${SPARK_REPL_OPTS:-""}"
37-
DEFAULT_MASTER="local"
37+
DEFAULT_MASTER="local[*]"
3838
MASTER=${MASTER:-""}
3939

4040
info_log=0
@@ -64,7 +64,7 @@ ${txtbld}OPTIONS${txtrst}:
6464
is followed by m for megabytes or g for gigabytes, e.g. "1g".
6565
-dm --driver-memory : The memory used by the Spark Shell, the number is followed
6666
by m for megabytes or g for gigabytes, e.g. "1g".
67-
-m --master : A full string that describes the Spark Master, defaults to "local"
67+
-m --master : A full string that describes the Spark Master, defaults to "local[*]"
6868
e.g. "spark://localhost:7077".
6969
--log-conf : Enables logging of the supplied SparkConf as INFO at start of the
7070
Spark Context.
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark
19+
20+
import java.lang.ref.{ReferenceQueue, WeakReference}
21+
22+
import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
23+
24+
import org.apache.spark.broadcast.Broadcast
25+
import org.apache.spark.rdd.RDD
26+
27+
/**
28+
* Classes that represent cleaning tasks.
29+
*/
30+
private sealed trait CleanupTask
31+
private case class CleanRDD(rddId: Int) extends CleanupTask
32+
private case class CleanShuffle(shuffleId: Int) extends CleanupTask
33+
private case class CleanBroadcast(broadcastId: Long) extends CleanupTask
34+
35+
/**
36+
* A WeakReference associated with a CleanupTask.
37+
*
38+
* When the referent object becomes only weakly reachable, the corresponding
39+
* CleanupTaskWeakReference is automatically added to the given reference queue.
40+
*/
41+
private class CleanupTaskWeakReference(
42+
val task: CleanupTask,
43+
referent: AnyRef,
44+
referenceQueue: ReferenceQueue[AnyRef])
45+
extends WeakReference(referent, referenceQueue)
46+
47+
/**
48+
* An asynchronous cleaner for RDD, shuffle, and broadcast state.
49+
*
50+
* This maintains a weak reference for each RDD, ShuffleDependency, and Broadcast of interest,
51+
* to be processed when the associated object goes out of scope of the application. Actual
52+
* cleanup is performed in a separate daemon thread.
53+
*/
54+
private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
55+
56+
private val referenceBuffer = new ArrayBuffer[CleanupTaskWeakReference]
57+
with SynchronizedBuffer[CleanupTaskWeakReference]
58+
59+
private val referenceQueue = new ReferenceQueue[AnyRef]
60+
61+
private val listeners = new ArrayBuffer[CleanerListener]
62+
with SynchronizedBuffer[CleanerListener]
63+
64+
private val cleaningThread = new Thread() { override def run() { keepCleaning() }}
65+
66+
/**
67+
* Whether the cleaning thread will block on cleanup tasks.
68+
* This is set to true only for tests.
69+
*/
70+
private val blockOnCleanupTasks = sc.conf.getBoolean(
71+
"spark.cleaner.referenceTracking.blocking", false)
72+
73+
@volatile private var stopped = false
74+
75+
/** Attach a listener object to get information of when objects are cleaned. */
76+
def attachListener(listener: CleanerListener) {
77+
listeners += listener
78+
}
79+
80+
/** Start the cleaner. */
81+
def start() {
82+
cleaningThread.setDaemon(true)
83+
cleaningThread.setName("Spark Context Cleaner")
84+
cleaningThread.start()
85+
}
86+
87+
/** Stop the cleaner. */
88+
def stop() {
89+
stopped = true
90+
}
91+
92+
/** Register a RDD for cleanup when it is garbage collected. */
93+
def registerRDDForCleanup(rdd: RDD[_]) {
94+
registerForCleanup(rdd, CleanRDD(rdd.id))
95+
}
96+
97+
/** Register a ShuffleDependency for cleanup when it is garbage collected. */
98+
def registerShuffleForCleanup(shuffleDependency: ShuffleDependency[_, _]) {
99+
registerForCleanup(shuffleDependency, CleanShuffle(shuffleDependency.shuffleId))
100+
}
101+
102+
/** Register a Broadcast for cleanup when it is garbage collected. */
103+
def registerBroadcastForCleanup[T](broadcast: Broadcast[T]) {
104+
registerForCleanup(broadcast, CleanBroadcast(broadcast.id))
105+
}
106+
107+
/** Register an object for cleanup. */
108+
private def registerForCleanup(objectForCleanup: AnyRef, task: CleanupTask) {
109+
referenceBuffer += new CleanupTaskWeakReference(task, objectForCleanup, referenceQueue)
110+
}
111+
112+
/** Keep cleaning RDD, shuffle, and broadcast state. */
113+
private def keepCleaning() {
114+
while (!stopped) {
115+
try {
116+
val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
117+
.map(_.asInstanceOf[CleanupTaskWeakReference])
118+
reference.map(_.task).foreach { task =>
119+
logDebug("Got cleaning task " + task)
120+
referenceBuffer -= reference.get
121+
task match {
122+
case CleanRDD(rddId) =>
123+
doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
124+
case CleanShuffle(shuffleId) =>
125+
doCleanupShuffle(shuffleId, blocking = blockOnCleanupTasks)
126+
case CleanBroadcast(broadcastId) =>
127+
doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
128+
}
129+
}
130+
} catch {
131+
case t: Throwable => logError("Error in cleaning thread", t)
132+
}
133+
}
134+
}
135+
136+
/** Perform RDD cleanup. */
137+
def doCleanupRDD(rddId: Int, blocking: Boolean) {
138+
try {
139+
logDebug("Cleaning RDD " + rddId)
140+
sc.unpersistRDD(rddId, blocking)
141+
listeners.foreach(_.rddCleaned(rddId))
142+
logInfo("Cleaned RDD " + rddId)
143+
} catch {
144+
case t: Throwable => logError("Error cleaning RDD " + rddId, t)
145+
}
146+
}
147+
148+
/** Perform shuffle cleanup, asynchronously. */
149+
def doCleanupShuffle(shuffleId: Int, blocking: Boolean) {
150+
try {
151+
logDebug("Cleaning shuffle " + shuffleId)
152+
mapOutputTrackerMaster.unregisterShuffle(shuffleId)
153+
blockManagerMaster.removeShuffle(shuffleId, blocking)
154+
listeners.foreach(_.shuffleCleaned(shuffleId))
155+
logInfo("Cleaned shuffle " + shuffleId)
156+
} catch {
157+
case t: Throwable => logError("Error cleaning shuffle " + shuffleId, t)
158+
}
159+
}
160+
161+
/** Perform broadcast cleanup. */
162+
def doCleanupBroadcast(broadcastId: Long, blocking: Boolean) {
163+
try {
164+
logDebug("Cleaning broadcast " + broadcastId)
165+
broadcastManager.unbroadcast(broadcastId, true, blocking)
166+
listeners.foreach(_.broadcastCleaned(broadcastId))
167+
logInfo("Cleaned broadcast " + broadcastId)
168+
} catch {
169+
case t: Throwable => logError("Error cleaning broadcast " + broadcastId, t)
170+
}
171+
}
172+
173+
private def blockManagerMaster = sc.env.blockManager.master
174+
private def broadcastManager = sc.env.broadcastManager
175+
private def mapOutputTrackerMaster = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
176+
177+
// Used for testing. These methods explicitly blocks until cleanup is completed
178+
// to ensure that more reliable testing.
179+
}
180+
181+
private object ContextCleaner {
182+
private val REF_QUEUE_POLL_TIMEOUT = 100
183+
}
184+
185+
/**
186+
* Listener class used for testing when any item has been cleaned by the Cleaner class.
187+
*/
188+
private[spark] trait CleanerListener {
189+
def rddCleaned(rddId: Int)
190+
def shuffleCleaned(shuffleId: Int)
191+
def broadcastCleaned(broadcastId: Long)
192+
}

core/src/main/scala/org/apache/spark/Dependency.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ class ShuffleDependency[K, V](
5555
extends Dependency(rdd.asInstanceOf[RDD[Product2[K, V]]]) {
5656

5757
val shuffleId: Int = rdd.context.newShuffleId()
58+
59+
rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
5860
}
5961

6062

core/src/main/scala/org/apache/spark/FutureAction.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:
141141
private def awaitResult(): Try[T] = {
142142
jobWaiter.awaitResult() match {
143143
case JobSucceeded => scala.util.Success(resultFunc)
144-
case JobFailed(e: Exception, _) => scala.util.Failure(e)
144+
case JobFailed(e: Exception) => scala.util.Failure(e)
145145
}
146146
}
147147
}

0 commit comments

Comments
 (0)