Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions python/pyspark/sql/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -1744,6 +1744,29 @@ def from_json(col, schema, options={}):
return Column(jc)


@ignore_unicode_prefix
@since(2.1)
def to_json(col, options={}):
"""
Converts a column containing a [[StructType]] into a JSON string. Throws an exception,
in the case of an unsupported type.

:param col: name of column containing the struct
:param options: options to control converting. accepts the same options as the json datasource

>>> from pyspark.sql import Row
>>> from pyspark.sql.types import *
>>> data = [(1, Row(name='Alice', age=2))]
>>> df = spark.createDataFrame(data, ("key", "value"))
>>> df.select(to_json(df.value).alias("json")).collect()
[Row(json=u'{"age":2,"name":"Alice"}')]
"""

sc = SparkContext._active_spark_context
jc = sc._jvm.functions.to_json(_to_java_column(col), options)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is super minor, but there is a pretty consistent pattern for all of the other functions here (including from_json), it might be good to follow that same pattern for consistencies sake since there isn't an obvious reason why that wouldn't work here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@holdenk Thank you for your comment. Could you please a bit elaborate this comment? I am a bit not sure on what to fix.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually nvm my original comment, the more I look at this file the less it seems the pattern is overly consistent and this same pattern is done elsewhere within the file.

return Column(jc)


@since(1.5)
def size(col):
"""
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None):
"""
Loads a JSON file (`JSON Lines text format or newline-delimited JSON
<[http://jsonlines.org/>`_) or an RDD of Strings storing JSON objects (one object per
<http://jsonlines.org/>`_) or an RDD of Strings storing JSON objects (one object per
record) and returns the result as a :class`DataFrame`.

