Skip to content

Commit f23e15b

Browse files
author
Andrew Or
committed
Merge branch 'master' of github.com:apache/spark into viz-emphasize-rdd
2 parents 565801f + 7740996 commit f23e15b

File tree

137 files changed

+35960
-573
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

137 files changed

+35960
-573
lines changed

R/pkg/NAMESPACE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ exportMethods("cache",
1313
"collect",
1414
"columns",
1515
"count",
16+
"describe",
1617
"distinct",
1718
"dtypes",
1819
"except",

R/pkg/R/DataFrame.R

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1276,3 +1276,40 @@ setMethod("saveAsTable",
12761276
callJMethod(df@sdf, "saveAsTable", tableName, source, jmode, options)
12771277
})
12781278

1279+
#' describe
1280+
#'
1281+
#' Computes statistics for numeric columns.
1282+
#' If no columns are given, this function computes statistics for all numerical columns.
1283+
#'
1284+
#' @param x A DataFrame to be computed.
1285+
#' @param col A string of name
1286+
#' @param ... Additional expressions
1287+
#' @return A DataFrame
1288+
#' @rdname describe
1289+
#' @export
1290+
#' @examples
1291+
#'\dontrun{
1292+
#' sc <- sparkR.init()
1293+
#' sqlCtx <- sparkRSQL.init(sc)
1294+
#' path <- "path/to/file.json"
1295+
#' df <- jsonFile(sqlCtx, path)
1296+
#' describe(df)
1297+
#' describe(df, "col1")
1298+
#' describe(df, "col1", "col2")
1299+
#' }
1300+
setMethod("describe",
1301+
signature(x = "DataFrame", col = "character"),
1302+
function(x, col, ...) {
1303+
colList <- list(col, ...)
1304+
sdf <- callJMethod(x@sdf, "describe", listToSeq(colList))
1305+
dataFrame(sdf)
1306+
})
1307+
1308+
#' @rdname describe
1309+
setMethod("describe",
1310+
signature(x = "DataFrame"),
1311+
function(x) {
1312+
colList <- as.list(c(columns(x)))
1313+
sdf <- callJMethod(x@sdf, "describe", listToSeq(colList))
1314+
dataFrame(sdf)
1315+
})

R/pkg/R/generics.R

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,10 @@ setGeneric("value", function(bcast) { standardGeneric("value") })
384384
#' @export
385385
setGeneric("columns", function(x) {standardGeneric("columns") })
386386

387+
#' @rdname describe
388+
#' @export
389+
setGeneric("describe", function(x, col, ...) { standardGeneric("describe") })
390+
387391
#' @rdname schema
388392
#' @export
389393
setGeneric("dtypes", function(x) { standardGeneric("dtypes") })

R/pkg/inst/tests/test_sparkSQL.R

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -705,5 +705,16 @@ test_that("parquetFile works with multiple input paths", {
705705
expect_true(count(parquetDF) == count(df)*2)
706706
})
707707

708+
test_that("describe() on a DataFrame", {
709+
df <- jsonFile(sqlCtx, jsonPath)
710+
stats <- describe(df, "age")
711+
expect_true(collect(stats)[1, "summary"] == "count")
712+
expect_true(collect(stats)[2, "age"] == 24.5)
713+
expect_true(collect(stats)[3, "age"] == 5.5)
714+
stats <- describe(df)
715+
expect_true(collect(stats)[4, "name"] == "Andy")
716+
expect_true(collect(stats)[5, "age"] == 30.0)
717+
})
718+
708719
unlink(parquetPath)
709720
unlink(jsonPath)

core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -143,9 +143,7 @@ function renderDagViz(forJob) {
143143
resizeSvg(svg);
144144
}
145145

