Skip to content

Commit a6d5c50

Browse files
author
Marcelo Vanzin
committed
Merge branch 'master' into hist-server-single-log
Conflicts: core/src/main/scala/org/apache/spark/util/FileLogger.scala core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala core/src/test/scala/org/apache/spark/util/FileLoggerSuite.scala
2 parents 16fd491 + 4c589ca commit a6d5c50

File tree

299 files changed

+5780
-1406
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

299 files changed

+5780
-1406
lines changed

bin/pyspark

Lines changed: 38 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -50,22 +50,47 @@ fi
5050

5151
. "$FWDIR"/bin/load-spark-env.sh
5252

53-
# Figure out which Python executable to use
53+
# In Spark <= 1.1, setting IPYTHON=1 would cause the driver to be launched using the `ipython`
54+
# executable, while the worker would still be launched using PYSPARK_PYTHON.
55+
#
56+
# In Spark 1.2, we removed the documentation of the IPYTHON and IPYTHON_OPTS variables and added
57+
# PYSPARK_DRIVER_PYTHON and PYSPARK_DRIVER_PYTHON_OPTS to allow IPython to be used for the driver.
58+
# Now, users can simply set PYSPARK_DRIVER_PYTHON=ipython to use IPython and set
59+
# PYSPARK_DRIVER_PYTHON_OPTS to pass options when starting the Python driver
60+
# (e.g. PYSPARK_DRIVER_PYTHON_OPTS='notebook'). This supports full customization of the IPython
61+
# and executor Python executables.
62+
#
63+
# For backwards-compatibility, we retain the old IPYTHON and IPYTHON_OPTS variables.
64+
65+
# Determine the Python executable to use if PYSPARK_PYTHON or PYSPARK_DRIVER_PYTHON isn't set:
66+
if hash python2.7 2>/dev/null; then
67+
# Attempt to use Python 2.7, if installed:
68+
DEFAULT_PYTHON="python2.7"
69+
else
70+
DEFAULT_PYTHON="python"
71+
fi
72+
73+
# Determine the Python executable to use for the driver:
74+
if [[ -n "$IPYTHON_OPTS" || "$IPYTHON" == "1" ]]; then
75+
# If IPython options are specified, assume user wants to run IPython
76+
# (for backwards-compatibility)
77+
PYSPARK_DRIVER_PYTHON_OPTS="$PYSPARK_DRIVER_PYTHON_OPTS $IPYTHON_OPTS"
78+
PYSPARK_DRIVER_PYTHON="ipython"
79+
elif [[ -z "$PYSPARK_DRIVER_PYTHON" ]]; then
80+
PYSPARK_DRIVER_PYTHON="${PYSPARK_PYTHON:-"$DEFAULT_PYTHON"}"
81+
fi
82+
83+
# Determine the Python executable to use for the executors:
5484
if [[ -z "$PYSPARK_PYTHON" ]]; then
55-
if [[ "$IPYTHON" = "1" || -n "$IPYTHON_OPTS" ]]; then
56-
# for backward compatibility
57-
PYSPARK_PYTHON="ipython"
85+
if [[ $PYSPARK_DRIVER_PYTHON == *ipython* && $DEFAULT_PYTHON != "python2.7" ]]; then
86+
echo "IPython requires Python 2.7+; please install python2.7 or set PYSPARK_PYTHON" 1>&2
87+
exit 1
5888
else
59-
PYSPARK_PYTHON="python"
89+
PYSPARK_PYTHON="$DEFAULT_PYTHON"
6090
fi
6191
fi
6292
export PYSPARK_PYTHON
6393

64-
if [[ -z "$PYSPARK_PYTHON_OPTS" && -n "$IPYTHON_OPTS" ]]; then
65-
# for backward compatibility
66-
PYSPARK_PYTHON_OPTS="$IPYTHON_OPTS"
67-
fi
68-
6994
# Add the PySpark classes to the Python path:
7095
export PYTHONPATH="$SPARK_HOME/python/:$PYTHONPATH"
7196
export PYTHONPATH="$SPARK_HOME/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH"
@@ -93,9 +118,9 @@ if [[ -n "$SPARK_TESTING" ]]; then
93118
unset YARN_CONF_DIR
94119
unset HADOOP_CONF_DIR
95120
if [[ -n "$PYSPARK_DOC_TEST" ]]; then
96-
exec "$PYSPARK_PYTHON" -m doctest $1
121+
exec "$PYSPARK_DRIVER_PYTHON" -m doctest $1
97122
else
98-
exec "$PYSPARK_PYTHON" $1
123+
exec "$PYSPARK_DRIVER_PYTHON" $1
99124
fi
100125
exit
101126
fi
@@ -111,5 +136,5 @@ if [[ "$1" =~ \.py$ ]]; then
111136
else
112137
# PySpark shell requires special handling downstream
113138
export PYSPARK_SHELL=1
114-
exec "$PYSPARK_PYTHON" $PYSPARK_PYTHON_OPTS
139+
exec "$PYSPARK_DRIVER_PYTHON" $PYSPARK_DRIVER_PYTHON_OPTS
115140
fi

