Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ export("as.DataFrame",
"read.json",
"read.parquet",
"read.text",
"spark.lapply",
"sql",
"str",
"tableToDF",
Expand Down
42 changes: 42 additions & 0 deletions R/pkg/R/context.R
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,48 @@ setCheckpointDir <- function(sc, dirName) {
invisible(callJMethod(sc, "setCheckpointDir", suppressWarnings(normalizePath(dirName))))
}

#' @title Run a function over a list of elements, distributing the computations with Spark.
#'
#' @description
#' Applies a function in a manner that is similar to doParallel or lapply to elements of a list.
#' The computations are distributed using Spark. It is conceptually the same as the following code:
#' lapply(list, func)
#'
#' Known limitations:
#' - variable scoping and capture: compared to R's rich support for variable resolutions, the
# distributed nature of SparkR limits how variables are resolved at runtime. All the variables
# that are available through lexical scoping are embedded in the closure of the function and
# available as read-only variables within the function. The environment variables should be
# stored into temporary variables outside the function, and not directly accessed within the
# function.
#'
#' - loading external packages: In order to use a package, you need to load it inside the
#' closure. For example, if you rely on the MASS module, here is how you would use it:
#'\dontrun{
#' train <- function(hyperparam) {
#' library(MASS)
#' lm.ridge(“y ~ x+z”, data, lambda=hyperparam)
#' model
#' }
#'}
#'
#' @rdname spark.lapply
#' @param sc Spark Context to use
#' @param list the list of elements
#' @param func a function that takes one argument.
#' @return a list of results (the exact type being determined by the function)
#' @export
#' @examples
#'\dontrun{
#' doubled <- spark.lapply(1:10, function(x){2 * x})
#'}
spark.lapply <- function(sc, list, func) {
rdd <- parallelize(sc, list, length(list))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guess people could possibly get confused about when to call this vs when to call the newly proposed dapply (#12493) Perhaps we need to explain this more and check for class(list) in the event someone is passing in a Spark DataFrame to this function.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dapply and spark.lapply have different schematics. No need to check class(list) here as a DataFrame can be treated as a list of columns. parallelize() will issue warning for DataFrame at here: https://github.com/apache/spark/blob/master/R/pkg/R/context.R#L110

Copy link
Member

@felixcheung felixcheung Apr 20, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It actually fails here instead https://github.com/apache/spark/blob/master/R/pkg/R/context.R#L116
Spark DataFrame is not is.data.frame

results <- map(rdd, func)
local <- collect(results)
local
}

#' Set new log level
#'
#' Set new log level: "ALL", "DEBUG", "ERROR", "FATAL", "INFO", "OFF", "TRACE", "WARN"
Expand Down
6 changes: 6 additions & 0 deletions R/pkg/inst/tests/testthat/test_context.R
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,9 @@ test_that("sparkJars sparkPackages as comma-separated strings", {
expect_that(processSparkJars(f), not(gives_warning()))
expect_match(processSparkJars(f), f)
})

test_that("spark.lapply should perform simple transforms", {
sc <- sparkR.init()
doubled <- spark.lapply(sc, 1:10, function(x) { 2 * x })
expect_equal(doubled, as.list(2 * 1:10))
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be good to add a test where we capture some environment variables and/or use a package. Also we should update https://github.com/apache/spark/blob/master/docs/sparkr.md but we can open another JIRA for that I guess.