If the ``schema`` parameter is not specified, this function goes
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/sql/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
timestampFormat=None):
"""
Loads a JSON file stream (`JSON Lines text format or newline-delimited JSON
<[http://jsonlines.org/>`_) and returns a :class`DataFrame`.
<http://jsonlines.org/>`_) and returns a :class`DataFrame`.

If the ``schema`` parameter is not specified, this function goes
through the input once to determine the input schema.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@

package org.apache.spark.sql.catalyst.expressions

import java.io.{ByteArrayOutputStream, StringWriter}
import java.io.{ByteArrayOutputStream, CharArrayWriter, StringWriter}

import scala.util.parsing.combinator.RegexParsers

import com.fasterxml.jackson.core._

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions, SparkSQLJsonProcessingException}
import org.apache.spark.sql.catalyst.json._
import org.apache.spark.sql.catalyst.util.ParseModes
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
Expand Down Expand Up @@ -494,3 +495,46 @@ case class JsonToStruct(schema: StructType, options: Map[String, String], child:

override def inputTypes: Seq[AbstractDataType] = StringType :: Nil
}

/**
* Converts a [[StructType]] to a json output string.
*/
case class StructToJson(options: Map[String, String], child: Expression)
extends Expression with CodegenFallback with ExpectsInputTypes {
override def nullable: Boolean = true

@transient
lazy val writer = new CharArrayWriter()

@transient
lazy val gen =
new JacksonGenerator(child.dataType.asInstanceOf[StructType], writer)

override def dataType: DataType = StringType
override def children: Seq[Expression] = child :: Nil

override def checkInputDataTypes(): TypeCheckResult = {
if (StructType.acceptsType(child.dataType)) {
try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, one final comment as I'm looking at this more closely. I don't think we should use exceptions for control flow in the common case. Specifically, verifySchema should work the same way as acceptsType above and return a boolean.

Copy link
Member Author

@HyukjinKwon HyukjinKwon Oct 29, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yes, makes sense but if verifySchema returns a boolean, we could not find which field and type are problematic.

Maybe, I can make do one of the below:

  • make this logic in verifySchema into checkInputDataTypes
  • make verifySchema return the unsupported fields. and types.
  • Just fix the exception message without the information of unsupported fields and types.

If you pick one, I will follow (or please let me know if there is a better way)!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see. It is for the better message. I guess its probably not worth the time to refactor in that case.

JacksonUtils.verifySchema(child.dataType.asInstanceOf[StructType])
TypeCheckResult.TypeCheckSuccess
} catch {
case e: UnsupportedOperationException =>
TypeCheckResult.TypeCheckFailure(e.getMessage)
}
} else {
TypeCheckResult.TypeCheckFailure(
s"$prettyName requires that the expression is a struct expression.")
}
}

override def eval(input: InternalRow): Any = {
gen.write(child.eval(input).asInstanceOf[InternalRow])
gen.flush()
val json = writer.toString
writer.reset()
UTF8String.fromString(json)
}

override def inputTypes: Seq[AbstractDataType] = StructType :: Nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,14 @@
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.json
package org.apache.spark.sql.catalyst.json

import java.io.Writer

import com.fasterxml.jackson.core._

import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.json.JSONOptions
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils, MapData}
import org.apache.spark.sql.types._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.json

import com.fasterxml.jackson.core.{JsonParser, JsonToken}

import org.apache.spark.sql.types._

object JacksonUtils {
/**
* Advance the parser until a null or a specific token is found
Expand All @@ -29,4 +31,28 @@ object JacksonUtils {
case x => x != stopOn
}
}

/**
* Verify if the schema is supported in JSON parsing.
*/
def verifySchema(schema: StructType): Unit = {
def verifyType(name: String, dataType: DataType): Unit = dataType match {
case NullType | BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType |
DoubleType | StringType | TimestampType | DateType | BinaryType | _: DecimalType =>

case st: StructType => st.foreach(field => verifyType(field.name, field.dataType))

case at: ArrayType => verifyType(name, at.elementType)

case mt: MapType => verifyType(name, mt.keyType)

case udt: UserDefinedType[_] => verifyType(name, udt.sqlType)

case _ =>
throw new UnsupportedOperationException(
s"Unable to convert column $name of type ${dataType.simpleString} to JSON.")
}

schema.foreach(field => verifyType(field.name, field.dataType))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -343,4 +343,13 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
null
)
}

test("to_json") {
val schema = StructType(StructField("a", IntegerType) :: Nil)
val struct = Literal.create(create_row(1), schema)
checkEvaluation(
StructToJson(Map.empty, struct),
"""{"a":1}"""
)
}
}
2 changes: 1 addition & 1 deletion sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.encoders._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.json.JacksonGenerator
import org.apache.spark.sql.catalyst.optimizer.CombineUnions
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
Expand All @@ -45,7 +46,6 @@ import org.apache.spark.sql.catalyst.util.usePrettyExpression
import org.apache.spark.sql.execution.{FileRelation, LogicalRDD, QueryExecution, SQLExecution}
import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand, GlobalTempView, LocalTempView}
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.json.JacksonGenerator
import org.apache.spark.sql.execution.python.EvaluatePython
import org.apache.spark.sql.streaming.DataStreamWriter
import org.apache.spark.sql.types._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions}
import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JacksonParser, JSONOptions}
import org.apache.spark.sql.catalyst.util.CompressionCodecs
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.text.TextOutputWriter
Expand Down
44 changes: 43 additions & 1 deletion sql/core/src/main/scala/org/apache/spark/sql/functions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2883,10 +2883,10 @@ object functions {
* (Scala-specific) Parses a column containing a JSON string into a [[StructType]] with the
* specified schema. Returns `null`, in the case of an unparseable string.
*
* @param e a string column containing JSON data.
* @param schema the schema to use when parsing the json string
* @param options options to control how the json is parsed. accepts the same options and the
* json data source.
* @param e a string column containing JSON data.
*
* @group collection_funcs
* @since 2.1.0
Expand Down Expand Up @@ -2936,6 +2936,48 @@ object functions {
def from_json(e: Column, schema: String, options: java.util.Map[String, String]): Column =
from_json(e, DataType.fromJson(schema).asInstanceOf[StructType], options)


/**
* (Scala-specific) Converts a column containing a [[StructType]] into a JSON string with the
* specified schema. Throws an exception, in the case of an unsupported type.
*
* @param e a struct column.
* @param options options to control how the struct column is converted into a json string.
* accepts the same options and the json data source.
*
* @group collection_funcs
* @since 2.1.0
*/
def to_json(e: Column, options: Map[String, String]): Column = withExpr {
StructToJson(options, e.expr)
}

/**
* (Java-specific) Converts a column containing a [[StructType]] into a JSON string with the
* specified schema. Throws an exception, in the case of an unsupported type.
*
* @param e a struct column.
* @param options options to control how the struct column is converted into a json string.
* accepts the same options and the json data source.
*
* @group collection_funcs
* @since 2.1.0
*/
def to_json(e: Column, options: java.util.Map[String, String]): Column =
to_json(e, options.asScala.toMap)

/**
* Converts a column containing a [[StructType]] into a JSON string with the
* specified schema. Throws an exception, in the case of an unsupported type.
*
* @param e a struct column.
*
* @group collection_funcs
* @since 2.1.0
*/
def to_json(e: Column): Column =
to_json(e, Map.empty[String, String])

/**
* Returns length of array or map.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

package org.apache.spark.sql

import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.functions.{from_json, struct, to_json}
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.{IntegerType, StructType}
import org.apache.spark.sql.types.{CalendarIntervalType, IntegerType, StructType}

class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
import testImplicits._
Expand All @@ -31,7 +31,6 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
Row("alice", "5"))
}


val tuples: Seq[(String, String)] =
("1", """{"f1": "value1", "f2": "value2", "f3": 3, "f5": 5.23}""") ::
("2", """{"f1": "value12", "f3": "value3", "f2": 2, "f4": 4.01}""") ::
Expand Down Expand Up @@ -97,7 +96,7 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
checkAnswer(expr, expected)
}

test("json_parser") {
test("from_json") {
val df = Seq("""{"a": 1}""").toDS()
val schema = new StructType().add("a", IntegerType)

Expand All @@ -106,7 +105,7 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
Row(Row(1)) :: Nil)
}

test("json_parser missing columns") {
test("from_json missing columns") {
val df = Seq("""{"a": 1}""").toDS()
val schema = new StructType().add("b", IntegerType)

Expand All @@ -115,12 +114,31 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
Row(Row(null)) :: Nil)
}

test("json_parser invalid json") {
test("from_json invalid json") {
val df = Seq("""{"a" 1}""").toDS()
val schema = new StructType().add("a", IntegerType)

checkAnswer(
df.select(from_json($"value", schema)),
Row(null) :: Nil)
}

test("to_json") {
val df = Seq(Tuple1(Tuple1(1))).toDF("a")

checkAnswer(
df.select(to_json($"a")),
Row("""{"_1":1}""") :: Nil)
}

test("to_json unsupported type") {
val df = Seq(Tuple1(Tuple1("interval -3 month 7 hours"))).toDF("a")
.select(struct($"a._1".cast(CalendarIntervalType).as("a")).as("c"))
val e = intercept[AnalysisException]{
// Unsupported type throws an exception
df.select(to_json($"c")).collect()
}
assert(e.getMessage.contains(
"Unable to convert column a of type calendarinterval to JSON."))
}
}