146-
/*
147-
* Render the RDD DAG visualization on the stage page.
148-
*/
146+
/* Render the RDD DAG visualization on the stage page. */
149147
function renderDagVizForStage(svgContainer) {
150148
var metadata = metadataContainer().select(".stage-metadata");
151149
var dot = metadata.select(".dot-file").text();

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -407,15 +407,17 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
407407

408408
if (master == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true")
409409

410+
// "_jobProgressListener" should be set up before creating SparkEnv because when creating
411+
// "SparkEnv", some messages will be posted to "listenerBus" and we should not miss them.
412+
_jobProgressListener = new JobProgressListener(_conf)
413+
listenerBus.addListener(jobProgressListener)
414+
410415
// Create the Spark execution environment (cache, map output tracker, etc)
411416
_env = createSparkEnv(_conf, isLocal, listenerBus)
412417
SparkEnv.set(_env)
413418

414419
_metadataCleaner = new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, _conf)
415420

416-
_jobProgressListener = new JobProgressListener(_conf)
417-
listenerBus.addListener(jobProgressListener)
418-
419421
_statusTracker = new SparkStatusTracker(this)
420422

421423
_progressBar =
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.annotation;
19+
20+
import java.lang.annotation.ElementType;
21+
import java.lang.annotation.Retention;
22+
import java.lang.annotation.RetentionPolicy;
23+
import java.lang.annotation.Target;
24+
25+
/**
26+
* A class that is considered private to the internals of Spark -- there is a high-likelihood
27+
* they will be changed in future versions of Spark.
28+
*
29+
* This should be used only when the standard Scala / Java means of protecting classes are
30+
* insufficient. In particular, Java has no equivalent of private[spark], so we use this annotation
31+
* in its place.
32+
*
33+
* NOTE: If there exists a Scaladoc comment that immediately precedes this annotation, the first
34+
* line of the comment must be ":: Private ::" with no trailing blank line. This is because
35+
* of the known issue that Scaladoc displays only either the annotation or the comment, whichever
36+
* comes first.
37+
*/
38+
@Retention(RetentionPolicy.RUNTIME)
39+
@Target({ElementType.TYPE, ElementType.FIELD, ElementType.METHOD, ElementType.PARAMETER,
40+
ElementType.CONSTRUCTOR, ElementType.LOCAL_VARIABLE, ElementType.PACKAGE})
41+
public @interface Private {}

core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,13 @@ class KryoSerializer(conf: SparkConf)
125125
override def newInstance(): SerializerInstance = {
126126
new KryoSerializerInstance(this)
127127
}
128+
129+
private[spark] override lazy val supportsRelocationOfSerializedObjects: Boolean = {
130+
// If auto-reset is disabled, then Kryo may store references to duplicate occurrences of objects
131+
// in the stream rather than writing those objects' serialized bytes, breaking relocation. See
132+
// https://groups.google.com/d/msg/kryo-users/6ZUSyfjjtdo/FhGG1KHDXPgJ for more details.
133+
newInstance().asInstanceOf[KryoSerializerInstance].getAutoReset()
134+
}
128135
}
129136

130137
private[spark]

core/src/main/scala/org/apache/spark/serializer/Serializer.scala

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import java.nio.ByteBuffer
2323
import scala.reflect.ClassTag
2424

2525
import org.apache.spark.{SparkConf, SparkEnv}
26-
import org.apache.spark.annotation.DeveloperApi
26+
import org.apache.spark.annotation.{DeveloperApi, Private}
2727
import org.apache.spark.util.{Utils, ByteBufferInputStream, NextIterator}
2828

2929
/**
@@ -63,6 +63,39 @@ abstract class Serializer {
6363

6464
/** Creates a new [[SerializerInstance]]. */
6565
def newInstance(): SerializerInstance
66+
67+
/**
68+
* :: Private ::
69+
* Returns true if this serializer supports relocation of its serialized objects and false
70+
* otherwise. This should return true if and only if reordering the bytes of serialized objects
71+
* in serialization stream output is equivalent to having re-ordered those elements prior to
72+
* serializing them. More specifically, the following should hold if a serializer supports
73+
* relocation:
74+
*
75+
* {{{
76+
* serOut.open()
77+
* position = 0
78+
* serOut.write(obj1)
79+
* serOut.flush()
80+
* position = # of bytes writen to stream so far
81+
* obj1Bytes = output[0:position-1]
82+
* serOut.write(obj2)
83+
* serOut.flush()
84+
* position2 = # of bytes written to stream so far
85+
* obj2Bytes = output[position:position2-1]
86+
* serIn.open([obj2bytes] concatenate [obj1bytes]) should return (obj2, obj1)
87+
* }}}
88+
*
89+
* In general, this property should hold for serializers that are stateless and that do not
90+
* write special metadata at the beginning or end of the serialization stream.
91+
*
92+
* This API is private to Spark; this method should not be overridden in third-party subclasses
93+
* or called in user code and is subject to removal in future Spark releases.
94+
*
95+
* See SPARK-7311 for more details.
96+
*/
97+
@Private
98+
private[spark] def supportsRelocationOfSerializedObjects: Boolean = false
6699
}
67100

68101

core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,8 +131,7 @@ private[spark] class ExternalSorter[K, V, C](
131131
private val kvChunkSize = conf.getInt("spark.shuffle.sort.kvChunkSize", 1 << 22) // 4 MB
132132
private val useSerializedPairBuffer =
133133
!ordering.isDefined && conf.getBoolean("spark.shuffle.sort.serializeMapOutputs", true) &&
134-
ser.isInstanceOf[KryoSerializer] &&
135-
serInstance.asInstanceOf[KryoSerializerInstance].getAutoReset
134+
ser.supportsRelocationOfSerializedObjects
136135

137136
// Data structures to store in-memory objects before we spill. Depending on whether we have an
138137
// Aggregator set, we either put objects into an AppendOnlyMap where we combine them, or we

0 commit comments

Comments
 (0)