bin/spark-class

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ else
105105
exit 1
106106
fi
107107
fi
108-
JAVA_VERSION=$("$RUNNER" -version 2>&1 | sed 's/.* version "\(.*\)\.\(.*\)\..*"/\1\2/; 1q')
108+
JAVA_VERSION=$("$RUNNER" -version 2>&1 | grep 'version' | sed 's/.* version "\(.*\)\.\(.*\)\..*"/\1\2/; 1q')
109109

110110
# Set JAVA_OPTS to be able to load native libraries and to set heap size
111111
if [ "$JAVA_VERSION" -ge 18 ]; then

bin/spark-shell.cmd

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ rem See the License for the specific language governing permissions and
1717
rem limitations under the License.
1818
rem
1919

20-
set SPARK_HOME=%~dp0..
20+
rem This is the entry point for running Spark shell. To avoid polluting the
21+
rem environment, it just launches a new cmd to do the real work.
2122

22-
cmd /V /E /C %SPARK_HOME%\bin\spark-submit.cmd --class org.apache.spark.repl.Main %* spark-shell
23+
cmd /V /E /C %~dp0spark-shell2.cmd %*

bin/spark-shell2.cmd

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
@echo off
2+
3+
rem
4+
rem Licensed to the Apache Software Foundation (ASF) under one or more
5+
rem contributor license agreements. See the NOTICE file distributed with
6+
rem this work for additional information regarding copyright ownership.
7+
rem The ASF licenses this file to You under the Apache License, Version 2.0
8+
rem (the "License"); you may not use this file except in compliance with
9+
rem the License. You may obtain a copy of the License at
10+
rem
11+
rem http://www.apache.org/licenses/LICENSE-2.0
12+
rem
13+
rem Unless required by applicable law or agreed to in writing, software
14+
rem distributed under the License is distributed on an "AS IS" BASIS,
15+
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
rem See the License for the specific language governing permissions and
17+
rem limitations under the License.
18+
rem
19+
20+
set SPARK_HOME=%~dp0..
21+
22+
cmd /V /E /C %SPARK_HOME%\bin\spark-submit.cmd --class org.apache.spark.repl.Main %* spark-shell

bin/spark-submit.cmd

Lines changed: 3 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -17,52 +17,7 @@ rem See the License for the specific language governing permissions and
1717
rem limitations under the License.
1818
rem
1919

20-
rem NOTE: Any changes in this file must be reflected in SparkSubmitDriverBootstrapper.scala!
20+
rem This is the entry point for running Spark submit. To avoid polluting the
21+
rem environment, it just launches a new cmd to do the real work.
2122

22-
set SPARK_HOME=%~dp0..
23-
set ORIG_ARGS=%*
24-
25-
rem Reset the values of all variables used
26-
set SPARK_SUBMIT_DEPLOY_MODE=client
27-
set SPARK_SUBMIT_PROPERTIES_FILE=%SPARK_HOME%\conf\spark-defaults.conf
28-
set SPARK_SUBMIT_DRIVER_MEMORY=
29-
set SPARK_SUBMIT_LIBRARY_PATH=
30-
set SPARK_SUBMIT_CLASSPATH=
31-
set SPARK_SUBMIT_OPTS=
32-
set SPARK_SUBMIT_BOOTSTRAP_DRIVER=
33-
34-
:loop
35-
if [%1] == [] goto continue
36-
if [%1] == [--deploy-mode] (
37-
set SPARK_SUBMIT_DEPLOY_MODE=%2
38-
) else if [%1] == [--properties-file] (
39-
set SPARK_SUBMIT_PROPERTIES_FILE=%2
40-
) else if [%1] == [--driver-memory] (
41-
set SPARK_SUBMIT_DRIVER_MEMORY=%2
42-
) else if [%1] == [--driver-library-path] (
43-
set SPARK_SUBMIT_LIBRARY_PATH=%2
44-
) else if [%1] == [--driver-class-path] (
45-
set SPARK_SUBMIT_CLASSPATH=%2
46-
) else if [%1] == [--driver-java-options] (
47-
set SPARK_SUBMIT_OPTS=%2
48-
)
49-
shift
50-
goto loop
51-
:continue
52-
53-
rem For client mode, the driver will be launched in the same JVM that launches
54-
rem SparkSubmit, so we may need to read the properties file for any extra class
55-
rem paths, library paths, java options and memory early on. Otherwise, it will
56-
rem be too late by the time the driver JVM has started.
57-
58-
if [%SPARK_SUBMIT_DEPLOY_MODE%] == [client] (
59-
if exist %SPARK_SUBMIT_PROPERTIES_FILE% (
60-
rem Parse the properties file only if the special configs exist
61-
for /f %%i in ('findstr /r /c:"^[\t ]*spark.driver.memory" /c:"^[\t ]*spark.driver.extra" ^
62-
%SPARK_SUBMIT_PROPERTIES_FILE%') do (
63-
set SPARK_SUBMIT_BOOTSTRAP_DRIVER=1
64-
)
65-
)
66-
)
67-
68-
cmd /V /E /C %SPARK_HOME%\bin\spark-class.cmd org.apache.spark.deploy.SparkSubmit %ORIG_ARGS%
23+
cmd /V /E /C %~dp0spark-submit2.cmd %*

