Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ exportMethods("arrange",
"createOrReplaceTempView",
"crossJoin",
"crosstab",
"cube",
"dapply",
"dapplyCollect",
"describe",
Expand Down Expand Up @@ -143,6 +144,7 @@ exportMethods("arrange",
"registerTempTable",
"rename",
"repartition",
"rollup",
"sample",
"sample_frac",
"sampleBy",
Expand Down
73 changes: 72 additions & 1 deletion R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -1321,7 +1321,7 @@ setMethod("toRDD",
#' Groups the SparkDataFrame using the specified columns, so we can run aggregation on them.
#'
#' @param x a SparkDataFrame.
#' @param ... variable(s) (character names(s) or Column(s)) to group on.
#' @param ... character name(s) or Column(s) to group on.
#' @return A GroupedData.
#' @family SparkDataFrame functions
#' @aliases groupBy,SparkDataFrame-method
Expand All @@ -1337,6 +1337,7 @@ setMethod("toRDD",
#' agg(groupBy(df, "department", "gender"), salary="avg", "age" -> "max")
#' }
#' @note groupBy since 1.4.0
#' @seealso \link{agg}, \link{cube}, \link{rollup}
setMethod("groupBy",
signature(x = "SparkDataFrame"),
function(x, ...) {
Expand Down Expand Up @@ -3642,3 +3643,73 @@ setMethod("checkpoint",
df <- callJMethod(x@sdf, "checkpoint", as.logical(eager))
dataFrame(df)
})

#' cube
#'
#' Create a multi-dimensional cube for the SparkDataFrame using the specified columns.
#'
#' If grouping expression is missing \code{cube} creates a single global aggregate and is equivalent to
#' direct application of \link{agg}.
#'
#' @param x a SparkDataFrame.
#' @param ... character name(s) or Column(s) to group on.
#' @return A GroupedData.
#' @family SparkDataFrame functions
#' @aliases cube,SparkDataFrame-method
#' @rdname cube
#' @name cube
#' @export
#' @examples
#' \dontrun{
#' df <- createDataFrame(mtcars)
#' mean(cube(df, "cyl", "gear", "am"), "mpg")
#'
#' # Following calls are equivalent
#' agg(cube(carsDF), mean(carsDF$mpg))
#' agg(carsDF, mean(carsDF$mpg))
#' }
#' @note cube since 2.3.0
#' @seealso \link{agg}, \link{groupBy}, \link{rollup}
setMethod("cube",
signature(x = "SparkDataFrame"),
function(x, ...) {
cols <- list(...)
jcol <- lapply(cols, function(x) if (class(x) == "Column") x@jc else column(x)@jc)
sgd <- callJMethod(x@sdf, "cube", jcol)
groupedData(sgd)
})

#' rollup
#'
#' Create a multi-dimensional rollup for the SparkDataFrame using the specified columns.
#'
#' If grouping expression is missing \code{rollup} creates a single global aggregate and is equivalent to
#' direct application of \link{agg}.
#'
#' @param x a SparkDataFrame.
#' @param ... character name(s) or Column(s) to group on.
#' @return A GroupedData.
#' @family SparkDataFrame functions
#' @aliases rollup,SparkDataFrame-method
#' @rdname rollup
#' @name rollup
#' @export
#' @examples
#'\dontrun{
#' df <- createDataFrame(mtcars)
#' mean(rollup(df, "cyl", "gear", "am"), "mpg")
#'
#' # Following calls are equivalent
#' agg(rollup(carsDF), mean(carsDF$mpg))
#' agg(carsDF, mean(carsDF$mpg))
#' }
#' @note rollup since 2.3.0
#' @seealso \link{agg}, \link{cube}, \link{groupBy}
setMethod("rollup",
signature(x = "SparkDataFrame"),
function(x, ...) {
cols <- list(...)
jcol <- lapply(cols, function(x) if (class(x) == "Column") x@jc else column(x)@jc)
sgd <- callJMethod(x@sdf, "rollup", jcol)
groupedData(sgd)
})
8 changes: 8 additions & 0 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,10 @@ setGeneric("createOrReplaceTempView",
# @export
setGeneric("crossJoin", function(x, y) { standardGeneric("crossJoin") })

#' @rdname cube
#' @export
setGeneric("cube", function(x, ...) { standardGeneric("cube") })

#' @rdname dapply
#' @export
setGeneric("dapply", function(x, func, schema) { standardGeneric("dapply") })
Expand Down Expand Up @@ -631,6 +635,10 @@ setGeneric("sample",
standardGeneric("sample")
})

#' @rdname rollup
#' @export
setGeneric("rollup", function(x, ...) { standardGeneric("rollup") })

#' @rdname sample
#' @export
setGeneric("sample_frac",
Expand Down
17 changes: 2 additions & 15 deletions R/pkg/inst/tests/testthat/test_mllib_classification.R
Original file line number Diff line number Diff line change
Expand Up @@ -284,22 +284,11 @@ test_that("spark.mlp", {
c("1.0", "1.0", "1.0", "1.0", "0.0", "1.0", "2.0", "2.0", "1.0", "0.0"))

# test initialWeights
model <- spark.mlp(df, label ~ features, layers = c(4, 3), maxIter = 2, initialWeights =
model <- spark.mlp(df, label ~ features, layers = c(4, 3), initialWeights =
c(0, 0, 0, 0, 0, 5, 5, 5, 5, 5, 9, 9, 9, 9, 9))
mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
expect_equal(head(mlpPredictions$prediction, 10),
c("1.0", "1.0", "2.0", "1.0", "2.0", "1.0", "2.0", "2.0", "1.0", "0.0"))

model <- spark.mlp(df, label ~ features, layers = c(4, 3), maxIter = 2, initialWeights =
c(0.0, 0.0, 0.0, 0.0, 0.0, 5.0, 5.0, 5.0, 5.0, 5.0, 9.0, 9.0, 9.0, 9.0, 9.0))
mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
expect_equal(head(mlpPredictions$prediction, 10),
c("1.0", "1.0", "2.0", "1.0", "2.0", "1.0", "2.0", "2.0", "1.0", "0.0"))

model <- spark.mlp(df, label ~ features, layers = c(4, 3), maxIter = 2)
mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
expect_equal(head(mlpPredictions$prediction, 10),
c("1.0", "1.0", "1.0", "1.0", "0.0", "1.0", "0.0", "0.0", "1.0", "0.0"))
c("1.0", "1.0", "1.0", "1.0", "0.0", "1.0", "2.0", "2.0", "1.0", "0.0"))

# Test formula works well
df <- suppressWarnings(createDataFrame(iris))
Expand All @@ -310,8 +299,6 @@ test_that("spark.mlp", {
expect_equal(summary$numOfOutputs, 3)
expect_equal(summary$layers, c(4, 3))
expect_equal(length(summary$weights), 15)
expect_equal(head(summary$weights, 5), list(-0.5793153, -4.652961, 6.216155, -6.649478,
-10.51147), tolerance = 1e-3)
})

test_that("spark.naiveBayes", {
Expand Down
102 changes: 102 additions & 0 deletions R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -1816,6 +1816,108 @@ test_that("pivot GroupedData column", {
expect_error(collect(sum(pivot(groupBy(df, "year"), "course", list("R", "R")), "earnings")))
})

test_that("test multi-dimensional aggregations with cube and rollup", {
df <- createDataFrame(data.frame(
id = 1:6,
year = c(2016, 2016, 2016, 2017, 2017, 2017),
salary = c(10000, 15000, 20000, 22000, 32000, 21000),
department = c("management", "rnd", "sales", "management", "rnd", "sales")
))

actual_cube <- collect(
orderBy(
agg(
cube(df, "year", "department"),
expr("sum(salary) AS total_salary"), expr("avg(salary) AS average_salary")
),
"year", "department"
)
)

expected_cube <- data.frame(
year = c(rep(NA, 4), rep(2016, 4), rep(2017, 4)),
department = rep(c(NA, "management", "rnd", "sales"), times = 3),
total_salary = c(
120000, # Total
10000 + 22000, 15000 + 32000, 20000 + 21000, # Department only
20000 + 15000 + 10000, # 2016
10000, 15000, 20000, # 2016 each department
21000 + 32000 + 22000, # 2017
22000, 32000, 21000 # 2017 each department
),
average_salary = c(
# Total
mean(c(20000, 15000, 10000, 21000, 32000, 22000)),
# Mean by department
mean(c(10000, 22000)), mean(c(15000, 32000)), mean(c(20000, 21000)),
mean(c(10000, 15000, 20000)), # 2016
10000, 15000, 20000, # 2016 each department
mean(c(21000, 32000, 22000)), # 2017
22000, 32000, 21000 # 2017 each department
),
stringsAsFactors = FALSE
)

expect_equal(actual_cube, expected_cube)

# cube should accept column objects
expect_equal(
count(sum(cube(df, df$year, df$department), "salary")),
12
)

# cube without columns should result in a single aggregate
expect_equal(
collect(agg(cube(df), expr("sum(salary) as total_salary"))),
data.frame(total_salary = 120000)
)

actual_rollup <- collect(
orderBy(
agg(
rollup(df, "year", "department"),
expr("sum(salary) AS total_salary"), expr("avg(salary) AS average_salary")
),
"year", "department"
)
)

expected_rollup <- data.frame(
year = c(NA, rep(2016, 4), rep(2017, 4)),
department = c(NA, rep(c(NA, "management", "rnd", "sales"), times = 2)),
total_salary = c(
120000, # Total
20000 + 15000 + 10000, # 2016
10000, 15000, 20000, # 2016 each department
21000 + 32000 + 22000, # 2017
22000, 32000, 21000 # 2017 each department
),
average_salary = c(
# Total
mean(c(20000, 15000, 10000, 21000, 32000, 22000)),
mean(c(10000, 15000, 20000)), # 2016
10000, 15000, 20000, # 2016 each department
mean(c(21000, 32000, 22000)), # 2017
22000, 32000, 21000 # 2017 each department
),
stringsAsFactors = FALSE
)

expect_equal(actual_rollup, expected_rollup)

# cube should accept column objects
expect_equal(
count(sum(rollup(df, df$year, df$department), "salary")),
9
)

# rollup without columns should result in a single aggregate
expect_equal(
collect(agg(rollup(df), expr("sum(salary) as total_salary"))),
data.frame(total_salary = 120000)
)
})

test_that("arrange() and orderBy() on a DataFrame", {
df <- read.json(jsonPath)
sorted <- arrange(df, df$age)
Expand Down
15 changes: 15 additions & 0 deletions R/pkg/vignettes/sparkr-vignettes.Rmd
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,21 @@ numCyl <- summarize(groupBy(carsDF, carsDF$cyl), count = n(carsDF$cyl))
head(numCyl)
```

Use `cube` or `rollup` to compute subtotals across multiple dimensions.

```{r}
mean(cube(carsDF, "cyl", "gear", "am"), "mpg")
```

generates groupings for {(`cyl`, `gear`, `am`), (`cyl`, `gear`), (`cyl`), ()}, while

```{r}
mean(rollup(carsDF, "cyl", "gear", "am"), "mpg")
```

generates groupings for all possible combinations of grouping columns.


#### Operating on Columns

SparkR also provides a number of functions that can directly applied to columns for data processing and during aggregation. The example below shows the use of basic arithmetic functions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -340,17 +339,17 @@ protected Path getRecoveryPath(String fileName) {
* when it previously was not. If YARN NM recovery is enabled it uses that path, otherwise
* it will uses a YARN local dir.
*/
protected File initRecoveryDb(String dbFileName) {
protected File initRecoveryDb(String dbName) {
if (_recoveryPath != null) {
File recoveryFile = new File(_recoveryPath.toUri().getPath(), dbFileName);
File recoveryFile = new File(_recoveryPath.toUri().getPath(), dbName);
if (recoveryFile.exists()) {
return recoveryFile;
}
}
// db doesn't exist in recovery path go check local dirs for it
String[] localDirs = _conf.getTrimmedStrings("yarn.nodemanager.local-dirs");
for (String dir : localDirs) {
File f = new File(new Path(dir).toUri().getPath(), dbFileName);
File f = new File(new Path(dir).toUri().getPath(), dbName);
if (f.exists()) {
if (_recoveryPath == null) {
// If NM recovery is not enabled, we should specify the recovery path using NM local
Expand All @@ -363,25 +362,29 @@ protected File initRecoveryDb(String dbFileName) {
// make sure to move all DBs to the recovery path from the old NM local dirs.
// If another DB was initialized first just make sure all the DBs are in the same
// location.
File newLoc = new File(_recoveryPath.toUri().getPath(), dbFileName);
if (!newLoc.equals(f)) {
Path newLoc = new Path(_recoveryPath, dbName);
Path copyFrom = new Path(f.toURI());
if (!newLoc.equals(copyFrom)) {
logger.info("Moving " + copyFrom + " to: " + newLoc);
try {
Files.move(f.toPath(), newLoc.toPath());
// The move here needs to handle moving non-empty directories across NFS mounts
FileSystem fs = FileSystem.getLocal(_conf);
fs.rename(copyFrom, newLoc);
} catch (Exception e) {
// Fail to move recovery file to new path, just continue on with new DB location
logger.error("Failed to move recovery file {} to the path {}",
dbFileName, _recoveryPath.toString(), e);
dbName, _recoveryPath.toString(), e);
}
}
return newLoc;
return new File(newLoc.toUri().getPath());
}
}
}
if (_recoveryPath == null) {
_recoveryPath = new Path(localDirs[0]);
}

return new File(_recoveryPath.toUri().getPath(), dbFileName);
return new File(_recoveryPath.toUri().getPath(), dbName);
}

/**
Expand Down
Loading