Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.

Commit c268cb5

Browse files
committed
Merge remote-tracking branch 'origin/master' into SPARK-9266
2 parents c1f0167 + 2f5cbd8 commit c268cb5

File tree

52 files changed

+777
-296
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+777
-296
lines changed

R/pkg/NAMESPACE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ exportMethods("arrange",
2626
"collect",
2727
"columns",
2828
"count",
29+
"crosstab",
2930
"describe",
3031
"distinct",
3132
"dropna",

R/pkg/R/DataFrame.R

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1554,3 +1554,31 @@ setMethod("fillna",
15541554
}
15551555
dataFrame(sdf)
15561556
})
1557+
1558+
#' crosstab
1559+
#'
1560+
#' Computes a pair-wise frequency table of the given columns. Also known as a contingency
1561+
#' table. The number of distinct values for each column should be less than 1e4. At most 1e6
1562+
#' non-zero pair frequencies will be returned.
1563+
#'
1564+
#' @param col1 name of the first column. Distinct items will make the first item of each row.
1565+
#' @param col2 name of the second column. Distinct items will make the column names of the output.
1566+
#' @return a local R data.frame representing the contingency table. The first column of each row
1567+
#' will be the distinct values of `col1` and the column names will be the distinct values
1568+
#' of `col2`. The name of the first column will be `$col1_$col2`. Pairs that have no
1569+
#' occurrences will have `null` as their counts.
1570+
#'
1571+
#' @rdname statfunctions
1572+
#' @export
1573+
#' @examples
1574+
#' \dontrun{
1575+
#' df <- jsonFile(sqlCtx, "/path/to/file.json")
1576+
#' ct = crosstab(df, "title", "gender")
1577+
#' }
1578+
setMethod("crosstab",
1579+
signature(x = "DataFrame", col1 = "character", col2 = "character"),
1580+
function(x, col1, col2) {
1581+
statFunctions <- callJMethod(x@sdf, "stat")
1582+
sct <- callJMethod(statFunctions, "crosstab", col1, col2)
1583+
collect(dataFrame(sct))
1584+
})

R/pkg/R/generics.R

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,10 @@ setGeneric("count", function(x) { standardGeneric("count") })
5959
# @export
6060
setGeneric("countByValue", function(x) { standardGeneric("countByValue") })
6161

62+
# @rdname statfunctions
63+
# @export
64+
setGeneric("crosstab", function(x, col1, col2) { standardGeneric("crosstab") })
65+
6266
# @rdname distinct
6367
# @export
6468
setGeneric("distinct", function(x, numPartitions = 1) { standardGeneric("distinct") })

R/pkg/inst/tests/test_sparkSQL.R

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -987,6 +987,19 @@ test_that("fillna() on a DataFrame", {
987987
expect_identical(expected, actual)
988988
})
989989

990+
test_that("crosstab() on a DataFrame", {
991+
rdd <- lapply(parallelize(sc, 0:3), function(x) {
992+
list(paste0("a", x %% 3), paste0("b", x %% 2))
993+
})
994+
df <- toDF(rdd, list("a", "b"))
995+
ct <- crosstab(df, "a", "b")
996+
ordered <- ct[order(ct$a_b),]
997+
row.names(ordered) <- NULL
998+
expected <- data.frame("a_b" = c("a0", "a1", "a2"), "b0" = c(1, 0, 1), "b1" = c(1, 1, 0),
999+
stringsAsFactors = FALSE, row.names = NULL)
1000+
expect_identical(expected, ordered)
1001+
})
1002+
9901003
unlink(parquetPath)
9911004
unlink(jsonPath)
9921005
unlink(jsonPathNa)

bin/spark-shell

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,11 @@ function main() {
4747
# (see https://github.com/sbt/sbt/issues/562).
4848
stty -icanon min 1 -echo > /dev/null 2>&1
4949
export SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Djline.terminal=unix"
50-
"$FWDIR"/bin/spark-submit --class org.apache.spark.repl.Main "$@"
50+
"$FWDIR"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@"
5151
stty icanon echo > /dev/null 2>&1
5252
else
5353
export SPARK_SUBMIT_OPTS
54-
"$FWDIR"/bin/spark-submit --class org.apache.spark.repl.Main "$@"
54+
"$FWDIR"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@"
5555
fi
5656
}
5757

