diff --git a/R/WINDOWS.md b/R/WINDOWS.md index cb2eebb9ffe6..9ca7e58e20cd 100644 --- a/R/WINDOWS.md +++ b/R/WINDOWS.md @@ -6,7 +6,7 @@ To build SparkR on Windows, the following steps are required include Rtools and R in `PATH`. 2. Install -[JDK7](http://www.oracle.com/technetwork/java/javase/downloads/jdk7-downloads-1880260.html) and set +[JDK8](http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html) and set `JAVA_HOME` in the system environment variables. 3. Download and install [Maven](http://maven.apache.org/download.html). Also include the `bin` diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 81e19364ae7e..871f8e41a0f2 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -229,6 +229,7 @@ exportMethods("%in%", "floor", "format_number", "format_string", + "from_json", "from_unixtime", "from_utc_timestamp", "getField", @@ -327,6 +328,7 @@ exportMethods("%in%", "toDegrees", "toRadians", "to_date", + "to_json", "to_timestamp", "to_utc_timestamp", "translate", diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index 9e5084481fcd..edf2bcf8fdb3 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -1793,6 +1793,33 @@ setMethod("to_date", column(jc) }) +#' to_json +#' +#' Converts a column containing a \code{structType} into a Column of JSON string. +#' Resolving the Column can fail if an unsupported type is encountered. +#' +#' @param x Column containing the struct +#' @param ... additional named properties to control how it is converted, accepts the same options +#' as the JSON data source. +#' +#' @family normal_funcs +#' @rdname to_json +#' @name to_json +#' @aliases to_json,Column-method +#' @export +#' @examples +#' \dontrun{ +#' to_json(df$t, dateFormat = 'dd/MM/yyyy') +#' select(df, to_json(df$t)) +#'} +#' @note to_json since 2.2.0 +setMethod("to_json", signature(x = "Column"), + function(x, ...) { + options <- varargsToStrEnv(...) + jc <- callJStatic("org.apache.spark.sql.functions", "to_json", x@jc, options) + column(jc) + }) + #' to_timestamp #' #' Converts the column into a TimestampType. You may optionally specify a format @@ -2403,6 +2430,36 @@ setMethod("date_format", signature(y = "Column", x = "character"), column(jc) }) +#' from_json +#' +#' Parses a column containing a JSON string into a Column of \code{structType} with the specified +#' \code{schema}. If the string is unparseable, the Column will contains the value NA. +#' +#' @param x Column containing the JSON string. +#' @param schema a structType object to use as the schema to use when parsing the JSON string. +#' @param ... additional named properties to control how the json is parsed, accepts the same +#' options as the JSON data source. +#' +#' @family normal_funcs +#' @rdname from_json +#' @name from_json +#' @aliases from_json,Column,structType-method +#' @export +#' @examples +#' \dontrun{ +#' schema <- structType(structField("name", "string"), +#' select(df, from_json(df$value, schema, dateFormat = "dd/MM/yyyy")) +#'} +#' @note from_json since 2.2.0 +setMethod("from_json", signature(x = "Column", schema = "structType"), + function(x, schema, ...) { + options <- varargsToStrEnv(...) + jc <- callJStatic("org.apache.spark.sql.functions", + "from_json", + x@jc, schema$jobj, options) + column(jc) + }) + #' from_utc_timestamp #' #' Given a timestamp, which corresponds to a certain time of day in UTC, returns another timestamp diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index 647cbbdd825e..45bc12746511 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -991,6 +991,10 @@ setGeneric("format_number", function(y, x) { standardGeneric("format_number") }) #' @export setGeneric("format_string", function(format, x, ...) { standardGeneric("format_string") }) +#' @rdname from_json +#' @export +setGeneric("from_json", function(x, schema, ...) { standardGeneric("from_json") }) + #' @rdname from_unixtime #' @export setGeneric("from_unixtime", function(x, ...) { standardGeneric("from_unixtime") }) @@ -1265,6 +1269,10 @@ setGeneric("toRadians", function(x) { standardGeneric("toRadians") }) #' @export setGeneric("to_date", function(x, format) { standardGeneric("to_date") }) +#' @rdname to_json +#' @export +setGeneric("to_json", function(x, ...) { standardGeneric("to_json") }) + #' @rdname to_timestamp #' @export setGeneric("to_timestamp", function(x, format) { standardGeneric("to_timestamp") }) diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 1dd8c5ce6cb3..7c096597fea6 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -88,6 +88,13 @@ mockLinesComplexType <- complexTypeJsonPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp") writeLines(mockLinesComplexType, complexTypeJsonPath) +# For test map type and struct type in DataFrame +mockLinesMapType <- c("{\"name\":\"Bob\",\"info\":{\"age\":16,\"height\":176.5}}", + "{\"name\":\"Alice\",\"info\":{\"age\":20,\"height\":164.3}}", + "{\"name\":\"David\",\"info\":{\"age\":60,\"height\":180}}") +mapTypeJsonPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp") +writeLines(mockLinesMapType, mapTypeJsonPath) + test_that("calling sparkRSQL.init returns existing SQL context", { sqlContext <- suppressWarnings(sparkRSQL.init(sc)) expect_equal(suppressWarnings(sparkRSQL.init(sc)), sqlContext) @@ -466,13 +473,6 @@ test_that("create DataFrame from a data.frame with complex types", { expect_equal(ldf$an_envir, collected$an_envir) }) -# For test map type and struct type in DataFrame -mockLinesMapType <- c("{\"name\":\"Bob\",\"info\":{\"age\":16,\"height\":176.5}}", - "{\"name\":\"Alice\",\"info\":{\"age\":20,\"height\":164.3}}", - "{\"name\":\"David\",\"info\":{\"age\":60,\"height\":180}}") -mapTypeJsonPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp") -writeLines(mockLinesMapType, mapTypeJsonPath) - test_that("Collect DataFrame with complex types", { # ArrayType df <- read.json(complexTypeJsonPath) @@ -1337,6 +1337,33 @@ test_that("column functions", { df <- createDataFrame(data.frame(x = c(2.5, 3.5))) expect_equal(collect(select(df, bround(df$x, 0)))[[1]][1], 2) expect_equal(collect(select(df, bround(df$x, 0)))[[1]][2], 4) + + # Test to_json(), from_json() + df <- read.json(mapTypeJsonPath) + j <- collect(select(df, alias(to_json(df$info), "json"))) + expect_equal(j[order(j$json), ][1], "{\"age\":16,\"height\":176.5}") + df <- as.DataFrame(j) + schema <- structType(structField("age", "integer"), + structField("height", "double")) + s <- collect(select(df, alias(from_json(df$json, schema), "structcol"))) + expect_equal(ncol(s), 1) + expect_equal(nrow(s), 3) + expect_is(s[[1]][[1]], "struct") + expect_true(any(apply(s, 1, function(x) { x[[1]]$age == 16 } ))) + + # passing option + df <- as.DataFrame(list(list("col" = "{\"date\":\"21/10/2014\"}"))) + schema2 <- structType(structField("date", "date")) + expect_error(tryCatch(collect(select(df, from_json(df$col, schema2))), + error = function(e) { stop(e) }), + paste0(".*(java.lang.NumberFormatException: For input string:).*")) + s <- collect(select(df, from_json(df$col, schema2, dateFormat = "dd/MM/yyyy"))) + expect_is(s[[1]][[1]]$date, "Date") + expect_equal(as.character(s[[1]][[1]]$date), "2014-10-21") + + # check for unparseable + df <- as.DataFrame(list(list("a" = ""))) + expect_equal(collect(select(df, from_json(df$a, schema)))[[1]][[1]], NA) }) test_that("column binary mathfunctions", { @@ -2867,5 +2894,7 @@ unlink(parquetPath) unlink(orcPath) unlink(jsonPath) unlink(jsonPathNa) +unlink(complexTypeJsonPath) +unlink(mapTypeJsonPath) sparkR.session.stop() diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala index 7dbe32975435..e722a24d4a89 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala @@ -76,7 +76,7 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") { val aliveWorkers = state.workers.filter(_.state == WorkerState.ALIVE) val workerTable = UIUtils.listingTable(workerHeaders, workerRow, workers) - val appHeaders = Seq("Application ID", "Name", "Cores", "Memory per Node", "Submitted Time", + val appHeaders = Seq("Application ID", "Name", "Cores", "Memory per Executor", "Submitted Time", "User", "State", "Duration") val activeApps = state.activeApps.sortBy(_.startTime).reverse val activeAppsTable = UIUtils.listingTable(appHeaders, appRow, activeApps) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala index e66fb893394e..eaeaf08c37b4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala @@ -32,11 +32,13 @@ import java.util.Objects import javax.xml.bind.DatatypeConverter import scala.math.{BigDecimal, BigInt} +import scala.reflect.runtime.universe.TypeTag +import scala.util.Try import org.json4s.JsonAST._ import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, ScalaReflection} import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.types._ @@ -153,6 +155,14 @@ object Literal { Literal(CatalystTypeConverters.convertToCatalyst(v), dataType) } + def create[T : TypeTag](v: T): Literal = Try { + val ScalaReflection.Schema(dataType, _) = ScalaReflection.schemaFor[T] + val convert = CatalystTypeConverters.createToCatalystConverter(dataType) + Literal(convert(v), dataType) + }.getOrElse { + Literal(v) + } + /** * Create a literal with default value for given DataType */ diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralExpressionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralExpressionSuite.scala index 15e8e6c057ba..a9e0eb0e377a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralExpressionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/LiteralExpressionSuite.scala @@ -19,9 +19,11 @@ package org.apache.spark.sql.catalyst.expressions import java.nio.charset.StandardCharsets +import scala.reflect.runtime.universe.{typeTag, TypeTag} + import org.apache.spark.SparkFunSuite import org.apache.spark.sql.Row -import org.apache.spark.sql.catalyst.CatalystTypeConverters +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection} import org.apache.spark.sql.catalyst.encoders.ExamplePointUDT import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.types._ @@ -75,6 +77,9 @@ class LiteralExpressionSuite extends SparkFunSuite with ExpressionEvalHelper { test("boolean literals") { checkEvaluation(Literal(true), true) checkEvaluation(Literal(false), false) + + checkEvaluation(Literal.create(true), true) + checkEvaluation(Literal.create(false), false) } test("int literals") { @@ -83,36 +88,60 @@ class LiteralExpressionSuite extends SparkFunSuite with ExpressionEvalHelper { checkEvaluation(Literal(d.toLong), d.toLong) checkEvaluation(Literal(d.toShort), d.toShort) checkEvaluation(Literal(d.toByte), d.toByte) + + checkEvaluation(Literal.create(d), d) + checkEvaluation(Literal.create(d.toLong), d.toLong) + checkEvaluation(Literal.create(d.toShort), d.toShort) + checkEvaluation(Literal.create(d.toByte), d.toByte) } checkEvaluation(Literal(Long.MinValue), Long.MinValue) checkEvaluation(Literal(Long.MaxValue), Long.MaxValue) + + checkEvaluation(Literal.create(Long.MinValue), Long.MinValue) + checkEvaluation(Literal.create(Long.MaxValue), Long.MaxValue) } test("double literals") { List(0.0, -0.0, Double.NegativeInfinity, Double.PositiveInfinity).foreach { d => checkEvaluation(Literal(d), d) checkEvaluation(Literal(d.toFloat), d.toFloat) + + checkEvaluation(Literal.create(d), d) + checkEvaluation(Literal.create(d.toFloat), d.toFloat) } checkEvaluation(Literal(Double.MinValue), Double.MinValue) checkEvaluation(Literal(Double.MaxValue), Double.MaxValue) checkEvaluation(Literal(Float.MinValue), Float.MinValue) checkEvaluation(Literal(Float.MaxValue), Float.MaxValue) + checkEvaluation(Literal.create(Double.MinValue), Double.MinValue) + checkEvaluation(Literal.create(Double.MaxValue), Double.MaxValue) + checkEvaluation(Literal.create(Float.MinValue), Float.MinValue) + checkEvaluation(Literal.create(Float.MaxValue), Float.MaxValue) + } test("string literals") { checkEvaluation(Literal(""), "") checkEvaluation(Literal("test"), "test") checkEvaluation(Literal("\u0000"), "\u0000") + + checkEvaluation(Literal.create(""), "") + checkEvaluation(Literal.create("test"), "test") + checkEvaluation(Literal.create("\u0000"), "\u0000") } test("sum two literals") { checkEvaluation(Add(Literal(1), Literal(1)), 2) + checkEvaluation(Add(Literal.create(1), Literal.create(1)), 2) } test("binary literals") { checkEvaluation(Literal.create(new Array[Byte](0), BinaryType), new Array[Byte](0)) checkEvaluation(Literal.create(new Array[Byte](2), BinaryType), new Array[Byte](2)) + + checkEvaluation(Literal.create(new Array[Byte](0)), new Array[Byte](0)) + checkEvaluation(Literal.create(new Array[Byte](2)), new Array[Byte](2)) } test("decimal") { @@ -124,24 +153,63 @@ class LiteralExpressionSuite extends SparkFunSuite with ExpressionEvalHelper { Decimal((d * 1000L).toLong, 10, 3)) checkEvaluation(Literal(BigDecimal(d.toString)), Decimal(d)) checkEvaluation(Literal(new java.math.BigDecimal(d.toString)), Decimal(d)) + + checkEvaluation(Literal.create(Decimal(d)), Decimal(d)) + checkEvaluation(Literal.create(Decimal(d.toInt)), Decimal(d.toInt)) + checkEvaluation(Literal.create(Decimal(d.toLong)), Decimal(d.toLong)) + checkEvaluation(Literal.create(Decimal((d * 1000L).toLong, 10, 3)), + Decimal((d * 1000L).toLong, 10, 3)) + checkEvaluation(Literal.create(BigDecimal(d.toString)), Decimal(d)) + checkEvaluation(Literal.create(new java.math.BigDecimal(d.toString)), Decimal(d)) + } } + private def toCatalyst[T: TypeTag](value: T): Any = { + val ScalaReflection.Schema(dataType, _) = ScalaReflection.schemaFor[T] + CatalystTypeConverters.createToCatalystConverter(dataType)(value) + } + test("array") { - def checkArrayLiteral(a: Array[_], elementType: DataType): Unit = { - val toCatalyst = (a: Array[_], elementType: DataType) => { - CatalystTypeConverters.createToCatalystConverter(ArrayType(elementType))(a) - } - checkEvaluation(Literal(a), toCatalyst(a, elementType)) + def checkArrayLiteral[T: TypeTag](a: Array[T]): Unit = { + checkEvaluation(Literal(a), toCatalyst(a)) + checkEvaluation(Literal.create(a), toCatalyst(a)) + } + checkArrayLiteral(Array(1, 2, 3)) + checkArrayLiteral(Array("a", "b", "c")) + checkArrayLiteral(Array(1.0, 4.0)) + checkArrayLiteral(Array(CalendarInterval.MICROS_PER_DAY, CalendarInterval.MICROS_PER_HOUR)) + } + + test("seq") { + def checkSeqLiteral[T: TypeTag](a: Seq[T], elementType: DataType): Unit = { + checkEvaluation(Literal.create(a), toCatalyst(a)) } - checkArrayLiteral(Array(1, 2, 3), IntegerType) - checkArrayLiteral(Array("a", "b", "c"), StringType) - checkArrayLiteral(Array(1.0, 4.0), DoubleType) - checkArrayLiteral(Array(CalendarInterval.MICROS_PER_DAY, CalendarInterval.MICROS_PER_HOUR), + checkSeqLiteral(Seq(1, 2, 3), IntegerType) + checkSeqLiteral(Seq("a", "b", "c"), StringType) + checkSeqLiteral(Seq(1.0, 4.0), DoubleType) + checkSeqLiteral(Seq(CalendarInterval.MICROS_PER_DAY, CalendarInterval.MICROS_PER_HOUR), CalendarIntervalType) } - test("unsupported types (map and struct) in literals") { + test("map") { + def checkMapLiteral[T: TypeTag](m: T): Unit = { + checkEvaluation(Literal.create(m), toCatalyst(m)) + } + checkMapLiteral(Map("a" -> 1, "b" -> 2, "c" -> 3)) + checkMapLiteral(Map("1" -> 1.0, "2" -> 2.0, "3" -> 3.0)) + } + + test("struct") { + def checkStructLiteral[T: TypeTag](s: T): Unit = { + checkEvaluation(Literal.create(s), toCatalyst(s)) + } + checkStructLiteral((1, 3.0, "abcde")) + checkStructLiteral(("de", 1, 2.0f)) + checkStructLiteral((1, ("fgh", 3.0))) + } + + test("unsupported types (map and struct) in Literal.apply") { def checkUnsupportedTypeInLiteral(v: Any): Unit = { val errMsgMap = intercept[RuntimeException] { Literal(v) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 24ed906d3368..2247010ac3f3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -91,15 +91,24 @@ object functions { * @group normal_funcs * @since 1.3.0 */ - def lit(literal: Any): Column = { - literal match { - case c: Column => return c - case s: Symbol => return new ColumnName(literal.asInstanceOf[Symbol].name) - case _ => // continue - } + def lit(literal: Any): Column = typedLit(literal) - val literalExpr = Literal(literal) - Column(literalExpr) + /** + * Creates a [[Column]] of literal value. + * + * The passed in object is returned directly if it is already a [[Column]]. + * If the object is a Scala Symbol, it is converted into a [[Column]] also. + * Otherwise, a new [[Column]] is created to represent the literal value. + * The difference between this function and [[lit]] is that this function + * can handle parameterized scala types e.g.: List, Seq and Map. + * + * @group normal_funcs + * @since 2.2.0 + */ + def typedLit[T : TypeTag](literal: T): Column = literal match { + case c: Column => c + case s: Symbol => new ColumnName(s.name) + case _ => Column(Literal.create(literal)) } ////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala index ee280a313cc0..b0f398dab745 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala @@ -712,4 +712,18 @@ class ColumnExpressionSuite extends QueryTest with SharedSQLContext { testData2.select($"a".bitwiseXOR($"b").bitwiseXOR(39)), testData2.collect().toSeq.map(r => Row(r.getInt(0) ^ r.getInt(1) ^ 39))) } + + test("typedLit") { + val df = Seq(Tuple1(0)).toDF("a") + // Only check the types `lit` cannot handle + checkAnswer( + df.select(typedLit(Seq(1, 2, 3))), + Row(Seq(1, 2, 3)) :: Nil) + checkAnswer( + df.select(typedLit(Map("a" -> 1, "b" -> 2))), + Row(Map("a" -> 1, "b" -> 2)) :: Nil) + checkAnswer( + df.select(typedLit(("a", 2, 1.0))), + Row(Row("a", 2, 1.0)) :: Nil) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index 34fa626e00e3..f9808834df4a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -312,13 +312,23 @@ object QueryTest { sparkAnswer: Seq[Row], isSorted: Boolean = false): Option[String] = { if (prepareAnswer(expectedAnswer, isSorted) != prepareAnswer(sparkAnswer, isSorted)) { + val getRowType: Option[Row] => String = row => + row.map(row => + if (row.schema == null) { + "struct<>" + } else { + s"${row.schema.catalogString}" + }).getOrElse("struct<>") + val errorMessage = s""" |== Results == |${sideBySide( s"== Correct Answer - ${expectedAnswer.size} ==" +: + getRowType(expectedAnswer.headOption) +: prepareAnswer(expectedAnswer, isSorted).map(_.toString()), s"== Spark Answer - ${sparkAnswer.size} ==" +: + getRowType(sparkAnswer.headOption) +: prepareAnswer(sparkAnswer, isSorted).map(_.toString())).mkString("\n")} """.stripMargin return Some(errorMessage)