Skip to content
Closed
6 changes: 3 additions & 3 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -2625,10 +2625,10 @@ setMethod("write.df",
signature(df = "SparkDataFrame"),
function(df, path = NULL, source = NULL, mode = "error", ...) {
if (!is.null(path) && !is.character(path)) {
stop("path should be charactor, null or omitted.")
stop("path should be charactor, NULL or omitted.")
}
if (!is.null(source) && !is.character(source)) {
stop("source should be character, null or omitted. It is the datasource specified ",
stop("source should be character, NULL or omitted. It is the datasource specified ",
"in 'spark.sql.sources.default' configuration by default.")
}
if (!is.character(mode)) {
Expand All @@ -2646,7 +2646,7 @@ setMethod("write.df",
write <- callJMethod(write, "format", source)
write <- callJMethod(write, "mode", jmode)
write <- callJMethod(write, "options", options)
write <- tryCatch(callJMethod(write, "save"), error = captureJVMException)
write <- handledCallJMethod(write, "save")
})

#' @rdname write.df
Expand Down
14 changes: 6 additions & 8 deletions R/pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -772,10 +772,10 @@ dropTempView <- function(viewName) {
#' @note read.df since 1.4.0
read.df.default <- function(path = NULL, source = NULL, schema = NULL, na.strings = "NA", ...) {
if (!is.null(path) && !is.character(path)) {
stop("path should be charactor, null or omitted.")
stop("path should be charactor, NULL or omitted.")
}
if (!is.null(source) && !is.character(source)) {
stop("source should be character, null or omitted. It is the datasource specified ",
stop("source should be character, NULL or omitted. It is the datasource specified ",
"in 'spark.sql.sources.default' configuration by default.")
}
sparkSession <- getSparkSession()
Expand All @@ -791,13 +791,11 @@ read.df.default <- function(path = NULL, source = NULL, schema = NULL, na.string
}
if (!is.null(schema)) {
stopifnot(class(schema) == "structType")
sdf <- tryCatch(callJStatic("org.apache.spark.sql.api.r.SQLUtils", "loadDF", sparkSession,
source, schema$jobj, options),
error = captureJVMException)
sdf <- handledCallJStatic("org.apache.spark.sql.api.r.SQLUtils", "loadDF", sparkSession,
source, schema$jobj, options)
} else {
sdf <- tryCatch(callJStatic("org.apache.spark.sql.api.r.SQLUtils", "loadDF", sparkSession,
source, options),
error = captureJVMException)
sdf <- handledCallJStatic("org.apache.spark.sql.api.r.SQLUtils", "loadDF", sparkSession,
source, options)
}
dataFrame(sdf)
}
Expand Down
55 changes: 46 additions & 9 deletions R/pkg/R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -698,18 +698,55 @@ isSparkRShell <- function() {
grepl(".*shell\\.R$", Sys.getenv("R_PROFILE_USER"), perl = TRUE)
}

captureJVMException <- function(e) {
stacktrace <- as.character(e)
# Works identically with `callJStatic(...)` but throws a pretty formatted exception.
handledCallJStatic <- function(cls, method, ...) {
result <- tryCatch(callJStatic(cls, method, ...),
error = function(e) {
captureJVMException(e, method)
})
result
}

# Works identically with `callJMethod(...)` but throws a pretty formatted exception.
handledCallJMethod <- function(obj, method, ...) {
result <- tryCatch(callJMethod(obj, method, ...),
error = function(e) {
captureJVMException(e, method)
})
result
}

captureJVMException <- function(e, method) {
rawmsg <- as.character(e)
if (any(grep("^Error in .*?: ", rawmsg))) {
# If the exception message starts with "Error in ...", this is possibly
# "Error in invokeJava(...)". Here, it replaces the characters to
# `paste("Error in", method, ":")` in order to identify which function
# was called in JVM side.
stacktrace <- strsplit(rawmsg, "Error in .*?: ")[[1]]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very minor nit: you could probably replace the double pass with grep above and strsplit with just the result from strsplit

rmsg <- paste("Error in", method, ":")
stacktrace <- paste(rmsg[1], stacktrace[2])
} else {
# Otherwise, do not convert the error message just in case.
stacktrace <- rawmsg
}

if (any(grep("java.lang.IllegalArgumentException: ", stacktrace))) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are there cases where the IllegalArgument should be checked on the R side first to avoid the exception in the first place?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! @felixcheung I will address all other comments above. However, for this one, I was thinking hard but it seems not easy because we won't know if given data source is valid or not in R side first.

I might be able to do this only for internal data sources or known databricks datasources such as "redshift" or "xml" like.. creating a map for our internal data sources and then checking a path is given or not. However, I am not sure if it is a good idea to manage another list for datasources.

Copy link
Member

@felixcheung felixcheung Sep 26, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, I don't think we should couple the R code to the underlining data source implementations, and was not suggesting that :)

I guess I'm saying there are still many (other) cases where the parameters are unchecked and would be good to see if this check to convert JVM IllegalArgumentException is sufficient or more checks should be added to the R side.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. Yeap. This might be about best effort thing. I think I tried (if I am right) all combinations of parameters mssing/wrong in the APIs. One exceptional case for both APIs is, they throw an exception, ClassCastException when the extra options are wrongly typed, which I think we should check within R side and this will be handled in #15239
I might better open another PR for validating parameters across SparkR if you think it is okay.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great, thanks - generally I'd prefer having parameter checks in R; though in this case I think we need balance the added code complicity and reduced usability (by checking more, it might fail where it didn't before).

so I'm not 100% sure we should add parameter checks all across the board.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeap, I do understand and will investigate it with keeping this in mind :)

msg <- strsplit(stacktrace, "java.lang.IllegalArgumentException: ", fixed = TRUE)[[1]][2]
first <- strsplit(msg, "\r?\n\tat")[[1]][1]
stop(first)
msg <- strsplit(stacktrace, "java.lang.IllegalArgumentException: ", fixed = TRUE)[[1]]
# Extract "Error in ..." message.
rmsg <- msg[1]
# Extract the first message of JVM exception.
first <- strsplit(msg[2], "\r?\n\tat")[[1]][1]
stop(paste0(rmsg, "illegal argument - ", first), call. = FALSE)
} else if (any(grep("org.apache.spark.sql.AnalysisException: ", stacktrace))) {
msg <- strsplit(stacktrace, "org.apache.spark.sql.AnalysisException: ", fixed = TRUE)[[1]][2]
first <- strsplit(msg, "\r?\n\tat")[[1]][1]
stop(first)
msg <- strsplit(stacktrace, "org.apache.spark.sql.AnalysisException: ", fixed = TRUE)[[1]]
# Extract "Error in ..." message.
rmsg <- msg[1]
# Extract the first message of JVM exception.
first <- strsplit(msg[2], "\r?\n\tat")[[1]][1]
stop(paste0(rmsg, "analysis error - ", first), call. = FALSE)
} else {
stop(stacktrace)
stop(stacktrace, call. = FALSE)
}
}

Expand Down
16 changes: 9 additions & 7 deletions R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -2549,14 +2549,15 @@ test_that("Call DataFrameWriter.save() API in Java without path and check argume
# This tests if the exception is thrown from JVM not from SparkR side.
# It makes sure that we can omit path argument in write.df API and then it calls
# DataFrameWriter.save() without path.
expect_error(write.df(df, source = "csv"), "'path' is not specified")
expect_error(write.df(df, source = "csv"),
"Error in save : illegal argument - 'path' is not specified")

# Arguments checking in R side.
expect_error(write.df(df, "data.tmp", source = c(1, 2)),
paste("source should be character, null or omitted. It is the datasource specified",
paste("source should be character, NULL or omitted. It is the datasource specified",
"in 'spark.sql.sources.default' configuration by default."))
expect_error(write.df(df, path = c(3)),
"path should be charactor, null or omitted.")
"path should be charactor, NULL or omitted.")
expect_error(write.df(df, mode = TRUE),
"mode should be charactor or omitted. It is 'error' by default.")
})
Expand All @@ -2566,14 +2567,15 @@ test_that("Call DataFrameWriter.load() API in Java without path and check argume
# It makes sure that we can omit path argument in read.df API and then it calls
# DataFrameWriter.load() without path.
expect_error(read.df(source = "json"),
"Unable to infer schema for JSON at . It must be specified manually")
expect_error(read.df("arbitrary_path"), "Path does not exist:")
paste("Error in loadDF : analysis error - Unable to infer schema for JSON at .",
"It must be specified manually"))
expect_error(read.df("arbitrary_path"), "Error in loadDF : analysis error - Path does not exist")

# Arguments checking in R side.
expect_error(read.df(path = c(3)),
"path should be charactor, null or omitted.")
"path should be charactor, NULL or omitted.")
expect_error(read.df(jsonPath, source = c(1, 2)),
paste("source should be character, null or omitted. It is the datasource specified",
paste("source should be character, NULL or omitted. It is the datasource specified",
"in 'spark.sql.sources.default' configuration by default."))
})

Expand Down
9 changes: 6 additions & 3 deletions R/pkg/inst/tests/testthat/test_utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,13 @@ test_that("convertToJSaveMode", {
})

test_that("captureJVMException", {
expect_error(tryCatch(callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getSQLDataType",
method <- "getSQLDataType"
expect_error(tryCatch(callJStatic("org.apache.spark.sql.api.r.SQLUtils", method,
Copy link
Member

@felixcheung felixcheung Oct 5, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's change this test to handledCallJStatic too?

"unknown"),
error = captureJVMException),
"Invalid type unknown")
error = function(e) {
captureJVMException(e, method)
}),
"Error in getSQLDataType : illegal argument - Invalid type unknown")
})

test_that("hashCode", {
Expand Down