Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ exportMethods("%in%",
"floor",
"format_number",
"format_string",
"from_json",
"from_unixtime",
"from_utc_timestamp",
"getField",
Expand Down Expand Up @@ -327,6 +328,7 @@ exportMethods("%in%",
"toDegrees",
"toRadians",
"to_date",
"to_json",
"to_timestamp",
"to_utc_timestamp",
"translate",
Expand Down
57 changes: 57 additions & 0 deletions R/pkg/R/functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -1793,6 +1793,33 @@ setMethod("to_date",
column(jc)
})

#' to_json
#'
#' Converts a column containing a \code{structType} into a Column of JSON string.
#' Resolving the Column can fail if an unsupported type is encountered.
#'
#' @param x Column containing the struct
#' @param ... additional named properties to control how it is converted, accepts the same options
#' as the JSON data source.
#'
#' @family normal_funcs
#' @rdname to_json
#' @name to_json
#' @aliases to_json,Column-method
#' @export
#' @examples
#' \dontrun{
#' to_json(df$t, dateFormat = 'dd/MM/yyyy')
#' select(df, to_json(df$t))
#'}
#' @note to_json since 2.2.0
setMethod("to_json", signature(x = "Column"),
function(x, ...) {
options <- varargsToStrEnv(...)
jc <- callJStatic("org.apache.spark.sql.functions", "to_json", x@jc, options)
column(jc)
})

#' to_timestamp
#'
#' Converts the column into a TimestampType. You may optionally specify a format
Expand Down Expand Up @@ -2403,6 +2430,36 @@ setMethod("date_format", signature(y = "Column", x = "character"),
column(jc)
})

#' from_json
#'
#' Parses a column containing a JSON string into a Column of \code{structType} with the specified
#' \code{schema}. If the string is unparseable, the Column will contains the value NA.
#'
#' @param x Column containing the JSON string.
#' @param schema a structType object to use as the schema to use when parsing the JSON string.
#' @param ... additional named properties to control how the json is parsed, accepts the same
#' options as the JSON data source.
#'
#' @family normal_funcs
#' @rdname from_json
#' @name from_json
#' @aliases from_json,structType,Column-method
#' @export
#' @examples
#' \dontrun{
#' schema <- structType(structField("name", "string"),
#' select(df, from_json(df$value, schema, dateFormat = "dd/MM/yyyy"))
#'}
#' @note from_json since 2.2.0
setMethod("from_json", signature(x = "Column", schema = "structType"),
function(x, schema, ...) {
options <- varargsToStrEnv(...)
jc <- callJStatic("org.apache.spark.sql.functions",
"from_json",
x@jc, schema$jobj, options)
column(jc)
})

#' from_utc_timestamp
#'
#' Given a timestamp, which corresponds to a certain time of day in UTC, returns another timestamp
Expand Down
8 changes: 8 additions & 0 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -991,6 +991,10 @@ setGeneric("format_number", function(y, x) { standardGeneric("format_number") })
#' @export
setGeneric("format_string", function(format, x, ...) { standardGeneric("format_string") })

#' @rdname from_json
#' @export
setGeneric("from_json", function(x, schema, ...) { standardGeneric("from_json") })

#' @rdname from_unixtime
#' @export
setGeneric("from_unixtime", function(x, ...) { standardGeneric("from_unixtime") })
Expand Down Expand Up @@ -1265,6 +1269,10 @@ setGeneric("toRadians", function(x) { standardGeneric("toRadians") })
#' @export
setGeneric("to_date", function(x, format) { standardGeneric("to_date") })

#' @rdname to_json
#' @export
setGeneric("to_json", function(x, ...) { standardGeneric("to_json") })

#' @rdname to_timestamp
#' @export
setGeneric("to_timestamp", function(x, format) { standardGeneric("to_timestamp") })
Expand Down
42 changes: 35 additions & 7 deletions R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,13 @@ mockLinesComplexType <-
complexTypeJsonPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
writeLines(mockLinesComplexType, complexTypeJsonPath)

# For test map type and struct type in DataFrame
mockLinesMapType <- c("{\"name\":\"Bob\",\"info\":{\"age\":16,\"height\":176.5}}",
"{\"name\":\"Alice\",\"info\":{\"age\":20,\"height\":164.3}}",
"{\"name\":\"David\",\"info\":{\"age\":60,\"height\":180}}")
mapTypeJsonPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
writeLines(mockLinesMapType, mapTypeJsonPath)

test_that("calling sparkRSQL.init returns existing SQL context", {
sqlContext <- suppressWarnings(sparkRSQL.init(sc))
expect_equal(suppressWarnings(sparkRSQL.init(sc)), sqlContext)
Expand Down Expand Up @@ -466,13 +473,6 @@ test_that("create DataFrame from a data.frame with complex types", {
expect_equal(ldf$an_envir, collected$an_envir)
})

# For test map type and struct type in DataFrame
mockLinesMapType <- c("{\"name\":\"Bob\",\"info\":{\"age\":16,\"height\":176.5}}",
"{\"name\":\"Alice\",\"info\":{\"age\":20,\"height\":164.3}}",
"{\"name\":\"David\",\"info\":{\"age\":60,\"height\":180}}")
mapTypeJsonPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
writeLines(mockLinesMapType, mapTypeJsonPath)

test_that("Collect DataFrame with complex types", {
# ArrayType
df <- read.json(complexTypeJsonPath)
Expand Down Expand Up @@ -1337,6 +1337,32 @@ test_that("column functions", {
df <- createDataFrame(data.frame(x = c(2.5, 3.5)))
expect_equal(collect(select(df, bround(df$x, 0)))[[1]][1], 2)
expect_equal(collect(select(df, bround(df$x, 0)))[[1]][2], 4)

# Test to_json(), from_json()
df <- read.json(mapTypeJsonPath)
j <- collect(select(df, alias(to_json(df$info), "json")))
expect_equal(j[order(j$json), ][1], "{\"age\":16,\"height\":176.5}")
df <- as.DataFrame(j)
schema <- structType(structField("age", "integer"),
structField("height", "double"))
s <- collect(select(df, alias(from_json(df$json, schema), "structcol")))
#order
Copy link
Member Author

@felixcheung felixcheung Mar 2, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't figured out how to sort/filter by the field of the struct in a data.frame in R yet, to make this results predictable..

expect_is(s[[1]][[1]], "struct")
expect_equal(s[[1]][[1]]$age, 16)

# passing option
df <- as.DataFrame(list(list("col" = "{\"date\":\"21/10/2014\"}")))
schema2 <- structType(structField("date", "date"))
expect_error(tryCatch(collect(select(df, from_json(df$col, schema2))),
error = function(e) { stop(e) }),
paste0(".*(java.lang.NumberFormatException: For input string:).*"))
s <- collect(select(df, from_json(df$col, schema2, dateFormat = "dd/MM/yyyy")))
expect_is(s[[1]][[1]]$date, "Date")
expect_equal(as.character(s[[1]][[1]]$date), "2014-10-21")

# check for unparseable
df <- as.DataFrame(list(list("a" = "")))
expect_equal(collect(select(df, from_json(df$a, schema)))[[1]][[1]], NA)
})

test_that("column binary mathfunctions", {
Expand Down Expand Up @@ -2867,5 +2893,7 @@ unlink(parquetPath)
unlink(orcPath)
unlink(jsonPath)
unlink(jsonPathNa)
unlink(complexTypeJsonPath)
unlink(mapTypeJsonPath)

sparkR.session.stop()