From 2a26e2c680b517e9e89a0f4bc4cc31884020188d Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 2 Dec 2018 21:06:05 +0100 Subject: [PATCH 01/15] Added a test for timestamp inferring --- .../catalyst/json/JsonInferSchemaSuite.scala | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala new file mode 100644 index 000000000000..561d0bbafccc --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.json + +import com.fasterxml.jackson.core.JsonFactory + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.types.TimestampType + +class JsonInferSchemaSuite extends SparkFunSuite{ + def checkTimestampType(pattern: String, json: String): Unit = { + val options = new JSONOptions(Map("timestampFormat" -> pattern), "GMT", "") + val inferSchema = new JsonInferSchema(options) + val factory = new JsonFactory() + options.setJacksonOptions(factory) + val parser = CreateJacksonParser.string(factory, json) + + assert(inferSchema.inferField(parser) === TimestampType) + } + + test("Timestamp field types are inferred correctly via custom data format") { + checkTimestampType("yyyy-MM-dd'T'HH:mm:ss.SSSXXX", "2018-12-02T21:04:00.123") + } +} From bd472072a39dbec2e1eec1396196c6c5e6a659dd Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 2 Dec 2018 21:43:48 +0100 Subject: [PATCH 02/15] Infer date and timestamp types --- .../spark/sql/catalyst/json/JsonInferSchema.scala | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala index 263e05de3207..a4a0a40ac494 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala @@ -28,7 +28,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.analysis.TypeCoercion import org.apache.spark.sql.catalyst.expressions.ExprUtils import org.apache.spark.sql.catalyst.json.JacksonUtils.nextUntil -import org.apache.spark.sql.catalyst.util.{DropMalformedMode, FailFastMode, ParseMode, PermissiveMode} +import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -121,7 +121,18 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable { DecimalType(bigDecimal.precision, bigDecimal.scale) } decimalTry.getOrElse(StringType) - case VALUE_STRING => StringType + case VALUE_STRING => + val stringValue = parser.getText + if ((allCatch opt options.timestampFormat.parse(stringValue)).isDefined) { + TimestampType + } else if ((allCatch opt options.dateFormat.parse(stringValue)).isDefined) { + DateType + } else if ((allCatch opt DateTimeUtils.stringToTime(stringValue)).isDefined) { + // We keep this for backwards compatibility. + TimestampType + } else { + StringType + } case START_OBJECT => val builder = Array.newBuilder[StructField] From 9dbdf0a764c998875932e50faf460f36216ef58d Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 2 Dec 2018 21:44:08 +0100 Subject: [PATCH 03/15] Test for date type --- .../catalyst/json/JsonInferSchemaSuite.scala | 43 +++++++++++++++---- 1 file changed, 34 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala index 561d0bbafccc..cc669a694f58 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala @@ -20,20 +20,45 @@ package org.apache.spark.sql.catalyst.json import com.fasterxml.jackson.core.JsonFactory import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.types.TimestampType +import org.apache.spark.sql.types._ -class JsonInferSchemaSuite extends SparkFunSuite{ - def checkTimestampType(pattern: String, json: String): Unit = { - val options = new JSONOptions(Map("timestampFormat" -> pattern), "GMT", "") - val inferSchema = new JsonInferSchema(options) +class JsonInferSchemaSuite extends SparkFunSuite { + + def checkType(options: Map[String, String], json: String, `type`: DataType): Unit = { + val jsonOptions = new JSONOptions(options, "GMT", "") + val inferSchema = new JsonInferSchema(jsonOptions) val factory = new JsonFactory() - options.setJacksonOptions(factory) + jsonOptions.setJacksonOptions(factory) val parser = CreateJacksonParser.string(factory, json) + parser.nextToken() + val expectedType = StructType(Seq(StructField("a", `type`, true))) + + assert(inferSchema.inferField(parser) === expectedType) + } + + def checkTimestampType(pattern: String, json: String): Unit = { + checkType(Map("timestampFormat" -> pattern), json, TimestampType) + } + + test("inferring timestamp type") { + checkTimestampType("yyyy", """{"a": "2018"}""") + checkTimestampType("yyyy-MM", """{"a": "2018-12"}""") + checkTimestampType("yyyy-MM-dd", """{"a": "2018-12-02"}""") + checkTimestampType( + "yyyy-MM-dd'T'HH:mm:ss.SSS", + """{"a": "2018-12-02T21:04:00.123"}""") + checkTimestampType( + "yyyy-MM-dd'T'HH:mm:ss.SSSSSSXXX", + """{"a": "2018-12-02T21:04:00.123567+01:00"}""") + } - assert(inferSchema.inferField(parser) === TimestampType) + def checkDateType(pattern: String, json: String): Unit = { + checkType(Map("dateFormat" -> pattern), json, DateType) } - test("Timestamp field types are inferred correctly via custom data format") { - checkTimestampType("yyyy-MM-dd'T'HH:mm:ss.SSSXXX", "2018-12-02T21:04:00.123") + test("inferring date type") { + checkDateType("yyyy", """{"a": "2018"}""") + checkDateType("yyyy-MM", """{"a": "2018-12"}""") + checkDateType("yyyy-MM-dd", """{"a": "2018-12-02"}""") } } From 93768323778db1ace74246cde9b3dbc4ebee5f91 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 3 Dec 2018 13:36:59 +0100 Subject: [PATCH 04/15] Added a test to check that inferring of the date type is prioritised over timestamps --- .../catalyst/json/JsonInferSchemaSuite.scala | 23 +++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala index cc669a694f58..76a191873818 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala @@ -42,8 +42,8 @@ class JsonInferSchemaSuite extends SparkFunSuite { test("inferring timestamp type") { checkTimestampType("yyyy", """{"a": "2018"}""") - checkTimestampType("yyyy-MM", """{"a": "2018-12"}""") - checkTimestampType("yyyy-MM-dd", """{"a": "2018-12-02"}""") + checkTimestampType("yyyy=MM", """{"a": "2018=12"}""") + checkTimestampType("yyyy MM dd", """{"a": "2018 12 02"}""") checkTimestampType( "yyyy-MM-dd'T'HH:mm:ss.SSS", """{"a": "2018-12-02T21:04:00.123"}""") @@ -61,4 +61,23 @@ class JsonInferSchemaSuite extends SparkFunSuite { checkDateType("yyyy-MM", """{"a": "2018-12"}""") checkDateType("yyyy-MM-dd", """{"a": "2018-12-02"}""") } + + test("inferring the date type before timestamps") { + checkType( + options = Map( + "dateFormat" -> "yyyy-MM-dd", + "timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss.SSS" + ), + json = """{"a": "2018-12-02T21:04:00.123"}""", + `type` = TimestampType + ) + checkType( + options = Map( + "dateFormat" -> "yyyy-MM-dd", + "timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss.SSS" + ), + json = """{"a": "2018-12-02"}""", + `type` = DateType + ) + } } From 05bbfea96dc4cf12f25e87a5c7749629156a05ef Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 3 Dec 2018 13:43:42 +0100 Subject: [PATCH 05/15] Infer date type before timestamp type --- .../spark/sql/catalyst/json/JsonInferSchema.scala | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala index a4a0a40ac494..f19481b21395 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.json +import java.text.ParsePosition import java.util.Comparator import scala.util.control.Exception.allCatch @@ -123,10 +124,18 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable { decimalTry.getOrElse(StringType) case VALUE_STRING => val stringValue = parser.getText - if ((allCatch opt options.timestampFormat.parse(stringValue)).isDefined) { - TimestampType - } else if ((allCatch opt options.dateFormat.parse(stringValue)).isDefined) { + val dateTry = allCatch opt { + val pos = new ParsePosition(0) + options.dateFormat.parse(stringValue, pos) + if (pos.getErrorIndex != -1 || pos.getIndex != stringValue.length) { + throw new IllegalArgumentException( + s"$stringValue cannot be parsed as ${DateType.simpleString}") + } + } + if (dateTry.isDefined) { DateType + } else if ((allCatch opt options.timestampFormat.parse(stringValue)).isDefined) { + TimestampType } else if ((allCatch opt DateTimeUtils.stringToTime(stringValue)).isDefined) { // We keep this for backwards compatibility. TimestampType From 63ebf421154bb2d606740d404e0f66d39ce112ad Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 16 Dec 2018 10:39:38 +0100 Subject: [PATCH 06/15] Fix merges --- .../sql/catalyst/json/JsonInferSchema.scala | 25 ++++++++----------- .../catalyst/json/JsonInferSchemaSuite.scala | 2 +- 2 files changed, 12 insertions(+), 15 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala index f19481b21395..e8ebc8da69e4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala @@ -38,6 +38,14 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable { private val decimalParser = ExprUtils.getDecimalParser(options.locale) + @transient + private lazy val timestampFormatter = TimestampFormatter( + options.timestampFormat, + options.timeZone, + options.locale) + @transient + private lazy val dateFormatter = DateFormatter(options.dateFormat, options.locale) + /** * Infer the type of a collection of json records in three stages: * 1. Infer the type of each record @@ -124,21 +132,10 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable { decimalTry.getOrElse(StringType) case VALUE_STRING => val stringValue = parser.getText - val dateTry = allCatch opt { - val pos = new ParsePosition(0) - options.dateFormat.parse(stringValue, pos) - if (pos.getErrorIndex != -1 || pos.getIndex != stringValue.length) { - throw new IllegalArgumentException( - s"$stringValue cannot be parsed as ${DateType.simpleString}") - } - } - if (dateTry.isDefined) { - DateType - } else if ((allCatch opt options.timestampFormat.parse(stringValue)).isDefined) { - TimestampType - } else if ((allCatch opt DateTimeUtils.stringToTime(stringValue)).isDefined) { - // We keep this for backwards compatibility. + if ((allCatch opt timestampFormatter.parse(stringValue)).isDefined) { TimestampType + } else if ((allCatch opt dateFormatter.parse(stringValue)).isDefined) { + DateType } else { StringType } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala index 76a191873818..5661fb348b33 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala @@ -62,7 +62,7 @@ class JsonInferSchemaSuite extends SparkFunSuite { checkDateType("yyyy-MM-dd", """{"a": "2018-12-02"}""") } - test("inferring the date type before timestamps") { + test("strict inferring of date and timestamps") { checkType( options = Map( "dateFormat" -> "yyyy-MM-dd", From e6fc43207eb7ebc331dd2af8c7e1c909b21b2919 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 17 Dec 2018 19:23:22 +0100 Subject: [PATCH 07/15] Inferring timestamp only --- .../sql/catalyst/json/JsonInferSchema.scala | 31 +++++++------------ 1 file changed, 11 insertions(+), 20 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala index f19481b21395..3203e626ea40 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.catalyst.json -import java.text.ParsePosition import java.util.Comparator import scala.util.control.Exception.allCatch @@ -38,6 +37,11 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable { private val decimalParser = ExprUtils.getDecimalParser(options.locale) + private val timestampFormatter = TimestampFormatter( + options.timestampFormat, + options.timeZone, + options.locale) + /** * Infer the type of a collection of json records in three stages: * 1. Infer the type of each record @@ -116,28 +120,15 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable { // record fields' types have been combined. NullType - case VALUE_STRING if options.prefersDecimal => + case VALUE_STRING => + val field = parser.getText val decimalTry = allCatch opt { - val bigDecimal = decimalParser(parser.getText) + val bigDecimal = decimalParser(field) DecimalType(bigDecimal.precision, bigDecimal.scale) } - decimalTry.getOrElse(StringType) - case VALUE_STRING => - val stringValue = parser.getText - val dateTry = allCatch opt { - val pos = new ParsePosition(0) - options.dateFormat.parse(stringValue, pos) - if (pos.getErrorIndex != -1 || pos.getIndex != stringValue.length) { - throw new IllegalArgumentException( - s"$stringValue cannot be parsed as ${DateType.simpleString}") - } - } - if (dateTry.isDefined) { - DateType - } else if ((allCatch opt options.timestampFormat.parse(stringValue)).isDefined) { - TimestampType - } else if ((allCatch opt DateTimeUtils.stringToTime(stringValue)).isDefined) { - // We keep this for backwards compatibility. + if (options.prefersDecimal && decimalTry.isDefined) { + decimalTry.get + } else if ((allCatch opt timestampFormatter.parse(field)).isDefined) { TimestampType } else { StringType From b27d081f45b6a373809ac1bf0454916d35015f76 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 17 Dec 2018 19:23:49 +0100 Subject: [PATCH 08/15] Test for inferring timestamps and decimals --- .../catalyst/json/JsonInferSchemaSuite.scala | 28 ++++++------------- 1 file changed, 9 insertions(+), 19 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala index 76a191873818..1683551cda91 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala @@ -52,32 +52,22 @@ class JsonInferSchemaSuite extends SparkFunSuite { """{"a": "2018-12-02T21:04:00.123567+01:00"}""") } - def checkDateType(pattern: String, json: String): Unit = { - checkType(Map("dateFormat" -> pattern), json, DateType) - } - - test("inferring date type") { - checkDateType("yyyy", """{"a": "2018"}""") - checkDateType("yyyy-MM", """{"a": "2018-12"}""") - checkDateType("yyyy-MM-dd", """{"a": "2018-12-02"}""") - } - - test("inferring the date type before timestamps") { + test("inferring decimals and timestamps") { checkType( options = Map( - "dateFormat" -> "yyyy-MM-dd", - "timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss.SSS" + "prefersDecimal" -> "true", + "timestampFormat" -> "yyyyMMdd.HHmmssSSS" ), - json = """{"a": "2018-12-02T21:04:00.123"}""", - `type` = TimestampType + json = """{"a": "20181202.210400123"}""", + `type` = DecimalType(17, 9) ) checkType( options = Map( - "dateFormat" -> "yyyy-MM-dd", - "timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss.SSS" + "prefersDecimal" -> "false", + "timestampFormat" -> "yyyyMMdd.HHmmssSSS" ), - json = """{"a": "2018-12-02"}""", - `type` = DateType + json = """{"a": "20181202.210400123"}""", + `type` = TimestampType ) } } From c59e3e830e359649848d4fcf0f34faccd5e4711c Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 17 Dec 2018 19:24:43 +0100 Subject: [PATCH 09/15] `type` -> dt --- .../spark/sql/catalyst/json/JsonInferSchemaSuite.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala index 1683551cda91..cde3656256a7 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala @@ -24,14 +24,14 @@ import org.apache.spark.sql.types._ class JsonInferSchemaSuite extends SparkFunSuite { - def checkType(options: Map[String, String], json: String, `type`: DataType): Unit = { + def checkType(options: Map[String, String], json: String, dt: DataType): Unit = { val jsonOptions = new JSONOptions(options, "GMT", "") val inferSchema = new JsonInferSchema(jsonOptions) val factory = new JsonFactory() jsonOptions.setJacksonOptions(factory) val parser = CreateJacksonParser.string(factory, json) parser.nextToken() - val expectedType = StructType(Seq(StructField("a", `type`, true))) + val expectedType = StructType(Seq(StructField("a", dt, true))) assert(inferSchema.inferField(parser) === expectedType) } @@ -59,7 +59,7 @@ class JsonInferSchemaSuite extends SparkFunSuite { "timestampFormat" -> "yyyyMMdd.HHmmssSSS" ), json = """{"a": "20181202.210400123"}""", - `type` = DecimalType(17, 9) + dt = DecimalType(17, 9) ) checkType( options = Map( @@ -67,7 +67,7 @@ class JsonInferSchemaSuite extends SparkFunSuite { "timestampFormat" -> "yyyyMMdd.HHmmssSSS" ), json = """{"a": "20181202.210400123"}""", - `type` = TimestampType + dt = TimestampType ) } } From e7471a70b04a6b01510ef1fec59009ffca083f9d Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 17 Dec 2018 19:25:08 +0100 Subject: [PATCH 10/15] GMT -> UTC --- .../apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala index cde3656256a7..06e5434ae05e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.types._ class JsonInferSchemaSuite extends SparkFunSuite { def checkType(options: Map[String, String], json: String, dt: DataType): Unit = { - val jsonOptions = new JSONOptions(options, "GMT", "") + val jsonOptions = new JSONOptions(options, "UTC", "") val inferSchema = new JsonInferSchema(jsonOptions) val factory = new JsonFactory() jsonOptions.setJacksonOptions(factory) From 82816ed983ed8eedbc6df8caab02eb723e658fca Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 17 Dec 2018 20:41:22 +0100 Subject: [PATCH 11/15] Test for fallback to string type --- .../sql/catalyst/json/JsonInferSchemaSuite.scala | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala index 06e5434ae05e..6e7bd18f598f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala @@ -52,7 +52,7 @@ class JsonInferSchemaSuite extends SparkFunSuite { """{"a": "2018-12-02T21:04:00.123567+01:00"}""") } - test("inferring decimals and timestamps") { + test("prefer decimals over timestamps") { checkType( options = Map( "prefersDecimal" -> "true", @@ -61,6 +61,9 @@ class JsonInferSchemaSuite extends SparkFunSuite { json = """{"a": "20181202.210400123"}""", dt = DecimalType(17, 9) ) + } + + test("skip decimal type inferring") { checkType( options = Map( "prefersDecimal" -> "false", @@ -70,4 +73,12 @@ class JsonInferSchemaSuite extends SparkFunSuite { dt = TimestampType ) } + + test("fallback to string type") { + checkType( + options = Map("timestampFormat" -> "yyyy,MM,dd.HHmmssSSS"), + json = """{"a": "20181202.210400123"}""", + dt = StringType + ) + } } From b0d1374c8bcd1fb87d311cdf06dd3162a72cb2e4 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 17 Dec 2018 21:21:25 +0100 Subject: [PATCH 12/15] Fix task is not serializable --- .../org/apache/spark/sql/catalyst/json/JsonInferSchema.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala index 3203e626ea40..d1bc00c08c1c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala @@ -37,7 +37,8 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable { private val decimalParser = ExprUtils.getDecimalParser(options.locale) - private val timestampFormatter = TimestampFormatter( + @transient + private lazy val timestampFormatter = TimestampFormatter( options.timestampFormat, options.timeZone, options.locale) From 5782de5f2e98f2828fc1edfa3c3eb9a2a472d90f Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 17 Dec 2018 21:21:54 +0100 Subject: [PATCH 13/15] Added test for schema inferring --- .../execution/datasources/json/JsonSuite.scala | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 786335b42e3c..c3beb64aef9d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -38,6 +38,7 @@ import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ +import org.apache.spark.sql.types.StructType.fromDDL import org.apache.spark.util.Utils class TestFileFilter extends PathFilter { @@ -2589,4 +2590,21 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { Row(null, Array(0, 1, 2), "abc", """{"a":"-","b":[0, 1, 2],"c":"abc"}""") :: Row(0.1, null, "def", """{"a":0.1,"b":{},"c":"def"}""") :: Nil) } + + test("inferring timestamp type") { + def schemaOf(jsons: String*): StructType = spark.read.json(jsons.toDS).schema + + assert(schemaOf("""{"a":"2018-12-17T10:11:12.123-01:00"}""", + """{"a":"2018-12-16T22:23:24.123-02:00"}""") === fromDDL("a timestamp")) + + assert(schemaOf("""{"a":"2018-12-17T10:11:12.123-01:00"}""", """{"a":1}""") + === fromDDL("a string")) + assert(schemaOf("""{"a":"2018-12-17T10:11:12.123-01:00"}""", """{"a":"123"}""") + === fromDDL("a string")) + + assert(schemaOf("""{"a":"2018-12-17T10:11:12.123-01:00"}""", """{"a":null}""") + === fromDDL("a timestamp")) + assert(schemaOf("""{"a":null}""", """{"a":"2018-12-17T10:11:12.123-01:00"}""") + === fromDDL("a timestamp")) + } } From 63a656809e9218186844ed707a7f66a804d3948f Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 17 Dec 2018 21:42:59 +0100 Subject: [PATCH 14/15] Roundtrip test for timestamp inferring --- .../datasources/json/JsonSuite.scala | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index c3beb64aef9d..4f50c4d27224 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2607,4 +2607,29 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { assert(schemaOf("""{"a":null}""", """{"a":"2018-12-17T10:11:12.123-01:00"}""") === fromDDL("a timestamp")) } + + test("roundtrip for timestamp type inferring") { + val customSchema = new StructType(Array(StructField("date", TimestampType, true))) + withTempDir { dir => + val timestampsWithFormatPath = s"${dir.getCanonicalPath}/timestampsWithFormat.json" + val timestampsWithFormat = spark.read + .option("timestampFormat", "dd/MM/yyyy HH:mm") + .json(datesRecords) + assert(timestampsWithFormat.schema === customSchema) + + timestampsWithFormat.write + .format("json") + .option("timestampFormat", "yyyy-MM-dd HH:mm:ss") + .option(DateTimeUtils.TIMEZONE_OPTION, "UTC") + .save(timestampsWithFormatPath) + + val readBack = spark.read + .option("timestampFormat", "yyyy-MM-dd HH:mm:ss") + .option(DateTimeUtils.TIMEZONE_OPTION, "UTC") + .json(timestampsWithFormatPath) + + assert(readBack.schema === customSchema) + checkAnswer(readBack, timestampsWithFormat) + } + } } From 11daee31f6474528292a0485596f66fa7b955a04 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 17 Dec 2018 22:20:18 +0100 Subject: [PATCH 15/15] Testing for legacy and new timestamp parser --- .../catalyst/json/JsonInferSchemaSuite.scala | 80 ++++++++++++------- .../datasources/json/JsonSuite.scala | 79 ++++++++++-------- 2 files changed, 93 insertions(+), 66 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala index 6e7bd18f598f..9307f9b47b80 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala @@ -20,9 +20,11 @@ package org.apache.spark.sql.catalyst.json import com.fasterxml.jackson.core.JsonFactory import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.plans.SQLHelper +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ -class JsonInferSchemaSuite extends SparkFunSuite { +class JsonInferSchemaSuite extends SparkFunSuite with SQLHelper { def checkType(options: Map[String, String], json: String, dt: DataType): Unit = { val jsonOptions = new JSONOptions(options, "UTC", "") @@ -41,44 +43,60 @@ class JsonInferSchemaSuite extends SparkFunSuite { } test("inferring timestamp type") { - checkTimestampType("yyyy", """{"a": "2018"}""") - checkTimestampType("yyyy=MM", """{"a": "2018=12"}""") - checkTimestampType("yyyy MM dd", """{"a": "2018 12 02"}""") - checkTimestampType( - "yyyy-MM-dd'T'HH:mm:ss.SSS", - """{"a": "2018-12-02T21:04:00.123"}""") - checkTimestampType( - "yyyy-MM-dd'T'HH:mm:ss.SSSSSSXXX", - """{"a": "2018-12-02T21:04:00.123567+01:00"}""") + Seq(true, false).foreach { legacyParser => + withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) { + checkTimestampType("yyyy", """{"a": "2018"}""") + checkTimestampType("yyyy=MM", """{"a": "2018=12"}""") + checkTimestampType("yyyy MM dd", """{"a": "2018 12 02"}""") + checkTimestampType( + "yyyy-MM-dd'T'HH:mm:ss.SSS", + """{"a": "2018-12-02T21:04:00.123"}""") + checkTimestampType( + "yyyy-MM-dd'T'HH:mm:ss.SSSSSSXXX", + """{"a": "2018-12-02T21:04:00.123567+01:00"}""") + } + } } test("prefer decimals over timestamps") { - checkType( - options = Map( - "prefersDecimal" -> "true", - "timestampFormat" -> "yyyyMMdd.HHmmssSSS" - ), - json = """{"a": "20181202.210400123"}""", - dt = DecimalType(17, 9) - ) + Seq(true, false).foreach { legacyParser => + withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) { + checkType( + options = Map( + "prefersDecimal" -> "true", + "timestampFormat" -> "yyyyMMdd.HHmmssSSS" + ), + json = """{"a": "20181202.210400123"}""", + dt = DecimalType(17, 9) + ) + } + } } test("skip decimal type inferring") { - checkType( - options = Map( - "prefersDecimal" -> "false", - "timestampFormat" -> "yyyyMMdd.HHmmssSSS" - ), - json = """{"a": "20181202.210400123"}""", - dt = TimestampType - ) + Seq(true, false).foreach { legacyParser => + withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) { + checkType( + options = Map( + "prefersDecimal" -> "false", + "timestampFormat" -> "yyyyMMdd.HHmmssSSS" + ), + json = """{"a": "20181202.210400123"}""", + dt = TimestampType + ) + } + } } test("fallback to string type") { - checkType( - options = Map("timestampFormat" -> "yyyy,MM,dd.HHmmssSSS"), - json = """{"a": "20181202.210400123"}""", - dt = StringType - ) + Seq(true, false).foreach { legacyParser => + withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) { + checkType( + options = Map("timestampFormat" -> "yyyy,MM,dd.HHmmssSSS"), + json = """{"a": "20181202.210400123"}""", + dt = StringType + ) + } + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 4f50c4d27224..8f575a371c98 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2592,44 +2592,53 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } test("inferring timestamp type") { - def schemaOf(jsons: String*): StructType = spark.read.json(jsons.toDS).schema - - assert(schemaOf("""{"a":"2018-12-17T10:11:12.123-01:00"}""", - """{"a":"2018-12-16T22:23:24.123-02:00"}""") === fromDDL("a timestamp")) - - assert(schemaOf("""{"a":"2018-12-17T10:11:12.123-01:00"}""", """{"a":1}""") - === fromDDL("a string")) - assert(schemaOf("""{"a":"2018-12-17T10:11:12.123-01:00"}""", """{"a":"123"}""") - === fromDDL("a string")) - - assert(schemaOf("""{"a":"2018-12-17T10:11:12.123-01:00"}""", """{"a":null}""") - === fromDDL("a timestamp")) - assert(schemaOf("""{"a":null}""", """{"a":"2018-12-17T10:11:12.123-01:00"}""") - === fromDDL("a timestamp")) + Seq(true, false).foreach { legacyParser => + withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) { + def schemaOf(jsons: String*): StructType = spark.read.json(jsons.toDS).schema + + assert(schemaOf( + """{"a":"2018-12-17T10:11:12.123-01:00"}""", + """{"a":"2018-12-16T22:23:24.123-02:00"}""") === fromDDL("a timestamp")) + + assert(schemaOf("""{"a":"2018-12-17T10:11:12.123-01:00"}""", """{"a":1}""") + === fromDDL("a string")) + assert(schemaOf("""{"a":"2018-12-17T10:11:12.123-01:00"}""", """{"a":"123"}""") + === fromDDL("a string")) + + assert(schemaOf("""{"a":"2018-12-17T10:11:12.123-01:00"}""", """{"a":null}""") + === fromDDL("a timestamp")) + assert(schemaOf("""{"a":null}""", """{"a":"2018-12-17T10:11:12.123-01:00"}""") + === fromDDL("a timestamp")) + } + } } test("roundtrip for timestamp type inferring") { - val customSchema = new StructType(Array(StructField("date", TimestampType, true))) - withTempDir { dir => - val timestampsWithFormatPath = s"${dir.getCanonicalPath}/timestampsWithFormat.json" - val timestampsWithFormat = spark.read - .option("timestampFormat", "dd/MM/yyyy HH:mm") - .json(datesRecords) - assert(timestampsWithFormat.schema === customSchema) - - timestampsWithFormat.write - .format("json") - .option("timestampFormat", "yyyy-MM-dd HH:mm:ss") - .option(DateTimeUtils.TIMEZONE_OPTION, "UTC") - .save(timestampsWithFormatPath) - - val readBack = spark.read - .option("timestampFormat", "yyyy-MM-dd HH:mm:ss") - .option(DateTimeUtils.TIMEZONE_OPTION, "UTC") - .json(timestampsWithFormatPath) - - assert(readBack.schema === customSchema) - checkAnswer(readBack, timestampsWithFormat) + Seq(true, false).foreach { legacyParser => + withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) { + val customSchema = new StructType().add("date", TimestampType) + withTempDir { dir => + val timestampsWithFormatPath = s"${dir.getCanonicalPath}/timestampsWithFormat.json" + val timestampsWithFormat = spark.read + .option("timestampFormat", "dd/MM/yyyy HH:mm") + .json(datesRecords) + assert(timestampsWithFormat.schema === customSchema) + + timestampsWithFormat.write + .format("json") + .option("timestampFormat", "yyyy-MM-dd HH:mm:ss") + .option(DateTimeUtils.TIMEZONE_OPTION, "UTC") + .save(timestampsWithFormatPath) + + val readBack = spark.read + .option("timestampFormat", "yyyy-MM-dd HH:mm:ss") + .option(DateTimeUtils.TIMEZONE_OPTION, "UTC") + .json(timestampsWithFormatPath) + + assert(readBack.schema === customSchema) + checkAnswer(readBack, timestampsWithFormat) + } + } } } }