Skip to content

Commit 81e9705

Browse files
author
Andrew Or
committed
Merge branch 'master' of github.com:apache/spark into yarn-shuffle-build
2 parents fb7f398 + 300887b commit 81e9705

File tree

131 files changed

+2470
-1236
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

131 files changed

+2470
-1236
lines changed

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ and Spark Streaming for stream processing.
1313
## Online Documentation
1414

1515
You can find the latest Spark documentation, including a programming
16-
guide, on the [project web page](http://spark.apache.org/documentation.html).
16+
guide, on the [project web page](http://spark.apache.org/documentation.html)
17+
and [project wiki](https://cwiki.apache.org/confluence/display/SPARK).
1718
This README file only contains basic setup instructions.
1819

1920
## Building Spark

core/src/main/resources/org/apache/spark/ui/static/webui.css

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,20 @@ pre {
120120
border: none;
121121
}
122122

123+
.stacktrace-details {
124+
max-height: 300px;
125+
overflow-y: auto;
126+
margin: 0;
127+
transition: max-height 0.5s ease-out, padding 0.5s ease-out;
128+
}
129+
130+
.stacktrace-details.collapsed {
131+
max-height: 0;
132+
padding-top: 0;
133+
padding-bottom: 0;
134+
border: none;
135+
}
136+
123137
span.expand-additional-metrics {
124138
cursor: pointer;
125139
}

core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import org.apache.hadoop.mapred._
2626
import org.apache.hadoop.fs.FileSystem
2727
import org.apache.hadoop.fs.Path
2828

29+
import org.apache.spark.mapred.SparkHadoopMapRedUtil
2930
import org.apache.spark.rdd.HadoopRDD
3031

3132
/**

core/src/main/scala/org/apache/spark/TaskEndReason.scala

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,15 +83,48 @@ case class FetchFailed(
8383
* :: DeveloperApi ::
8484
* Task failed due to a runtime exception. This is the most common failure case and also captures
8585
* user program exceptions.
86+
*
87+
* `stackTrace` contains the stack trace of the exception itself. It still exists for backward
88+
* compatibility. It's better to use `this(e: Throwable, metrics: Option[TaskMetrics])` to
89+
* create `ExceptionFailure` as it will handle the backward compatibility properly.
90+
*
91+
* `fullStackTrace` is a better representation of the stack trace because it contains the whole
92+
* stack trace including the exception and its causes
8693
*/
8794
@DeveloperApi
8895
case class ExceptionFailure(
8996
className: String,
9097
description: String,
9198
stackTrace: Array[StackTraceElement],
99+
fullStackTrace: String,
92100
metrics: Option[TaskMetrics])
93101
extends TaskFailedReason {
94-
override def toErrorString: String = Utils.exceptionString(className, description, stackTrace)
102+
103+
private[spark] def this(e: Throwable, metrics: Option[TaskMetrics]) {
104+
this(e.getClass.getName, e.getMessage, e.getStackTrace, Utils.exceptionString(e), metrics)
105+
}
106+
107+
override def toErrorString: String =
108+
if (fullStackTrace == null) {
109+
// fullStackTrace is added in 1.2.0
110+
// If fullStackTrace is null, use the old error string for backward compatibility
111+
exceptionString(className, description, stackTrace)
112+
} else {
113+
fullStackTrace
114+
}
115+
116+
/**
117+
* Return a nice string representation of the exception, including the stack trace.
118+
* Note: It does not include the exception's causes, and is only used for backward compatibility.
119+
*/
120+
private def exceptionString(
121+
className: String,
122+
description: String,
123+
stackTrace: Array[StackTraceElement]): String = {
124+
val desc = if (description == null) "" else description
125+
val st = if (stackTrace == null) "" else stackTrace.map(" " + _).mkString("\n")
126+
s"$className: $desc\n$st"
127+
}
95128
}
96129

97130
/**

core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -493,9 +493,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
493493
}
494494

495495
/**
496-
* Returns the top K elements from this RDD as defined by
496+
* Returns the top k (largest) elements from this RDD as defined by
497497
* the specified Comparator[T].
498-
* @param num the number of top elements to return
498+
* @param num k, the number of top elements to return
499499
* @param comp the comparator that defines the order
500500
* @return an array of top elements
501501
*/
@@ -507,9 +507,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
507507
}
508508

509509
/**
510-
* Returns the top K elements from this RDD using the
510+
* Returns the top k (largest) elements from this RDD using the
511511
* natural ordering for T.
512-
* @param num the number of top elements to return
512+
* @param num k, the number of top elements to return
513513
* @return an array of top elements
514514
*/
515515
def top(num: Int): JList[T] = {
@@ -518,9 +518,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
518518
}
519519

520520
/**
521-
* Returns the first K elements from this RDD as defined by
521+
* Returns the first k (smallest) elements from this RDD as defined by
522522
* the specified Comparator[T] and maintains the order.
523-
* @param num the number of top elements to return
523+
* @param num k, the number of elements to return
524524
* @param comp the comparator that defines the order
525525
* @return an array of top elements
526526
*/
@@ -552,9 +552,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
552552
}
553553

554554
/**
555-
* Returns the first K elements from this RDD using the
555+
* Returns the first k (smallest) elements from this RDD using the
556556
* natural ordering for T while maintain the order.
557-
* @param num the number of top elements to return
557+
* @param num k, the number of top elements to return
558558
* @return an array of top elements
559559
*/
560560
def takeOrdered(num: Int): JList[T] = {

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.deploy
1919

20+
import java.lang.reflect.Method
2021
import java.security.PrivilegedExceptionAction
2122

2223
import org.apache.hadoop.conf.Configuration
@@ -133,14 +134,9 @@ class SparkHadoopUtil extends Logging {
133134
*/
134135
private[spark] def getFSBytesReadOnThreadCallback(path: Path, conf: Configuration)
135136
: Option[() => Long] = {
136-
val qualifiedPath = path.getFileSystem(conf).makeQualified(path)
137-
val scheme = qualifiedPath.toUri().getScheme()
138-
val stats = FileSystem.getAllStatistics().filter(_.getScheme().equals(scheme))
139137
try {
140-
val threadStats = stats.map(Utils.invoke(classOf[Statistics], _, "getThreadStatistics"))
141-
val statisticsDataClass =
142-
Class.forName("org.apache.hadoop.fs.FileSystem$Statistics$StatisticsData")
143-
val getBytesReadMethod = statisticsDataClass.getDeclaredMethod("getBytesRead")
138+
val threadStats = getFileSystemThreadStatistics(path, conf)
139+
val getBytesReadMethod = getFileSystemThreadStatisticsMethod("getBytesRead")
144140
val f = () => threadStats.map(getBytesReadMethod.invoke(_).asInstanceOf[Long]).sum
145141
val baselineBytesRead = f()
146142
Some(() => f() - baselineBytesRead)
@@ -151,6 +147,42 @@ class SparkHadoopUtil extends Logging {
151147
}
152148
}
153149
}
150+
151+
/**
152+
* Returns a function that can be called to find Hadoop FileSystem bytes written. If
153+
* getFSBytesWrittenOnThreadCallback is called from thread r at time t, the returned callback will
154+
* return the bytes written on r since t. Reflection is required because thread-level FileSystem
155+
* statistics are only available as of Hadoop 2.5 (see HADOOP-10688).
156+
* Returns None if the required method can't be found.
157+
*/
158+
private[spark] def getFSBytesWrittenOnThreadCallback(path: Path, conf: Configuration)
159+
: Option[() => Long] = {
160+
try {
161+
val threadStats = getFileSystemThreadStatistics(path, conf)
162+
val getBytesWrittenMethod = getFileSystemThreadStatisticsMethod("getBytesWritten")
163+
val f = () => threadStats.map(getBytesWrittenMethod.invoke(_).asInstanceOf[Long]).sum
164+
val baselineBytesWritten = f()
165+
Some(() => f() - baselineBytesWritten)
166+
} catch {
167+
case e: NoSuchMethodException => {
168+
logDebug("Couldn't find method for retrieving thread-level FileSystem output data", e)
169+
None
170+
}
171+
}
172+
}
173+
174+
private def getFileSystemThreadStatistics(path: Path, conf: Configuration): Seq[AnyRef] = {
175+
val qualifiedPath = path.getFileSystem(conf).makeQualified(path)
176+
val scheme = qualifiedPath.toUri().getScheme()
177+
val stats = FileSystem.getAllStatistics().filter(_.getScheme().equals(scheme))
178+
stats.map(Utils.invoke(classOf[Statistics], _, "getThreadStatistics"))
179+
}
180+
181+
private def getFileSystemThreadStatisticsMethod(methodName: String): Method = {
182+
val statisticsDataClass =
183+
Class.forName("org.apache.hadoop.fs.FileSystem$Statistics$StatisticsData")
184+
statisticsDataClass.getDeclaredMethod(methodName)
185+
}
154186
}
155187

156188
object SparkHadoopUtil {

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ private[spark] class Executor(
263263
m.executorRunTime = serviceTime
264264
m.jvmGCTime = gcTime - startGCTime
265265
}
266-
val reason = ExceptionFailure(t.getClass.getName, t.getMessage, t.getStackTrace, metrics)
266+
val reason = new ExceptionFailure(t, metrics)
267267
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
268268

269269
// Don't forcibly exit unless the exception was inherently fatal, to avoid

core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,12 @@ class TaskMetrics extends Serializable {
8282
*/
8383
var inputMetrics: Option[InputMetrics] = None
8484

85+
/**
86+
* If this task writes data externally (e.g. to a distributed filesystem), metrics on how much
87+
* data was written are stored here.
88+
*/
89+
var outputMetrics: Option[OutputMetrics] = None
90+
8591
/**
8692
* If this task reads from shuffle output, metrics on getting shuffle data will be collected here.
8793
* This includes read metrics aggregated over all the task's shuffle dependencies.
@@ -157,6 +163,16 @@ object DataReadMethod extends Enumeration with Serializable {
157163
val Memory, Disk, Hadoop, Network = Value
158164
}
159165

166+
/**
167+
* :: DeveloperApi ::
168+
* Method by which output data was written.
169+
*/
170+
@DeveloperApi
171+
object DataWriteMethod extends Enumeration with Serializable {
172+
type DataWriteMethod = Value
173+
val Hadoop = Value
174+
}
175+
160176
/**
161177
* :: DeveloperApi ::
162178
* Metrics about reading input data.
@@ -169,6 +185,18 @@ case class InputMetrics(readMethod: DataReadMethod.Value) {
169185
var bytesRead: Long = 0L
170186
}
171187

188+
/**
189+
* :: DeveloperApi ::
190+
* Metrics about writing output data.
191+
*/
192+
@DeveloperApi
193+
case class OutputMetrics(writeMethod: DataWriteMethod.Value) {
194+
/**
195+
* Total bytes written
196+
*/
197+
var bytesWritten: Long = 0L
198+
}
199+
172200
/**
173201
* :: DeveloperApi ::
174202
* Metrics pertaining to shuffle data read in a given task.

core/src/main/scala/org/apache/hadoop/mapred/SparkHadoopMapRedUtil.scala renamed to core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,22 +15,35 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.hadoop.mapred
18+
package org.apache.spark.mapred
1919

20-
private[apache]
20+
import java.lang.reflect.Modifier
21+
22+
import org.apache.hadoop.mapred.{TaskAttemptID, JobID, JobConf, JobContext, TaskAttemptContext}
23+
24+
private[spark]
2125
trait SparkHadoopMapRedUtil {
2226
def newJobContext(conf: JobConf, jobId: JobID): JobContext = {
2327
val klass = firstAvailableClass("org.apache.hadoop.mapred.JobContextImpl",
2428
"org.apache.hadoop.mapred.JobContext")
2529
val ctor = klass.getDeclaredConstructor(classOf[JobConf],
2630
classOf[org.apache.hadoop.mapreduce.JobID])
31+
// In Hadoop 1.0.x, JobContext is an interface, and JobContextImpl is package private.
32+
// Make it accessible if it's not in order to access it.
33+
if (!Modifier.isPublic(ctor.getModifiers)) {
34+
ctor.setAccessible(true)
35+
}
2736
ctor.newInstance(conf, jobId).asInstanceOf[JobContext]
2837
}
2938

3039
def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): TaskAttemptContext = {
3140
val klass = firstAvailableClass("org.apache.hadoop.mapred.TaskAttemptContextImpl",
3241
"org.apache.hadoop.mapred.TaskAttemptContext")
3342
val ctor = klass.getDeclaredConstructor(classOf[JobConf], classOf[TaskAttemptID])
43+
// See above
44+
if (!Modifier.isPublic(ctor.getModifiers)) {
45+
ctor.setAccessible(true)
46+
}
3447
ctor.newInstance(conf, attemptId).asInstanceOf[TaskAttemptContext]
3548
}
3649

core/src/main/scala/org/apache/hadoop/mapreduce/SparkHadoopMapReduceUtil.scala renamed to core/src/main/scala/org/apache/spark/mapreduce/SparkHadoopMapReduceUtil.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,14 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.hadoop.mapreduce
18+
package org.apache.spark.mapreduce
1919

2020
import java.lang.{Boolean => JBoolean, Integer => JInteger}
2121

2222
import org.apache.hadoop.conf.Configuration
23+
import org.apache.hadoop.mapreduce.{JobContext, JobID, TaskAttemptContext, TaskAttemptID}
2324

24-
private[apache]
25+
private[spark]
2526
trait SparkHadoopMapReduceUtil {
2627
def newJobContext(conf: Configuration, jobId: JobID): JobContext = {
2728
val klass = firstAvailableClass(

0 commit comments

Comments
 (0)