Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,12 @@ object StructType extends AbstractDataType {
}
}

/**
* Creates StructType for a given DDL-formatted string, which is a comma separated list of field
* definitions, e.g., a INT, b STRING.
*/
def fromDDL(ddl: String): StructType = CatalystSqlParser.parseTableSchema(ddl)

def apply(fields: Seq[StructField]): StructType = StructType(fields.toArray)

def apply(fields: java.util.List[StructField]): StructType = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,30 +169,72 @@ class DataTypeSuite extends SparkFunSuite {
assert(!arrayType.existsRecursively(_.isInstanceOf[IntegerType]))
}

def checkDataTypeJsonRepr(dataType: DataType): Unit = {
test(s"JSON - $dataType") {
def checkDataTypeFromJson(dataType: DataType): Unit = {
test(s"from Json - $dataType") {
assert(DataType.fromJson(dataType.json) === dataType)
}
}

checkDataTypeJsonRepr(NullType)
checkDataTypeJsonRepr(BooleanType)
checkDataTypeJsonRepr(ByteType)
checkDataTypeJsonRepr(ShortType)
checkDataTypeJsonRepr(IntegerType)
checkDataTypeJsonRepr(LongType)
checkDataTypeJsonRepr(FloatType)
checkDataTypeJsonRepr(DoubleType)
checkDataTypeJsonRepr(DecimalType(10, 5))
checkDataTypeJsonRepr(DecimalType.SYSTEM_DEFAULT)
checkDataTypeJsonRepr(DateType)
checkDataTypeJsonRepr(TimestampType)
checkDataTypeJsonRepr(StringType)
checkDataTypeJsonRepr(BinaryType)
checkDataTypeJsonRepr(ArrayType(DoubleType, true))
checkDataTypeJsonRepr(ArrayType(StringType, false))
checkDataTypeJsonRepr(MapType(IntegerType, StringType, true))
checkDataTypeJsonRepr(MapType(IntegerType, ArrayType(DoubleType), false))
def checkDataTypeFromDDL(dataType: DataType): Unit = {
test(s"from DDL - $dataType") {
val parsed = StructType.fromDDL(s"a ${dataType.sql}")
val expected = new StructType().add("a", dataType)
assert(parsed.sameType(expected))
}
}

checkDataTypeFromJson(NullType)

checkDataTypeFromJson(BooleanType)
checkDataTypeFromDDL(BooleanType)

checkDataTypeFromJson(ByteType)
checkDataTypeFromDDL(ByteType)

checkDataTypeFromJson(ShortType)
checkDataTypeFromDDL(ShortType)

checkDataTypeFromJson(IntegerType)
checkDataTypeFromDDL(IntegerType)

checkDataTypeFromJson(LongType)
checkDataTypeFromDDL(LongType)

checkDataTypeFromJson(FloatType)
checkDataTypeFromDDL(FloatType)

checkDataTypeFromJson(DoubleType)
checkDataTypeFromDDL(DoubleType)

checkDataTypeFromJson(DecimalType(10, 5))
checkDataTypeFromDDL(DecimalType(10, 5))

checkDataTypeFromJson(DecimalType.SYSTEM_DEFAULT)
checkDataTypeFromDDL(DecimalType.SYSTEM_DEFAULT)

checkDataTypeFromJson(DateType)
checkDataTypeFromDDL(DateType)

checkDataTypeFromJson(TimestampType)
checkDataTypeFromDDL(TimestampType)

checkDataTypeFromJson(StringType)
checkDataTypeFromDDL(StringType)

checkDataTypeFromJson(BinaryType)
checkDataTypeFromDDL(BinaryType)

checkDataTypeFromJson(ArrayType(DoubleType, true))
checkDataTypeFromDDL(ArrayType(DoubleType, true))

checkDataTypeFromJson(ArrayType(StringType, false))
checkDataTypeFromDDL(ArrayType(StringType, false))

checkDataTypeFromJson(MapType(IntegerType, StringType, true))
checkDataTypeFromDDL(MapType(IntegerType, StringType, true))

checkDataTypeFromJson(MapType(IntegerType, ArrayType(DoubleType), false))
checkDataTypeFromDDL(MapType(IntegerType, ArrayType(DoubleType), false))

val metadata = new MetadataBuilder()
.putString("name", "age")
Expand All @@ -201,7 +243,8 @@ class DataTypeSuite extends SparkFunSuite {
StructField("a", IntegerType, nullable = true),
StructField("b", ArrayType(DoubleType), nullable = false),
StructField("c", DoubleType, nullable = false, metadata)))
checkDataTypeJsonRepr(structType)
checkDataTypeFromJson(structType)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same here

checkDataTypeFromDDL(structType)

def checkDefaultSize(dataType: DataType, expectedDefaultSize: Int): Unit = {
test(s"Check the default size of $dataType") {
Expand Down
15 changes: 12 additions & 3 deletions sql/core/src/main/scala/org/apache/spark/sql/functions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import scala.collection.JavaConverters._
import scala.language.implicitConversions
import scala.reflect.runtime.universe.{typeTag, TypeTag}
import scala.util.Try
import scala.util.control.NonFatal

import org.apache.spark.annotation.{Experimental, InterfaceStability}
import org.apache.spark.sql.catalyst.ScalaReflection
Expand Down Expand Up @@ -3055,13 +3056,21 @@ object functions {
* with the specified schema. Returns `null`, in the case of an unparseable string.
*
* @param e a string column containing JSON data.
* @param schema the schema to use when parsing the json string as a json string
* @param schema the schema to use when parsing the json string as a json string. In Spark 2.1,
* the user-provided schema has to be in JSON format. Since Spark 2.2, the DDL
* format is also supported for the schema.
*
* @group collection_funcs
* @since 2.1.0
*/
def from_json(e: Column, schema: String, options: java.util.Map[String, String]): Column =
from_json(e, DataType.fromJson(schema), options)
def from_json(e: Column, schema: String, options: java.util.Map[String, String]): Column = {
val dataType = try {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A little concern: Won't the error message from parsing json be shadowed?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is fine, right? cc @cloud-fan

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea I think it's fine

DataType.fromJson(schema)
} catch {
case NonFatal(_) => StructType.fromDDL(schema)
}
Comment on lines +3067 to +3071
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering why parsing schema from a string is implemented here but not inside of the JsonToStructs expression? So, calling of from_json in SQL and in Scala/API has different behaviour, right. Did you do that intentionally?

Copy link
Member

@HyukjinKwon HyukjinKwon Oct 30, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you came to a wrong PR. The schema parsing was started in Scala side at maropu@fe33121#diff-b5e6d03d9c9afbfa925e039c48e31078608ea749c193e6af3087b79eb701bc7cR2877. I guess reason is that, the schema parameter was not intended to be used an expression before. Now we take it in SQL as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I just missed it, so making the code in common seems fine.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @HyukjinKwon. Ah, I see, I totally forgot it...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved parsing to a common place. Please, take a look at #30201

from_json(e, dataType, options)
}

/**
* (Scala-specific) Converts a column containing a `StructType` or `ArrayType` of `StructType`s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,13 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
Row(Seq(Row(1, "a"), Row(2, null), Row(null, null))))
}

test("from_json uses DDL strings for defining a schema") {
val df = Seq("""{"a": 1, "b": "haa"}""").toDS()
checkAnswer(
df.select(from_json($"value", "a INT, b STRING", new java.util.HashMap[String, String]())),
Row(Row(1, "haa")) :: Nil)
}

test("to_json - struct") {
val df = Seq(Tuple1(Tuple1(1))).toDF("a")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}

import org.apache.spark.sql.{sources, Row, SparkSession}
import org.apache.spark.sql.{sources, SparkSession}
import org.apache.spark.sql.catalyst.{expressions, InternalRow}
import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, GenericInternalRow, InterpretedPredicate, InterpretedProjection, JoinedRow, Literal}
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
Expand Down