Skip to content
Closed
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ exportMethods("arrange",
"createOrReplaceTempView",
"crossJoin",
"crosstab",
"cube",
"dapply",
"dapplyCollect",
"describe",
Expand Down Expand Up @@ -143,6 +144,7 @@ exportMethods("arrange",
"registerTempTable",
"rename",
"repartition",
"rollup",
"sample",
"sample_frac",
"sampleBy",
Expand Down
74 changes: 73 additions & 1 deletion R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -1321,7 +1321,7 @@ setMethod("toRDD",
#' Groups the SparkDataFrame using the specified columns, so we can run aggregation on them.
#'
#' @param x a SparkDataFrame.
#' @param ... variable(s) (character names(s) or Column(s)) to group on.
#' @param ... character name(s) or Column(s) to group on.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extra space? ... character (two spaces)

#' @return A GroupedData.
#' @family SparkDataFrame functions
#' @aliases groupBy,SparkDataFrame-method
Expand All @@ -1337,6 +1337,7 @@ setMethod("toRDD",
#' agg(groupBy(df, "department", "gender"), salary="avg", "age" -> "max")
#' }
#' @note groupBy since 1.4.0
#' @seealso \link{agg}, \link{cube}, \link{rollup}
setMethod("groupBy",
signature(x = "SparkDataFrame"),
function(x, ...) {
Expand Down Expand Up @@ -3642,3 +3643,74 @@ setMethod("checkpoint",
df <- callJMethod(x@sdf, "checkpoint", as.logical(eager))
dataFrame(df)
})

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra newline


#' cube
#'
#' Create a multi-dimensional cube for the SparkDataFrame using the specified columns.
#'
#' If grouping expression is missing `cube` creates a single global aggregate and is equivalent to

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the backtick doesn't work with roxygen2 - if you want, use \code{cube} instead

#' direct application of \link{agg}.
#'
#' @param x a SparkDataFrame.
#' @param ... character name(s) or Column(s) to group on.
#' @return A GroupedData.
#' @family SparkDataFrame functions
#' @aliases cube,SparkDataFrame-method
#' @rdname cube
#' @name cube
#' @export
#' @examples
#' \dontrun{
#' df <- createDataFrame(mtcars)
#' mean(cube(df, "cyl", "gear", "am"), "mpg")
#'
#' # Following calls are equivalent
#' agg(cube(carsDF), mean(carsDF$mpg))
#' agg(carsDF, mean(carsDF$mpg))
#' }
#' @note cube since 2.3.0
#' @seealso \link{agg}, \link{groupBy}, \link{rollup}
setMethod("cube",
signature(x = "SparkDataFrame"),
function(x, ...) {
cols <- list(...)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check length of cols is > 0?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If think we can skip that. rollup(df) and cube(df) are valid function calls equivalent to group_by(df) and arguably can be useful in some cases (like aggregations based on user input).

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, it's a bit odd to call rollup or cube that way but ok if other languages leave that open too. but I'd say we should add a line to explain "rollup or cube without column is the same as group_by" (or something better)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you want to support empty parameter let's add some tests for it then?

jcol <- lapply(cols, function(x) if (class(x) == "Column") x@jc else column(x)@jc)
sgd <- callJMethod(x@sdf, "cube", jcol)
groupedData(sgd)
})

#' rollup
#'
#' Create a multi-dimensional rollup for the SparkDataFrame using the specified columns.
#'
#' If grouping expression is missing `rollup` creates a single global aggregate and is equivalent to

@felixcheung felixcheung Apr 25, 2017

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto on backtick rollup

#' direct application of \link{agg}.
#'
#' @param x a SparkDataFrame.
#' @param ... character name(s) or Column(s) to group on.
#' @return A GroupedData.
#' @family SparkDataFrame functions
#' @aliases rollup,SparkDataFrame-method
#' @rdname rollup
#' @name rollup
#' @export
#' @examples
#'\dontrun{
#' df <- createDataFrame(mtcars)
#' mean(rollup(df, "cyl", "gear", "am"), "mpg")
#'
#' # Following calls are equivalent
#' agg(rollup(carsDF), mean(carsDF$mpg))
#' agg(carsDF, mean(carsDF$mpg))
#' }
#' @note rollup since 2.3.0
#' @seealso \link{agg}, \link{cube}, \link{groupBy}
setMethod("rollup",
signature(x = "SparkDataFrame"),
function(x, ...) {
cols <- list(...)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check length of cols

jcol <- lapply(cols, function(x) if (class(x) == "Column") x@jc else column(x)@jc)
sgd <- callJMethod(x@sdf, "rollup", jcol)
groupedData(sgd)
})
8 changes: 8 additions & 0 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,10 @@ setGeneric("createOrReplaceTempView",
# @export
setGeneric("crossJoin", function(x, y) { standardGeneric("crossJoin") })

#' @rdname cube
#' @export
setGeneric("cube", function(x, ...) { standardGeneric("cube") })

#' @rdname dapply
#' @export
setGeneric("dapply", function(x, func, schema) { standardGeneric("dapply") })
Expand Down Expand Up @@ -631,6 +635,10 @@ setGeneric("sample",
standardGeneric("sample")
})

#' @rdname rollup
#' @export
setGeneric("rollup", function(x, ...) { standardGeneric("rollup") })

#' @rdname sample
#' @export
setGeneric("sample_frac",
Expand Down
102 changes: 102 additions & 0 deletions R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -1782,6 +1782,108 @@ test_that("pivot GroupedData column", {
expect_error(collect(sum(pivot(groupBy(df, "year"), "course", list("R", "R")), "earnings")))
})

test_that("test multi-dimensional aggregations with cube and rollup", {
df <- createDataFrame(data.frame(
id = 1:6,
year = c(2016, 2016, 2016, 2017, 2017, 2017),
salary = c(10000, 15000, 20000, 22000, 32000, 21000),
department = c("management", "rnd", "sales", "management", "rnd", "sales")
))

actual_cube <- collect(
orderBy(
agg(
cube(df, "year", "department"),
expr("sum(salary) AS total_salary"), expr("avg(salary) AS average_salary")
),
"year", "department"
)
)

expected_cube <- data.frame(
year = c(rep(NA, 4), rep(2016, 4), rep(2017, 4)),
department = rep(c(NA, "management", "rnd", "sales"), times = 3),
total_salary = c(
120000, # Total
10000 + 22000, 15000 + 32000, 20000 + 21000, # Department only
20000 + 15000 + 10000, # 2016
10000, 15000, 20000, # 2016 each department
21000 + 32000 + 22000, # 2017
22000, 32000, 21000 # 2017 each department
),
average_salary = c(
# Total
mean(c(20000, 15000, 10000, 21000, 32000, 22000)),
# Mean by department
mean(c(10000, 22000)), mean(c(15000, 32000)), mean(c(20000, 21000)),
mean(c(10000, 15000, 20000)), # 2016
10000, 15000, 20000, # 2016 each department
mean(c(21000, 32000, 22000)), # 2017
22000, 32000, 21000 # 2017 each department
),
stringsAsFactors = FALSE
)

expect_equal(actual_cube, expected_cube)

# cube should accept column objects
expect_equal(
count(sum(cube(df, df$year, df$department), "salary")),
12
)

# cube without columns should result in a single aggregate
expect_equal(
collect(agg(cube(df), expr("sum(salary) as total_salary"))),
data.frame(total_salary = 120000)
)

actual_rollup <- collect(
orderBy(
agg(
rollup(df, "year", "department"),
expr("sum(salary) AS total_salary"), expr("avg(salary) AS average_salary")
),
"year", "department"
)
)

expected_rollup <- data.frame(
year = c(NA, rep(2016, 4), rep(2017, 4)),
department = c(NA, rep(c(NA, "management", "rnd", "sales"), times = 2)),
total_salary = c(
120000, # Total
20000 + 15000 + 10000, # 2016
10000, 15000, 20000, # 2016 each department
21000 + 32000 + 22000, # 2017
22000, 32000, 21000 # 2017 each department
),
average_salary = c(
# Total
mean(c(20000, 15000, 10000, 21000, 32000, 22000)),
mean(c(10000, 15000, 20000)), # 2016
10000, 15000, 20000, # 2016 each department
mean(c(21000, 32000, 22000)), # 2017
22000, 32000, 21000 # 2017 each department
),
stringsAsFactors = FALSE
)

expect_equal(actual_rollup, expected_rollup)

# cube should accept column objects
expect_equal(
count(sum(rollup(df, df$year, df$department), "salary")),
9
)

# rollup without columns should result in a single aggregate
expect_equal(
collect(agg(rollup(df), expr("sum(salary) as total_salary"))),
data.frame(total_salary = 120000)
)
})

test_that("arrange() and orderBy() on a DataFrame", {
df <- read.json(jsonPath)
sorted <- arrange(df, df$age)
Expand Down
15 changes: 15 additions & 0 deletions R/pkg/vignettes/sparkr-vignettes.Rmd
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,21 @@ numCyl <- summarize(groupBy(carsDF, carsDF$cyl), count = n(carsDF$cyl))
head(numCyl)
```

Use `cube` or `rollup` to compute subtotals across multiple dimensions.

```{r}
mean(cube(carsDF, "cyl", "gear", "am"), "mpg")
```

generates groupings for {(`cyl`, `gear`, `am`), (`cyl`, `gear`), (`cyl`), ()}, while

```{r}
mean(rollup(carsDF, "cyl", "gear", "am"), "mpg")
```

generates groupings for all possible combinations of grouping columns.


#### Operating on Columns

SparkR also provides a number of functions that can directly applied to columns for data processing and during aggregation. The example below shows the use of basic arithmetic functions.
Expand Down
30 changes: 30 additions & 0 deletions docs/sparkr.md
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,36 @@ head(arrange(waiting_counts, desc(waiting_counts$count)))
{% endhighlight %}
</div>

In addition to standard aggregations, SparkR supports [OLAP cube](https://en.wikipedia.org/wiki/OLAP_cube) operators `cube`:

<div data-lang="r" markdown="1">
{% highlight r %}
head(agg(cube(df, "cyl", "disp", "gear"), avg(df$mpg)))
## cyl disp gear avg(mpg)
##1 NA 140.8 4 22.8
##2 4 75.7 4 30.4
##3 8 400.0 3 19.2
##4 8 318.0 3 15.5
##5 NA 351.0 NA 15.8
##6 NA 275.8 NA 16.3
{% endhighlight %}
</div>

and `rollup`:

<div data-lang="r" markdown="1">
{% highlight r %}
head(agg(rollup(df, "cyl", "disp", "gear"), avg(df$mpg)))
## cyl disp gear avg(mpg)
##1 4 75.7 4 30.4
##2 8 400.0 3 19.2
##3 8 318.0 3 15.5
##4 4 78.7 NA 32.4
##5 8 304.0 3 15.2
##6 4 79.0 NA 27.3
{% endhighlight %}
</div>

### Operating on Columns

SparkR also provides a number of functions that can directly applied to columns for data processing and during aggregation. The example below shows the use of basic arithmetic functions.
Expand Down