Skip to content

Commit 1cf7e69

Browse files
committed
Merge branch 'master' of git://git.apache.org/spark into SPARK-2976
2 parents d1e0666 + 8c5a222 commit 1cf7e69

File tree

329 files changed

+9785
-4030
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

329 files changed

+9785
-4030
lines changed

.rat-excludes

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ log4j-defaults.properties
2525
bootstrap-tooltip.js
2626
jquery-1.11.1.min.js
2727
sorttable.js
28+
.*avsc
2829
.*txt
2930
.*json
3031
.*data

.travis.yml

Lines changed: 0 additions & 32 deletions
This file was deleted.

README.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,15 @@ If your project is built with Maven, add this to your POM file's `<dependencies>
115115
</dependency>
116116

117117

118+
## A Note About Thrift JDBC server and CLI for Spark SQL
119+
120+
Spark SQL supports Thrift JDBC server and CLI.
121+
See sql-programming-guide.md for more information about those features.
122+
You can use those features by setting `-Phive-thriftserver` when building Spark as follows.
123+
124+
$ sbt/sbt -Phive-thriftserver assembly
125+
126+
118127
## Configuration
119128

120129
Please refer to the [Configuration guide](http://spark.apache.org/docs/latest/configuration.html)

bin/spark-shell.cmd

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,4 @@ rem
1919

2020
set SPARK_HOME=%~dp0..
2121

22-
cmd /V /E /C %SPARK_HOME%\bin\spark-submit.cmd spark-shell --class org.apache.spark.repl.Main %*
22+
cmd /V /E /C %SPARK_HOME%\bin\spark-submit.cmd --class org.apache.spark.repl.Main %* spark-shell

bin/spark-sql

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -65,30 +65,30 @@ while (($#)); do
6565
case $1 in
6666
-d | --define | --database | -f | -h | --hiveconf | --hivevar | -i | -p)
6767
ensure_arg_number $# 2
68-
CLI_ARGS+=($1); shift
69-
CLI_ARGS+=($1); shift
68+
CLI_ARGS+=("$1"); shift
69+
CLI_ARGS+=("$1"); shift
7070
;;
7171

7272
-e)
7373
ensure_arg_number $# 2
74-
CLI_ARGS+=($1); shift
75-
CLI_ARGS+=(\"$1\"); shift
74+
CLI_ARGS+=("$1"); shift
75+
CLI_ARGS+=("$1"); shift
7676
;;
7777

7878
-s | --silent)
79-
CLI_ARGS+=($1); shift
79+
CLI_ARGS+=("$1"); shift
8080
;;
8181

8282
-v | --verbose)
8383
# Both SparkSubmit and SparkSQLCLIDriver recognizes -v | --verbose
84-
CLI_ARGS+=($1)
85-
SUBMISSION_ARGS+=($1); shift
84+
CLI_ARGS+=("$1")
85+
SUBMISSION_ARGS+=("$1"); shift
8686
;;
8787

8888
*)
89-
SUBMISSION_ARGS+=($1); shift
89+
SUBMISSION_ARGS+=("$1"); shift
9090
;;
9191
esac
9292
done
9393

94-
eval exec "$FWDIR"/bin/spark-submit --class $CLASS ${SUBMISSION_ARGS[*]} spark-internal ${CLI_ARGS[*]}
94+
exec "$FWDIR"/bin/spark-submit --class $CLASS "${SUBMISSION_ARGS[@]}" spark-internal "${CLI_ARGS[@]}"

core/src/main/scala/org/apache/spark/ContextCleaner.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,15 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
6666

6767
/**
6868
* Whether the cleaning thread will block on cleanup tasks.
69-
* This is set to true only for tests.
69+
*
70+
* Due to SPARK-3015, this is set to true by default. This is intended to be only a temporary
71+
* workaround for the issue, which is ultimately caused by the way the BlockManager actors
72+
* issue inter-dependent blocking Akka messages to each other at high frequencies. This happens,
73+
* for instance, when the driver performs a GC and cleans up all broadcast blocks that are no
74+
* longer in scope.
7075
*/
7176
private val blockOnCleanupTasks = sc.conf.getBoolean(
72-
"spark.cleaner.referenceTracking.blocking", false)
77+
"spark.cleaner.referenceTracking.blocking", true)
7378

