Skip to content

Commit 6018122

Browse files
committed
2 parents 0b93785 + a96b727 commit 6018122

File tree

22 files changed

+358
-420
lines changed

22 files changed

+358
-420
lines changed

core/src/main/resources/org/apache/spark/ui/static/webui.css

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,16 @@ span.additional-metric-title {
169169
display: inline-block;
170170
}
171171

172+
.version {
173+
line-height: 30px;
174+
vertical-align: bottom;
175+
font-size: 12px;
176+
padding: 0;
177+
margin: 0;
178+
font-weight: bold;
179+
color: #777;
180+
}
181+
172182
/* Hide all additional metrics by default. This is done here rather than using JavaScript to
173183
* avoid slow page loads for stage pages with large numbers (e.g., thousands) of tasks. */
174184
.scheduler_delay, .deserialization_time, .serialization_time, .getting_result_time {

core/src/main/scala/org/apache/spark/Dependency.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@ abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
6060
* @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. If set to None,
6161
* the default serializer, as specified by `spark.serializer` config option, will
6262
* be used.
63+
* @param keyOrdering key ordering for RDD's shuffles
64+
* @param aggregator map/reduce-side aggregator for RDD's shuffle
65+
* @param mapSideCombine whether to perform partial aggregation (also known as map-side combine)
6366
*/
6467
@DeveloperApi
6568
class ShuffleDependency[K, V, C](

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
172172
private[spark] def this(master: String, appName: String, sparkHome: String, jars: Seq[String]) =
173173
this(master, appName, sparkHome, jars, Map(), Map())
174174

175+
// log out Spark Version in Spark driver log
176+
logInfo(s"Running Spark version $SPARK_VERSION")
177+
175178
private[spark] val conf = config.clone()
176179
conf.validateSettings()
177180

core/src/main/scala/org/apache/spark/io/CompressionCodec.scala

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,12 @@ import java.io.{InputStream, OutputStream}
2121

2222
import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
2323
import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream}
24-
import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream}
24+
import org.xerial.snappy.{Snappy, SnappyInputStream, SnappyOutputStream}
2525

2626
import org.apache.spark.SparkConf
2727
import org.apache.spark.annotation.DeveloperApi
2828
import org.apache.spark.util.Utils
29+
import org.apache.spark.Logging
2930

3031
/**
3132
* :: DeveloperApi ::
@@ -44,25 +45,33 @@ trait CompressionCodec {
4445
def compressedInputStream(s: InputStream): InputStream
4546
}
4647

47-
4848
private[spark] object CompressionCodec {
4949

50+
private val configKey = "spark.io.compression.codec"
5051
private val shortCompressionCodecNames = Map(
5152
"lz4" -> classOf[LZ4CompressionCodec].getName,
5253
"lzf" -> classOf[LZFCompressionCodec].getName,
5354
"snappy" -> classOf[SnappyCompressionCodec].getName)
5455

5556
def createCodec(conf: SparkConf): CompressionCodec = {
56-
createCodec(conf, conf.get("spark.io.compression.codec", DEFAULT_COMPRESSION_CODEC))
57+
createCodec(conf, conf.get(configKey, DEFAULT_COMPRESSION_CODEC))
5758
}
5859

5960
def createCodec(conf: SparkConf, codecName: String): CompressionCodec = {
6061
val codecClass = shortCompressionCodecNames.getOrElse(codecName.toLowerCase, codecName)
61-
val ctor = Class.forName(codecClass, true, Utils.getContextOrSparkClassLoader)
62-
.getConstructor(classOf[SparkConf])
63-
ctor.newInstance(conf).asInstanceOf[CompressionCodec]
62+
val codec = try {
63+
val ctor = Class.forName(codecClass, true, Utils.getContextOrSparkClassLoader)
64+
.getConstructor(classOf[SparkConf])
65+
Some(ctor.newInstance(conf).asInstanceOf[CompressionCodec])
66+
} catch {
67+
case e: ClassNotFoundException => None
68+
case e: IllegalArgumentException => None
69+
}
70+
codec.getOrElse(throw new IllegalArgumentException(s"Codec [$codecName] is not available. " +
71+
s"Consider setting $configKey=$FALLBACK_COMPRESSION_CODEC"))
6472
}
6573

74+
val FALLBACK_COMPRESSION_CODEC = "lzf"
6675
val DEFAULT_COMPRESSION_CODEC = "snappy"
6776
val ALL_COMPRESSION_CODECS = shortCompressionCodecNames.values.toSeq
6877
}
@@ -120,6 +129,12 @@ class LZFCompressionCodec(conf: SparkConf) extends CompressionCodec {
120129
@DeveloperApi
121130
class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec {
122131

132+
try {
133+
Snappy.getNativeLibraryVersion
134+
} catch {
135+
case e: Error => throw new IllegalArgumentException
136+
}
137+
123138
override def compressedOutputStream(s: OutputStream): OutputStream = {
124139
val blockSize = conf.getInt("spark.io.compression.snappy.block.size", 32768)
125140
new SnappyOutputStream(s, blockSize)

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -483,7 +483,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
483483
*/
484484
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
485485
this.cogroup(other, partitioner).flatMapValues( pair =>
486-
for (v <- pair._1; w <- pair._2) yield (v, w)
486+
for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
487487
)
488488
}
489489