bin/spark-submit2.cmd

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
@echo off
2+
3+
rem
4+
rem Licensed to the Apache Software Foundation (ASF) under one or more
5+
rem contributor license agreements. See the NOTICE file distributed with
6+
rem this work for additional information regarding copyright ownership.
7+
rem The ASF licenses this file to You under the Apache License, Version 2.0
8+
rem (the "License"); you may not use this file except in compliance with
9+
rem the License. You may obtain a copy of the License at
10+
rem
11+
rem http://www.apache.org/licenses/LICENSE-2.0
12+
rem
13+
rem Unless required by applicable law or agreed to in writing, software
14+
rem distributed under the License is distributed on an "AS IS" BASIS,
15+
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
rem See the License for the specific language governing permissions and
17+
rem limitations under the License.
18+
rem
19+
20+
rem NOTE: Any changes in this file must be reflected in SparkSubmitDriverBootstrapper.scala!
21+
22+
set SPARK_HOME=%~dp0..
23+
set ORIG_ARGS=%*
24+
25+
rem Reset the values of all variables used
26+
set SPARK_SUBMIT_DEPLOY_MODE=client
27+
set SPARK_SUBMIT_PROPERTIES_FILE=%SPARK_HOME%\conf\spark-defaults.conf
28+
set SPARK_SUBMIT_DRIVER_MEMORY=
29+
set SPARK_SUBMIT_LIBRARY_PATH=
30+
set SPARK_SUBMIT_CLASSPATH=
31+
set SPARK_SUBMIT_OPTS=
32+
set SPARK_SUBMIT_BOOTSTRAP_DRIVER=
33+
34+
:loop
35+
if [%1] == [] goto continue
36+
if [%1] == [--deploy-mode] (
37+
set SPARK_SUBMIT_DEPLOY_MODE=%2
38+
) else if [%1] == [--properties-file] (
39+
set SPARK_SUBMIT_PROPERTIES_FILE=%2
40+
) else if [%1] == [--driver-memory] (
41+
set SPARK_SUBMIT_DRIVER_MEMORY=%2
42+
) else if [%1] == [--driver-library-path] (
43+
set SPARK_SUBMIT_LIBRARY_PATH=%2
44+
) else if [%1] == [--driver-class-path] (
45+
set SPARK_SUBMIT_CLASSPATH=%2
46+
) else if [%1] == [--driver-java-options] (
47+
set SPARK_SUBMIT_OPTS=%2
48+
)
49+
shift
50+
goto loop
51+
:continue
52+
53+
rem For client mode, the driver will be launched in the same JVM that launches
54+
rem SparkSubmit, so we may need to read the properties file for any extra class
55+
rem paths, library paths, java options and memory early on. Otherwise, it will
56+
rem be too late by the time the driver JVM has started.
57+
58+
if [%SPARK_SUBMIT_DEPLOY_MODE%] == [client] (
59+
if exist %SPARK_SUBMIT_PROPERTIES_FILE% (
60+
rem Parse the properties file only if the special configs exist
61+
for /f %%i in ('findstr /r /c:"^[\t ]*spark.driver.memory" /c:"^[\t ]*spark.driver.extra" ^
62+
%SPARK_SUBMIT_PROPERTIES_FILE%') do (
63+
set SPARK_SUBMIT_BOOTSTRAP_DRIVER=1
64+
)
65+
)
66+
)
67+
68+
cmd /V /E /C %SPARK_HOME%\bin\spark-class.cmd org.apache.spark.deploy.SparkSubmit %ORIG_ARGS%

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import scala.language.implicitConversions
2121

