Skip to content

Commit 77fad08

Browse files
committed
Merge branch 'master' of https://github.com/apache/spark into format_pom
2 parents 62d0862 + 640f9a0 commit 77fad08

File tree

110 files changed

+2448
-1183
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

110 files changed

+2448
-1183
lines changed

assembly/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333

3434
<properties>
3535
<spark.jar.dir>scala-${scala.binary.version}</spark.jar.dir>
36-
<spark.jar.basename>${project.artifactId}-${project.version}-hadoop${hadoop.version}.jar</spark.jar.basename>
36+
<spark.jar.basename>spark-assembly-${project.version}-hadoop${hadoop.version}.jar</spark.jar.basename>
3737
<spark.jar>${project.build.directory}/${spark.jar.dir}/${spark.jar.basename}</spark.jar>
3838
<deb.pkg.name>spark</deb.pkg.name>
3939
<deb.install.path>/usr/share/spark</deb.install.path>

bin/compute-classpath.sh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,9 @@ if [ -f "$ASSEMBLY_DIR"/spark-assembly*hadoop*-deps.jar ]; then
5050
else
5151
# Else use spark-assembly jar from either RELEASE or assembly directory
5252
if [ -f "$FWDIR/RELEASE" ]; then
53-
ASSEMBLY_JAR=`ls "$FWDIR"/jars/spark*-assembly*.jar`
53+
ASSEMBLY_JAR=`ls "$FWDIR"/lib/spark-assembly*hadoop*.jar`
5454
else
55-
ASSEMBLY_JAR=`ls "$ASSEMBLY_DIR"/spark*-assembly*hadoop*.jar`
55+
ASSEMBLY_JAR=`ls "$ASSEMBLY_DIR"/spark-assembly*hadoop*.jar`
5656
fi
5757
CLASSPATH="$CLASSPATH:$ASSEMBLY_JAR"
5858
fi

