diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index cd69c21a0197..f829e6b503ab 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -175,10 +175,30 @@ class UnivocityParser( } case _: TimestampType => (d: String) => - nullSafeDatum(d, name, nullable, options)(timestampFormatter.parse) + nullSafeDatum(d, name, nullable, options) { datum => + try { + timestampFormatter.parse(datum) + } catch { + case NonFatal(e) => + // If fails to parse, then tries the way used in 2.0 and 1.x for backwards + // compatibility. + val str = UTF8String.fromString(DateTimeUtils.cleanLegacyTimestampStr(datum)) + DateTimeUtils.stringToTimestamp(str, options.zoneId).getOrElse(throw e) + } + } case _: DateType => (d: String) => - nullSafeDatum(d, name, nullable, options)(dateFormatter.parse) + nullSafeDatum(d, name, nullable, options) { datum => + try { + dateFormatter.parse(datum) + } catch { + case NonFatal(e) => + // If fails to parse, then tries the way used in 2.0 and 1.x for backwards + // compatibility. + val str = UTF8String.fromString(DateTimeUtils.cleanLegacyTimestampStr(datum)) + DateTimeUtils.stringToDate(str, options.zoneId).getOrElse(throw e) + } + } case _: StringType => (d: String) => nullSafeDatum(d, name, nullable, options)(UTF8String.fromString) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index 1e408cdb126b..da3b5013fe12 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -21,7 +21,6 @@ import java.io.{ByteArrayOutputStream, CharConversionException} import java.nio.charset.MalformedInputException import scala.collection.mutable.ArrayBuffer -import scala.util.Try import scala.util.control.NonFatal import com.fasterxml.jackson.core._ @@ -230,7 +229,15 @@ class JacksonParser( case TimestampType => (parser: JsonParser) => parseJsonToken[java.lang.Long](parser, dataType) { case VALUE_STRING if parser.getTextLength >= 1 => - timestampFormatter.parse(parser.getText) + try { + timestampFormatter.parse(parser.getText) + } catch { + case NonFatal(e) => + // If fails to parse, then tries the way used in 2.0 and 1.x for backwards + // compatibility. + val str = UTF8String.fromString(DateTimeUtils.cleanLegacyTimestampStr(parser.getText)) + DateTimeUtils.stringToTimestamp(str, options.zoneId).getOrElse(throw e) + } case VALUE_NUMBER_INT => parser.getLongValue * 1000000L @@ -239,7 +246,23 @@ class JacksonParser( case DateType => (parser: JsonParser) => parseJsonToken[java.lang.Integer](parser, dataType) { case VALUE_STRING if parser.getTextLength >= 1 => - dateFormatter.parse(parser.getText) + try { + dateFormatter.parse(parser.getText) + } catch { + case NonFatal(e) => + // If fails to parse, then tries the way used in 2.0 and 1.x for backwards + // compatibility. + val str = UTF8String.fromString(DateTimeUtils.cleanLegacyTimestampStr(parser.getText)) + DateTimeUtils.stringToDate(str, options.zoneId).getOrElse { + // In Spark 1.5.0, we store the data as number of days since epoch in string. + // So, we just convert it to Int. + try { + parser.getText.toInt + } catch { + case _: NumberFormatException => throw e + } + }.asInstanceOf[Integer] + } } case BinaryType => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index 2e4daa20ad51..a72de5ac053c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -163,6 +163,21 @@ object DateTimeUtils { instantToMicros(localDateTime.atZone(zoneId).toInstant) } + // A method called by JSON/CSV parser to clean up the legacy timestamp string by removing the + // "GMT" string. + def cleanLegacyTimestampStr(s: String): String = { + val indexOfGMT = s.indexOf("GMT") + if (indexOfGMT != -1) { + // ISO8601 with a weird time zone specifier (2000-01-01T00:00GMT+01:00) + val s0 = s.substring(0, indexOfGMT) + val s1 = s.substring(indexOfGMT + 3) + // Mapped to 2000-01-01T00:00+01:00 + s0 + s1 + } else { + s + } + } + /** * Trim and parse a given UTF8 date string to the corresponding a corresponding [[Long]] value. * The return type is [[Option]] in order to distinguish between 0L and null. The following diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala index 536c76f042d2..c8c71bcfeb7a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala @@ -28,6 +28,7 @@ import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.plans.SQLHelper import org.apache.spark.sql.catalyst.util.DateTimeConstants._ +import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.sources.{EqualTo, Filter, StringStartsWith} import org.apache.spark.sql.types._ @@ -318,4 +319,44 @@ class UnivocityParserSuite extends SparkFunSuite with SQLHelper { }.getMessage assert(errMsg2.contains("i does not exist")) } + + test("SPARK-30960: parse date/timestamp string with legacy format") { + def check(parser: UnivocityParser): Unit = { + // The legacy format allows 1 or 2 chars for some fields. + assert(parser.makeConverter("t", TimestampType).apply("2020-1-12 12:3:45") == + date(2020, 1, 12, 12, 3, 45, 0)) + assert(parser.makeConverter("t", DateType).apply("2020-1-12") == + days(2020, 1, 12, 0, 0, 0)) + // The legacy format allows arbitrary length of second fraction. + assert(parser.makeConverter("t", TimestampType).apply("2020-1-12 12:3:45.1") == + date(2020, 1, 12, 12, 3, 45, 100000)) + assert(parser.makeConverter("t", TimestampType).apply("2020-1-12 12:3:45.1234") == + date(2020, 1, 12, 12, 3, 45, 123400)) + // The legacy format allow date string to end with T or space, with arbitrary string + assert(parser.makeConverter("t", DateType).apply("2020-1-12T") == + days(2020, 1, 12, 0, 0, 0)) + assert(parser.makeConverter("t", DateType).apply("2020-1-12Txyz") == + days(2020, 1, 12, 0, 0, 0)) + assert(parser.makeConverter("t", DateType).apply("2020-1-12 ") == + days(2020, 1, 12, 0, 0, 0)) + assert(parser.makeConverter("t", DateType).apply("2020-1-12 xyz") == + days(2020, 1, 12, 0, 0, 0)) + // The legacy format ignores the "GMT" from the string + assert(parser.makeConverter("t", TimestampType).apply("2020-1-12 12:3:45GMT") == + date(2020, 1, 12, 12, 3, 45, 0)) + assert(parser.makeConverter("t", TimestampType).apply("GMT2020-1-12 12:3:45") == + date(2020, 1, 12, 12, 3, 45, 0)) + assert(parser.makeConverter("t", DateType).apply("2020-1-12GMT") == + days(2020, 1, 12, 0, 0, 0)) + assert(parser.makeConverter("t", DateType).apply("GMT2020-1-12") == + days(2020, 1, 12, 0, 0, 0)) + } + + val options = new CSVOptions(Map.empty[String, String], false, "UTC") + check(new UnivocityParser(StructType(Seq.empty), options)) + + val optionsWithPattern = + new CSVOptions(Map("timestampFormat" -> "invalid", "dateFormat" -> "invalid"), false, "UTC") + check(new UnivocityParser(StructType(Seq.empty), optionsWithPattern)) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 0be0e1e3da3d..43553df29654 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -2301,6 +2301,12 @@ abstract class CSVSuite extends QueryTest with SharedSparkSession with TestCsvDa assert(csv.schema.fieldNames === Seq("a", "b", "0")) checkAnswer(csv, Row("a", "b", 1)) } + + test("SPARK-30960: parse date/timestamp string with legacy format") { + val ds = Seq("2020-1-12 3:23:34.12, 2020-1-12 T").toDS() + val csv = spark.read.option("header", false).schema("t timestamp, d date").csv(ds) + checkAnswer(csv, Row(Timestamp.valueOf("2020-1-12 3:23:34.12"), Date.valueOf("2020-1-12"))) + } } class CSVv1Suite extends CSVSuite { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 7abe818a29d9..d2ce555c0454 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -21,6 +21,7 @@ import java.io._ import java.nio.charset.{Charset, StandardCharsets, UnsupportedCharsetException} import java.nio.file.Files import java.sql.{Date, Timestamp} +import java.time.{LocalDate, LocalDateTime, ZoneId} import java.util.Locale import com.fasterxml.jackson.core.JsonFactory @@ -39,6 +40,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ import org.apache.spark.sql.types.StructType.fromDDL +import org.apache.spark.sql.types.TestUDT.{MyDenseVector, MyDenseVectorUDT} import org.apache.spark.util.Utils class TestFileFilter extends PathFilter { @@ -1447,6 +1449,107 @@ abstract class JsonSuite extends QueryTest with SharedSparkSession with TestJson }) } + test("backward compatibility") { + // This test we make sure our JSON support can read JSON data generated by previous version + // of Spark generated through toJSON method and JSON data source. + // The data is generated by the following program. + // Here are a few notes: + // - Spark 1.5.0 cannot save timestamp data. So, we manually added timestamp field (col13) + // in the JSON object. + // - For Spark before 1.5.1, we do not generate UDTs. So, we manually added the UDT value to + // JSON objects generated by those Spark versions (col17). + // - If the type is NullType, we do not write data out. + + // Create the schema. + val struct = + StructType( + StructField("f1", FloatType, true) :: + StructField("f2", ArrayType(BooleanType), true) :: Nil) + + val dataTypes = + Seq( + StringType, BinaryType, NullType, BooleanType, + ByteType, ShortType, IntegerType, LongType, + FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5), + DateType, TimestampType, + ArrayType(IntegerType), MapType(StringType, LongType), struct, + new MyDenseVectorUDT()) + val fields = dataTypes.zipWithIndex.map { case (dataType, index) => + StructField(s"col$index", dataType, nullable = true) + } + val schema = StructType(fields) + + val constantValues = + Seq( + "a string in binary".getBytes(StandardCharsets.UTF_8), + null, + true, + 1.toByte, + 2.toShort, + 3, + Long.MaxValue, + 0.25.toFloat, + 0.75, + new java.math.BigDecimal(s"1234.23456"), + new java.math.BigDecimal(s"1.23456"), + java.sql.Date.valueOf("2015-01-01"), + java.sql.Timestamp.valueOf("2015-01-01 23:50:59.123"), + Seq(2, 3, 4), + Map("a string" -> 2000L), + Row(4.75.toFloat, Seq(false, true)), + new MyDenseVector(Array(0.25, 2.25, 4.25))) + val data = + Row.fromSeq(Seq("Spark " + spark.sparkContext.version) ++ constantValues) :: Nil + + // Data generated by previous versions. + // scalastyle:off + val existingJSONData = + """{"col0":"Spark 1.2.2","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: + """{"col0":"Spark 1.3.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: + """{"col0":"Spark 1.3.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: + """{"col0":"Spark 1.4.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: + """{"col0":"Spark 1.4.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: + """{"col0":"Spark 1.5.0","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: + """{"col0":"Spark 1.5.0","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"16436","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: Nil + // scalastyle:on + + // Generate data for the current version. + val df = spark.createDataFrame(spark.sparkContext.parallelize(data, 1), schema) + withTempPath { path => + df.write.format("json").mode("overwrite").save(path.getCanonicalPath) + + // df.toJSON will convert internal rows to external rows first and then generate + // JSON objects. While, df.write.format("json") will write internal rows directly. + val allJSON = + existingJSONData ++ + df.toJSON.collect() ++ + sparkContext.textFile(path.getCanonicalPath).collect() + + Utils.deleteRecursively(path) + sparkContext.parallelize(allJSON, 1).saveAsTextFile(path.getCanonicalPath) + + // Read data back with the schema specified. + val col0Values = + Seq( + "Spark 1.2.2", + "Spark 1.3.1", + "Spark 1.3.1", + "Spark 1.4.1", + "Spark 1.4.1", + "Spark 1.5.0", + "Spark 1.5.0", + "Spark " + spark.sparkContext.version, + "Spark " + spark.sparkContext.version) + val expectedResult = col0Values.map { v => + Row.fromSeq(Seq(v) ++ constantValues) + } + checkAnswer( + spark.read.format("json").schema(schema).load(path.getCanonicalPath), + expectedResult + ) + } + } + test("SPARK-11544 test pathfilter") { withTempPath { dir => val path = dir.getCanonicalPath @@ -2557,6 +2660,15 @@ abstract class JsonSuite extends QueryTest with SharedSparkSession with TestJson checkAnswer(readBack, timestampsWithFormat) } } + + test("SPARK-30960: parse date/timestamp string with legacy format") { + val ds = Seq("{'t': '2020-1-12 3:23:34.12', 'd': '2020-1-12 T', 'd2': '12345'}").toDS() + val json = spark.read.schema("t timestamp, d date, d2 date").json(ds) + checkAnswer(json, Row( + Timestamp.valueOf("2020-1-12 3:23:34.12"), + Date.valueOf("2020-1-12"), + Date.valueOf(LocalDate.ofEpochDay(12345)))) + } } class JsonV1Suite extends JsonSuite {