Skip to content

Commit 04167a6

Browse files
committed
Merge remote-tracking branch 'apache-github/master' into streaming-guide-update-1.3
2 parents 0b77486 + 8767565 commit 04167a6

File tree

167 files changed

+2145
-1433
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

167 files changed

+2145
-1433
lines changed

assembly/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
<modelVersion>4.0.0</modelVersion>
2121
<parent>
2222
<groupId>org.apache.spark</groupId>
23-
<artifactId>spark-parent</artifactId>
23+
<artifactId>spark-parent_2.10</artifactId>
2424
<version>1.3.0-SNAPSHOT</version>
2525
<relativePath>../pom.xml</relativePath>
2626
</parent>

bagel/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
<modelVersion>4.0.0</modelVersion>
2121
<parent>
2222
<groupId>org.apache.spark</groupId>
23-
<artifactId>spark-parent</artifactId>
23+
<artifactId>spark-parent_2.10</artifactId>
2424
<version>1.3.0-SNAPSHOT</version>
2525
<relativePath>../pom.xml</relativePath>
2626
</parent>

core/pom.xml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
<modelVersion>4.0.0</modelVersion>
2121
<parent>
2222
<groupId>org.apache.spark</groupId>
23-
<artifactId>spark-parent</artifactId>
23+
<artifactId>spark-parent_2.10</artifactId>
2424
<version>1.3.0-SNAPSHOT</version>
2525
<relativePath>../pom.xml</relativePath>
2626
</parent>
@@ -319,6 +319,12 @@
319319
<artifactId>selenium-java</artifactId>
320320
<scope>test</scope>
321321
</dependency>
322+
<!-- Added for selenium: -->
323+
<dependency>
324+
<groupId>xml-apis</groupId>
325+
<artifactId>xml-apis</artifactId>
326+
<scope>test</scope>
327+
</dependency>
322328
<dependency>
323329
<groupId>org.mockito</groupId>
324330
<artifactId>mockito-all</artifactId>

core/src/main/scala/org/apache/spark/ContextCleaner.scala

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -105,9 +105,19 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
105105
cleaningThread.start()
106106
}
107107

108-
/** Stop the cleaner. */
108+
/**
109+
* Stop the cleaning thread and wait until the thread has finished running its current task.
110+
*/
109111
def stop() {
110112
stopped = true
113+
// Interrupt the cleaning thread, but wait until the current task has finished before
114+
// doing so. This guards against the race condition where a cleaning thread may
115+
// potentially clean similarly named variables created by a different SparkContext,
116+
// resulting in otherwise inexplicable block-not-found exceptions (SPARK-6132).
117+
synchronized {
118+
cleaningThread.interrupt()
119+
}
120+
cleaningThread.join()
111121
}
112122

113123
/** Register a RDD for cleanup when it is garbage collected. */
@@ -140,21 +150,25 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
140150
try {
141151
val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
142152
.map(_.asInstanceOf[CleanupTaskWeakReference])
143-
reference.map(_.task).foreach { task =>
144-
logDebug("Got cleaning task " + task)
145-
referenceBuffer -= reference.get
146-
task match {
147-
case CleanRDD(rddId) =>
148-
doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
149-
case CleanShuffle(shuffleId) =>
150-
doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
151-
case CleanBroadcast(broadcastId) =>
152-
doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
153-
case CleanAccum(accId) =>
154-
doCleanupAccum(accId, blocking = blockOnCleanupTasks)
153+
// Synchronize here to avoid being interrupted on stop()
154+
synchronized {
155+
reference.map(_.task).foreach { task =>
156+
logDebug("Got cleaning task " + task)
157+
referenceBuffer -= reference.get
158+
task match {
159+
case CleanRDD(rddId) =>
160+
doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
161+
case CleanShuffle(shuffleId) =>
162+
doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
163+
case CleanBroadcast(broadcastId) =>
164+
doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
165+
case CleanAccum(accId) =>
166+
doCleanupAccum(accId, blocking = blockOnCleanupTasks)
167+
}
155168
}
156169
}
157170
} catch {
171+
case ie: InterruptedException if stopped => // ignore
158172
case e: Exception => logError("Error in cleaning thread", e)
159173
}
160174
}

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
6868
if (value == null) {
6969
throw new NullPointerException("null value for " + key)
7070
}
71-
settings.put(translateConfKey(key, warn = true), value)
71+
settings.put(key, value)
7272
this
7373
}
7474

@@ -140,7 +140,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
140140

141141
/** Set a parameter if it isn't already configured */
142142
def setIfMissing(key: String, value: String): SparkConf = {
143-
settings.putIfAbsent(translateConfKey(key, warn = true), value)
143+
settings.putIfAbsent(key, value)
144144
this
145145
}
146146

