Skip to content

Commit 65e4ebc

Browse files
committed
Merge remote-tracking branch 'upstream/master' into mllib-stats-api-check
2 parents 8195c78 + 482c5af commit 65e4ebc

File tree

51 files changed

+835
-318
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+835
-318
lines changed

bin/pyspark

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,18 @@ FWDIR="$(cd `dirname $0`/..; pwd)"
2323
# Export this as SPARK_HOME
2424
export SPARK_HOME="$FWDIR"
2525

26+
source $FWDIR/bin/utils.sh
27+
2628
SCALA_VERSION=2.10
2729

28-
if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
30+
function usage() {
2931
echo "Usage: ./bin/pyspark [options]" 1>&2
3032
$FWDIR/bin/spark-submit --help 2>&1 | grep -v Usage 1>&2
3133
exit 0
34+
}
35+
36+
if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
37+
usage
3238
fi
3339

3440
# Exit if the user hasn't compiled Spark
@@ -66,10 +72,11 @@ fi
6672
# Build up arguments list manually to preserve quotes and backslashes.
6773
# We export Spark submit arguments as an environment variable because shell.py must run as a
6874
# PYTHONSTARTUP script, which does not take in arguments. This is required for IPython notebooks.
69-
75+
SUBMIT_USAGE_FUNCTION=usage
76+
gatherSparkSubmitOpts "$@"
7077
PYSPARK_SUBMIT_ARGS=""
7178
whitespace="[[:space:]]"
72-
for i in "$@"; do
79+
for i in "${SUBMISSION_OPTS[@]}"; do
7380
if [[ $i =~ \" ]]; then i=$(echo $i | sed 's/\"/\\\"/g'); fi
7481
if [[ $i =~ $whitespace ]]; then i=\"$i\"; fi
7582
PYSPARK_SUBMIT_ARGS="$PYSPARK_SUBMIT_ARGS $i"
@@ -90,7 +97,10 @@ fi
9097
if [[ "$1" =~ \.py$ ]]; then
9198
echo -e "\nWARNING: Running python applications through ./bin/pyspark is deprecated as of Spark 1.0." 1>&2
9299
echo -e "Use ./bin/spark-submit <python file>\n" 1>&2
93-
exec $FWDIR/bin/spark-submit "$@"
100+
primary=$1
101+
shift
102+
gatherSparkSubmitOpts "$@"
103+
exec $FWDIR/bin/spark-submit "${SUBMISSION_OPTS[@]}" $primary "${APPLICATION_OPTS[@]}"
94104
else
95105
# Only use ipython if no command line arguments were provided [SPARK-1134]
96106
if [[ "$IPYTHON" = "1" ]]; then

bin/spark-shell

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,21 @@ set -o posix
3131
## Global script variables
3232
FWDIR="$(cd `dirname $0`/..; pwd)"
3333

34+
function usage() {
35+
echo "Usage: ./bin/spark-shell [options]"
36+
$FWDIR/bin/spark-submit --help 2>&1 | grep -v Usage 1>&2
37+
exit 0
38+
}
39+
3440
if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
35-
echo "Usage: ./bin/spark-shell [options]"
36-
$FWDIR/bin/spark-submit --help 2>&1 | grep -v Usage 1>&2
37-
exit 0
41+
usage
3842
fi
3943

40-
function main(){
44+
source $FWDIR/bin/utils.sh
45+
SUBMIT_USAGE_FUNCTION=usage
46+
gatherSparkSubmitOpts "$@"
47+
48+
function main() {
4149
if $cygwin; then
4250
# Workaround for issue involving JLine and Cygwin
4351
# (see http://sourceforge.net/p/jline/bugs/40/).
@@ -46,11 +54,11 @@ function main(){
4654
# (see https://github.com/sbt/sbt/issues/562).
4755
stty -icanon min 1 -echo > /dev/null 2>&1
4856
export SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Djline.terminal=unix"
49-
$FWDIR/bin/spark-submit --class org.apache.spark.repl.Main spark-shell "$@"
57+
$FWDIR/bin/spark-submit --class org.apache.spark.repl.Main "${SUBMISSION_OPTS[@]}" spark-shell "${APPLICATION_OPTS[@]}"
5058
stty icanon echo > /dev/null 2>&1
5159
else
5260
export SPARK_SUBMIT_OPTS
53-
$FWDIR/bin/spark-submit --class org.apache.spark.repl.Main spark-shell "$@"
61+
$FWDIR/bin/spark-submit --class org.apache.spark.repl.Main "${SUBMISSION_OPTS[@]}" spark-shell "${APPLICATION_OPTS[@]}"
5462
fi
5563
}
5664

bin/utils.sh

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
#!/usr/bin/env bash
2+
3+
#
4+
# Licensed to the Apache Software Foundation (ASF) under one or more
5+
# contributor license agreements. See the NOTICE file distributed with
6+
# this work for additional information regarding copyright ownership.
7+
# The ASF licenses this file to You under the Apache License, Version 2.0
8+
# (the "License"); you may not use this file except in compliance with
9+
# the License. You may obtain a copy of the License at
10+
#
11+
# http://www.apache.org/licenses/LICENSE-2.0
12+
#
13+
# Unless required by applicable law or agreed to in writing, software
14+
# distributed under the License is distributed on an "AS IS" BASIS,
15+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
# See the License for the specific language governing permissions and
17+
# limitations under the License.
18+
#
19+
20+
# Gather all all spark-submit options into SUBMISSION_OPTS
21+
function gatherSparkSubmitOpts() {
22+
23+
if [ -z "$SUBMIT_USAGE_FUNCTION" ]; then
24+
echo "Function for printing usage of $0 is not set." 1>&2
25+
echo "Please set usage function to shell variable 'SUBMIT_USAGE_FUNCTION' in $0" 1>&2
26+
exit 1
27+
fi
28+
29+
# NOTE: If you add or remove spark-sumbmit options,
30+
# modify NOT ONLY this script but also SparkSubmitArgument.scala
31+
SUBMISSION_OPTS=()
32+
APPLICATION_OPTS=()
33+
while (($#)); do
34+
case "$1" in
35+
--master | --deploy-mode | --class | --name | --jars | --py-files | --files | \
36+
--conf | --properties-file | --driver-memory | --driver-java-options | \
37+
--driver-library-path | --driver-class-path | --executor-memory | --driver-cores | \
38+
--total-executor-cores | --executor-cores | --queue | --num-executors | --archives)
39+
if [[ $# -lt 2 ]]; then
40+
"$SUBMIT_USAGE_FUNCTION"
41+
exit 1;
42+
fi
43+
SUBMISSION_OPTS+=("$1"); shift
44+
SUBMISSION_OPTS+=("$1"); shift
45+
;;
46+
47+
--verbose | -v | --supervise)
48+
SUBMISSION_OPTS+=("$1"); shift
49+
;;
50+
51+
*)
52+
APPLICATION_OPTS+=("$1"); shift
53+
;;
54+
esac
55+
done
56+
57+
export SUBMISSION_OPTS
58+
export APPLICATION_OPTS
59+
}

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -156,11 +156,9 @@ object SparkEnv extends Logging {
156156
conf.set("spark.driver.port", boundPort.toString)
157157
}
158158

159-
// Create an instance of the class named by the given Java system property, or by
160-
// defaultClassName if the property is not set, and return it as a T
161-
def instantiateClass[T](propertyName: String, defaultClassName: String): T = {
162-
val name = conf.get(propertyName, defaultClassName)
163-
val cls = Class.forName(name, true, Utils.getContextOrSparkClassLoader)
159+
// Create an instance of the class with the given name, possibly initializing it with our conf
160+
def instantiateClass[T](className: String): T = {
161+
val cls = Class.forName(className, true, Utils.getContextOrSparkClassLoader)
164162
// Look for a constructor taking a SparkConf and a boolean isDriver, then one taking just
165163
// SparkConf, then one taking no arguments
166164
try {
@@ -178,11 +176,17 @@ object SparkEnv extends Logging {
178176
}
179177
}
180178

181-
val serializer = instantiateClass[Serializer](
179+
// Create an instance of the class named by the given SparkConf property, or defaultClassName
180+
// if the property is not set, possibly initializing it with our conf
181+
def instantiateClassFromConf[T](propertyName: String, defaultClassName: String): T = {
182+
instantiateClass[T](conf.get(propertyName, defaultClassName))
183+
}
184+
185+
val serializer = instantiateClassFromConf[Serializer](
182186
"spark.serializer", "org.apache.spark.serializer.JavaSerializer")
183187
logDebug(s"Using serializer: ${serializer.getClass}")
184188

185-
val closureSerializer = instantiateClass[Serializer](
189+
val closureSerializer = instantiateClassFromConf[Serializer](
186190
"spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer")
187191

188192
def registerOrLookup(name: String, newActor: => Actor): ActorRef = {
@@ -246,8 +250,13 @@ object SparkEnv extends Logging {
246250
"."
247251
}
248252

249-
val shuffleManager = instantiateClass[ShuffleManager](
250-
"spark.shuffle.manager", "org.apache.spark.shuffle.hash.HashShuffleManager")
253+
// Let the user specify short names for shuffle managers
254+
val shortShuffleMgrNames = Map(
255+
"hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
256+
"sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
257+
val shuffleMgrName = conf.get("spark.shuffle.manager", "hash")
258+
val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
259+
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
251260

252261
val shuffleMemoryManager = new ShuffleMemoryManager(conf)
253262

core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,15 @@
1717

1818
package org.apache.spark.broadcast
1919

20-
import java.io.{ByteArrayInputStream, ObjectInputStream, ObjectOutputStream}
20+
import java.io.{ByteArrayOutputStream, ByteArrayInputStream, InputStream,
21+
ObjectInputStream, ObjectOutputStream, OutputStream}
2122

2223
import scala.reflect.ClassTag
2324
import scala.util.Random
2425

2526
import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException}
27+
import org.apache.spark.io.CompressionCodec
2628
import org.apache.spark.storage.{BroadcastBlockId, StorageLevel}
27-
import org.apache.spark.util.Utils
2829

2930
/**
3031
* A [[org.apache.spark.broadcast.Broadcast]] implementation that uses a BitTorrent-like
@@ -214,11 +215,15 @@ private[broadcast] object TorrentBroadcast extends Logging {
214215
private lazy val BLOCK_SIZE = conf.getInt("spark.broadcast.blockSize", 4096) * 1024
215216
private var initialized = false
216217
private var conf: SparkConf = null
218+
private var compress: Boolean = false
219+
private var compressionCodec: CompressionCodec = null
217220

218221
def initialize(_isDriver: Boolean, conf: SparkConf) {
219222
TorrentBroadcast.conf = conf // TODO: we might have to fix it in tests
220223
synchronized {
221224
if (!initialized) {
225+
compress = conf.getBoolean("spark.broadcast.compress", true)
226+
compressionCodec = CompressionCodec.createCodec(conf)
222227
initialized = true
223228
}
224229
}
@@ -228,8 +233,13 @@ private[broadcast] object TorrentBroadcast extends Logging {
228233
initialized = false
229234
}
230235

231-
def blockifyObject[T](obj: T): TorrentInfo = {
232-
val byteArray = Utils.serialize[T](obj)
236+
def blockifyObject[T: ClassTag](obj: T): TorrentInfo = {
237+
val bos = new ByteArrayOutputStream()
238+
val out: OutputStream = if (compress) compressionCodec.compressedOutputStream(bos) else bos
239+
val ser = SparkEnv.get.serializer.newInstance()
240+
val serOut = ser.serializeStream(out)
241+
serOut.writeObject[T](obj).close()
242+
val byteArray = bos.toByteArray
233243
val bais = new ByteArrayInputStream(byteArray)
234244

235245
var blockNum = byteArray.length / BLOCK_SIZE
@@ -255,7 +265,7 @@ private[broadcast] object TorrentBroadcast extends Logging {
255265
info
256266
}
257267

258-
def unBlockifyObject[T](
268+
def unBlockifyObject[T: ClassTag](
259269
arrayOfBlocks: Array[TorrentBlock],
260270
totalBytes: Int,
261271
totalBlocks: Int): T = {
@@ -264,7 +274,16 @@ private[broadcast] object TorrentBroadcast extends Logging {
264274
System.arraycopy(arrayOfBlocks(i).byteArray, 0, retByteArray,
265275
i * BLOCK_SIZE, arrayOfBlocks(i).byteArray.length)
266276
}
267-
Utils.deserialize[T](retByteArray, Thread.currentThread.getContextClassLoader)
277+
278+
val in: InputStream = {
279+
val arrIn = new ByteArrayInputStream(retByteArray)
280+
if (compress) compressionCodec.compressedInputStream(arrIn) else arrIn
281+
}
282+
val ser = SparkEnv.get.serializer.newInstance()
283+
val serIn = ser.deserializeStream(in)
284+
val obj = serIn.readObject[T]()
285+
serIn.close()
286+
obj
268287
}
269288

270289
/**

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,12 +219,15 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
219219

220220
/** Fill in values by parsing user options. */
221221
private def parseOpts(opts: Seq[String]): Unit = {
222-
var inSparkOpts = true
223222
val EQ_SEPARATED_OPT="""(--[^=]+)=(.+)""".r
224223

225224
// Delineates parsing of Spark options from parsing of user options.
226225
parse(opts)
227226

227+
/**
228+
* NOTE: If you add or remove spark-submit options,
229+
* modify NOT ONLY this file but also utils.sh
230+
*/
228231
def parse(opts: Seq[String]): Unit = opts match {
229232
case ("--name") :: value :: tail =>
230233
name = value

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ private[spark] class Worker(
136136
logInfo("Spark home: " + sparkHome)
137137
createWorkDir()
138138
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
139-
webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
139+
webUi = new WorkerWebUI(this, workDir, webUiPort)
140140
webUi.bind()
141141
registerWithMaster()
142142

@@ -373,7 +373,8 @@ private[spark] class Worker(
373373
private[spark] object Worker extends Logging {
374374
def main(argStrings: Array[String]) {
375375
SignalLogger.register(log)
376-
val args = new WorkerArguments(argStrings)
376+
val conf = new SparkConf
377+
val args = new WorkerArguments(argStrings, conf)
377378
val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores,
378379
args.memory, args.masters, args.workDir)
379380
actorSystem.awaitTermination()

core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,12 @@ package org.apache.spark.deploy.worker
2020
import java.lang.management.ManagementFactory
2121

2222
import org.apache.spark.util.{IntParam, MemoryParam, Utils}
23+
import org.apache.spark.SparkConf
2324

2425
/**
2526
* Command-line parser for the worker.
2627
*/
27-
private[spark] class WorkerArguments(args: Array[String]) {
28+
private[spark] class WorkerArguments(args: Array[String], conf: SparkConf) {
2829
var host = Utils.localHostName()
2930
var port = 0
3031
var webUiPort = 8081
@@ -46,6 +47,9 @@ private[spark] class WorkerArguments(args: Array[String]) {
4647
if (System.getenv("SPARK_WORKER_WEBUI_PORT") != null) {
4748
webUiPort = System.getenv("SPARK_WORKER_WEBUI_PORT").toInt
4849
}
50+
if (conf.contains("spark.worker.ui.port")) {
51+
webUiPort = conf.get("spark.worker.ui.port").toInt
52+
}
4953
if (System.getenv("SPARK_WORKER_DIR") != null) {
5054
workDir = System.getenv("SPARK_WORKER_DIR")
5155
}

core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ private[spark]
3434
class WorkerWebUI(
3535
val worker: Worker,
3636
val workDir: File,
37-
port: Option[Int] = None)
38-
extends WebUI(worker.securityMgr, getUIPort(port, worker.conf), worker.conf, name = "WorkerUI")
37+
requestedPort: Int)
38+
extends WebUI(worker.securityMgr, requestedPort, worker.conf, name = "WorkerUI")
3939
with Logging {
4040

4141
val timeout = AkkaUtils.askTimeout(worker.conf)
@@ -55,10 +55,5 @@ class WorkerWebUI(
5555
}
5656

5757
private[spark] object WorkerWebUI {
58-
val DEFAULT_PORT = 8081
5958
val STATIC_RESOURCE_BASE = SparkUI.STATIC_RESOURCE_DIR
60-
61-
def getUIPort(requestedPort: Option[Int], conf: SparkConf): Int = {
62-
requestedPort.getOrElse(conf.getInt("spark.worker.ui.port", WorkerWebUI.DEFAULT_PORT))
63-
}
6459
}

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,7 @@ private[spark] class Executor(
374374
for (taskRunner <- runningTasks.values()) {
375375
if (!taskRunner.attemptedTask.isEmpty) {
376376
Option(taskRunner.task).flatMap(_.metrics).foreach { metrics =>
377+
metrics.updateShuffleReadMetrics
377378
tasksMetrics += ((taskRunner.taskId, metrics))
378379
}
379380
}

0 commit comments

Comments
 (0)