@@ -496,9 +496,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
496496
def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = {
497497
this.cogroup(other, partitioner).flatMapValues { pair =>
498498
if (pair._2.isEmpty) {
499-
pair._1.map(v => (v, None))
499+
pair._1.iterator.map(v => (v, None))
500500
} else {
501-
for (v <- pair._1; w <- pair._2) yield (v, Some(w))
501+
for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, Some(w))
502502
}
503503
}
504504
}
@@ -513,9 +513,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
513513
: RDD[(K, (Option[V], W))] = {
514514
this.cogroup(other, partitioner).flatMapValues { pair =>
515515
if (pair._1.isEmpty) {
516-
pair._2.map(w => (None, w))
516+
pair._2.iterator.map(w => (None, w))
517517
} else {
518-
for (v <- pair._1; w <- pair._2) yield (Some(v), w)
518+
for (v <- pair._1.iterator; w <- pair._2.iterator) yield (Some(v), w)
519519
}
520520
}
521521
}
@@ -531,9 +531,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
531531
def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
532532
: RDD[(K, (Option[V], Option[W]))] = {
533533
this.cogroup(other, partitioner).flatMapValues {
534-
case (vs, Seq()) => vs.map(v => (Some(v), None))
535-
case (Seq(), ws) => ws.map(w => (None, Some(w)))
536-
case (vs, ws) => for (v <- vs; w <- ws) yield (Some(v), Some(w))
534+
case (vs, Seq()) => vs.iterator.map(v => (Some(v), None))
535+
case (Seq(), ws) => ws.iterator.map(w => (None, Some(w)))
536+
case (vs, ws) => for (v <- vs.iterator; w <- ws.iterator) yield (Some(v), Some(w))
537537
}
538538
}
539539

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1186,7 +1186,13 @@ abstract class RDD[T: ClassTag](
11861186
// same bytecodes for `saveAsTextFile`.
11871187
val nullWritableClassTag = implicitly[ClassTag[NullWritable]]
11881188
val textClassTag = implicitly[ClassTag[Text]]
1189-
val r = this.map(x => (NullWritable.get(), new Text(x.toString)))
1189+
val r = this.mapPartitions { iter =>
1190+
val text = new Text()
1191+
iter.map { x =>
1192+
text.set(x.toString)
1193+
(NullWritable.get(), text)
1194+
}
1195+
}
11901196
RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)
11911197
.saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
11921198
}
@@ -1198,7 +1204,13 @@ abstract class RDD[T: ClassTag](
11981204
// https://issues.apache.org/jira/browse/SPARK-2075
11991205
val nullWritableClassTag = implicitly[ClassTag[NullWritable]]
12001206
val textClassTag = implicitly[ClassTag[Text]]
1201-
val r = this.map(x => (NullWritable.get(), new Text(x.toString)))
1207+
val r = this.mapPartitions { iter =>
1208+
val text = new Text()
1209+
iter.map { x =>
1210+
text.set(x.toString)
1211+
(NullWritable.get(), text)
1212+
}
1213+
}
12021214
RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)
12031215
.saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path, codec)
12041216
}

