Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion R/WINDOWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ To build SparkR on Windows, the following steps are required
include Rtools and R in `PATH`.

2. Install
[JDK7](http://www.oracle.com/technetwork/java/javase/downloads/jdk7-downloads-1880260.html) and set
[JDK8](http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html) and set
`JAVA_HOME` in the system environment variables.

3. Download and install [Maven](http://maven.apache.org/download.html). Also include the `bin`
Expand Down
2 changes: 2 additions & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ exportMethods("%in%",
"floor",
"format_number",
"format_string",
"from_json",
"from_unixtime",
"from_utc_timestamp",
"getField",
Expand Down Expand Up @@ -327,6 +328,7 @@ exportMethods("%in%",
"toDegrees",
"toRadians",
"to_date",
"to_json",
"to_timestamp",
"to_utc_timestamp",
"translate",
Expand Down
57 changes: 57 additions & 0 deletions R/pkg/R/functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -1793,6 +1793,33 @@ setMethod("to_date",
column(jc)
})

#' to_json
#'
#' Converts a column containing a \code{structType} into a Column of JSON string.
#' Resolving the Column can fail if an unsupported type is encountered.
#'
#' @param x Column containing the struct
#' @param ... additional named properties to control how it is converted, accepts the same options
#' as the JSON data source.
#'
#' @family normal_funcs
#' @rdname to_json
#' @name to_json
#' @aliases to_json,Column-method
#' @export
#' @examples
#' \dontrun{
#' to_json(df$t, dateFormat = 'dd/MM/yyyy')
#' select(df, to_json(df$t))
#'}
#' @note to_json since 2.2.0
setMethod("to_json", signature(x = "Column"),
function(x, ...) {
options <- varargsToStrEnv(...)
jc <- callJStatic("org.apache.spark.sql.functions", "to_json", x@jc, options)
column(jc)
})

#' to_timestamp
#'
#' Converts the column into a TimestampType. You may optionally specify a format
Expand Down Expand Up @@ -2403,6 +2430,36 @@ setMethod("date_format", signature(y = "Column", x = "character"),
column(jc)
})

#' from_json
#'
#' Parses a column containing a JSON string into a Column of \code{structType} with the specified
#' \code{schema}. If the string is unparseable, the Column will contains the value NA.
#'
#' @param x Column containing the JSON string.
#' @param schema a structType object to use as the schema to use when parsing the JSON string.
#' @param ... additional named properties to control how the json is parsed, accepts the same
#' options as the JSON data source.
#'
#' @family normal_funcs
#' @rdname from_json
#' @name from_json
#' @aliases from_json,Column,structType-method
#' @export
#' @examples
#' \dontrun{
#' schema <- structType(structField("name", "string"),
#' select(df, from_json(df$value, schema, dateFormat = "dd/MM/yyyy"))
#'}
#' @note from_json since 2.2.0
setMethod("from_json", signature(x = "Column", schema = "structType"),
function(x, schema, ...) {
options <- varargsToStrEnv(...)
jc <- callJStatic("org.apache.spark.sql.functions",
"from_json",
x@jc, schema$jobj, options)
column(jc)
})

#' from_utc_timestamp
#'
#' Given a timestamp, which corresponds to a certain time of day in UTC, returns another timestamp
Expand Down
8 changes: 8 additions & 0 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -991,6 +991,10 @@ setGeneric("format_number", function(y, x) { standardGeneric("format_number") })
#' @export
setGeneric("format_string", function(format, x, ...) { standardGeneric("format_string") })

#' @rdname from_json
#' @export
setGeneric("from_json", function(x, schema, ...) { standardGeneric("from_json") })

#' @rdname from_unixtime
#' @export
setGeneric("from_unixtime", function(x, ...) { standardGeneric("from_unixtime") })
Expand Down Expand Up @@ -1265,6 +1269,10 @@ setGeneric("toRadians", function(x) { standardGeneric("toRadians") })
#' @export
setGeneric("to_date", function(x, format) { standardGeneric("to_date") })

#' @rdname to_json
#' @export
setGeneric("to_json", function(x, ...) { standardGeneric("to_json") })

#' @rdname to_timestamp
#' @export
setGeneric("to_timestamp", function(x, format) { standardGeneric("to_timestamp") })
Expand Down
43 changes: 36 additions & 7 deletions R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,13 @@ mockLinesComplexType <-
complexTypeJsonPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
writeLines(mockLinesComplexType, complexTypeJsonPath)

# For test map type and struct type in DataFrame
mockLinesMapType <- c("{\"name\":\"Bob\",\"info\":{\"age\":16,\"height\":176.5}}",
"{\"name\":\"Alice\",\"info\":{\"age\":20,\"height\":164.3}}",
"{\"name\":\"David\",\"info\":{\"age\":60,\"height\":180}}")
mapTypeJsonPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
writeLines(mockLinesMapType, mapTypeJsonPath)

test_that("calling sparkRSQL.init returns existing SQL context", {
sqlContext <- suppressWarnings(sparkRSQL.init(sc))
expect_equal(suppressWarnings(sparkRSQL.init(sc)), sqlContext)
Expand Down Expand Up @@ -466,13 +473,6 @@ test_that("create DataFrame from a data.frame with complex types", {
expect_equal(ldf$an_envir, collected$an_envir)
})

# For test map type and struct type in DataFrame
mockLinesMapType <- c("{\"name\":\"Bob\",\"info\":{\"age\":16,\"height\":176.5}}",
"{\"name\":\"Alice\",\"info\":{\"age\":20,\"height\":164.3}}",
"{\"name\":\"David\",\"info\":{\"age\":60,\"height\":180}}")
mapTypeJsonPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
writeLines(mockLinesMapType, mapTypeJsonPath)

test_that("Collect DataFrame with complex types", {
# ArrayType
df <- read.json(complexTypeJsonPath)
Expand Down Expand Up @@ -1337,6 +1337,33 @@ test_that("column functions", {
df <- createDataFrame(data.frame(x = c(2.5, 3.5)))
expect_equal(collect(select(df, bround(df$x, 0)))[[1]][1], 2)
expect_equal(collect(select(df, bround(df$x, 0)))[[1]][2], 4)

# Test to_json(), from_json()
df <- read.json(mapTypeJsonPath)
j <- collect(select(df, alias(to_json(df$info), "json")))
expect_equal(j[order(j$json), ][1], "{\"age\":16,\"height\":176.5}")
df <- as.DataFrame(j)
schema <- structType(structField("age", "integer"),
structField("height", "double"))
s <- collect(select(df, alias(from_json(df$json, schema), "structcol")))
expect_equal(ncol(s), 1)
expect_equal(nrow(s), 3)
expect_is(s[[1]][[1]], "struct")
expect_true(any(apply(s, 1, function(x) { x[[1]]$age == 16 } )))

# passing option
df <- as.DataFrame(list(list("col" = "{\"date\":\"21/10/2014\"}")))
schema2 <- structType(structField("date", "date"))
expect_error(tryCatch(collect(select(df, from_json(df$col, schema2))),
error = function(e) { stop(e) }),
paste0(".*(java.lang.NumberFormatException: For input string:).*"))
s <- collect(select(df, from_json(df$col, schema2, dateFormat = "dd/MM/yyyy")))
expect_is(s[[1]][[1]]$date, "Date")
expect_equal(as.character(s[[1]][[1]]$date), "2014-10-21")

# check for unparseable
df <- as.DataFrame(list(list("a" = "")))
expect_equal(collect(select(df, from_json(df$a, schema)))[[1]][[1]], NA)
})

test_that("column binary mathfunctions", {
Expand Down Expand Up @@ -2867,5 +2894,7 @@ unlink(parquetPath)
unlink(orcPath)
unlink(jsonPath)
unlink(jsonPathNa)
unlink(complexTypeJsonPath)
unlink(mapTypeJsonPath)

sparkR.session.stop()
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
val aliveWorkers = state.workers.filter(_.state == WorkerState.ALIVE)
val workerTable = UIUtils.listingTable(workerHeaders, workerRow, workers)

val appHeaders = Seq("Application ID", "Name", "Cores", "Memory per Node", "Submitted Time",
val appHeaders = Seq("Application ID", "Name", "Cores", "Memory per Executor", "Submitted Time",
"User", "State", "Duration")
val activeApps = state.activeApps.sortBy(_.startTime).reverse
val activeAppsTable = UIUtils.listingTable(appHeaders, appRow, activeApps)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@ import java.util.Objects
import javax.xml.bind.DatatypeConverter

import scala.math.{BigDecimal, BigInt}
import scala.reflect.runtime.universe.TypeTag
import scala.util.Try

import org.json4s.JsonAST._

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, ScalaReflection}
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -153,6 +155,14 @@ object Literal {
Literal(CatalystTypeConverters.convertToCatalyst(v), dataType)
}

def create[T : TypeTag](v: T): Literal = Try {
val ScalaReflection.Schema(dataType, _) = ScalaReflection.schemaFor[T]
val convert = CatalystTypeConverters.createToCatalystConverter(dataType)
Literal(convert(v), dataType)
}.getOrElse {
Literal(v)
}

/**
* Create a literal with default value for given DataType
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ package org.apache.spark.sql.catalyst.expressions

import java.nio.charset.StandardCharsets

import scala.reflect.runtime.universe.{typeTag, TypeTag}

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection}
import org.apache.spark.sql.catalyst.encoders.ExamplePointUDT
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -75,6 +77,9 @@ class LiteralExpressionSuite extends SparkFunSuite with ExpressionEvalHelper {
test("boolean literals") {
checkEvaluation(Literal(true), true)
checkEvaluation(Literal(false), false)

checkEvaluation(Literal.create(true), true)
checkEvaluation(Literal.create(false), false)
}

test("int literals") {
Expand All @@ -83,36 +88,60 @@ class LiteralExpressionSuite extends SparkFunSuite with ExpressionEvalHelper {
checkEvaluation(Literal(d.toLong), d.toLong)
checkEvaluation(Literal(d.toShort), d.toShort)
checkEvaluation(Literal(d.toByte), d.toByte)

checkEvaluation(Literal.create(d), d)
checkEvaluation(Literal.create(d.toLong), d.toLong)
checkEvaluation(Literal.create(d.toShort), d.toShort)
checkEvaluation(Literal.create(d.toByte), d.toByte)
}
checkEvaluation(Literal(Long.MinValue), Long.MinValue)
checkEvaluation(Literal(Long.MaxValue), Long.MaxValue)

checkEvaluation(Literal.create(Long.MinValue), Long.MinValue)
checkEvaluation(Literal.create(Long.MaxValue), Long.MaxValue)
}

test("double literals") {
List(0.0, -0.0, Double.NegativeInfinity, Double.PositiveInfinity).foreach { d =>
checkEvaluation(Literal(d), d)
checkEvaluation(Literal(d.toFloat), d.toFloat)

checkEvaluation(Literal.create(d), d)
checkEvaluation(Literal.create(d.toFloat), d.toFloat)
}
checkEvaluation(Literal(Double.MinValue), Double.MinValue)
checkEvaluation(Literal(Double.MaxValue), Double.MaxValue)
checkEvaluation(Literal(Float.MinValue), Float.MinValue)
checkEvaluation(Literal(Float.MaxValue), Float.MaxValue)

checkEvaluation(Literal.create(Double.MinValue), Double.MinValue)
checkEvaluation(Literal.create(Double.MaxValue), Double.MaxValue)
checkEvaluation(Literal.create(Float.MinValue), Float.MinValue)
checkEvaluation(Literal.create(Float.MaxValue), Float.MaxValue)

}

test("string literals") {
checkEvaluation(Literal(""), "")
checkEvaluation(Literal("test"), "test")
checkEvaluation(Literal("\u0000"), "\u0000")

checkEvaluation(Literal.create(""), "")
checkEvaluation(Literal.create("test"), "test")
checkEvaluation(Literal.create("\u0000"), "\u0000")
}

test("sum two literals") {
checkEvaluation(Add(Literal(1), Literal(1)), 2)
checkEvaluation(Add(Literal.create(1), Literal.create(1)), 2)
}

test("binary literals") {
checkEvaluation(Literal.create(new Array[Byte](0), BinaryType), new Array[Byte](0))
checkEvaluation(Literal.create(new Array[Byte](2), BinaryType), new Array[Byte](2))

checkEvaluation(Literal.create(new Array[Byte](0)), new Array[Byte](0))
checkEvaluation(Literal.create(new Array[Byte](2)), new Array[Byte](2))
}

test("decimal") {
Expand All @@ -124,24 +153,63 @@ class LiteralExpressionSuite extends SparkFunSuite with ExpressionEvalHelper {
Decimal((d * 1000L).toLong, 10, 3))
checkEvaluation(Literal(BigDecimal(d.toString)), Decimal(d))
checkEvaluation(Literal(new java.math.BigDecimal(d.toString)), Decimal(d))

checkEvaluation(Literal.create(Decimal(d)), Decimal(d))
checkEvaluation(Literal.create(Decimal(d.toInt)), Decimal(d.toInt))
checkEvaluation(Literal.create(Decimal(d.toLong)), Decimal(d.toLong))
checkEvaluation(Literal.create(Decimal((d * 1000L).toLong, 10, 3)),
Decimal((d * 1000L).toLong, 10, 3))
checkEvaluation(Literal.create(BigDecimal(d.toString)), Decimal(d))
checkEvaluation(Literal.create(new java.math.BigDecimal(d.toString)), Decimal(d))

}
}

private def toCatalyst[T: TypeTag](value: T): Any = {
val ScalaReflection.Schema(dataType, _) = ScalaReflection.schemaFor[T]
CatalystTypeConverters.createToCatalystConverter(dataType)(value)
}

test("array") {
def checkArrayLiteral(a: Array[_], elementType: DataType): Unit = {
val toCatalyst = (a: Array[_], elementType: DataType) => {
CatalystTypeConverters.createToCatalystConverter(ArrayType(elementType))(a)
}
checkEvaluation(Literal(a), toCatalyst(a, elementType))
def checkArrayLiteral[T: TypeTag](a: Array[T]): Unit = {
checkEvaluation(Literal(a), toCatalyst(a))
checkEvaluation(Literal.create(a), toCatalyst(a))
}
checkArrayLiteral(Array(1, 2, 3))
checkArrayLiteral(Array("a", "b", "c"))
checkArrayLiteral(Array(1.0, 4.0))
checkArrayLiteral(Array(CalendarInterval.MICROS_PER_DAY, CalendarInterval.MICROS_PER_HOUR))
}

test("seq") {
def checkSeqLiteral[T: TypeTag](a: Seq[T], elementType: DataType): Unit = {
checkEvaluation(Literal.create(a), toCatalyst(a))
}
checkArrayLiteral(Array(1, 2, 3), IntegerType)
checkArrayLiteral(Array("a", "b", "c"), StringType)
checkArrayLiteral(Array(1.0, 4.0), DoubleType)
checkArrayLiteral(Array(CalendarInterval.MICROS_PER_DAY, CalendarInterval.MICROS_PER_HOUR),
checkSeqLiteral(Seq(1, 2, 3), IntegerType)
checkSeqLiteral(Seq("a", "b", "c"), StringType)
checkSeqLiteral(Seq(1.0, 4.0), DoubleType)
checkSeqLiteral(Seq(CalendarInterval.MICROS_PER_DAY, CalendarInterval.MICROS_PER_HOUR),
CalendarIntervalType)
}

test("unsupported types (map and struct) in literals") {
test("map") {
def checkMapLiteral[T: TypeTag](m: T): Unit = {
checkEvaluation(Literal.create(m), toCatalyst(m))
}
checkMapLiteral(Map("a" -> 1, "b" -> 2, "c" -> 3))
checkMapLiteral(Map("1" -> 1.0, "2" -> 2.0, "3" -> 3.0))
}

test("struct") {
def checkStructLiteral[T: TypeTag](s: T): Unit = {
checkEvaluation(Literal.create(s), toCatalyst(s))
}
checkStructLiteral((1, 3.0, "abcde"))
checkStructLiteral(("de", 1, 2.0f))
checkStructLiteral((1, ("fgh", 3.0)))
}

test("unsupported types (map and struct) in Literal.apply") {
def checkUnsupportedTypeInLiteral(v: Any): Unit = {
val errMsgMap = intercept[RuntimeException] {
Literal(v)
Expand Down
Loading