bin/spark-shell2.cmd

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,4 @@ if "x%SPARK_SUBMIT_OPTS%"=="x" (
3232
set SPARK_SUBMIT_OPTS="%SPARK_SUBMIT_OPTS% -Dscala.usejavacp=true"
3333

3434
:run_shell
35-
%SPARK_HOME%\bin\spark-submit2.cmd --class org.apache.spark.repl.Main %*
35+
%SPARK_HOME%\bin\spark-submit2.cmd --class org.apache.spark.repl.Main --name "Spark shell" %*

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 66 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1758,16 +1758,13 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
17581758

17591759
/**
17601760
* Run a function on a given set of partitions in an RDD and pass the results to the given
1761-
* handler function. This is the main entry point for all actions in Spark. The allowLocal
1762-
* flag specifies whether the scheduler can run the computation on the driver rather than
1763-
* shipping it out to the cluster, for short actions like first().
1761+
* handler function. This is the main entry point for all actions in Spark.
17641762
*/
17651763
def runJob[T, U: ClassTag](
17661764
rdd: RDD[T],
17671765
func: (TaskContext, Iterator[T]) => U,
17681766
partitions: Seq[Int],
1769-
allowLocal: Boolean,
1770-
resultHandler: (Int, U) => Unit) {
1767+
resultHandler: (Int, U) => Unit): Unit = {
17711768
if (stopped.get()) {
17721769
throw new IllegalStateException("SparkContext has been shutdown")
17731770
}
@@ -1777,54 +1774,104 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
17771774
if (conf.getBoolean("spark.logLineage", false)) {
17781775
logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
17791776
}
1780-
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
1781-
resultHandler, localProperties.get)
1777+
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
17821778
progressBar.foreach(_.finishAll())
17831779
rdd.doCheckpoint()
17841780
}
17851781

17861782
/**
1787-
* Run a function on a given set of partitions in an RDD and return the results as an array. The
1788-
* allowLocal flag specifies whether the scheduler can run the computation on the driver rather
1789-
* than shipping it out to the cluster, for short actions like first().
1783+
* Run a function on a given set of partitions in an RDD and return the results as an array.
1784+
*/
1785+
def runJob[T, U: ClassTag](
1786+
rdd: RDD[T],
1787+
func: (TaskContext, Iterator[T]) => U,
1788+
partitions: Seq[Int]): Array[U] = {
1789+
val results = new Array[U](partitions.size)
1790+
runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res)
1791+
results
1792+
}
1793+
1794+
/**
1795+
* Run a job on a given set of partitions of an RDD, but take a function of type
1796+
* `Iterator[T] => U` instead of `(TaskContext, Iterator[T]) => U`.
1797+
*/
1798+
def runJob[T, U: ClassTag](
1799+
rdd: RDD[T],
1800+
func: Iterator[T] => U,
1801+
partitions: Seq[Int]): Array[U] = {
1802+
val cleanedFunc = clean(func)
1803+
runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)
1804+
}
1805+
1806+
1807+
/**
1808+
* Run a function on a given set of partitions in an RDD and pass the results to the given
1809+
* handler function. This is the main entry point for all actions in Spark.
1810+
*
1811+
* The allowLocal flag is deprecated as of Spark 1.5.0+.
1812+
*/
1813+
@deprecated("use the version of runJob without the allowLocal parameter", "1.5.0")
1814+
def runJob[T, U: ClassTag](
1815+
rdd: RDD[T],
1816+
func: (TaskContext, Iterator[T]) => U,
1817+
partitions: Seq[Int],
1818+
allowLocal: Boolean,
1819+
resultHandler: (Int, U) => Unit): Unit = {
1820+
if (allowLocal) {
1821+
logWarning("sc.runJob with allowLocal=true is deprecated in Spark 1.5.0+")
1822+
}
1823+
runJob(rdd, func, partitions, resultHandler)
1824+
}
1825+
1826+
/**
1827+
* Run a function on a given set of partitions in an RDD and return the results as an array.
1828+
*
1829+
* The allowLocal flag is deprecated as of Spark 1.5.0+.
17901830
*/
1831+
@deprecated("use the version of runJob without the allowLocal parameter", "1.5.0")
17911832
def runJob[T, U: ClassTag](
17921833
rdd: RDD[T],
17931834
func: (TaskContext, Iterator[T]) => U,
17941835
partitions: Seq[Int],
17951836
allowLocal: Boolean
17961837
): Array[U] = {
1797-
val results = new Array[U](partitions.size)
1798-
runJob[T, U](rdd, func, partitions, allowLocal, (index, res) => results(index) = res)
1799-
results
1838+
if (allowLocal) {
1839+
logWarning("sc.runJob with allowLocal=true is deprecated in Spark 1.5.0+")
1840+
}
1841+
runJob(rdd, func, partitions)
18001842
}
18011843

