diff --git a/docs/sql-data-sources-csv.md b/docs/sql-data-sources-csv.md index 8384f8332a6a..7b538528219a 100644 --- a/docs/sql-data-sources-csv.md +++ b/docs/sql-data-sources-csv.md @@ -174,6 +174,12 @@ Data source options of CSV can be set via: Sets the string that indicates a timestamp without timezone format. Custom date formats follow the formats at Datetime Patterns. This applies to timestamp without timezone type, note that zone-offset and time-zone components are not supported when writing or reading this data type. read/write + + enableDateTimeParsingFallback + Enabled if the time parser policy is legacy or no custom date or timestamp pattern was provided + Allows to fall back to the backward compatible (Spark 1.x and 2.0) behavior of parsing dates and timestamps if values do not match the set patterns. + read + maxColumns 20480 diff --git a/docs/sql-data-sources-json.md b/docs/sql-data-sources-json.md index 27d89875623c..500cd65b58b8 100644 --- a/docs/sql-data-sources-json.md +++ b/docs/sql-data-sources-json.md @@ -202,6 +202,12 @@ Data source options of JSON can be set via: Sets the string that indicates a timestamp without timezone format. Custom date formats follow the formats at Datetime Patterns. This applies to timestamp without timezone type, note that zone-offset and time-zone components are not supported when writing or reading this data type. read/write + + enableDateTimeParsingFallback + Enabled if the time parser policy is legacy or no custom date or timestamp pattern was provided + Allows to fall back to the backward compatible (Spark 1.x and 2.0) behavior of parsing dates and timestamps if values do not match the set patterns. + read + multiLine false diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala index a033e3a3a8d7..27806ea1c403 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala @@ -190,6 +190,17 @@ class CSVOptions( val timestampNTZFormatInWrite: String = parameters.getOrElse("timestampNTZFormat", s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS]") + // SPARK-39731: Enables the backward compatible parsing behavior. + // Generally, this config should be set to false to avoid producing potentially incorrect results + // which is the current default (see UnivocityParser). + // + // If enabled and the date cannot be parsed, we will fall back to `DateTimeUtils.stringToDate`. + // If enabled and the timestamp cannot be parsed, `DateTimeUtils.stringToTimestamp` will be used. + // Otherwise, depending on the parser policy and a custom pattern, an exception may be thrown and + // the value will be parsed as null. + val enableDateTimeParsingFallback: Option[Boolean] = + parameters.get("enableDateTimeParsingFallback").map(_.toBoolean) + val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false) val maxColumns = getInt("maxColumns", 20480) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 0237b6c454d0..a6b4d7ea6679 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -120,6 +120,20 @@ class UnivocityParser( new NoopFilters } + // Flags to signal if we need to fall back to the backward compatible behavior of parsing + // dates and timestamps. + // For more information, see comments for "enableDateTimeParsingFallback" option in CSVOptions. + private val enableParsingFallbackForTimestampType = + options.enableDateTimeParsingFallback.getOrElse { + SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY || + options.timestampFormatInRead.isEmpty + } + private val enableParsingFallbackForDateType = + options.enableDateTimeParsingFallback.getOrElse { + SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY || + options.dateFormatInRead.isEmpty + } + // Retrieve the raw record string. private def getCurrentInput: UTF8String = { val currentContent = tokenizer.getContext.currentParsedContent() @@ -205,7 +219,10 @@ class UnivocityParser( } catch { case NonFatal(e) => // If fails to parse, then tries the way used in 2.0 and 1.x for backwards - // compatibility. + // compatibility if enabled. + if (!enableParsingFallbackForTimestampType) { + throw e + } val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(datum)) DateTimeUtils.stringToDate(str).getOrElse(throw e) } @@ -217,16 +234,17 @@ class UnivocityParser( timestampFormatter.parse(datum) } catch { case NonFatal(e) => - // If fails to parse, then tries the way used in 2.0 and 1.x for backwards - // compatibility. - val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(datum)) - DateTimeUtils.stringToTimestamp(str, options.zoneId).getOrElse { - // There may be date type entries in timestamp column due to schema inference - if (options.inferDate) { - daysToMicros(dateFormatter.parse(datum), options.zoneId) - } else { - throw(e) + // There may be date type entries in timestamp column due to schema inference + if (options.inferDate) { + daysToMicros(dateFormatter.parse(datum), options.zoneId) + } else { + // If fails to parse, then tries the way used in 2.0 and 1.x for backwards + // compatibility if enabled. + if (!enableParsingFallbackForDateType) { + throw e } + val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(datum)) + DateTimeUtils.stringToTimestamp(str, options.zoneId).getOrElse(throw(e)) } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala index 5f90dbc49c9d..66fd22894f93 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala @@ -111,6 +111,17 @@ private[sql] class JSONOptions( val timestampNTZFormatInWrite: String = parameters.getOrElse("timestampNTZFormat", s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS]") + // SPARK-39731: Enables the backward compatible parsing behavior. + // Generally, this config should be set to false to avoid producing potentially incorrect results + // which is the current default (see JacksonParser). + // + // If enabled and the date cannot be parsed, we will fall back to `DateTimeUtils.stringToDate`. + // If enabled and the timestamp cannot be parsed, `DateTimeUtils.stringToTimestamp` will be used. + // Otherwise, depending on the parser policy and a custom pattern, an exception may be thrown and + // the value will be parsed as null. + val enableDateTimeParsingFallback: Option[Boolean] = + parameters.get("enableDateTimeParsingFallback").map(_.toBoolean) + val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false) /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index 7004d2a8f162..06133d44c13a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -78,6 +78,20 @@ class JacksonParser( legacyFormat = FAST_DATE_FORMAT, isParsing = true) + // Flags to signal if we need to fall back to the backward compatible behavior of parsing + // dates and timestamps. + // For more information, see comments for "enableDateTimeParsingFallback" option in JSONOptions. + private val enableParsingFallbackForTimestampType = + options.enableDateTimeParsingFallback.getOrElse { + SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY || + options.timestampFormatInRead.isEmpty + } + private val enableParsingFallbackForDateType = + options.enableDateTimeParsingFallback.getOrElse { + SQLConf.get.legacyTimeParserPolicy == SQLConf.LegacyBehaviorPolicy.LEGACY || + options.dateFormatInRead.isEmpty + } + /** * Create a converter which converts the JSON documents held by the `JsonParser` * to a value according to a desired schema. This is a wrapper for the method @@ -257,7 +271,10 @@ class JacksonParser( } catch { case NonFatal(e) => // If fails to parse, then tries the way used in 2.0 and 1.x for backwards - // compatibility. + // compatibility if enabled. + if (!enableParsingFallbackForTimestampType) { + throw e + } val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(parser.getText)) DateTimeUtils.stringToTimestamp(str, options.zoneId).getOrElse(throw e) } @@ -280,7 +297,10 @@ class JacksonParser( } catch { case NonFatal(e) => // If fails to parse, then tries the way used in 2.0 and 1.x for backwards - // compatibility. + // compatibility if enabled. + if (!enableParsingFallbackForDateType) { + throw e + } val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(parser.getText)) DateTimeUtils.stringToDate(str).getOrElse { // In Spark 1.5.0, we store the data as number of days since epoch in string. diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala index 2589376bc3dc..381ec57fcd13 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala @@ -355,9 +355,22 @@ class UnivocityParserSuite extends SparkFunSuite with SQLHelper { val options = new CSVOptions(Map.empty[String, String], false, "UTC") check(new UnivocityParser(StructType(Seq.empty), options)) - val optionsWithPattern = new CSVOptions( - Map("timestampFormat" -> "invalid", "dateFormat" -> "invalid"), false, "UTC") - check(new UnivocityParser(StructType(Seq.empty), optionsWithPattern)) + def optionsWithPattern(enableFallback: Boolean): CSVOptions = new CSVOptions( + Map( + "timestampFormat" -> "invalid", + "dateFormat" -> "invalid", + "enableDateTimeParsingFallback" -> s"$enableFallback"), + false, + "UTC") + + // With fallback enabled, we are still able to parse dates and timestamps. + check(new UnivocityParser(StructType(Seq.empty), optionsWithPattern(true))) + + // With legacy parser disabled, parsing results in error. + val err = intercept[IllegalArgumentException] { + check(new UnivocityParser(StructType(Seq.empty), optionsWithPattern(false))) + } + assert(err.getMessage.contains("Illegal pattern character: n")) } test("SPARK-39469: dates should be parsed correctly in a timestamp column when inferDate=true") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 758f54306088..0e5718103902 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -36,7 +36,7 @@ import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.compress.GzipCodec import org.apache.logging.log4j.Level -import org.apache.spark.{SparkConf, SparkException, TestUtils} +import org.apache.spark.{SparkConf, SparkException, SparkUpgradeException, TestUtils} import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Encoders, QueryTest, Row} import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils} import org.apache.spark.sql.execution.datasources.CommonFileDataSourceSuite @@ -2837,7 +2837,80 @@ abstract class CSVSuite ) assert(results.collect().toSeq.map(_.toSeq) == expected) } + } + } + + test("SPARK-39731: Correctly parse dates and timestamps with yyyyMMdd pattern") { + withTempPath { path => + Seq( + "1,2020011,2020011", + "2,20201203,20201203").toDF() + .repartition(1) + .write.text(path.getAbsolutePath) + val schema = new StructType() + .add("id", IntegerType) + .add("date", DateType) + .add("ts", TimestampType) + val output = spark.read + .schema(schema) + .option("dateFormat", "yyyyMMdd") + .option("timestampFormat", "yyyyMMdd") + .csv(path.getAbsolutePath) + def check(mode: String, res: Seq[Row]): Unit = { + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> mode) { + checkAnswer(output, res) + } + } + + check( + "legacy", + Seq( + Row(1, Date.valueOf("2020-01-01"), Timestamp.valueOf("2020-01-01 00:00:00")), + Row(2, Date.valueOf("2020-12-03"), Timestamp.valueOf("2020-12-03 00:00:00")) + ) + ) + + check( + "corrected", + Seq( + Row(1, null, null), + Row(2, Date.valueOf("2020-12-03"), Timestamp.valueOf("2020-12-03 00:00:00")) + ) + ) + + val err = intercept[SparkException] { + check("exception", Nil) + }.getCause + assert(err.isInstanceOf[SparkUpgradeException]) + } + } + + test("SPARK-39731: Handle date and timestamp parsing fallback") { + withTempPath { path => + Seq("2020-01-01,2020-01-01").toDF() + .repartition(1) + .write.text(path.getAbsolutePath) + val schema = new StructType() + .add("date", DateType) + .add("ts", TimestampType) + + def output(enableFallback: Boolean): DataFrame = spark.read + .schema(schema) + .option("dateFormat", "invalid") + .option("timestampFormat", "invalid") + .option("enableDateTimeParsingFallback", enableFallback) + .csv(path.getAbsolutePath) + + checkAnswer( + output(enableFallback = true), + Seq(Row(Date.valueOf("2020-01-01"), Timestamp.valueOf("2020-01-01 00:00:00"))) + ) + + checkAnswer( + output(enableFallback = false), + Seq(Row(null, null)) + ) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 3fe9c58c957c..1ecaf748f5da 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -29,7 +29,7 @@ import org.apache.hadoop.fs.{Path, PathFilter} import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.compress.GzipCodec -import org.apache.spark.{SparkConf, SparkException, TestUtils} +import org.apache.spark.{SparkConf, SparkException, SparkUpgradeException, TestUtils} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{functions => F, _} import org.apache.spark.sql.catalyst.json._ @@ -3249,6 +3249,79 @@ abstract class JsonSuite } } } + + test("SPARK-39731: Correctly parse dates and timestamps with yyyyMMdd pattern") { + withTempPath { path => + Seq( + """{"date": "2020011", "ts": "2020011"}""", + """{"date": "20201203", "ts": "20201203"}""").toDF() + .repartition(1) + .write.text(path.getAbsolutePath) + val schema = new StructType() + .add("date", DateType) + .add("ts", TimestampType) + val output = spark.read + .schema(schema) + .option("dateFormat", "yyyyMMdd") + .option("timestampFormat", "yyyyMMdd") + .json(path.getAbsolutePath) + + def check(mode: String, res: Seq[Row]): Unit = { + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> mode) { + checkAnswer(output, res) + } + } + + check( + "legacy", + Seq( + Row(Date.valueOf("2020-01-01"), Timestamp.valueOf("2020-01-01 00:00:00")), + Row(Date.valueOf("2020-12-03"), Timestamp.valueOf("2020-12-03 00:00:00")) + ) + ) + + check( + "corrected", + Seq( + Row(null, null), + Row(Date.valueOf("2020-12-03"), Timestamp.valueOf("2020-12-03 00:00:00")) + ) + ) + + val err = intercept[SparkException] { + check("exception", Nil) + }.getCause + assert(err.isInstanceOf[SparkUpgradeException]) + } + } + + test("SPARK-39731: Handle date and timestamp parsing fallback") { + withTempPath { path => + Seq("""{"date": "2020-01-01", "ts": "2020-01-01"}""").toDF() + .repartition(1) + .write.text(path.getAbsolutePath) + val schema = new StructType() + .add("date", DateType) + .add("ts", TimestampType) + + def output(enableFallback: Boolean): DataFrame = spark.read + .schema(schema) + .option("dateFormat", "invalid") + .option("timestampFormat", "invalid") + .option("enableDateTimeParsingFallback", enableFallback) + .json(path.getAbsolutePath) + + checkAnswer( + output(enableFallback = true), + Seq(Row(Date.valueOf("2020-01-01"), Timestamp.valueOf("2020-01-01 00:00:00"))) + ) + + checkAnswer( + output(enableFallback = false), + Seq(Row(null, null)) + ) + } + } } class JsonV1Suite extends JsonSuite {