Skip to content

Commit f6de871

Browse files
author
Andrew Or
committed
Merge branch 'master' of github.com:apache/spark into dag-viz-streaming
2 parents 0ca1801 + 8e3822a commit f6de871

File tree

26 files changed

+292
-239
lines changed

26 files changed

+292
-239
lines changed

core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -186,8 +186,9 @@ function renderDagVizForJob(svgContainer) {
186186
var stageId = metadata.attr("stage-id");
187187
var containerId = VizConstants.graphPrefix + stageId;
188188
// Link each graph to the corresponding stage page (TODO: handle stage attempts)
189-
var stageLink = "/stages/stage/?id=" +
190-
stageId.replace(VizConstants.stagePrefix, "") + "&attempt=0&expandDagViz=true";
189+
var stageLink = $("#stage-" + stageId.replace(VizConstants.stagePrefix, "") + "-0")
190+
.find("a")
191+
.attr("href") + "&expandDagViz=true";
191192
var container = svgContainer
192193
.append("a")
193194
.attr("xlink:href", stageLink)

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -371,6 +371,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
371371
throw new SparkException("An application name must be set in your configuration")
372372
}
373373

374+
// System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster
375+
// yarn-standalone is deprecated, but still supported
376+
if ((master == "yarn-cluster" || master == "yarn-standalone") &&
377+
!_conf.contains("spark.yarn.app.id")) {
378+
throw new SparkException("Detected yarn-cluster mode, but isn't running on a cluster. " +
379+
"Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.")
380+
}
381+
374382
if (_conf.getBoolean("spark.logConf", false)) {
375383
logInfo("Spark configuration:\n" + _conf.toDebugString)
376384
}