18021844
/**
18031845
* Run a job on a given set of partitions of an RDD, but take a function of type
18041846
* `Iterator[T] => U` instead of `(TaskContext, Iterator[T]) => U`.
1847+
*
1848+
* The allowLocal argument is deprecated as of Spark 1.5.0+.
18051849
*/
1850+
@deprecated("use the version of runJob without the allowLocal parameter", "1.5.0")
18061851
def runJob[T, U: ClassTag](
18071852
rdd: RDD[T],
18081853
func: Iterator[T] => U,
18091854
partitions: Seq[Int],
18101855
allowLocal: Boolean
18111856
): Array[U] = {
1812-
val cleanedFunc = clean(func)
1813-
runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions, allowLocal)
1857+
if (allowLocal) {
1858+
logWarning("sc.runJob with allowLocal=true is deprecated in Spark 1.5.0+")
1859+
}
1860+
runJob(rdd, func, partitions)
18141861
}
18151862

18161863
/**
18171864
* Run a job on all partitions in an RDD and return the results in an array.
18181865
*/
18191866
def runJob[T, U: ClassTag](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U): Array[U] = {
1820-
runJob(rdd, func, 0 until rdd.partitions.size, false)
1867+
runJob(rdd, func, 0 until rdd.partitions.length)
18211868
}
18221869

18231870
/**
18241871
* Run a job on all partitions in an RDD and return the results in an array.
18251872
*/
18261873
def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
1827-
runJob(rdd, func, 0 until rdd.partitions.size, false)
1874+
runJob(rdd, func, 0 until rdd.partitions.length)
18281875
}
18291876

18301877
/**
@@ -1835,7 +1882,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
18351882
processPartition: (TaskContext, Iterator[T]) => U,
18361883
resultHandler: (Int, U) => Unit)
18371884
{
1838-
runJob[T, U](rdd, processPartition, 0 until rdd.partitions.size, false, resultHandler)
1885+
runJob[T, U](rdd, processPartition, 0 until rdd.partitions.length, resultHandler)
18391886
}
18401887

18411888
/**
@@ -1847,7 +1894,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
18471894
resultHandler: (Int, U) => Unit)
18481895
{
18491896
val processFunc = (context: TaskContext, iter: Iterator[T]) => processPartition(iter)
1850-
runJob[T, U](rdd, processFunc, 0 until rdd.partitions.size, false, resultHandler)
1897+
runJob[T, U](rdd, processFunc, 0 until rdd.partitions.length, resultHandler)
18511898
}
18521899

18531900
/**
@@ -1892,7 +1939,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
18921939
(context: TaskContext, iter: Iterator[T]) => cleanF(iter),
18931940
partitions,
18941941
callSite,
1895-
allowLocal = false,
18961942
resultHandler,
18971943
localProperties.get)
18981944
new SimpleFutureAction(waiter, resultFunc)

core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
364364
// This is useful for implementing `take` from other language frontends
365365
// like Python where the data is serialized.
366366
import scala.collection.JavaConversions._
367-
val res = context.runJob(rdd, (it: Iterator[T]) => it.toArray, partitionIds, true)
367+
val res = context.runJob(rdd, (it: Iterator[T]) => it.toArray, partitionIds)
368368
res.map(x => new java.util.ArrayList(x.toSeq)).toArray
369369
}
370370

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -358,12 +358,11 @@ private[spark] object PythonRDD extends Logging {
358358
def runJob(
359359
sc: SparkContext,
360360
rdd: JavaRDD[Array[Byte]],
361-
partitions: JArrayList[Int],
362-
allowLocal: Boolean): Int = {
361+
partitions: JArrayList[Int]): Int = {
363362
type ByteArray = Array[Byte]
364363
type UnrolledPartition = Array[ByteArray]
365364
val allPartitions: Array[UnrolledPartition] =
366-
sc.runJob(rdd, (x: Iterator[ByteArray]) => x.toArray, partitions, allowLocal)
365+
sc.runJob(rdd, (x: Iterator[ByteArray]) => x.toArray, partitions)
367366
val flattenedPartition: UnrolledPartition = Array.concat(allPartitions: _*)
368367
serveIterator(flattenedPartition.iterator,
369368
s"serve RDD ${rdd.id} with partitions ${partitions.mkString(",")}")

core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.api.r
2020
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream}
2121

2222
import scala.collection.mutable.HashMap
23+
import scala.language.existentials
2324

2425
import io.netty.channel.ChannelHandler.Sharable
2526
import io.netty.channel.{ChannelHandlerContext, SimpleChannelInboundHandler}

0 commit comments

Comments
 (0)