2222
import java.io._
2323
import java.net.URI
24+
import java.util.Arrays
2425
import java.util.concurrent.atomic.AtomicInteger
2526
import java.util.{Properties, UUID}
2627
import java.util.UUID.randomUUID
@@ -1429,7 +1430,10 @@ object SparkContext extends Logging {
14291430
simpleWritableConverter[Boolean, BooleanWritable](_.get)
14301431

14311432
implicit def bytesWritableConverter(): WritableConverter[Array[Byte]] = {
1432-
simpleWritableConverter[Array[Byte], BytesWritable](_.getBytes)
1433+
simpleWritableConverter[Array[Byte], BytesWritable](bw =>
1434+
// getBytes method returns array which is longer then data to be returned
1435+
Arrays.copyOfRange(bw.getBytes, 0, bw.getLength)
1436+
)
14331437
}
14341438

14351439
implicit def stringWritableConverter(): WritableConverter[String] =

core/src/main/scala/org/apache/spark/TestUtils.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ import scala.collection.JavaConversions._
2626
import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider}
2727
import com.google.common.io.Files
2828

29+
import org.apache.spark.util.Utils
30+
2931
/**
3032
* Utilities for tests. Included in main codebase since it's used by multiple
3133
* projects.
@@ -42,8 +44,7 @@ private[spark] object TestUtils {
4244
* in order to avoid interference between tests.
4345
*/
4446
def createJarWithClasses(classNames: Seq[String], value: String = ""): URL = {
45-
val tempDir = Files.createTempDir()
46-
tempDir.deleteOnExit()
47+
val tempDir = Utils.createTempDir()
4748
val files = for (name <- classNames) yield createCompiledClass(name, tempDir, value)
4849
val jarFile = new File(tempDir, "testJar-%s.jar".format(System.currentTimeMillis()))
4950
createJar(files, jarFile)

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@ import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collectio
2525
import scala.collection.JavaConversions._
2626
import scala.collection.mutable
2727
import scala.language.existentials
28-
import scala.reflect.ClassTag
29-
import scala.util.{Try, Success, Failure}
3028

3129
import net.razorvine.pickle.{Pickler, Unpickler}
3230

@@ -42,7 +40,7 @@ import org.apache.spark.rdd.RDD
4240
import org.apache.spark.util.Utils
4341

4442
private[spark] class PythonRDD(
45-
parent: RDD[_],
43+
@transient parent: RDD[_],
4644
command: Array[Byte],
4745
envVars: JMap[String, String],
4846
pythonIncludes: JList[String],
@@ -55,9 +53,9 @@ private[spark] class PythonRDD(
5553
val bufferSize = conf.getInt("spark.buffer.size", 65536)
5654
val reuse_worker = conf.getBoolean("spark.python.worker.reuse", true)
5755

58-
override def getPartitions = parent.partitions
56+
override def getPartitions = firstParent.partitions
5957

60-
override val partitioner = if (preservePartitoning) parent.partitioner else None
58+
override val partitioner = if (preservePartitoning) firstParent.partitioner else None
6159

6260
override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = {
6361
val startTime = System.currentTimeMillis
@@ -234,7 +232,7 @@ private[spark] class PythonRDD(
234232
dataOut.writeInt(command.length)
235233
dataOut.write(command)
236234
// Data values
237-
PythonRDD.writeIteratorToStream(parent.iterator(split, context), dataOut)
235+
PythonRDD.writeIteratorToStream(firstParent.iterator(split, context), dataOut)
238236
dataOut.writeInt(SpecialLengths.END_OF_DATA_SECTION)
239237
dataOut.flush()
240238
} catch {

core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,10 +108,12 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
108108
serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(Array(127, 0, 0, 1)))
109109

110110
// Create and start the worker
111-
val pb = new ProcessBuilder(Seq(pythonExec, "-u", "-m", "pyspark.worker"))
111+
val pb = new ProcessBuilder(Seq(pythonExec, "-m", "pyspark.worker"))
112112
val workerEnv = pb.environment()
113113
workerEnv.putAll(envVars)
114114
workerEnv.put("PYTHONPATH", pythonPath)
115+
// This is equivalent to setting the -u flag; we use it because ipython doesn't support -u:
116+
workerEnv.put("PYTHONUNBUFFERED", "YES")
115117
val worker = pb.start()
116118

117119
// Redirect worker stdout and stderr
@@ -149,10 +151,12 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
149151

150152
try {
151153
// Create and start the daemon
152-
val pb = new ProcessBuilder(Seq(pythonExec, "-u", "-m", "pyspark.daemon"))
154+
val pb = new ProcessBuilder(Seq(pythonExec, "-m", "pyspark.daemon"))
153155
val workerEnv = pb.environment()
154156
workerEnv.putAll(envVars)
155157
workerEnv.put("PYTHONPATH", pythonPath)
158+
// This is equivalent to setting the -u flag; we use it because ipython doesn't support -u:
159+
workerEnv.put("PYTHONUNBUFFERED", "YES")
156160
daemon = pb.start()
157161

158162
val in = new DataInputStream(daemon.getInputStream)

0 commit comments

Comments
 (0)