Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions R/pkg/R/functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -1697,8 +1697,8 @@ setMethod("to_date",
})

#' @details
#' \code{to_json}: Converts a column containing a \code{structType}, array of \code{structType},
#' a \code{mapType} or array of \code{mapType} into a Column of JSON string.
#' \code{to_json}: Converts a column containing a \code{structType}, a \code{mapType}
#' or an \code{arrayType} into a Column of JSON string.
#' Resolving the Column can fail if an unsupported type is encountered.
#'
#' @rdname column_collection_functions
Expand Down
9 changes: 9 additions & 0 deletions R/pkg/tests/fulltests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -1667,6 +1667,15 @@ test_that("column functions", {
expect_true(any(apply(s, 1, function(x) { x[[1]]$age == 16 })))
}

# Test to_json() supports arrays of primitive types and arrays
df <- sql("SELECT array(19, 42, 70) as age")
j <- collect(select(df, alias(to_json(df$age), "json")))
expect_equal(j[order(j$json), ][1], "[19,42,70]")

df <- sql("SELECT array(array(1, 2), array(3, 4)) as matrix")
j <- collect(select(df, alias(to_json(df$matrix), "json")))
expect_equal(j[order(j$json), ][1], "[[1,2],[3,4]]")

# passing option
df <- as.DataFrame(list(list("col" = "{\"date\":\"21/10/2014\"}")))
schema2 <- structType(structField("date", "date"))
Expand Down
12 changes: 7 additions & 5 deletions python/pyspark/sql/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2289,13 +2289,11 @@ def from_json(col, schema, options={}):
@since(2.1)
def to_json(col, options={}):
"""
Converts a column containing a :class:`StructType`, :class:`ArrayType` of
:class:`StructType`\\s, a :class:`MapType` or :class:`ArrayType` of :class:`MapType`\\s
Converts a column containing a :class:`StructType`, :class:`ArrayType` or a :class:`MapType`
into a JSON string. Throws an exception, in the case of an unsupported type.

:param col: name of column containing the struct, array of the structs, the map or
array of the maps.
:param options: options to control converting. accepts the same options as the json datasource
:param col: name of column containing a struct, an array or a map.
:param options: options to control converting. accepts the same options as the JSON datasource

>>> from pyspark.sql import Row
>>> from pyspark.sql.types import *
Expand All @@ -2315,6 +2313,10 @@ def to_json(col, options={}):
>>> df = spark.createDataFrame(data, ("key", "value"))
>>> df.select(to_json(df.value).alias("json")).collect()
[Row(json=u'[{"name":"Alice"},{"name":"Bob"}]')]
>>> data = [(1, ["Alice", "Bob"])]
>>> df = spark.createDataFrame(data, ("key", "value"))
>>> df.select(to_json(df.value).alias("json")).collect()
[Row(json=u'["Alice","Bob"]')]
"""

sc = SparkContext._active_spark_context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -613,12 +613,11 @@ case class JsonToStructs(
}

/**
* Converts a [[StructType]], [[ArrayType]] of [[StructType]]s, [[MapType]]
* or [[ArrayType]] of [[MapType]]s to a json output string.
* Converts a [[StructType]], [[ArrayType]] or [[MapType]] to a JSON output string.
*/
// scalastyle:off line.size.limit
@ExpressionDescription(
usage = "_FUNC_(expr[, options]) - Returns a json string with a given struct value",
usage = "_FUNC_(expr[, options]) - Returns a JSON string with a given struct value",
examples = """
Examples:
> SELECT _FUNC_(named_struct('a', 1, 'b', 2));
Expand Down Expand Up @@ -660,15 +659,10 @@ case class StructsToJson(

@transient
lazy val gen = new JacksonGenerator(
rowSchema, writer, new JSONOptions(options, timeZoneId.get))
inputSchema, writer, new JSONOptions(options, timeZoneId.get))

@transient
lazy val rowSchema = child.dataType match {
case st: StructType => st
case ArrayType(st: StructType, _) => st
case mt: MapType => mt
case ArrayType(mt: MapType, _) => mt
}
lazy val inputSchema = child.dataType

// This converts rows to the JSON output according to the given schema.
@transient
Expand All @@ -680,47 +674,51 @@ case class StructsToJson(
UTF8String.fromString(json)
}

child.dataType match {
inputSchema match {
case _: StructType =>
(row: Any) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

child.dataType -> intputSchema

gen.write(row.asInstanceOf[InternalRow])
getAndReset()
case ArrayType(_: StructType, _) =>
case _: ArrayType =>
(arr: Any) =>
gen.write(arr.asInstanceOf[ArrayData])
getAndReset()
case _: MapType =>
(map: Any) =>
gen.write(map.asInstanceOf[MapData])
getAndReset()
case ArrayType(_: MapType, _) =>
(arr: Any) =>
gen.write(arr.asInstanceOf[ArrayData])
getAndReset()
}
}

override def dataType: DataType = StringType

override def checkInputDataTypes(): TypeCheckResult = child.dataType match {
case _: StructType | ArrayType(_: StructType, _) =>
override def checkInputDataTypes(): TypeCheckResult = inputSchema match {
case struct: StructType =>
try {
JacksonUtils.verifySchema(rowSchema.asInstanceOf[StructType])
JacksonUtils.verifySchema(struct)
TypeCheckResult.TypeCheckSuccess
} catch {
case e: UnsupportedOperationException =>
TypeCheckResult.TypeCheckFailure(e.getMessage)
}
case _: MapType | ArrayType(_: MapType, _) =>
case map: MapType =>
// TODO: let `JacksonUtils.verifySchema` verify a `MapType`
try {
val st = StructType(StructField("a", rowSchema.asInstanceOf[MapType]) :: Nil)
val st = StructType(StructField("a", map) :: Nil)
JacksonUtils.verifySchema(st)
TypeCheckResult.TypeCheckSuccess
} catch {
case e: UnsupportedOperationException =>
TypeCheckResult.TypeCheckFailure(e.getMessage)
}
case array: ArrayType =>
try {
JacksonUtils.verifyType(prettyName, array)
TypeCheckResult.TypeCheckSuccess
} catch {
case e: UnsupportedOperationException =>
TypeCheckResult.TypeCheckFailure(e.getMessage)
}
case _ => TypeCheckResult.TypeCheckFailure(
s"Input type ${child.dataType.catalogString} must be a struct, array of structs or " +
"a map or array of map.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.sql.catalyst.json

import java.io.Writer
import java.nio.charset.StandardCharsets

import com.fasterxml.jackson.core._

Expand All @@ -28,7 +27,7 @@ import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils, MapData}
import org.apache.spark.sql.types._

/**
* `JackGenerator` can only be initialized with a `StructType` or a `MapType`.
* `JackGenerator` can only be initialized with a `StructType`, a `MapType` or an `ArrayType`.
* Once it is initialized with `StructType`, it can be used to write out a struct or an array of
* struct. Once it is initialized with `MapType`, it can be used to write out a map or an array
* of map. An exception will be thrown if trying to write out a struct if it is initialized with
Expand All @@ -43,34 +42,32 @@ private[sql] class JacksonGenerator(
// we can directly access data in `ArrayData` without the help of `SpecificMutableRow`.
private type ValueWriter = (SpecializedGetters, Int) => Unit

// `JackGenerator` can only be initialized with a `StructType` or a `MapType`.
require(dataType.isInstanceOf[StructType] || dataType.isInstanceOf[MapType],
s"JacksonGenerator only supports to be initialized with a ${StructType.simpleString} " +
s"or ${MapType.simpleString} but got ${dataType.catalogString}")
// `JackGenerator` can only be initialized with a `StructType`, a `MapType` or a `ArrayType`.
require(dataType.isInstanceOf[StructType] || dataType.isInstanceOf[MapType]
|| dataType.isInstanceOf[ArrayType],
s"JacksonGenerator only supports to be initialized with a ${StructType.simpleString}, " +
s"${MapType.simpleString} or ${ArrayType.simpleString} but got ${dataType.catalogString}")

// `ValueWriter`s for all fields of the schema
private lazy val rootFieldWriters: Array[ValueWriter] = dataType match {
case st: StructType => st.map(_.dataType).map(makeWriter).toArray
case _ => throw new UnsupportedOperationException(
s"Initial type ${dataType.catalogString} must be a struct")
s"Initial type ${dataType.catalogString} must be a ${StructType.simpleString}")
}

// `ValueWriter` for array data storing rows of the schema.
private lazy val arrElementWriter: ValueWriter = dataType match {
case st: StructType =>
(arr: SpecializedGetters, i: Int) => {
writeObject(writeFields(arr.getStruct(i, st.length), st, rootFieldWriters))
}
case mt: MapType =>
(arr: SpecializedGetters, i: Int) => {
writeObject(writeMapData(arr.getMap(i), mt, mapElementWriter))
}
case at: ArrayType => makeWriter(at.elementType)
case _: StructType | _: MapType => makeWriter(dataType)
case _ => throw new UnsupportedOperationException(
s"Initial type ${dataType.catalogString} must be " +
s"an ${ArrayType.simpleString}, a ${StructType.simpleString} or a ${MapType.simpleString}")
}

private lazy val mapElementWriter: ValueWriter = dataType match {
case mt: MapType => makeWriter(mt.valueType)
case _ => throw new UnsupportedOperationException(
s"Initial type ${dataType.catalogString} must be a map")
s"Initial type ${dataType.catalogString} must be a ${MapType.simpleString}")
}

private val gen = new JsonFactory().createGenerator(writer).setRootValueSeparator(null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,8 @@ object JacksonUtils {
}
}

/**
* Verify if the schema is supported in JSON parsing.
*/
def verifySchema(schema: StructType): Unit = {
def verifyType(name: String, dataType: DataType): Unit = dataType match {
def verifyType(name: String, dataType: DataType): Unit = {
dataType match {
case NullType | BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType |
DoubleType | StringType | TimestampType | DateType | BinaryType | _: DecimalType =>

Expand All @@ -54,7 +51,12 @@ object JacksonUtils {
throw new UnsupportedOperationException(
s"Unable to convert column $name of type ${dataType.catalogString} to JSON.")
}
}

/**
* Verify if the schema is supported in JSON parsing.
*/
def verifySchema(schema: StructType): Unit = {
schema.foreach(field => verifyType(field.name, field.dataType))
}
}
18 changes: 9 additions & 9 deletions sql/core/src/main/scala/org/apache/spark/sql/functions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3493,11 +3493,11 @@ object functions {
def schema_of_json(e: Column): Column = withExpr(new SchemaOfJson(e.expr))

/**
* (Scala-specific) Converts a column containing a `StructType`, `ArrayType` of `StructType`s,
* a `MapType` or `ArrayType` of `MapType`s into a JSON string with the specified schema.
* (Scala-specific) Converts a column containing a `StructType`, `ArrayType` or
* a `MapType` into a JSON string with the specified schema.
* Throws an exception, in the case of an unsupported type.
*
* @param e a column containing a struct or array of the structs.
* @param e a column containing a struct, an array or a map.
* @param options options to control how the struct column is converted into a json string.
* accepts the same options and the json data source.
*
Expand All @@ -3509,11 +3509,11 @@ object functions {
}

/**
* (Java-specific) Converts a column containing a `StructType`, `ArrayType` of `StructType`s,
* a `MapType` or `ArrayType` of `MapType`s into a JSON string with the specified schema.
* (Java-specific) Converts a column containing a `StructType`, `ArrayType` or
* a `MapType` into a JSON string with the specified schema.
* Throws an exception, in the case of an unsupported type.
*
* @param e a column containing a struct or array of the structs.
* @param e a column containing a struct, an array or a map.
* @param options options to control how the struct column is converted into a json string.
* accepts the same options and the json data source.
*
Expand All @@ -3524,11 +3524,11 @@ object functions {
to_json(e, options.asScala.toMap)

/**
* Converts a column containing a `StructType`, `ArrayType` of `StructType`s,
* a `MapType` or `ArrayType` of `MapType`s into a JSON string with the specified schema.
* Converts a column containing a `StructType`, `ArrayType` or
* a `MapType` into a JSON string with the specified schema.
* Throws an exception, in the case of an unsupported type.
*
* @param e a column containing a struct or array of the structs.
* @param e a column containing a struct, an array or a map.
*
* @group collection_funcs
* @since 2.1.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,8 @@ select from_json('[null, {"a":2}]', 'array<struct<a:int>>');

select from_json('[{"a": 1}, {"b":2}]', 'array<map<string,int>>');
select from_json('[{"a": 1}, 2]', 'array<map<string,int>>');

-- to_json - array type
select to_json(array('1', '2', '3'));
select to_json(array(array(1, 2, 3), array(4)));

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 38
-- Number of queries: 40


-- !query 0
Expand All @@ -9,7 +9,7 @@ struct<function_desc:string>
-- !query 0 output
Class: org.apache.spark.sql.catalyst.expressions.StructsToJson
Function: to_json
Usage: to_json(expr[, options]) - Returns a json string with a given struct value
Usage: to_json(expr[, options]) - Returns a JSON string with a given struct value


-- !query 1
Expand Down Expand Up @@ -38,7 +38,7 @@ Extended Usage:
Since: 2.2.0

Function: to_json
Usage: to_json(expr[, options]) - Returns a json string with a given struct value
Usage: to_json(expr[, options]) - Returns a JSON string with a given struct value


-- !query 2
Expand Down Expand Up @@ -354,3 +354,19 @@ select from_json('[{"a": 1}, 2]', 'array<map<string,int>>')
struct<jsontostructs([{"a": 1}, 2]):array<map<string,int>>>
-- !query 37 output
NULL


-- !query 38
select to_json(array('1', '2', '3'))
-- !query 38 schema
struct<structstojson(array(1, 2, 3)):string>
-- !query 38 output
["1","2","3"]


-- !query 39
select to_json(array(array(1, 2, 3), array(4)))
-- !query 39 schema
struct<structstojson(array(array(1, 2, 3), array(4))):string>
-- !query 39 output
[[1,2,3],[4]]
Original file line number Diff line number Diff line change
Expand Up @@ -469,4 +469,53 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {

checkAnswer(sql("""select json[0] from jsonTable"""), Seq(Row(null)))
}

test("to_json - array of primitive types") {
val df = Seq(Array(1, 2, 3)).toDF("a")
checkAnswer(df.select(to_json($"a")), Seq(Row("[1,2,3]")))
}

test("roundtrip to_json -> from_json - array of primitive types") {
val arr = Array(1, 2, 3)
val df = Seq(arr).toDF("a")
checkAnswer(df.select(from_json(to_json($"a"), ArrayType(IntegerType))), Row(arr))
}

test("roundtrip from_json -> to_json - array of primitive types") {
val json = "[1,2,3]"
val df = Seq(json).toDF("a")
val schema = new ArrayType(IntegerType, false)

checkAnswer(df.select(to_json(from_json($"a", schema))), Seq(Row(json)))
}

test("roundtrip from_json -> to_json - array of arrays") {
val json = "[[1],[2,3],[4,5,6]]"
val jsonDF = Seq(json).toDF("a")
val schema = new ArrayType(ArrayType(IntegerType, false), false)

checkAnswer(
jsonDF.select(to_json(from_json($"a", schema))),
Seq(Row(json)))
}

test("roundtrip from_json -> to_json - array of maps") {
val json = """[{"a":1},{"b":2}]"""
val jsonDF = Seq(json).toDF("a")
val schema = new ArrayType(MapType(StringType, IntegerType, false), false)

checkAnswer(
jsonDF.select(to_json(from_json($"a", schema))),
Seq(Row(json)))
}

test("roundtrip from_json -> to_json - array of structs") {
val json = """[{"a":1},{"a":2},{"a":3}]"""
val jsonDF = Seq(json).toDF("a")
val schema = new ArrayType(new StructType().add("a", IntegerType), false)

checkAnswer(
jsonDF.select(to_json(from_json($"a", schema))),
Seq(Row(json)))
}
}