@@ -176,7 +176,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
176176

177177
/** Get a parameter as an Option */
178178
def getOption(key: String): Option[String] = {
179-
Option(settings.get(translateConfKey(key)))
179+
Option(settings.get(key))
180180
}
181181

182182
/** Get all parameters as a list of pairs */
@@ -229,7 +229,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
229229
def getAppId: String = get("spark.app.id")
230230

231231
/** Does the configuration contain a given parameter? */
232-
def contains(key: String): Boolean = settings.containsKey(translateConfKey(key))
232+
def contains(key: String): Boolean = settings.containsKey(key)
233233

234234
/** Copy this object */
235235
override def clone: SparkConf = {
@@ -343,6 +343,13 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
343343
}
344344
}
345345
}
346+
347+
// Warn against the use of deprecated configs
348+
deprecatedConfigs.values.foreach { dc =>
349+
if (contains(dc.oldName)) {
350+
dc.warn()
351+
}
352+
}
346353
}
347354

348355
/**

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
5151
import org.apache.spark.executor.TriggerThreadDump
5252
import org.apache.spark.input.{StreamInputFormat, PortableDataStream, WholeTextFileInputFormat,
5353
FixedLengthBinaryInputFormat}
54+
import org.apache.spark.io.CompressionCodec
5455
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
5556
import org.apache.spark.rdd._
5657
import org.apache.spark.scheduler._
@@ -233,6 +234,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
233234
None
234235
}
235236
}
237+
private[spark] val eventLogCodec: Option[String] = {
238+
val compress = conf.getBoolean("spark.eventLog.compress", false)
239+
if (compress && isEventLogEnabled) {
240+
Some(CompressionCodec.getCodecName(conf)).map(CompressionCodec.getShortName)
241+
} else {
242+
None
243+
}
244+
}
236245

237246
// Generate the random name for a temp folder in Tachyon
238247
// Add a timestamp as the suffix here to make it more safe
@@ -1383,10 +1392,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
13831392
/** Shut down the SparkContext. */
13841393
def stop() {
13851394
SparkContext.SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
1386-
postApplicationEnd()
1387-
ui.foreach(_.stop())
13881395
if (!stopped) {
13891396
stopped = true
1397+
postApplicationEnd()
1398+
ui.foreach(_.stop())
13901399
env.metricsSystem.report()
13911400
metadataCleaner.cancel()
13921401
cleaner.foreach(_.stop())

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 59 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,26 +19,27 @@ package org.apache.spark.api.python
1919

2020
import java.io._
2121
import java.net._
22-
import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, UUID, Collections}
23-
24-
import org.apache.spark.input.PortableDataStream
22+
import java.util.{Collections, ArrayList => JArrayList, List => JList, Map => JMap}
2523

2624
import scala.collection.JavaConversions._
2725
import scala.collection.mutable
2826
import scala.language.existentials
2927

3028
import com.google.common.base.Charsets.UTF_8
31-
3229
import org.apache.hadoop.conf.Configuration
3330
import org.apache.hadoop.io.compress.CompressionCodec
34-
import org.apache.hadoop.mapred.{InputFormat, OutputFormat, JobConf}
31+
import org.apache.hadoop.mapred.{InputFormat, JobConf, OutputFormat}
3532
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, OutputFormat => NewOutputFormat}
33+
3634
import org.apache.spark._
37-
import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD}
35+
import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext}
3836
import org.apache.spark.broadcast.Broadcast
37+
import org.apache.spark.input.PortableDataStream
3938
import org.apache.spark.rdd.RDD
4039
import org.apache.spark.util.Utils
4140