core/src/main/scala/org/apache/spark/ui/UIUtils.scala

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -194,9 +194,12 @@ private[spark] object UIUtils extends Logging {
194194
<body>
195195
<div class="navbar navbar-static-top">
196196
<div class="navbar-inner">
197-
<a href={prependBaseUri("/")} class="brand">
198-
<img src={prependBaseUri("/static/spark-logo-77x50px-hd.png")} />
199-
</a>
197+
<div class="brand">
198+
<a href={prependBaseUri("/")} class="brand">
199+
<img src={prependBaseUri("/static/spark-logo-77x50px-hd.png")} />
200+
<span class="version">{org.apache.spark.SPARK_VERSION}</span>
201+
</a>
202+
</div>
200203
<ul class="nav">{header}</ul>
201204
<p class="navbar-text pull-right">
202205
<strong title={appName}>{shortAppName}</strong> application UI
@@ -214,11 +217,6 @@ private[spark] object UIUtils extends Logging {
214217
</div>
215218
{content}
216219
</div>
217-
<div id="footer">
218-
<div class="container-fluid">
219-
<p class="muted credit">Spark {org.apache.spark.SPARK_VERSION}</p>
220-
</div>
221-
</div>
222220
</body>
223221
</html>
224222
}
@@ -245,11 +243,6 @@ private[spark] object UIUtils extends Logging {
245243
</div>
246244
{content}
247245
</div>
248-
<div id="footer">
249-
<div class="container-fluid">
250-
<p class="muted credit">Spark {org.apache.spark.SPARK_VERSION}</p>
251-
</div>
252-
</div>
253246
</body>
254247
</html>
255248
}

core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,4 +85,10 @@ class CompressionCodecSuite extends FunSuite {
8585
assert(codec.getClass === classOf[SnappyCompressionCodec])
8686
testCodec(codec)
8787
}
88+
89+
test("bad compression codec") {
90+
intercept[IllegalArgumentException] {
91+
CompressionCodec.createCodec(conf, "foobar")
92+
}
93+
}
8894
}

docs/configuration.md

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -852,6 +852,41 @@ Apart from these, the following properties are also available, and may be useful
852852
between nodes leading to flooding the network with those.
853853
</td>
854854
</tr>
855+
<tr>
856+
<td><code>spark.shuffle.io.preferDirectBufs</code></td>
857+
<td>true</td>
858+
<td>
859+
(Netty only) Off-heap buffers are used to reduce garbage collection during shuffle and cache
860+
block transfer. For environments where off-heap memory is tightly limited, users may wish to
861+
turn this off to force all allocations from Netty to be on-heap.
862+
</td>
863+
</tr>
864+
<tr>
865+
<td><code>spark.shuffle.io.numConnectionsPerPeer</code></td>
866+
<td>1</td>
867+
<td>
868+
(Netty only) Connections between hosts are reused in order to reduce connection buildup for
869+
large clusters. For clusters with many hard disks and few hosts, this may result in insufficient
870+
concurrency to saturate all disks, and so users may consider increasing this value.
871+
</td>
872+
</tr>
873+
<tr>
874+
<td><code>spark.shuffle.io.maxRetries</code></td>
875+
<td>3</td>
876+
<td>
877+
(Netty only) Fetches that fail due to IO-related exceptions are automatically retried if this is
878+
set to a non-zero value. This retry logic helps stabilize large shuffles in the face of long GC
879+
pauses or transient network connectivity issues.
880+
</td>
881+
</tr>
882+
<tr>
883+
<td><code>spark.shuffle.io.retryWait</code></td>
884+
<td>5</td>
885+
<td>
886+
(Netty only) Seconds to wait between retries of fetches. The maximum delay caused by retrying
887+
is simply <code>maxRetries * retryWait</code>, by default 15 seconds.
888+
</td>
889+
</tr>
855890
</table>
856891

857892
#### Scheduling

docs/job-scheduling.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ pre-packaged distribution.
9898
2. Add this jar to the classpath of all `NodeManager`s in your cluster.
9999
3. In the `yarn-site.xml` on each node, add `spark_shuffle` to `yarn.nodemanager.aux-services`,
100100
then set `yarn.nodemanager.aux-services.spark_shuffle.class` to
101-
`org.apache.spark.yarn.network.YarnShuffleService`. Additionally, set all relevant
101+
`org.apache.spark.network.yarn.YarnShuffleService`. Additionally, set all relevant
102102
`spark.shuffle.service.*` [configurations](configuration.html).
103103
4. Restart all `NodeManager`s in your cluster.
104104

0 commit comments

Comments
 (0)