7479
@volatile private var stopped = false
7580

@@ -174,9 +179,6 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
174179
private def blockManagerMaster = sc.env.blockManager.master
175180
private def broadcastManager = sc.env.broadcastManager
176181
private def mapOutputTrackerMaster = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
177-
178-
// Used for testing. These methods explicitly blocks until cleanup is completed
179-
// to ensure that more reliable testing.
180182
}
181183

182184
private object ContextCleaner {

core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import akka.actor.Actor
2121
import org.apache.spark.executor.TaskMetrics
2222
import org.apache.spark.storage.BlockManagerId
2323
import org.apache.spark.scheduler.TaskScheduler
24+
import org.apache.spark.util.ActorLogReceive
2425

2526
/**
2627
* A heartbeat from executors to the driver. This is a shared message used by several internal
@@ -36,8 +37,10 @@ private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean)
3637
/**
3738
* Lives in the driver to receive heartbeats from executors..
3839
*/
39-
private[spark] class HeartbeatReceiver(scheduler: TaskScheduler) extends Actor {
40-
override def receive = {
40+
private[spark] class HeartbeatReceiver(scheduler: TaskScheduler)
41+
extends Actor with ActorLogReceive with Logging {
42+
43+
override def receiveWithLogging = {
4144
case Heartbeat(executorId, taskMetrics, blockManagerId) =>
4245
val response = HeartbeatResponse(
4346
!scheduler.executorHeartbeatReceived(executorId, taskMetrics, blockManagerId))

core/src/main/scala/org/apache/spark/InterruptibleIterator.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class InterruptibleIterator[+T](val context: TaskContext, val delegate: Iterator
3333
// is allowed. The assumption is that Thread.interrupted does not have a memory fence in read
3434
// (just a volatile field in C), while context.interrupted is a volatile in the JVM, which
3535
// introduces an expensive read fence.
36-
if (context.interrupted) {
36+
if (context.isInterrupted) {
3737
throw new TaskKilledException
3838
} else {
3939
delegate.hasNext

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,10 @@ private[spark] case object StopMapOutputTracker extends MapOutputTrackerMessage
3838

3939
/** Actor class for MapOutputTrackerMaster */
4040
private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster, conf: SparkConf)
41-
extends Actor with Logging {
41+
extends Actor with ActorLogReceive with Logging {
4242
val maxAkkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
4343

44-
def receive = {
44+
override def receiveWithLogging = {
4545
case GetMapOutputStatuses(shuffleId: Int) =>
4646
val hostPort = sender.path.address.hostPort
4747
logInfo("Asked to send map output locations for shuffle " + shuffleId + " to " + hostPort)

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
4545
/** Create a SparkConf that loads defaults from system properties and the classpath */
4646
def this() = this(true)
4747

48-
private val settings = new HashMap[String, String]()
48+
private[spark] val settings = new HashMap[String, String]()
4949

5050
if (loadDefaults) {
5151
// Load any spark.* system properties
@@ -210,6 +210,12 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
210210
new SparkConf(false).setAll(settings)
211211
}
212212

213+
/**
214+
* By using this instead of System.getenv(), environment variables can be mocked
215+
* in unit tests.
216+
*/
217+
private[spark] def getenv(name: String): String = System.getenv(name)
218+
213219
/** Checks for illegal or deprecated config settings. Throws an exception for the former. Not
214220
* idempotent - may mutate this conf object to convert deprecated settings to supported ones. */
215221
private[spark] def validateSettings() {
@@ -227,7 +233,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
227233
// Validate spark.executor.extraJavaOptions
228234
settings.get(executorOptsKey).map { javaOpts =>
229235
if (javaOpts.contains("-Dspark")) {
230-
val msg = s"$executorOptsKey is not allowed to set Spark options (was '$javaOpts)'. " +
236+
val msg = s"$executorOptsKey is not allowed to set Spark options (was '$javaOpts'). " +
231237
"Set them directly on a SparkConf or in a properties file when using ./bin/spark-submit."
232238
throw new Exception(msg)
233239
}

0 commit comments

Comments
 (0)