-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-30960][SQL] add back the legacy date/timestamp format support in CSV/JSON parser #27710
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -175,10 +175,30 @@ class UnivocityParser( | |
| } | ||
|
|
||
| case _: TimestampType => (d: String) => | ||
| nullSafeDatum(d, name, nullable, options)(timestampFormatter.parse) | ||
| nullSafeDatum(d, name, nullable, options) { datum => | ||
| try { | ||
| timestampFormatter.parse(datum) | ||
| } catch { | ||
| case NonFatal(e) => | ||
| // If fails to parse, then tries the way used in 2.0 and 1.x for backwards | ||
| // compatibility. | ||
| val str = UTF8String.fromString(DateTimeUtils.cleanLegacyTimestampStr(datum)) | ||
| DateTimeUtils.stringToTimestamp(str, options.zoneId).getOrElse(throw e) | ||
|
||
| } | ||
| } | ||
|
|
||
| case _: DateType => (d: String) => | ||
| nullSafeDatum(d, name, nullable, options)(dateFormatter.parse) | ||
| nullSafeDatum(d, name, nullable, options) { datum => | ||
| try { | ||
| dateFormatter.parse(datum) | ||
| } catch { | ||
| case NonFatal(e) => | ||
| // If fails to parse, then tries the way used in 2.0 and 1.x for backwards | ||
| // compatibility. | ||
| val str = UTF8String.fromString(DateTimeUtils.cleanLegacyTimestampStr(datum)) | ||
| DateTimeUtils.stringToDate(str, options.zoneId).getOrElse(throw e) | ||
| } | ||
| } | ||
|
|
||
| case _: StringType => (d: String) => | ||
| nullSafeDatum(d, name, nullable, options)(UTF8String.fromString) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,7 +21,6 @@ import java.io.{ByteArrayOutputStream, CharConversionException} | |
| import java.nio.charset.MalformedInputException | ||
|
|
||
| import scala.collection.mutable.ArrayBuffer | ||
| import scala.util.Try | ||
| import scala.util.control.NonFatal | ||
|
|
||
| import com.fasterxml.jackson.core._ | ||
|
|
@@ -230,7 +229,15 @@ class JacksonParser( | |
| case TimestampType => | ||
| (parser: JsonParser) => parseJsonToken[java.lang.Long](parser, dataType) { | ||
| case VALUE_STRING if parser.getTextLength >= 1 => | ||
| timestampFormatter.parse(parser.getText) | ||
| try { | ||
| timestampFormatter.parse(parser.getText) | ||
| } catch { | ||
| case NonFatal(e) => | ||
| // If fails to parse, then tries the way used in 2.0 and 1.x for backwards | ||
| // compatibility. | ||
| val str = UTF8String.fromString(DateTimeUtils.cleanLegacyTimestampStr(parser.getText)) | ||
| DateTimeUtils.stringToTimestamp(str, options.zoneId).getOrElse(throw e) | ||
| } | ||
|
|
||
| case VALUE_NUMBER_INT => | ||
| parser.getLongValue * 1000000L | ||
|
|
@@ -239,7 +246,23 @@ class JacksonParser( | |
| case DateType => | ||
| (parser: JsonParser) => parseJsonToken[java.lang.Integer](parser, dataType) { | ||
| case VALUE_STRING if parser.getTextLength >= 1 => | ||
| dateFormatter.parse(parser.getText) | ||
| try { | ||
| dateFormatter.parse(parser.getText) | ||
| } catch { | ||
| case NonFatal(e) => | ||
| // If fails to parse, then tries the way used in 2.0 and 1.x for backwards | ||
| // compatibility. | ||
| val str = UTF8String.fromString(DateTimeUtils.cleanLegacyTimestampStr(parser.getText)) | ||
| DateTimeUtils.stringToDate(str, options.zoneId).getOrElse { | ||
| // In Spark 1.5.0, we store the data as number of days since epoch in string. | ||
| // So, we just convert it to Int. | ||
| try { | ||
| parser.getText.toInt | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we rebase this?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good catch! I think we should.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here is the PR #28453 |
||
| } catch { | ||
| case _: NumberFormatException => throw e | ||
| } | ||
| }.asInstanceOf[Integer] | ||
| } | ||
| } | ||
|
|
||
| case BinaryType => | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -163,6 +163,21 @@ object DateTimeUtils { | |
| instantToMicros(localDateTime.atZone(zoneId).toInstant) | ||
| } | ||
|
|
||
| // A method called by JSON/CSV parser to clean up the legacy timestamp string by removing the | ||
| // "GMT" string. | ||
| def cleanLegacyTimestampStr(s: String): String = { | ||
| val indexOfGMT = s.indexOf("GMT") | ||
| if (indexOfGMT != -1) { | ||
| // ISO8601 with a weird time zone specifier (2000-01-01T00:00GMT+01:00) | ||
| val s0 = s.substring(0, indexOfGMT) | ||
| val s1 = s.substring(indexOfGMT + 3) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There are 3 cases for GMT formatting,
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will be fixed by #27753
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh yeah, got it thanks. |
||
| // Mapped to 2000-01-01T00:00+01:00 | ||
| s0 + s1 | ||
| } else { | ||
| s | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Trim and parse a given UTF8 date string to the corresponding a corresponding [[Long]] value. | ||
| * The return type is [[Option]] in order to distinguish between 0L and null. The following | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -28,6 +28,7 @@ import org.apache.spark.SparkFunSuite | |
| import org.apache.spark.sql.catalyst.InternalRow | ||
| import org.apache.spark.sql.catalyst.plans.SQLHelper | ||
| import org.apache.spark.sql.catalyst.util.DateTimeConstants._ | ||
| import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._ | ||
| import org.apache.spark.sql.catalyst.util.DateTimeUtils | ||
| import org.apache.spark.sql.sources.{EqualTo, Filter, StringStartsWith} | ||
| import org.apache.spark.sql.types._ | ||
|
|
@@ -318,4 +319,44 @@ class UnivocityParserSuite extends SparkFunSuite with SQLHelper { | |
| }.getMessage | ||
| assert(errMsg2.contains("i does not exist")) | ||
| } | ||
|
|
||
| test("SPARK-30960: parse date/timestamp string with legacy format") { | ||
|
||
| def check(parser: UnivocityParser): Unit = { | ||
| // The legacy format allows 1 or 2 chars for some fields. | ||
| assert(parser.makeConverter("t", TimestampType).apply("2020-1-12 12:3:45") == | ||
| date(2020, 1, 12, 12, 3, 45, 0)) | ||
| assert(parser.makeConverter("t", DateType).apply("2020-1-12") == | ||
| days(2020, 1, 12, 0, 0, 0)) | ||
| // The legacy format allows arbitrary length of second fraction. | ||
| assert(parser.makeConverter("t", TimestampType).apply("2020-1-12 12:3:45.1") == | ||
| date(2020, 1, 12, 12, 3, 45, 100000)) | ||
| assert(parser.makeConverter("t", TimestampType).apply("2020-1-12 12:3:45.1234") == | ||
| date(2020, 1, 12, 12, 3, 45, 123400)) | ||
| // The legacy format allow date string to end with T or space, with arbitrary string | ||
| assert(parser.makeConverter("t", DateType).apply("2020-1-12T") == | ||
| days(2020, 1, 12, 0, 0, 0)) | ||
| assert(parser.makeConverter("t", DateType).apply("2020-1-12Txyz") == | ||
| days(2020, 1, 12, 0, 0, 0)) | ||
| assert(parser.makeConverter("t", DateType).apply("2020-1-12 ") == | ||
| days(2020, 1, 12, 0, 0, 0)) | ||
| assert(parser.makeConverter("t", DateType).apply("2020-1-12 xyz") == | ||
| days(2020, 1, 12, 0, 0, 0)) | ||
| // The legacy format ignores the "GMT" from the string | ||
| assert(parser.makeConverter("t", TimestampType).apply("2020-1-12 12:3:45GMT") == | ||
| date(2020, 1, 12, 12, 3, 45, 0)) | ||
| assert(parser.makeConverter("t", TimestampType).apply("GMT2020-1-12 12:3:45") == | ||
| date(2020, 1, 12, 12, 3, 45, 0)) | ||
| assert(parser.makeConverter("t", DateType).apply("2020-1-12GMT") == | ||
| days(2020, 1, 12, 0, 0, 0)) | ||
| assert(parser.makeConverter("t", DateType).apply("GMT2020-1-12") == | ||
| days(2020, 1, 12, 0, 0, 0)) | ||
| } | ||
|
|
||
| val options = new CSVOptions(Map.empty[String, String], false, "UTC") | ||
| check(new UnivocityParser(StructType(Seq.empty), options)) | ||
|
|
||
| val optionsWithPattern = | ||
| new CSVOptions(Map("timestampFormat" -> "invalid", "dateFormat" -> "invalid"), false, "UTC") | ||
| check(new UnivocityParser(StructType(Seq.empty), optionsWithPattern)) | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,6 +21,7 @@ import java.io._ | |
| import java.nio.charset.{Charset, StandardCharsets, UnsupportedCharsetException} | ||
| import java.nio.file.Files | ||
| import java.sql.{Date, Timestamp} | ||
| import java.time.{LocalDate, LocalDateTime, ZoneId} | ||
| import java.util.Locale | ||
|
|
||
| import com.fasterxml.jackson.core.JsonFactory | ||
|
|
@@ -39,6 +40,7 @@ import org.apache.spark.sql.internal.SQLConf | |
| import org.apache.spark.sql.test.SharedSparkSession | ||
| import org.apache.spark.sql.types._ | ||
| import org.apache.spark.sql.types.StructType.fromDDL | ||
| import org.apache.spark.sql.types.TestUDT.{MyDenseVector, MyDenseVectorUDT} | ||
| import org.apache.spark.util.Utils | ||
|
|
||
| class TestFileFilter extends PathFilter { | ||
|
|
@@ -1447,6 +1449,107 @@ abstract class JsonSuite extends QueryTest with SharedSparkSession with TestJson | |
| }) | ||
| } | ||
|
|
||
| test("backward compatibility") { | ||
|
||
| // This test we make sure our JSON support can read JSON data generated by previous version | ||
| // of Spark generated through toJSON method and JSON data source. | ||
| // The data is generated by the following program. | ||
| // Here are a few notes: | ||
| // - Spark 1.5.0 cannot save timestamp data. So, we manually added timestamp field (col13) | ||
| // in the JSON object. | ||
| // - For Spark before 1.5.1, we do not generate UDTs. So, we manually added the UDT value to | ||
| // JSON objects generated by those Spark versions (col17). | ||
| // - If the type is NullType, we do not write data out. | ||
|
|
||
| // Create the schema. | ||
| val struct = | ||
| StructType( | ||
| StructField("f1", FloatType, true) :: | ||
| StructField("f2", ArrayType(BooleanType), true) :: Nil) | ||
|
|
||
| val dataTypes = | ||
| Seq( | ||
| StringType, BinaryType, NullType, BooleanType, | ||
| ByteType, ShortType, IntegerType, LongType, | ||
| FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5), | ||
| DateType, TimestampType, | ||
| ArrayType(IntegerType), MapType(StringType, LongType), struct, | ||
| new MyDenseVectorUDT()) | ||
| val fields = dataTypes.zipWithIndex.map { case (dataType, index) => | ||
| StructField(s"col$index", dataType, nullable = true) | ||
| } | ||
| val schema = StructType(fields) | ||
|
|
||
| val constantValues = | ||
| Seq( | ||
| "a string in binary".getBytes(StandardCharsets.UTF_8), | ||
| null, | ||
| true, | ||
| 1.toByte, | ||
| 2.toShort, | ||
| 3, | ||
| Long.MaxValue, | ||
| 0.25.toFloat, | ||
| 0.75, | ||
| new java.math.BigDecimal(s"1234.23456"), | ||
| new java.math.BigDecimal(s"1.23456"), | ||
| java.sql.Date.valueOf("2015-01-01"), | ||
| java.sql.Timestamp.valueOf("2015-01-01 23:50:59.123"), | ||
| Seq(2, 3, 4), | ||
| Map("a string" -> 2000L), | ||
| Row(4.75.toFloat, Seq(false, true)), | ||
| new MyDenseVector(Array(0.25, 2.25, 4.25))) | ||
| val data = | ||
| Row.fromSeq(Seq("Spark " + spark.sparkContext.version) ++ constantValues) :: Nil | ||
|
|
||
| // Data generated by previous versions. | ||
| // scalastyle:off | ||
| val existingJSONData = | ||
| """{"col0":"Spark 1.2.2","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: | ||
| """{"col0":"Spark 1.3.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: | ||
| """{"col0":"Spark 1.3.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: | ||
| """{"col0":"Spark 1.4.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: | ||
| """{"col0":"Spark 1.4.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: | ||
| """{"col0":"Spark 1.5.0","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: | ||
| """{"col0":"Spark 1.5.0","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"16436","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: Nil | ||
| // scalastyle:on | ||
|
|
||
| // Generate data for the current version. | ||
| val df = spark.createDataFrame(spark.sparkContext.parallelize(data, 1), schema) | ||
| withTempPath { path => | ||
| df.write.format("json").mode("overwrite").save(path.getCanonicalPath) | ||
|
|
||
| // df.toJSON will convert internal rows to external rows first and then generate | ||
| // JSON objects. While, df.write.format("json") will write internal rows directly. | ||
| val allJSON = | ||
| existingJSONData ++ | ||
| df.toJSON.collect() ++ | ||
| sparkContext.textFile(path.getCanonicalPath).collect() | ||
|
|
||
| Utils.deleteRecursively(path) | ||
| sparkContext.parallelize(allJSON, 1).saveAsTextFile(path.getCanonicalPath) | ||
|
|
||
| // Read data back with the schema specified. | ||
| val col0Values = | ||
| Seq( | ||
| "Spark 1.2.2", | ||
| "Spark 1.3.1", | ||
| "Spark 1.3.1", | ||
| "Spark 1.4.1", | ||
| "Spark 1.4.1", | ||
| "Spark 1.5.0", | ||
| "Spark 1.5.0", | ||
| "Spark " + spark.sparkContext.version, | ||
| "Spark " + spark.sparkContext.version) | ||
| val expectedResult = col0Values.map { v => | ||
| Row.fromSeq(Seq(v) ++ constantValues) | ||
| } | ||
| checkAnswer( | ||
| spark.read.format("json").schema(schema).load(path.getCanonicalPath), | ||
| expectedResult | ||
| ) | ||
| } | ||
| } | ||
|
|
||
| test("SPARK-11544 test pathfilter") { | ||
| withTempPath { dir => | ||
| val path = dir.getCanonicalPath | ||
|
|
@@ -2557,6 +2660,15 @@ abstract class JsonSuite extends QueryTest with SharedSparkSession with TestJson | |
| checkAnswer(readBack, timestampsWithFormat) | ||
| } | ||
| } | ||
|
|
||
| test("SPARK-30960: parse date/timestamp string with legacy format") { | ||
| val ds = Seq("{'t': '2020-1-12 3:23:34.12', 'd': '2020-1-12 T', 'd2': '12345'}").toDS() | ||
| val json = spark.read.schema("t timestamp, d date, d2 date").json(ds) | ||
| checkAnswer(json, Row( | ||
| Timestamp.valueOf("2020-1-12 3:23:34.12"), | ||
| Date.valueOf("2020-1-12"), | ||
| Date.valueOf(LocalDate.ofEpochDay(12345)))) | ||
| } | ||
| } | ||
|
|
||
| class JsonV1Suite extends JsonSuite { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also shadow this fallback behavior under legacy config? Or JSON/CSV will not keep the same behavior with the SQL side?
As the current approach, it seems to break the rule we want to achieve in #27537: throw an exception when the result changing between old and new Spark versions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it should be protected by a config.
The fallback was there at the very beginning without any config, and I think it's reasonable to support the legacy format always, to make the parser more relaxed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copy, that makes sense.
So this fallback logic is kind of guard logic for the parser, no matter the parser is new or legacy one.
After this merged, #27537 need to address the logic conflict.