bin/run-example

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,15 @@ fi
4040
# Figure out the JAR file that our examples were packaged into. This includes a bit of a hack
4141
# to avoid the -sources and -doc packages that are built by publish-local.
4242
EXAMPLES_DIR="$FWDIR"/examples
43-
SPARK_EXAMPLES_JAR=""
44-
if [ -e "$EXAMPLES_DIR"/target/scala-$SCALA_VERSION/*assembly*[0-9Tg].jar ]; then
45-
export SPARK_EXAMPLES_JAR=`ls "$EXAMPLES_DIR"/target/scala-$SCALA_VERSION/*assembly*[0-9Tg].jar`
43+
44+
if [ -f "$FWDIR/RELEASE" ]; then
45+
export SPARK_EXAMPLES_JAR=`ls "$FWDIR"/lib/spark-examples-*hadoop*.jar`
46+
elif [ -e "$EXAMPLES_DIR"/target/scala-$SCALA_VERSION/spark-examples-*hadoop*.jar ]; then
47+
export SPARK_EXAMPLES_JAR=`ls "$EXAMPLES_DIR"/target/scala-$SCALA_VERSION/spark-examples-*hadoop*.jar`
4648
fi
49+
4750
if [[ -z $SPARK_EXAMPLES_JAR ]]; then
48-
echo "Failed to find Spark examples assembly in $FWDIR/examples/target" >&2
51+
echo "Failed to find Spark examples assembly in $FWDIR/lib or $FWDIR/examples/target" >&2
4952
echo "You need to build Spark with sbt/sbt assembly before running this program" >&2
5053
exit 1
5154
fi

conf/spark-env.sh.template

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
# Options read when launching programs locally with
77
# ./bin/run-example or ./bin/spark-submit
8+
# - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files
89
# - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node
910
# - SPARK_PUBLIC_DNS, to set the public dns name of the driver program
1011
# - SPARK_CLASSPATH, default classpath entries to append
@@ -17,6 +18,7 @@
1718
# - MESOS_NATIVE_LIBRARY, to point to your libmesos.so if you use Mesos
1819

1920
# Options read in YARN client mode
21+
# - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files
2022
# - SPARK_EXECUTOR_INSTANCES, Number of workers to start (Default: 2)
2123
# - SPARK_EXECUTOR_CORES, Number of cores for the workers (Default: 1).
2224
# - SPARK_EXECUTOR_MEMORY, Memory per Worker (e.g. 1000M, 2G) (Default: 1G)

core/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@
247247
<dependency>
248248
<groupId>org.spark-project</groupId>
249249
<artifactId>pyrolite</artifactId>
250-
<version>2.0</version>
250+
<version>2.0.1</version>
251251
</dependency>
252252
</dependencies>
253253
<build>

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 41 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -216,10 +216,33 @@ class SparkContext(config: SparkConf) extends Logging {
216216
private[spark] val ui = new SparkUI(this)
217217
ui.bind()
218218

219+
/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
220+
val hadoopConfiguration: Configuration = {
221+
val env = SparkEnv.get
222+
val hadoopConf = SparkHadoopUtil.get.newConfiguration()
223+
// Explicitly check for S3 environment variables
224+
if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
225+
System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
226+
hadoopConf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
227+
hadoopConf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
228+
hadoopConf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
229+
hadoopConf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
230+
}
231+
// Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
232+
conf.getAll.foreach { case (key, value) =>
233+
if (key.startsWith("spark.hadoop.")) {
234+
hadoopConf.set(key.substring("spark.hadoop.".length), value)
235+
}
236+
}
237+
val bufferSize = conf.get("spark.buffer.size", "65536")
238+
hadoopConf.set("io.file.buffer.size", bufferSize)
239+
hadoopConf
240+
}
241+
219242
// Optionally log Spark events
220243
private[spark] val eventLogger: Option[EventLoggingListener] = {
221244
if (conf.getBoolean("spark.eventLog.enabled", false)) {
222-
val logger = new EventLoggingListener(appName, conf)
245+
val logger = new EventLoggingListener(appName, conf, hadoopConfiguration)
223246
logger.start()
224247
listenerBus.addListener(logger)
225248
Some(logger)
@@ -294,29 +317,6 @@ class SparkContext(config: SparkConf) extends Logging {
294317
postEnvironmentUpdate()
295318
postApplicationStart()
296319

297-
/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
298-
val hadoopConfiguration: Configuration = {
299-
val env = SparkEnv.get
300-
val hadoopConf = SparkHadoopUtil.get.newConfiguration()
301-
// Explicitly check for S3 environment variables
302-
if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
303-
System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
304-
hadoopConf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
305-
hadoopConf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
306-
hadoopConf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
307-
hadoopConf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
308-
}
309-
// Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
310-
conf.getAll.foreach { case (key, value) =>
311-
if (key.startsWith("spark.hadoop.")) {
312-
hadoopConf.set(key.substring("spark.hadoop.".length), value)
313-
}
314-
}
315-
val bufferSize = conf.get("spark.buffer.size", "65536")
316-
hadoopConf.set("io.file.buffer.size", bufferSize)
317-
hadoopConf
318-
}
319-
320320
private[spark] var checkpointDir: Option[String] = None
321321

322322
// Thread Local variable that can be used by users to pass information down the stack
@@ -381,16 +381,27 @@ class SparkContext(config: SparkConf) extends Logging {
381381
* // In a separate thread:
382382
* sc.cancelJobGroup("some_job_to_cancel")
383383
* }}}
384+
*
385+
* If interruptOnCancel is set to true for the job group, then job cancellation will result
386+
* in Thread.interrupt() being called on the job's executor threads. This is useful to help ensure
387+
* that the tasks are actually stopped in a timely manner, but is off by default due to HDFS-1208,
388+
* where HDFS may respond to Thread.interrupt() by marking nodes as dead.
384389
*/
385-
def setJobGroup(groupId: String, description: String) {
390+
def setJobGroup(groupId: String, description: String, interruptOnCancel: Boolean = false) {
386391
setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, description)
387392
setLocalProperty(SparkContext.SPARK_JOB_GROUP_ID, groupId)
393+
// Note: Specifying interruptOnCancel in setJobGroup (rather than cancelJobGroup) avoids
394+
// changing several public APIs and allows Spark cancellations outside of the cancelJobGroup
395+
// APIs to also take advantage of this property (e.g., internal job failures or canceling from
396+
// JobProgressTab UI) on a per-job basis.
397+
setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, interruptOnCancel.toString)
388398
}
389399

390400
/** Clear the current thread's job group ID and its description. */
391401
def clearJobGroup() {
392402
setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, null)
393403
setLocalProperty(SparkContext.SPARK_JOB_GROUP_ID, null)
404+
setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, null)
394405
}
395406

396407
// Post init
@@ -1244,6 +1255,8 @@ object SparkContext extends Logging {
12441255

12451256
private[spark] val SPARK_JOB_GROUP_ID = "spark.jobGroup.id"
12461257

1258+
private[spark] val SPARK_JOB_INTERRUPT_ON_CANCEL = "spark.job.interruptOnCancel"
1259+
12471260
private[spark] val SPARK_UNKNOWN_USER = "<unknown>"
12481261

12491262
implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
@@ -1268,8 +1281,10 @@ object SparkContext extends Logging {
12681281

12691282
// TODO: Add AccumulatorParams for other types, e.g. lists and strings
12701283

1271-
implicit def rddToPairRDDFunctions[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]) =
1284+
implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
1285+
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) = {
12721286
new PairRDDFunctions(rdd)
1287+
}
12731288

