Skip to content
Closed
11 changes: 10 additions & 1 deletion R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -2624,6 +2624,15 @@ setMethod("except",
setMethod("write.df",
signature(df = "SparkDataFrame"),
function(df, path = NULL, source = NULL, mode = "error", ...) {
if (!is.character(path) && !is.null(path)) {
stop("path should be charactor, null or omitted.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"character"?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same below

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor point: is it more efficient to flip the checks, ie.
if (!is.null(path) && !is.character(path)), since path defaults to NULL?

}
if (!is.character(source) && !is.null(source)) {
stop("source should be charactor, null or omitted. It is 'parquet' by default.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

strictly speaking, it's spark.sql.sources.default property, and when it is not set, then it is parquet

}
if (!is.character(mode)) {
stop("mode should be charactor or omitted. It is 'error' by default.")
}
if (is.null(source)) {
source <- getDefaultSqlSource()
}
Expand All @@ -2636,7 +2645,7 @@ setMethod("write.df",
write <- callJMethod(write, "format", source)
write <- callJMethod(write, "mode", jmode)
write <- callJMethod(write, "options", options)
write <- callJMethod(write, "save")
write <- tryCatch(callJMethod(write, "save"), error = captureJVMException)
})

#' @rdname write.df
Expand Down
22 changes: 16 additions & 6 deletions R/pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,12 @@ dropTempView <- function(viewName) {
#' @method read.df default
#' @note read.df since 1.4.0
read.df.default <- function(path = NULL, source = NULL, schema = NULL, na.strings = "NA", ...) {
if (!is.character(path) && !is.null(path)) {
stop("path should be charactor, null or omitted.")
}
if (!is.character(source) && !is.null(source)) {
stop("source should be charactor, null or omitted. It is 'parquet' by default.")
}
sparkSession <- getSparkSession()
options <- varargsToEnv(...)
if (!is.null(path)) {
Expand All @@ -784,16 +790,20 @@ read.df.default <- function(path = NULL, source = NULL, schema = NULL, na.string
}
if (!is.null(schema)) {
stopifnot(class(schema) == "structType")
sdf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "loadDF", sparkSession, source,
schema$jobj, options)
sdf <- tryCatch({
callJStatic("org.apache.spark.sql.api.r.SQLUtils", "loadDF", sparkSession, source,
schema$jobj, options)
}, error = captureJVMException)
} else {
sdf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils",
"loadDF", sparkSession, source, options)
sdf <- tryCatch({
callJStatic("org.apache.spark.sql.api.r.SQLUtils",
"loadDF", sparkSession, source, options)
}, error = captureJVMException)
}
dataFrame(sdf)
}

read.df <- function(x, ...) {
read.df <- function(x = NULL, ...) {
dispatchFunc("read.df(path = NULL, source = NULL, schema = NULL, ...)", x, ...)
}

Expand All @@ -805,7 +815,7 @@ loadDF.default <- function(path = NULL, source = NULL, schema = NULL, ...) {
read.df(path, source, schema, ...)
}

loadDF <- function(x, ...) {
loadDF <- function(x = NULL, ...) {
dispatchFunc("loadDF(path = NULL, source = NULL, schema = NULL, ...)", x, ...)
}

Expand Down
15 changes: 15 additions & 0 deletions R/pkg/R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,21 @@ isSparkRShell <- function() {
grepl(".*shell\\.R$", Sys.getenv("R_PROFILE_USER"), perl = TRUE)
}

captureJVMException <- function(e) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'll be great to add some tests that would trigger tryCatch and this function?

Copy link
Member

@felixcheung felixcheung Sep 26, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since there are 3 (or 2) cases here and their handling seem nontrivial, could you add a test for each of the exception main types?

stacktrace <- as.character(e)
if (any(grep("java.lang.IllegalArgumentException: ", stacktrace))) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are there cases where the IllegalArgument should be checked on the R side first to avoid the exception in the first place?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! @felixcheung I will address all other comments above. However, for this one, I was thinking hard but it seems not easy because we won't know if given data source is valid or not in R side first.

I might be able to do this only for internal data sources or known databricks datasources such as "redshift" or "xml" like.. creating a map for our internal data sources and then checking a path is given or not. However, I am not sure if it is a good idea to manage another list for datasources.

Copy link
Member

@felixcheung felixcheung Sep 26, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, I don't think we should couple the R code to the underlining data source implementations, and was not suggesting that :)

I guess I'm saying there are still many (other) cases where the parameters are unchecked and would be good to see if this check to convert JVM IllegalArgumentException is sufficient or more checks should be added to the R side.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. Yeap. This might be about best effort thing. I think I tried (if I am right) all combinations of parameters mssing/wrong in the APIs. One exceptional case for both APIs is, they throw an exception, ClassCastException when the extra options are wrongly typed, which I think we should check within R side and this will be handled in #15239
I might better open another PR for validating parameters across SparkR if you think it is okay.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great, thanks - generally I'd prefer having parameter checks in R; though in this case I think we need balance the added code complicity and reduced usability (by checking more, it might fail where it didn't before).

so I'm not 100% sure we should add parameter checks all across the board.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeap, I do understand and will investigate it with keeping this in mind :)

msg <- strsplit(stacktrace, "java.lang.IllegalArgumentException: ", fixed = TRUE)[[1]][2]
first <- strsplit(msg, "\r?\n\tat")[[1]][1]
stop(first)
} else if (any(grep("org.apache.spark.sql.AnalysisException: ", stacktrace))) {
msg <- strsplit(stacktrace, "org.apache.spark.sql.AnalysisException: ", fixed = TRUE)[[1]][2]
first <- strsplit(msg, "\r?\n\tat")[[1]][1]
stop(first)
} else {
stop(stacktrace)
}
}

# rbind a list of rows with raw (binary) columns
#
# @param inputData a list of rows, with each row a list
Expand Down
31 changes: 27 additions & 4 deletions R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -2544,13 +2544,36 @@ test_that("Spark version from SparkSession", {
expect_equal(ver, version)
})

test_that("Call DataFrameWriter.save() API in Java without path", {
test_that("Call DataFrameWriter.save() API in Java without path and check argument types", {
df <- read.df(jsonPath, "json")
# This tests if the exception is thrown from Spark side not from SparkR side.
# This tests if the exception is thrown from JVM not from SparkR side.
# It makes sure that we can omit path argument in write.df API and then it calls
# DataFrameWriter.save() without path.
expect_error(write.df(df, source = "csv"),
"java.lang.IllegalArgumentException: 'path' is not specified")
expect_error(write.df(df, source = "csv"), "'path' is not specified")

# Arguments checking in R side.
expect_error(write.df(df, "data.tmp", source = c(1, 2)),
"source should be charactor, null or omitted. It is 'parquet' by default.")
expect_error(write.df(df, path = c(3)),
"path should be charactor, null or omitted.")
expect_error(write.df(df, mode = TRUE),
"mode should be charactor or omitted. It is 'error' by default.")
})

test_that("Call DataFrameWriter.load() API in Java without path and check argument types", {
df <- read.df(jsonPath, "json")
# This tests if the exception is thrown from JVM not from SparkR side.
# It makes sure that we can omit path argument in read.df API and then it calls
# DataFrameWriter.load() without path.
expect_error(read.df(source = "json"),
"Unable to infer schema for JSON at . It must be specified manually")
expect_error(read.df("arbitrary_path"), "Path does not exist:")

# Arguments checking in R side.
expect_error(read.df(path = c(3)),
"path should be charactor, null or omitted.")
expect_error(read.df(jsonPath, source = c(1, 2)),
"source should be charactor, null or omitted. It is 'parquet' by default.")
})

unlink(parquetPath)
Expand Down