-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-7264][ML] Parallel lapply for sparkR #12426
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 15 commits
bd73c5b
0ca1094
0643df2
0299d8b
745a103
a824d90
cc86264
1df83cb
651954f
2f7c60f
1a2daaf
2ad7b89
a97f4df
6aa61d2
2433f25
9ca6e15
378b437
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -226,6 +226,49 @@ setCheckpointDir <- function(sc, dirName) { | |
| invisible(callJMethod(sc, "setCheckpointDir", suppressWarnings(normalizePath(dirName)))) | ||
| } | ||
|
|
||
| #' @title Run a function over a list of elements, distributing the computations with Spark. | ||
| #' | ||
| #' @description | ||
| #' Applies a function in a manner that is similar to doParallel or lapply to elements of a list. | ||
| #' The computations are distributed using Spark. It is conceptually the same as the following code: | ||
| #' lapply(list, func) | ||
| #' | ||
| #' Known limitations: | ||
| #' - variable scoping and capture: compared to R's rich support for variable resolutions, the | ||
| # distributed nature of SparkR limits how variables are resolved at runtime. All the variables | ||
| # that are available through lexical scoping are embedded in the closure of the function and | ||
| # available as read-only variables within the function. The environment variables should be | ||
| # stored into temporary variables outside the function, and not directly accessed within the | ||
| # function. | ||
| #' | ||
| #' - loading external packages: In order to use a package, you need to load it inside the | ||
| #' closure. For example, if you rely on the MASS module, here is how you would use it: | ||
| #' | ||
| #'\dontrun{ | ||
| #' train <- function(hyperparam) { | ||
| #' library(MASS) | ||
| #' lm.ridge(“y ~ x+z”, data, lambda=hyperparam) | ||
| #' model | ||
| #' } | ||
| #'} | ||
| #' | ||
| #' @rdname spark.lapply | ||
| #' @param list the list of elements | ||
| #' @param func a function that takes one argument. | ||
| #' @return a list of results (the exact type being determined by the function) | ||
| #' @export | ||
| #' @examples | ||
| #'\dontrun{ | ||
| #' doubled <- spark.lapply(1:10, function(x){2 * x}) | ||
| #'} | ||
| spark.lapply <- function(list, func) { | ||
| sc <- get(".sparkRjsc", envir = .sparkREnv) | ||
| rdd <- parallelize(sc, list, length(list)) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm guess people could possibly get confused about when to call this vs when to call the newly proposed
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. dapply and spark.lapply have different schematics. No need to check class(list) here as a DataFrame can be treated as a list of columns. parallelize() will issue warning for DataFrame at here: https://github.com/apache/spark/blob/master/R/pkg/R/context.R#L110
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It actually fails here instead https://github.com/apache/spark/blob/master/R/pkg/R/context.R#L116 |
||
| results <- map(rdd, func) | ||
| local <- collect(results) | ||
| local | ||
| } | ||
|
|
||
| #' Set new log level | ||
| #' | ||
| #' Set new log level: "ALL", "DEBUG", "ERROR", "FATAL", "INFO", "OFF", "TRACE", "WARN" | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -141,3 +141,8 @@ test_that("sparkJars sparkPackages as comma-separated strings", { | |
| expect_that(processSparkJars(f), not(gives_warning())) | ||
| expect_match(processSparkJars(f), f) | ||
| }) | ||
|
|
||
| test_that("sparkLapply should perform simple transforms", { | ||
|
||
| doubled <- spark.lapply(1:10, function(x){2 * x}) | ||
| expect_equal(doubled, as.list(2 * 1:10)) | ||
| }) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would be good to add a test where we capture some environment variables and/or use a package. Also we should update https://github.com/apache/spark/blob/master/docs/sparkr.md but we can open another JIRA for that I guess. |
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One minor thing: All the existing functions like
parallelizetake in a Spark context as the first argument. We've discussed removing this in the past (See #9192) but we didn't reach a resolution on it.So to be consistent it'd be better to take in
scas the first argument here ?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I thought it was part of the design but I am happy to do that as it simplifies that piece of code.