core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -239,15 +239,6 @@ private[spark] object ClosureCleaner extends Logging {
239239
logDebug(s" + fields accessed by starting closure: " + accessedFields.size)
240240
accessedFields.foreach { f => logDebug(" " + f) }
241241

242-
val inInterpreter = {
243-
try {
244-
val interpClass = Class.forName("spark.repl.Main")
245-
interpClass.getMethod("interp").invoke(null) != null
246-
} catch {
247-
case _: ClassNotFoundException => true
248-
}
249-
}
250-
251242
// List of outer (class, object) pairs, ordered from outermost to innermost
252243
// Note that all outer objects but the outermost one (first one in this list) must be closures
253244
var outerPairs: List[(Class[_], AnyRef)] = (outerClasses zip outerObjects).reverse
@@ -274,7 +265,7 @@ private[spark] object ClosureCleaner extends Logging {
274265
// required fields from the original object. We need the parent here because the Java
275266
// language specification requires the first constructor parameter of any closure to be
276267
// its enclosing object.
277-
val clone = instantiateClass(cls, parent, inInterpreter)
268+
val clone = instantiateClass(cls, parent)
278269
for (fieldName <- accessedFields(cls)) {
279270
val field = cls.getDeclaredField(fieldName)
280271
field.setAccessible(true)
@@ -327,9 +318,8 @@ private[spark] object ClosureCleaner extends Logging {
327318

328319
private def instantiateClass(
329320
cls: Class[_],
330-
enclosingObject: AnyRef,
331-
inInterpreter: Boolean): AnyRef = {
332-
if (!inInterpreter) {
321+
enclosingObject: AnyRef): AnyRef = {
322+
if (!Utils.isInInterpreter) {
333323
// This is a bona fide closure class, whose constructor has no effects
334324
// other than to set its fields, so use its constructor
335325
val cons = cls.getConstructors()(0)

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1795,6 +1795,20 @@ private[spark] object Utils extends Logging {
17951795
}
17961796
}
17971797

1798+
lazy val isInInterpreter: Boolean = {
1799+
try {
1800+
val interpClass = classForName("spark.repl.Main")
1801+
interpClass.getMethod("interp").invoke(null) != null
1802+
} catch {
1803+
// Returning true seems to be a mistake.
1804+
// Currently changing it to false causes tests failures in Streaming.
1805+
// For a more detailed discussion, please, refer to
1806+
// https://github.com/apache/spark/pull/5835#issuecomment-101042271 and subsequent comments.
1807+
// Addressing this changed is tracked as https://issues.apache.org/jira/browse/SPARK-7527
1808+
case _: ClassNotFoundException => true
1809+
}
1810+
}
1811+
17981812
/**
17991813
* Return a well-formed URI for the file described by a user input string.
18001814
*

core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
165165
}
166166

167167
// Test that GC causes RDD cleanup after dereferencing the RDD
168+
// Note rdd is used after previous GC to avoid early collection by the JVM
168169
val postGCTester = new CleanerTester(sc, rddIds = Seq(rdd.id))
169170
rdd = null // Make RDD out of scope
170171
runGC()
@@ -181,9 +182,9 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
181182
intercept[Exception] {
182183
preGCTester.assertCleanup()(timeout(1000 millis))
183184
}
185+
rdd.count() // Defeat early collection by the JVM
184186

185187
// Test that GC causes shuffle cleanup after dereferencing the RDD
186-
rdd.count() // Defeat any early collection of rdd variable by the JVM
187188
val postGCTester = new CleanerTester(sc, shuffleIds = Seq(0))
188189
rdd = null // Make RDD out of scope, so that corresponding shuffle goes out of scope
189190
runGC()
@@ -201,6 +202,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
201202
}
202203

203204
// Test that GC causes broadcast cleanup after dereferencing the broadcast variable
205+
// Note broadcast is used after previous GC to avoid early collection by the JVM
204206
val postGCTester = new CleanerTester(sc, broadcastIds = Seq(broadcast.id))
205207
broadcast = null // Make broadcast variable out of scope
206208
runGC()
@@ -226,7 +228,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
226228

227229
// the checkpoint is not cleaned by default (without the configuration set)
228230
var postGCTester = new CleanerTester(sc, Seq(rddId), Nil, Nil, Nil)
229-
rdd = null // Make RDD out of scope
231+
rdd = null // Make RDD out of scope, ok if collected earlier
230232
runGC()
231233
postGCTester.assertCleanup()
232234
assert(fs.exists(RDDCheckpointData.rddCheckpointDataPath(sc, rddId).get))
@@ -245,6 +247,9 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
245247
// Confirm the checkpoint directory exists
246248
assert(fs.exists(RDDCheckpointData.rddCheckpointDataPath(sc, rddId).get))
247249

250+
// Reference rdd to defeat any early collection by the JVM
251+
rdd.count()
252+
248253
// Test that GC causes checkpoint data cleanup after dereferencing the RDD
249254
postGCTester = new CleanerTester(sc, Seq(rddId), Nil, Nil, Seq(rddId))
250255
rdd = null // Make RDD out of scope
@@ -352,6 +357,7 @@ class SortShuffleContextCleanerSuite extends ContextCleanerSuiteBase(classOf[Sor
352357
intercept[Exception] {
353358
preGCTester.assertCleanup()(timeout(1000 millis))
354359
}
360+
rdd.count() // Defeat early collection by the JVM
355361

356362
// Test that GC causes shuffle cleanup after dereferencing the RDD
357363
val postGCTester = new CleanerTester(sc, shuffleIds = Seq(0))

mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,8 @@ class DenseMatrix(
273273

274274
override def copy: DenseMatrix = new DenseMatrix(numRows, numCols, values.clone())
275275

276-
private[mllib] def map(f: Double => Double) = new DenseMatrix(numRows, numCols, values.map(f))
276+
private[mllib] def map(f: Double => Double) = new DenseMatrix(numRows, numCols, values.map(f),
277+
isTransposed)
277278

278279
private[mllib] def update(f: Double => Double): DenseMatrix = {
279280
val len = values.length
@@ -535,7 +536,7 @@ class SparseMatrix(
535536
}
536537

537538
private[mllib] def map(f: Double => Double) =
538-
new SparseMatrix(numRows, numCols, colPtrs, rowIndices, values.map(f))
539+
new SparseMatrix(numRows, numCols, colPtrs, rowIndices, values.map(f), isTransposed)
539540

540541
private[mllib] def update(f: Double => Double): SparseMatrix = {
541542
val len = values.length

python/pyspark/mllib/clustering.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,9 @@ def predict(self, x):
212212
if isinstance(x, RDD):
213213
cluster_labels = self.predictSoft(x).map(lambda z: z.index(max(z)))
214214
return cluster_labels
215+
else:
216+
raise TypeError("x should be represented by an RDD, "
217+
"but got %s." % type(x))
215218

216219
def predictSoft(self, x):
217220
"""
@@ -225,6 +228,9 @@ def predictSoft(self, x):
225228
membership_matrix = callMLlibFunc("predictSoftGMM", x.map(_convert_to_vector),
226229
_convert_to_vector(self._weights), means, sigmas)
227230
return membership_matrix.map(lambda x: pyarray.array('d', x))
231+
else:
232+
raise TypeError("x should be represented by an RDD, "
233+
"but got %s." % type(x))
228234

229235

230236
class GaussianMixture(object):

sbin/start-master.sh

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
sbin="`dirname "$0"`"
2323
sbin="`cd "$sbin"; pwd`"
2424

25+
ORIGINAL_ARGS="$@"
26+
2527
START_TACHYON=false
2628

2729
while (( "$#" )); do
@@ -53,7 +55,9 @@ if [ "$SPARK_MASTER_WEBUI_PORT" = "" ]; then
5355
SPARK_MASTER_WEBUI_PORT=8080
5456
fi
5557

56-
"$sbin"/spark-daemon.sh start org.apache.spark.deploy.master.Master 1 --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT
58+
"$sbin"/spark-daemon.sh start org.apache.spark.deploy.master.Master 1 \
59+
--ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT \
60+
$ORIGINAL_ARGS
5761

5862
if [ "$START_TACHYON" == "true" ]; then
5963
"$sbin"/../tachyon/bin/tachyon bootstrap-conf $SPARK_MASTER_IP

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ import org.apache.spark.sql.catalyst.ParserDialect
4444
import org.apache.spark.sql.execution.{Filter, _}
4545
import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
4646
import org.apache.spark.sql.json._
47-
import org.apache.spark.sql.parquet.FSBasedParquetRelation
47+
import org.apache.spark.sql.parquet.ParquetRelation2
4848
import org.apache.spark.sql.sources._
4949
import org.apache.spark.sql.types._
5050
import org.apache.spark.util.Utils
@@ -610,7 +610,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
610610
} else if (conf.parquetUseDataSourceApi) {
611611
val globbedPaths = paths.map(new Path(_)).flatMap(SparkHadoopUtil.get.globPath).toArray
612612
baseRelationToDataFrame(
613-
new FSBasedParquetRelation(
613+
new ParquetRelation2(
614614
globbedPaths.map(_.toString), None, None, Map.empty[String, String])(this))
615615
} else {
616616
DataFrame(this, parquet.ParquetRelation(
@@ -989,7 +989,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
989989
def jdbc(url: String, table: String): DataFrame = {
990990
jdbc(url, table, JDBCRelation.columnPartition(null), new Properties())
991991
}
992-
992+
993993
/**
994994
* :: Experimental ::
995995
* Construct a [[DataFrame]] representing the database table accessible via JDBC URL
@@ -1002,7 +1002,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
10021002
def jdbc(url: String, table: String, properties: Properties): DataFrame = {
10031003
jdbc(url, table, JDBCRelation.columnPartition(null), properties)
10041004
}
1005-
1005+
10061006
/**
10071007
* :: Experimental ::
10081008
* Construct a [[DataFrame]] representing the database table accessible via JDBC URL
@@ -1020,7 +1020,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
10201020
@Experimental
10211021
def jdbc(
10221022
url: String,
1023-
table: String,
1023+
table: String,
10241024
columnName: String,
10251025
lowerBound: Long,
10261026
upperBound: Long,
@@ -1056,7 +1056,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
10561056
val parts = JDBCRelation.columnPartition(partitioning)
10571057
jdbc(url, table, parts, properties)
10581058
}
1059-
1059+
10601060
/**
10611061
* :: Experimental ::
10621062
* Construct a [[DataFrame]] representing the database table accessible via JDBC URL
@@ -1093,7 +1093,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
10931093
}
10941094
jdbc(url, table, parts, properties)
10951095
}
1096-
1096+
10971097
private def jdbc(
10981098
url: String,
10991099
table: String,

0 commit comments

Comments
 (0)