41+
import scala.util.control.NonFatal
42+
4243
private[spark] class PythonRDD(
4344
@transient parent: RDD[_],
4445
command: Array[Byte],
@@ -341,21 +342,33 @@ private[spark] object PythonRDD extends Logging {
341342
/**
342343
* Adapter for calling SparkContext#runJob from Python.
343344
*
344-
* This method will return an iterator of an array that contains all elements in the RDD
345+
* This method will serve an iterator of an array that contains all elements in the RDD
345346
* (effectively a collect()), but allows you to run on a certain subset of partitions,
346347
* or to enable local execution.
348+
*
349+
* @return the port number of a local socket which serves the data collected from this job.
347350
*/
348351
def runJob(
349352
sc: SparkContext,
350353
rdd: JavaRDD[Array[Byte]],
351354
partitions: JArrayList[Int],
352-
allowLocal: Boolean): Iterator[Array[Byte]] = {
355+
allowLocal: Boolean): Int = {
353356
type ByteArray = Array[Byte]
354357
type UnrolledPartition = Array[ByteArray]
355358
val allPartitions: Array[UnrolledPartition] =
356359
sc.runJob(rdd, (x: Iterator[ByteArray]) => x.toArray, partitions, allowLocal)
357360
val flattenedPartition: UnrolledPartition = Array.concat(allPartitions: _*)
358-
flattenedPartition.iterator
361+
serveIterator(flattenedPartition.iterator,
362+
s"serve RDD ${rdd.id} with partitions ${partitions.mkString(",")}")
363+
}
364+
365+
/**
366+
* A helper function to collect an RDD as an iterator, then serve it via socket.
367+
*
368+
* @return the port number of a local socket which serves the data collected from this job.
369+
*/
370+
def collectAndServe[T](rdd: RDD[T]): Int = {
371+
serveIterator(rdd.collect().iterator, s"serve RDD ${rdd.id}")
359372
}
360373

361374
def readRDDFromFile(sc: JavaSparkContext, filename: String, parallelism: Int):
@@ -575,15 +588,44 @@ private[spark] object PythonRDD extends Logging {
575588
dataOut.write(bytes)
576589
}
577590

578-
def writeToFile[T](items: java.util.Iterator[T], filename: String) {
579-
import scala.collection.JavaConverters._
580-
writeToFile(items.asScala, filename)
581-
}
591+
/**
592+
* Create a socket server and a background thread to serve the data in `items`,
593+
*
594+
* The socket server can only accept one connection, or close if no connection
595+
* in 3 seconds.
596+
*
597+
* Once a connection comes in, it tries to serialize all the data in `items`
598+
* and send them into this connection.
599+
*
600+
* The thread will terminate after all the data are sent or any exceptions happen.
601+
*/
602+
private def serveIterator[T](items: Iterator[T], threadName: String): Int = {
603+
val serverSocket = new ServerSocket(0, 1)
604+
serverSocket.setReuseAddress(true)
605+
// Close the socket if no connection in 3 seconds
606+
serverSocket.setSoTimeout(3000)
607+
608+
new Thread(threadName) {
609+
setDaemon(true)
610+
override def run() {
611+
try {
612+
val sock = serverSocket.accept()
613+
val out = new DataOutputStream(new BufferedOutputStream(sock.getOutputStream))
614+
try {
615+
writeIteratorToStream(items, out)
616+
} finally {
617+
out.close()
618+
}
619+
} catch {
620+
case NonFatal(e) =>
621+
logError(s"Error while sending iterator", e)
622+
} finally {
623+
serverSocket.close()
624+
}
625+
}
626+
}.start()
582627

583-
def writeToFile[T](items: Iterator[T], filename: String) {
584-
val file = new DataOutputStream(new FileOutputStream(filename))
585-
writeIteratorToStream(items, file)
586-
file.close()
628+
serverSocket.getLocalPort
587629
}
588630

589631
private def getMergedConf(confAsMap: java.util.HashMap[String, String],

core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ private[spark] class ApplicationDescription(
2323
val memoryPerSlave: Int,
2424
val command: Command,
2525
var appUiUrl: String,
26-
val eventLogDir: Option[String] = None)
26+
val eventLogDir: Option[String] = None,
27+
// short name of compression codec used when writing event logs, if any (e.g. lzf)
28+
val eventLogCodec: Option[String] = None)
2729
extends Serializable {
2830

2931
val user = System.getProperty("user.name", "<unknown>")
@@ -34,8 +36,10 @@ private[spark] class ApplicationDescription(
3436
memoryPerSlave: Int = memoryPerSlave,
3537
command: Command = command,
3638
appUiUrl: String = appUiUrl,
37-
eventLogDir: Option[String] = eventLogDir): ApplicationDescription =
38-
new ApplicationDescription(name, maxCores, memoryPerSlave, command, appUiUrl, eventLogDir)
39+
eventLogDir: Option[String] = eventLogDir,
40+
eventLogCodec: Option[String] = eventLogCodec): ApplicationDescription =
41+
new ApplicationDescription(
42+
name, maxCores, memoryPerSlave, command, appUiUrl, eventLogDir, eventLogCodec)
3943

4044
override def toString: String = "ApplicationDescription(" + name + ")"
4145
}

core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ class LocalSparkCluster(
5959
/* Start the Workers */
6060
for (workerNum <- 1 to numWorkers) {
6161
val (workerSystem, _) = Worker.startSystemAndActor(localHostname, 0, 0, coresPerWorker,
62-
memoryPerWorker, masters, null, Some(workerNum))
62+
memoryPerWorker, masters, null, Some(workerNum), _conf)
6363
workerActorSystems += workerSystem
6464
}
6565

0 commit comments

Comments
 (0)