12741289
implicit def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]) = new AsyncRDDActions(rdd)
12751290

core/src/main/scala/org/apache/spark/TaskContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class TaskContext(
3333
val attemptId: Long,
3434
val runningLocally: Boolean = false,
3535
@volatile var interrupted: Boolean = false,
36-
private[spark] val taskMetrics: TaskMetrics = TaskMetrics.empty()
36+
private[spark] val taskMetrics: TaskMetrics = TaskMetrics.empty
3737
) extends Serializable {
3838

3939
@deprecated("use partitionId", "0.8.1")

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collectio
2424

2525
import scala.collection.JavaConversions._
2626
import scala.reflect.ClassTag
27+
import scala.util.Try
2728

2829
import net.razorvine.pickle.{Pickler, Unpickler}
2930

@@ -89,16 +90,22 @@ private[spark] class PythonRDD[T: ClassTag](
8990
dataOut.flush()
9091
worker.shutdownOutput()
9192
} catch {
93+
9294
case e: java.io.FileNotFoundException =>
9395
readerException = e
94-
// Kill the Python worker process:
95-
worker.shutdownOutput()
96+
Try(worker.shutdownOutput()) // kill Python worker process
97+
9698
case e: IOException =>
9799
// This can happen for legitimate reasons if the Python code stops returning data
98-
// before we are done passing elements through, e.g., for take(). Just log a message
99-
// to say it happened.
100-
logInfo("stdin writer to Python finished early")
101-
logDebug("stdin writer to Python finished early", e)
100+
// before we are done passing elements through, e.g., for take(). Just log a message to
101+
// say it happened (as it could also be hiding a real IOException from a data source).
102+
logInfo("stdin writer to Python finished early (may not be an error)", e)
103+
104+
case e: Exception =>
105+
// We must avoid throwing exceptions here, because the thread uncaught exception handler
106+
// will kill the whole executor (see Executor).
107+
readerException = e
108+
Try(worker.shutdownOutput()) // kill Python worker process
102109
}
103110
}
104111
}.start()
@@ -152,7 +159,7 @@ private[spark] class PythonRDD[T: ClassTag](
152159
val exLength = stream.readInt()
153160
val obj = new Array[Byte](exLength)
154161
stream.readFully(obj)
155-
throw new PythonException(new String(obj))
162+
throw new PythonException(new String(obj), readerException)
156163
case SpecialLengths.END_OF_DATA_SECTION =>
157164
// We've finished the data section of the output, but we can still
158165
// read some accumulator updates:
@@ -162,15 +169,17 @@ private[spark] class PythonRDD[T: ClassTag](
162169
val update = new Array[Byte](updateLen)
163170
stream.readFully(update)
164171
accumulator += Collections.singletonList(update)
165-
166172
}
167173
Array.empty[Byte]
168174
}
169175
} catch {
170-
case eof: EOFException => {
176+
case e: Exception if readerException != null =>
177+
logError("Python worker exited unexpectedly (crashed)", e)
178+
logError("Python crash may have been caused by prior exception:", readerException)
179+
throw readerException
180+
181+
case eof: EOFException =>
171182
throw new SparkException("Python worker exited unexpectedly (crashed)", eof)
172-
}
173-
case e: Throwable => throw e
174183
}
175184
}
176185

@@ -185,7 +194,7 @@ private[spark] class PythonRDD[T: ClassTag](
185194
}
186195

187196
/** Thrown for exceptions in user Python code. */
188-
private class PythonException(msg: String) extends Exception(msg)
197+
private class PythonException(msg: String, cause: Exception) extends RuntimeException(msg, cause)
189198

190199
/**
191200
* Form an RDD[(Array[Byte], Array[Byte])] from key-value pairs returned from Python.

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ object SparkSubmit {
196196
childArgs ++= appArgs.childArgs
197197
} else if (clusterManager == YARN) {
198198
for (arg <- appArgs.childArgs) {
199-
childArgs += ("--args", arg)
199+
childArgs += ("--arg", arg)
200200
}
201201
}
202202
}

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,15 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
116116
if (args.length == 0) printUsageAndExit(-1)
117117
if (primaryResource == null) SparkSubmit.printErrorAndExit("Must specify a primary resource")
118118
if (mainClass == null) SparkSubmit.printErrorAndExit("Must specify a main class with --class")
119+
120+
if (master.startsWith("yarn")) {
121+
val hasHadoopEnv = sys.env.contains("HADOOP_CONF_DIR") || sys.env.contains("YARN_CONF_DIR")
122+
val testing = sys.env.contains("SPARK_TESTING")
123+
if (!hasHadoopEnv && !testing) {
124+
throw new Exception(s"When running with master '$master' " +
125+
"either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.")
126+
}
127+
}
119128
}
120129

121130
override def toString = {

0 commit comments

Comments
 (0)