From 53a6de5529976567ea96430ab6e010f80a0603a5 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Tue, 11 Feb 2020 11:38:51 +0800 Subject: [PATCH 01/14] Raise exception instead of silent change --- .../expressions/datetimeExpressions.scala | 17 ++++++- .../catalyst/util/TimestampFormatter.scala | 17 +++++++ .../apache/spark/sql/DateFunctionsSuite.scala | 49 +++++++++++-------- 3 files changed, 61 insertions(+), 22 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala index 767dacfde073..9df423e86278 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.expressions import java.sql.Timestamp import java.time.{DateTimeException, LocalDate, LocalDateTime, ZoneId} +import java.time.format.DateTimeParseException import java.time.temporal.IsoFields import java.util.{Locale, TimeZone} @@ -789,6 +790,10 @@ abstract class ToTimestamp formatter.parse( t.asInstanceOf[UTF8String].toString) / downScaleFactor } catch { + case e: DateTimeParseException => + TimestampFormatter.checkLegacyFormatter( + e, t.asInstanceOf[UTF8String].toString, constFormat.toString, zoneId) + null case NonFatal(_) => null } } @@ -802,6 +807,10 @@ abstract class ToTimestamp TimestampFormatter(formatString, zoneId, legacyFormat = SIMPLE_DATE_FORMAT) .parse(t.asInstanceOf[UTF8String].toString) / downScaleFactor } catch { + case e: DateTimeParseException => + TimestampFormatter.checkLegacyFormatter( + e, t.asInstanceOf[UTF8String].toString, formatString, zoneId) + null case NonFatal(_) => null } } @@ -811,6 +820,7 @@ abstract class ToTimestamp override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { val javaType = CodeGenerator.javaType(dataType) + val tf = TimestampFormatter.getClass.getName.stripSuffix("$") left.dataType match { case StringType if right.foldable => val df = classOf[TimestampFormatter].getName @@ -818,7 +828,9 @@ abstract class ToTimestamp ExprCode.forNullValue(dataType) } else { val formatterName = ctx.addReferenceObj("formatter", formatter, df) + val zid = ctx.addReferenceObj("zoneId", zoneId, classOf[ZoneId].getName) val eval1 = left.genCode(ctx) + val formatString = right.genCode(ctx) ev.copy(code = code""" ${eval1.code} boolean ${ev.isNull} = ${eval1.isNull}; @@ -831,6 +843,8 @@ abstract class ToTimestamp } catch (java.text.ParseException e) { ${ev.isNull} = true; } catch (java.time.format.DateTimeParseException e) { + $tf$$.MODULE$$.checkLegacyFormatter( + e, ${eval1.value}.toString(), ${formatString.value}.toString(), $zid); ${ev.isNull} = true; } catch (java.time.DateTimeException e) { ${ev.isNull} = true; @@ -839,7 +853,6 @@ abstract class ToTimestamp } case StringType => val zid = ctx.addReferenceObj("zoneId", zoneId, classOf[ZoneId].getName) - val tf = TimestampFormatter.getClass.getName.stripSuffix("$") val ldf = LegacyDateFormats.getClass.getName.stripSuffix("$") nullSafeCodeGen(ctx, ev, (string, format) => { s""" @@ -854,6 +867,8 @@ abstract class ToTimestamp } catch (java.text.ParseException e) { ${ev.isNull} = true; } catch (java.time.format.DateTimeParseException e) { + $tf$$.MODULE$$.checkLegacyFormatter( + e, $string.toString(), $format.toString(), $zid)}; ${ev.isNull} = true; } catch (java.time.DateTimeException e) { ${ev.isNull} = true; diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala index b70a4edd5386..331e93f81ea9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala @@ -229,4 +229,21 @@ object TimestampFormatter { def getFractionFormatter(zoneId: ZoneId): TimestampFormatter = { new FractionTimestampFormatter(zoneId) } + + def checkLegacyFormatter( + e: DateTimeParseException, s: String, format: String, zoneId: ZoneId): Unit = { + assert(!SQLConf.get.legacyTimeParserEnabled, + "Only check legacy formatter while legacy parser disabled.") + val formatter = new LegacyTimestampFormatter(format, zoneId, defaultLocale) + val res = try { + Some(formatter.parse(s)) + } catch { + case _: Throwable => None + } + if (res.nonEmpty) { + throw new RuntimeException(e.getMessage + ", set " + + s"${SQLConf.LEGACY_TIME_PARSER_ENABLED.key} to true to restore the behavior before " + + "Spark 3.0.") + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala index fd65f7513aa6..53875f04cad4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala @@ -377,6 +377,14 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession { Seq(Row(Date.valueOf("2015-07-30")), Row(Date.valueOf("2015-07-30")))) } + def checkExceptionMessage(df: DataFrame): Unit = { + val message = intercept[Exception] { + df.collect() + }.getCause.getMessage + assert(message.contains(s"set ${SQLConf.LEGACY_TIME_PARSER_ENABLED.key} to true to restore " + + "the behavior before Spark 3.0")) + } + test("function to_date") { val d1 = Date.valueOf("2015-07-22") val d2 = Date.valueOf("2015-07-01") @@ -422,19 +430,10 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession { df.select(to_date(col("d"), "yyyy-MM-dd")), Seq(Row(Date.valueOf("2015-07-22")), Row(Date.valueOf("2015-07-01")), Row(Date.valueOf("2014-12-31")))) - checkAnswer( - df.select(to_date(col("s"), "yyyy-MM-dd")), - Seq(Row(null), Row(Date.valueOf("2014-12-31")), Row(null))) - - // now switch format - checkAnswer( - df.select(to_date(col("s"), "yyyy-dd-MM")), - Seq(Row(null), Row(null), Row(Date.valueOf("2014-12-31")))) + checkExceptionMessage(df.select(to_date(col("s"), "yyyy-MM-dd"))) // invalid format - checkAnswer( - df.select(to_date(col("s"), "yyyy-hh-MM")), - Seq(Row(null), Row(null), Row(null))) + checkExceptionMessage(df.select(to_date(col("s"), "yyyy-hh-MM"))) checkAnswer( df.select(to_date(col("s"), "yyyy-dd-aa")), Seq(Row(null), Row(null), Row(null))) @@ -562,6 +561,7 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession { private def secs(millis: Long): Long = TimeUnit.MILLISECONDS.toSeconds(millis) test("unix_timestamp") { +<<<<<<< HEAD Seq(false, true).foreach { legacyParser => withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) { val date1 = Date.valueOf("2015-07-24") @@ -819,16 +819,23 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession { } test("SPARK-30668: use legacy timestamp parser in to_timestamp") { - def checkTimeZoneParsing(expected: Any): Unit = { - val df = Seq("2020-01-27T20:06:11.847-0800").toDF("ts") - checkAnswer(df.select(to_timestamp(col("ts"), "yyyy-MM-dd'T'HH:mm:ss.SSSz")), - Row(expected)) - } - withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> "true") { - checkTimeZoneParsing(Timestamp.valueOf("2020-01-27 20:06:11.847")) - } - withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> "false") { - checkTimeZoneParsing(null) + Seq(true, false).foreach { wholeStageCodegen => + withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> wholeStageCodegen.toString) { + val confKey = SQLConf.LEGACY_TIME_PARSER_ENABLED.key + withSQLConf(confKey -> "true") { + val expected = Timestamp.valueOf("2020-01-27 20:06:11.847") + val df = Seq("2020-01-27T20:06:11.847-0800").toDF("ts") + checkAnswer(df.select(to_timestamp(col("ts"), "yyyy-MM-dd'T'HH:mm:ss.SSSz")), + Row(expected)) + } + withSQLConf(confKey -> "false") { + val df = Seq("2020-01-27T20:06:11.847-0800").toDF("ts") + val message = intercept[Exception] { + df.select(to_timestamp(col("ts"), "yyyy-MM-dd'T'HH:mm:ss.SSSz")).collect() + }.getCause.getMessage + assert(message.contains(s"set $confKey to true to restore the behavior before Spark 3.0")) + } + } } } From b983abef2e9b9f93d4a66935b9f840d8d582cac1 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Wed, 12 Feb 2020 10:13:27 +0800 Subject: [PATCH 02/14] fix --- .../spark/sql/catalyst/expressions/datetimeExpressions.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala index 9df423e86278..db4ec1deb0f8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala @@ -868,7 +868,7 @@ abstract class ToTimestamp ${ev.isNull} = true; } catch (java.time.format.DateTimeParseException e) { $tf$$.MODULE$$.checkLegacyFormatter( - e, $string.toString(), $format.toString(), $zid)}; + e, $string.toString(), $format.toString(), $zid); ${ev.isNull} = true; } catch (java.time.DateTimeException e) { ${ev.isNull} = true; From eab746a1037f41ba15bfeffbfe295ed604728909 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Thu, 20 Feb 2020 19:41:57 +0800 Subject: [PATCH 03/14] temp commit --- .../catalyst/util/TimestampFormatter.scala | 2 +- .../apache/spark/sql/DateFunctionsSuite.scala | 22 +++++++++---------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala index 331e93f81ea9..7e6139e8ad65 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala @@ -234,7 +234,7 @@ object TimestampFormatter { e: DateTimeParseException, s: String, format: String, zoneId: ZoneId): Unit = { assert(!SQLConf.get.legacyTimeParserEnabled, "Only check legacy formatter while legacy parser disabled.") - val formatter = new LegacyTimestampFormatter(format, zoneId, defaultLocale) + val formatter = new LegacySimpleTimestampFormatter(s, zoneId, defaultLocale, lenient = false) val res = try { Some(formatter.parse(s)) } catch { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala index 53875f04cad4..fc324b5a9520 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala @@ -377,14 +377,6 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession { Seq(Row(Date.valueOf("2015-07-30")), Row(Date.valueOf("2015-07-30")))) } - def checkExceptionMessage(df: DataFrame): Unit = { - val message = intercept[Exception] { - df.collect() - }.getCause.getMessage - assert(message.contains(s"set ${SQLConf.LEGACY_TIME_PARSER_ENABLED.key} to true to restore " + - "the behavior before Spark 3.0")) - } - test("function to_date") { val d1 = Date.valueOf("2015-07-22") val d2 = Date.valueOf("2015-07-01") @@ -430,10 +422,19 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession { df.select(to_date(col("d"), "yyyy-MM-dd")), Seq(Row(Date.valueOf("2015-07-22")), Row(Date.valueOf("2015-07-01")), Row(Date.valueOf("2014-12-31")))) - checkExceptionMessage(df.select(to_date(col("s"), "yyyy-MM-dd"))) + checkAnswer( + df.select(to_date(col("s"), "yyyy-MM-dd")), + Seq(Row(null), Row(Date.valueOf("2014-12-31")), Row(null))) + + // now switch format + checkAnswer( + df.select(to_date(col("s"), "yyyy-dd-MM")), + Seq(Row(null), Row(null), Row(Date.valueOf("2014-12-31")))) // invalid format - checkExceptionMessage(df.select(to_date(col("s"), "yyyy-hh-MM"))) + checkAnswer( + df.select(to_date(col("s"), "yyyy-hh-MM")), + Seq(Row(null), Row(null), Row(null))) checkAnswer( df.select(to_date(col("s"), "yyyy-dd-aa")), Seq(Row(null), Row(null), Row(null))) @@ -561,7 +562,6 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession { private def secs(millis: Long): Long = TimeUnit.MILLISECONDS.toSeconds(millis) test("unix_timestamp") { -<<<<<<< HEAD Seq(false, true).foreach { legacyParser => withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) { val date1 = Date.valueOf("2015-07-24") From faa58822d4a5a76114ae9ae024806552af9f30d6 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Fri, 21 Feb 2020 13:18:48 +0800 Subject: [PATCH 04/14] Updates for 3-value config --- .../sql/catalyst/util/DateFormatter.scala | 1 + .../catalyst/util/TimestampFormatter.scala | 31 +++++---- .../apache/spark/sql/internal/SQLConf.scala | 16 ++++- .../expressions/DateExpressionsSuite.scala | 16 ++--- .../catalyst/json/JsonInferSchemaSuite.scala | 16 ++--- .../apache/spark/sql/DateFunctionsSuite.scala | 67 ++++++++++++------- .../execution/datasources/csv/CSVSuite.scala | 2 +- .../datasources/json/JsonSuite.scala | 2 +- 8 files changed, 94 insertions(+), 57 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala index 941c8fcccd1a..958b554d5bf8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala @@ -26,6 +26,7 @@ import org.apache.commons.lang3.time.FastDateFormat import org.apache.spark.sql.catalyst.util.DateTimeConstants.MICROS_PER_MILLIS import org.apache.spark.sql.catalyst.util.DateTimeUtils._ import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy sealed trait DateFormatter extends Serializable { def parse(s: String): Int // returns days since epoch diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala index 7e6139e8ad65..99a6dbf0342f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala @@ -30,6 +30,7 @@ import org.apache.commons.lang3.time.FastDateFormat import org.apache.spark.sql.catalyst.util.DateTimeConstants._ import org.apache.spark.sql.catalyst.util.DateTimeUtils._ import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy import org.apache.spark.sql.types.Decimal sealed trait TimestampFormatter extends Serializable { @@ -232,18 +233,24 @@ object TimestampFormatter { def checkLegacyFormatter( e: DateTimeParseException, s: String, format: String, zoneId: ZoneId): Unit = { - assert(!SQLConf.get.legacyTimeParserEnabled, - "Only check legacy formatter while legacy parser disabled.") - val formatter = new LegacySimpleTimestampFormatter(s, zoneId, defaultLocale, lenient = false) - val res = try { - Some(formatter.parse(s)) - } catch { - case _: Throwable => None - } - if (res.nonEmpty) { - throw new RuntimeException(e.getMessage + ", set " + - s"${SQLConf.LEGACY_TIME_PARSER_ENABLED.key} to true to restore the behavior before " + - "Spark 3.0.") + // Only check legacy formatter while legacy time parser policy is exception. For legacy parser, + // DateTimeParseException will not be thrown. On the contrary, if the legacy policy set to + // corrected, Spark will return null. + if (LegacyBehaviorPolicy.withName( + SQLConf.get.getConf(SQLConf.LEGACY_TIME_PARSER_POLICY)) == LegacyBehaviorPolicy.EXCEPTION) { + val formatter = new LegacySimpleTimestampFormatter( + format, zoneId, defaultLocale, lenient = false) + val res = try { + Some(formatter.parse(s)) + } catch { + case _: Throwable => None + } + if (res.nonEmpty) { + throw new RuntimeException(e.getMessage + ", set " + + s"${SQLConf.LEGACY_TIME_PARSER_POLICY.key} to LEGACY to restore the behavior before " + + "Spark 3.0. Set to CORRECTED to use the new approach, which would return null for this " + + "record. See more details in SPARK-30668.") + } } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 7f55f2272bfb..cc4cf52d6624 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2352,6 +2352,18 @@ object SQLConf { .checkValues(LegacyBehaviorPolicy.values.map(_.toString)) .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString) + val LEGACY_TIME_PARSER_POLICY = buildConf("spark.sql.legacy.timeParserPolicy") + .internal() + .doc("When LEGACY, java.text.SimpleDateFormat is used for formatting and parsing " + + "dates/timestamps in a locale-sensitive manner, which is the approach before Spark 3.0. " + + "When set to CORRECTED, classes from java.time.* packages are used for the same purpose. " + + "The default value is EXCEPTION, RuntimeException is thrown when we will get different " + + "results.") + .stringConf + .transform(_.toUpperCase(Locale.ROOT)) + .checkValues(LegacyBehaviorPolicy.values.map(_.toString)) + .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString) + val LEGACY_ARRAY_EXISTS_FOLLOWS_THREE_VALUED_LOGIC = buildConf("spark.sql.legacy.followThreeValuedLogicInArrayExists") .internal() @@ -2743,7 +2755,9 @@ class SQLConf extends Serializable with Logging { def legacyMsSqlServerNumericMappingEnabled: Boolean = getConf(LEGACY_MSSQLSERVER_NUMERIC_MAPPING_ENABLED) - def legacyTimeParserEnabled: Boolean = getConf(SQLConf.LEGACY_TIME_PARSER_ENABLED) + def legacyTimeParserEnabled: Boolean = + LegacyBehaviorPolicy.withName( + getConf(SQLConf.LEGACY_TIME_PARSER_POLICY)) == LegacyBehaviorPolicy.LEGACY /** * Returns the [[Resolver]] for the current configuration, which can be used to determine if two diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala index e43eb594286c..71966a6f774a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala @@ -241,8 +241,8 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { } test("DateFormat") { - Seq(false, true).foreach { legacyParser => - withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) { + Seq("legacy", "corrected").foreach { legacyParserPolicy => + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy) { checkEvaluation( DateFormatClass(Literal.create(null, TimestampType), Literal("y"), gmtId), null) @@ -710,8 +710,8 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { } test("from_unixtime") { - Seq(false, true).foreach { legacyParser => - withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) { + Seq("legacy", "corrected").foreach { legacyParserPolicy => + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy) { val fmt1 = "yyyy-MM-dd HH:mm:ss" val sdf1 = new SimpleDateFormat(fmt1, Locale.US) val fmt2 = "yyyy-MM-dd HH:mm:ss.SSS" @@ -758,8 +758,8 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { } test("unix_timestamp") { - Seq(false, true).foreach { legacyParser => - withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) { + Seq("legacy", "corrected").foreach { legacyParserPolicy => + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy) { val sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US) val fmt2 = "yyyy-MM-dd HH:mm:ss.SSS" val sdf2 = new SimpleDateFormat(fmt2, Locale.US) @@ -824,8 +824,8 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { } test("to_unix_timestamp") { - Seq(false, true).foreach { legacyParser => - withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) { + Seq("legacy", "corrected").foreach { legacyParserPolicy => + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy) { val fmt1 = "yyyy-MM-dd HH:mm:ss" val sdf1 = new SimpleDateFormat(fmt1, Locale.US) val fmt2 = "yyyy-MM-dd HH:mm:ss.SSS" diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala index c2e03bd2c360..bce917c80f93 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala @@ -40,8 +40,8 @@ class JsonInferSchemaSuite extends SparkFunSuite with SQLHelper { } test("inferring timestamp type") { - Seq(true, false).foreach { legacyParser => - withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) { + Seq("legacy", "corrected").foreach { legacyParserPolicy => + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy) { checkTimestampType("yyyy", """{"a": "2018"}""") checkTimestampType("yyyy=MM", """{"a": "2018=12"}""") checkTimestampType("yyyy MM dd", """{"a": "2018 12 02"}""") @@ -56,8 +56,8 @@ class JsonInferSchemaSuite extends SparkFunSuite with SQLHelper { } test("prefer decimals over timestamps") { - Seq(true, false).foreach { legacyParser => - withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) { + Seq("legacy", "corrected").foreach { legacyParser => + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParser) { checkType( options = Map( "prefersDecimal" -> "true", @@ -71,8 +71,8 @@ class JsonInferSchemaSuite extends SparkFunSuite with SQLHelper { } test("skip decimal type inferring") { - Seq(true, false).foreach { legacyParser => - withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) { + Seq("legacy", "corrected").foreach { legacyParserPolicy => + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy) { checkType( options = Map( "prefersDecimal" -> "false", @@ -86,8 +86,8 @@ class JsonInferSchemaSuite extends SparkFunSuite with SQLHelper { } test("fallback to string type") { - Seq(true, false).foreach { legacyParser => - withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) { + Seq("legacy", "corrected").foreach { legacyParserPolicy => + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy) { checkType( options = Map("timestampFormat" -> "yyyy,MM,dd.HHmmssSSS"), json = """{"a": "20181202.210400123"}""", diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala index fc324b5a9520..522381e41a43 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala @@ -96,8 +96,8 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession { } test("date format") { - Seq(false, true).foreach { legacyParser => - withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) { + Seq("legacy", "corrected").foreach { legacyParserPolicy => + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy) { val df = Seq((d, sdf.format(d), ts)).toDF("a", "b", "c") checkAnswer( @@ -377,6 +377,14 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession { Seq(Row(Date.valueOf("2015-07-30")), Row(Date.valueOf("2015-07-30")))) } + def checkExceptionMessage(df: DataFrame): Unit = { + val message = intercept[Exception] { + df.collect() + }.getCause.getMessage + assert(message.contains(s"set ${SQLConf.LEGACY_TIME_PARSER_POLICY.key} to LEGACY to restore " + + "the behavior before Spark 3.0")) + } + test("function to_date") { val d1 = Date.valueOf("2015-07-22") val d2 = Date.valueOf("2015-07-01") @@ -422,14 +430,22 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession { df.select(to_date(col("d"), "yyyy-MM-dd")), Seq(Row(Date.valueOf("2015-07-22")), Row(Date.valueOf("2015-07-01")), Row(Date.valueOf("2014-12-31")))) - checkAnswer( - df.select(to_date(col("s"), "yyyy-MM-dd")), - Seq(Row(null), Row(Date.valueOf("2014-12-31")), Row(null))) + val confKey = SQLConf.LEGACY_TIME_PARSER_POLICY.key + withSQLConf(confKey -> "corrected") { + checkAnswer( + df.select(to_date(col("s"), "yyyy-MM-dd")), + Seq(Row(null), Row(Date.valueOf("2014-12-31")), Row(null))) + } + withSQLConf(confKey -> "exception") { + checkExceptionMessage(df.select(to_date(col("s"), "yyyy-MM-dd"))) + } // now switch format - checkAnswer( - df.select(to_date(col("s"), "yyyy-dd-MM")), - Seq(Row(null), Row(null), Row(Date.valueOf("2014-12-31")))) + withSQLConf(confKey -> "corrected") { + checkAnswer( + df.select(to_date(col("s"), "yyyy-dd-MM")), + Seq(Row(null), Row(null), Row(Date.valueOf("2014-12-31")))) + } // invalid format checkAnswer( @@ -529,8 +545,8 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession { } test("from_unixtime") { - Seq(false, true).foreach { legacyParser => - withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) { + Seq("corrected", "legacy").foreach { legacyParserPolicy => + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy) { val sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US) val fmt2 = "yyyy-MM-dd HH:mm:ss.SSS" val sdf2 = new SimpleDateFormat(fmt2, Locale.US) @@ -562,8 +578,8 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession { private def secs(millis: Long): Long = TimeUnit.MILLISECONDS.toSeconds(millis) test("unix_timestamp") { - Seq(false, true).foreach { legacyParser => - withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) { + Seq("corrected", "legacy").foreach { legacyParserPolicy => + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy) { val date1 = Date.valueOf("2015-07-24") val date2 = Date.valueOf("2015-07-25") val ts1 = Timestamp.valueOf("2015-07-24 10:00:00.3") @@ -629,8 +645,8 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession { } test("to_unix_timestamp") { - Seq(false, true).foreach { legacyParser => - withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) { + Seq("corrected", "legacy").foreach { legacyParserPolicy => + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy) { val date1 = Date.valueOf("2015-07-24") val date2 = Date.valueOf("2015-07-25") val ts1 = Timestamp.valueOf("2015-07-24 10:00:00.3") @@ -680,8 +696,8 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession { test("to_timestamp") { - Seq(false, true).foreach { legacyParser => - withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) { + Seq("legacy", "corrected").foreach { legacyParserPolicy => + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy.toString) { val date1 = Date.valueOf("2015-07-24") val date2 = Date.valueOf("2015-07-25") val ts_date1 = Timestamp.valueOf("2015-07-24 00:00:00") @@ -701,7 +717,7 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession { df.select(unix_timestamp(col("ss")).cast("timestamp"))) checkAnswer(df.select(to_timestamp(col("ss"))), Seq( Row(ts1), Row(ts2))) - if (legacyParser) { + if (legacyParserPolicy == "legacy") { // In Spark 2.4 and earlier, to_timestamp() parses in seconds precision and cuts off // the fractional part of seconds. The behavior was changed by SPARK-27438. val legacyFmt = "yyyy/MM/dd HH:mm:ss" @@ -821,19 +837,18 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession { test("SPARK-30668: use legacy timestamp parser in to_timestamp") { Seq(true, false).foreach { wholeStageCodegen => withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> wholeStageCodegen.toString) { - val confKey = SQLConf.LEGACY_TIME_PARSER_ENABLED.key - withSQLConf(confKey -> "true") { + val confKey = SQLConf.LEGACY_TIME_PARSER_POLICY.key + val df = Seq("2020-01-27T20:06:11.847-0800").toDF("ts") + withSQLConf(confKey -> "legacy") { val expected = Timestamp.valueOf("2020-01-27 20:06:11.847") - val df = Seq("2020-01-27T20:06:11.847-0800").toDF("ts") checkAnswer(df.select(to_timestamp(col("ts"), "yyyy-MM-dd'T'HH:mm:ss.SSSz")), Row(expected)) } - withSQLConf(confKey -> "false") { - val df = Seq("2020-01-27T20:06:11.847-0800").toDF("ts") - val message = intercept[Exception] { - df.select(to_timestamp(col("ts"), "yyyy-MM-dd'T'HH:mm:ss.SSSz")).collect() - }.getCause.getMessage - assert(message.contains(s"set $confKey to true to restore the behavior before Spark 3.0")) + withSQLConf(confKey -> "corrected") { + checkAnswer(df.select(to_timestamp(col("ts"), "yyyy-MM-dd'T'HH:mm:ss.SSSz")), Row(null)) + } + withSQLConf(confKey -> "exception") { + checkExceptionMessage(df.select(to_timestamp(col("ts"), "yyyy-MM-dd'T'HH:mm:ss.SSSz"))) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 43553df29654..e2f79a6a84ad 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -2327,5 +2327,5 @@ class CSVLegacyTimeParserSuite extends CSVSuite { override protected def sparkConf: SparkConf = super .sparkConf - .set(SQLConf.LEGACY_TIME_PARSER_ENABLED, true) + .set(SQLConf.LEGACY_TIME_PARSER_POLICY, "legacy") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 26d600e239b6..a0fd9d7e87df 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2689,5 +2689,5 @@ class JsonLegacyTimeParserSuite extends JsonSuite { override protected def sparkConf: SparkConf = super .sparkConf - .set(SQLConf.LEGACY_TIME_PARSER_ENABLED, true) + .set(SQLConf.LEGACY_TIME_PARSER_POLICY, "legacy") } From cadd288b8d805bac4e9d2653f8faf6c3753e7a0a Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Mon, 24 Feb 2020 12:56:40 +0800 Subject: [PATCH 05/14] fix nits --- .../apache/spark/sql/catalyst/util/DateFormatter.scala | 1 - .../org/apache/spark/sql/DateFunctionsSuite.scala | 10 ++++------ 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala index 958b554d5bf8..941c8fcccd1a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala @@ -26,7 +26,6 @@ import org.apache.commons.lang3.time.FastDateFormat import org.apache.spark.sql.catalyst.util.DateTimeConstants.MICROS_PER_MILLIS import org.apache.spark.sql.catalyst.util.DateTimeUtils._ import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy sealed trait DateFormatter extends Serializable { def parse(s: String): Int // returns days since epoch diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala index 522381e41a43..5dd8f93c37a1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala @@ -441,11 +441,9 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession { } // now switch format - withSQLConf(confKey -> "corrected") { - checkAnswer( - df.select(to_date(col("s"), "yyyy-dd-MM")), - Seq(Row(null), Row(null), Row(Date.valueOf("2014-12-31")))) - } + checkAnswer( + df.select(to_date(col("s"), "yyyy-dd-MM")), + Seq(Row(null), Row(null), Row(Date.valueOf("2014-12-31")))) // invalid format checkAnswer( @@ -697,7 +695,7 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession { test("to_timestamp") { Seq("legacy", "corrected").foreach { legacyParserPolicy => - withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy.toString) { + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> legacyParserPolicy) { val date1 = Date.valueOf("2015-07-24") val date2 = Date.valueOf("2015-07-25") val ts_date1 = Timestamp.valueOf("2015-07-24 00:00:00") From 3f716c1c6c665ea42c4a2881392c5c0f33cb401c Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Thu, 27 Feb 2020 17:34:13 +0800 Subject: [PATCH 06/14] Address comment and add check logic for incompatible pattern string --- .../expressions/datetimeExpressions.scala | 17 +---- .../sql/catalyst/util/DateFormatter.scala | 64 +++++++++++++++++-- .../catalyst/util/TimestampFormatter.scala | 44 +++++-------- .../apache/spark/sql/internal/SQLConf.scala | 5 +- .../spark/sql/util/DateFormatterSuite.scala | 12 ++++ 5 files changed, 92 insertions(+), 50 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala index db4ec1deb0f8..767dacfde073 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.catalyst.expressions import java.sql.Timestamp import java.time.{DateTimeException, LocalDate, LocalDateTime, ZoneId} -import java.time.format.DateTimeParseException import java.time.temporal.IsoFields import java.util.{Locale, TimeZone} @@ -790,10 +789,6 @@ abstract class ToTimestamp formatter.parse( t.asInstanceOf[UTF8String].toString) / downScaleFactor } catch { - case e: DateTimeParseException => - TimestampFormatter.checkLegacyFormatter( - e, t.asInstanceOf[UTF8String].toString, constFormat.toString, zoneId) - null case NonFatal(_) => null } } @@ -807,10 +802,6 @@ abstract class ToTimestamp TimestampFormatter(formatString, zoneId, legacyFormat = SIMPLE_DATE_FORMAT) .parse(t.asInstanceOf[UTF8String].toString) / downScaleFactor } catch { - case e: DateTimeParseException => - TimestampFormatter.checkLegacyFormatter( - e, t.asInstanceOf[UTF8String].toString, formatString, zoneId) - null case NonFatal(_) => null } } @@ -820,7 +811,6 @@ abstract class ToTimestamp override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { val javaType = CodeGenerator.javaType(dataType) - val tf = TimestampFormatter.getClass.getName.stripSuffix("$") left.dataType match { case StringType if right.foldable => val df = classOf[TimestampFormatter].getName @@ -828,9 +818,7 @@ abstract class ToTimestamp ExprCode.forNullValue(dataType) } else { val formatterName = ctx.addReferenceObj("formatter", formatter, df) - val zid = ctx.addReferenceObj("zoneId", zoneId, classOf[ZoneId].getName) val eval1 = left.genCode(ctx) - val formatString = right.genCode(ctx) ev.copy(code = code""" ${eval1.code} boolean ${ev.isNull} = ${eval1.isNull}; @@ -843,8 +831,6 @@ abstract class ToTimestamp } catch (java.text.ParseException e) { ${ev.isNull} = true; } catch (java.time.format.DateTimeParseException e) { - $tf$$.MODULE$$.checkLegacyFormatter( - e, ${eval1.value}.toString(), ${formatString.value}.toString(), $zid); ${ev.isNull} = true; } catch (java.time.DateTimeException e) { ${ev.isNull} = true; @@ -853,6 +839,7 @@ abstract class ToTimestamp } case StringType => val zid = ctx.addReferenceObj("zoneId", zoneId, classOf[ZoneId].getName) + val tf = TimestampFormatter.getClass.getName.stripSuffix("$") val ldf = LegacyDateFormats.getClass.getName.stripSuffix("$") nullSafeCodeGen(ctx, ev, (string, format) => { s""" @@ -867,8 +854,6 @@ abstract class ToTimestamp } catch (java.text.ParseException e) { ${ev.isNull} = true; } catch (java.time.format.DateTimeParseException e) { - $tf$$.MODULE$$.checkLegacyFormatter( - e, $string.toString(), $format.toString(), $zid); ${ev.isNull} = true; } catch (java.time.DateTimeException e) { ${ev.isNull} = true; diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala index 941c8fcccd1a..942e8ee1bf6d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.util import java.text.SimpleDateFormat import java.time.{LocalDate, ZoneId} +import java.time.format.DateTimeParseException import java.util.{Date, Locale} import org.apache.commons.lang3.time.FastDateFormat @@ -26,6 +27,8 @@ import org.apache.commons.lang3.time.FastDateFormat import org.apache.spark.sql.catalyst.util.DateTimeConstants.MICROS_PER_MILLIS import org.apache.spark.sql.catalyst.util.DateTimeUtils._ import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy._ sealed trait DateFormatter extends Serializable { def parse(s: String): Int // returns days since epoch @@ -43,7 +46,15 @@ class Iso8601DateFormatter( override def parse(s: String): Int = { val specialDate = convertSpecialDate(s.trim, zoneId) specialDate.getOrElse { - val localDate = LocalDate.parse(s, formatter) + val localDate = try { + LocalDate.parse(s, formatter) + } catch { + case e: DateTimeParseException if DateFormatter.hasDiffResult(s, pattern, zoneId) => + throw new RuntimeException(e.getMessage + ", set " + + s"${SQLConf.LEGACY_TIME_PARSER_POLICY.key} to LEGACY to restore the behavior before " + + "Spark 3.0. Set to CORRECTED to use the new approach, which would return null for " + + "this record. See more details in SPARK-30668.") + } localDateToDays(localDate) } } @@ -88,7 +99,7 @@ object DateFormatter { val defaultLocale: Locale = Locale.US def defaultPattern(): String = { - if (SQLConf.get.legacyTimeParserEnabled) "yyyy-MM-dd" else "uuuu-MM-dd" + if (SQLConf.get.legacyTimeParserPolicy == LEGACY) "yyyy-MM-dd" else "uuuu-MM-dd" } private def getFormatter( @@ -97,8 +108,13 @@ object DateFormatter { locale: Locale = defaultLocale, legacyFormat: LegacyDateFormat = LENIENT_SIMPLE_DATE_FORMAT): DateFormatter = { - val pattern = format.getOrElse(defaultPattern) - if (SQLConf.get.legacyTimeParserEnabled) { + val pattern = if (format.nonEmpty) { + checkIncompatiblePattern(format.get) + format.get + } else { + defaultPattern() + } + if (SQLConf.get.legacyTimeParserPolicy == LEGACY) { legacyFormat match { case FAST_DATE_FORMAT => new LegacyFastDateFormatter(pattern, locale) @@ -125,4 +141,44 @@ object DateFormatter { def apply(zoneId: ZoneId): DateFormatter = { getFormatter(None, zoneId) } + + def hasDiffResult(s: String, format: String, zoneId: ZoneId): Boolean = { + // Only check whether we will get different results between legacy format and new format, while + // legacy time parser policy set to EXCEPTION. For legacy parser, DateTimeParseException will + // not be thrown. On the contrary, if the legacy policy set to CORRECTED, + // DateTimeParseException will address by the caller side. + if (LegacyBehaviorPolicy.withName( + SQLConf.get.getConf(SQLConf.LEGACY_TIME_PARSER_POLICY)) == EXCEPTION) { + val formatter = new LegacySimpleTimestampFormatter( + format, zoneId, defaultLocale, lenient = false) + val res = try { + Some(formatter.parse(s)) + } catch { + case _: Throwable => None + } + res.nonEmpty + } else { + false + } + } + + def checkIncompatiblePattern(pattern: String): Unit = { + // Only check whether we have incompatible pattern for user provided pattern string. + // Currently, the only incompatible pattern string is 'u', which represents + // 'Day number of week' in legacy parser but 'Year' in new parser. + if (LegacyBehaviorPolicy.withName( + SQLConf.get.getConf(SQLConf.LEGACY_TIME_PARSER_POLICY)) == EXCEPTION) { + // Text can be quoted using single quotes, we only check the non-quote parts. + val isIncompatible = pattern.split("'").zipWithIndex.exists { + case (patternPart, index) => + index % 2 == 0 && patternPart.contains("u") + } + if (isIncompatible) { + throw new RuntimeException(s"The pattern $pattern provided is incompatible between " + + "legacy parser and new parser after Spark 3.0. Please change the pattern or set " + + s"${SQLConf.LEGACY_TIME_PARSER_POLICY.key} to LEGACY or CORRECTED to explicitly choose " + + "the parser.") + } + } + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala index 99a6dbf0342f..4f39840a3a70 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala @@ -30,7 +30,7 @@ import org.apache.commons.lang3.time.FastDateFormat import org.apache.spark.sql.catalyst.util.DateTimeConstants._ import org.apache.spark.sql.catalyst.util.DateTimeUtils._ import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy._ import org.apache.spark.sql.types.Decimal sealed trait TimestampFormatter extends Serializable { @@ -60,7 +60,15 @@ class Iso8601TimestampFormatter( override def parse(s: String): Long = { val specialDate = convertSpecialTimestamp(s.trim, zoneId) specialDate.getOrElse { - val parsed = formatter.parse(s) + val parsed = try { + formatter.parse(s) + } catch { + case e: DateTimeParseException if DateFormatter.hasDiffResult(s, pattern, zoneId) => + throw new RuntimeException(e.getMessage + ", set " + + s"${SQLConf.LEGACY_TIME_PARSER_POLICY.key} to LEGACY to restore the behavior before " + + "Spark 3.0. Set to CORRECTED to use the new approach, which would return null for " + + "this record. See more details in SPARK-30668.") + } val parsedZoneId = parsed.query(TemporalQueries.zone()) val timeZoneId = if (parsedZoneId == null) zoneId else parsedZoneId val zonedDateTime = toZonedDateTime(parsed, timeZoneId) @@ -192,8 +200,13 @@ object TimestampFormatter { locale: Locale = defaultLocale, legacyFormat: LegacyDateFormat = LENIENT_SIMPLE_DATE_FORMAT): TimestampFormatter = { - val pattern = format.getOrElse(defaultPattern) - if (SQLConf.get.legacyTimeParserEnabled) { + val pattern = if (format.nonEmpty) { + DateFormatter.checkIncompatiblePattern(format.get) + format.get + } else { + defaultPattern() + } + if (SQLConf.get.legacyTimeParserPolicy == LEGACY) { legacyFormat match { case FAST_DATE_FORMAT => new LegacyFastTimestampFormatter(pattern, zoneId, locale) @@ -230,27 +243,4 @@ object TimestampFormatter { def getFractionFormatter(zoneId: ZoneId): TimestampFormatter = { new FractionTimestampFormatter(zoneId) } - - def checkLegacyFormatter( - e: DateTimeParseException, s: String, format: String, zoneId: ZoneId): Unit = { - // Only check legacy formatter while legacy time parser policy is exception. For legacy parser, - // DateTimeParseException will not be thrown. On the contrary, if the legacy policy set to - // corrected, Spark will return null. - if (LegacyBehaviorPolicy.withName( - SQLConf.get.getConf(SQLConf.LEGACY_TIME_PARSER_POLICY)) == LegacyBehaviorPolicy.EXCEPTION) { - val formatter = new LegacySimpleTimestampFormatter( - format, zoneId, defaultLocale, lenient = false) - val res = try { - Some(formatter.parse(s)) - } catch { - case _: Throwable => None - } - if (res.nonEmpty) { - throw new RuntimeException(e.getMessage + ", set " + - s"${SQLConf.LEGACY_TIME_PARSER_POLICY.key} to LEGACY to restore the behavior before " + - "Spark 3.0. Set to CORRECTED to use the new approach, which would return null for this " + - "record. See more details in SPARK-30668.") - } - } - } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index cc4cf52d6624..61f69b420e78 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2755,9 +2755,8 @@ class SQLConf extends Serializable with Logging { def legacyMsSqlServerNumericMappingEnabled: Boolean = getConf(LEGACY_MSSQLSERVER_NUMERIC_MAPPING_ENABLED) - def legacyTimeParserEnabled: Boolean = - LegacyBehaviorPolicy.withName( - getConf(SQLConf.LEGACY_TIME_PARSER_POLICY)) == LegacyBehaviorPolicy.LEGACY + def legacyTimeParserPolicy: LegacyBehaviorPolicy.Value = LegacyBehaviorPolicy.withName( + getConf(SQLConf.LEGACY_TIME_PARSER_POLICY)) /** * Returns the [[Resolver]] for the current configuration, which can be used to determine if two diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateFormatterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateFormatterSuite.scala index d617b1c1d823..f998b0c20f1c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateFormatterSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateFormatterSuite.scala @@ -114,4 +114,16 @@ class DateFormatterSuite extends SparkFunSuite with SQLHelper { assert(formatter.parse("tomorrow UTC") === today + 1) } } + + test("check incompatible pattern") { + assertThrows[RuntimeException](DateFormatter.checkIncompatiblePattern("MM-DD-u")) + assertThrows[RuntimeException]( + DateFormatter.checkIncompatiblePattern("uuuu-MM-dd'T'HH:mm:ss.SSSz")) + assertThrows[RuntimeException]( + DateFormatter.checkIncompatiblePattern("uuuu-MM'u contains in quoted text'HH:mm:ss")) + + // Pass the check + DateFormatter.checkIncompatiblePattern("yyyy-MM-dd'T'HH:mm:ss.SSSz") + DateFormatter.checkIncompatiblePattern("yyyy-MM'u contains in quoted text'HH:mm:ss") + } } From 169609c7bdf8a0452cc754c1fa2c09a697bade51 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Thu, 27 Feb 2020 21:44:13 +0800 Subject: [PATCH 07/14] split the incompatible pattern checking in other PR --- .../sql/catalyst/util/DateFormatter.scala | 30 ++----------------- .../catalyst/util/TimestampFormatter.scala | 7 +---- .../spark/sql/util/DateFormatterSuite.scala | 12 -------- 3 files changed, 4 insertions(+), 45 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala index 942e8ee1bf6d..f4e5d55651a3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala @@ -99,7 +99,8 @@ object DateFormatter { val defaultLocale: Locale = Locale.US def defaultPattern(): String = { - if (SQLConf.get.legacyTimeParserPolicy == LEGACY) "yyyy-MM-dd" else "uuuu-MM-dd" + // if (SQLConf.get.legacyTimeParserPolicy == LEGACY) "yyyy-MM-dd" else "uuuu-MM-dd" + "yyyy-MM-dd" } private def getFormatter( @@ -108,12 +109,7 @@ object DateFormatter { locale: Locale = defaultLocale, legacyFormat: LegacyDateFormat = LENIENT_SIMPLE_DATE_FORMAT): DateFormatter = { - val pattern = if (format.nonEmpty) { - checkIncompatiblePattern(format.get) - format.get - } else { - defaultPattern() - } + val pattern = format.getOrElse(defaultPattern) if (SQLConf.get.legacyTimeParserPolicy == LEGACY) { legacyFormat match { case FAST_DATE_FORMAT => @@ -161,24 +157,4 @@ object DateFormatter { false } } - - def checkIncompatiblePattern(pattern: String): Unit = { - // Only check whether we have incompatible pattern for user provided pattern string. - // Currently, the only incompatible pattern string is 'u', which represents - // 'Day number of week' in legacy parser but 'Year' in new parser. - if (LegacyBehaviorPolicy.withName( - SQLConf.get.getConf(SQLConf.LEGACY_TIME_PARSER_POLICY)) == EXCEPTION) { - // Text can be quoted using single quotes, we only check the non-quote parts. - val isIncompatible = pattern.split("'").zipWithIndex.exists { - case (patternPart, index) => - index % 2 == 0 && patternPart.contains("u") - } - if (isIncompatible) { - throw new RuntimeException(s"The pattern $pattern provided is incompatible between " + - "legacy parser and new parser after Spark 3.0. Please change the pattern or set " + - s"${SQLConf.LEGACY_TIME_PARSER_POLICY.key} to LEGACY or CORRECTED to explicitly choose " + - "the parser.") - } - } - } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala index 4f39840a3a70..682021135cd9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala @@ -200,12 +200,7 @@ object TimestampFormatter { locale: Locale = defaultLocale, legacyFormat: LegacyDateFormat = LENIENT_SIMPLE_DATE_FORMAT): TimestampFormatter = { - val pattern = if (format.nonEmpty) { - DateFormatter.checkIncompatiblePattern(format.get) - format.get - } else { - defaultPattern() - } + val pattern = format.getOrElse(defaultPattern) if (SQLConf.get.legacyTimeParserPolicy == LEGACY) { legacyFormat match { case FAST_DATE_FORMAT => diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateFormatterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateFormatterSuite.scala index f998b0c20f1c..d617b1c1d823 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateFormatterSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateFormatterSuite.scala @@ -114,16 +114,4 @@ class DateFormatterSuite extends SparkFunSuite with SQLHelper { assert(formatter.parse("tomorrow UTC") === today + 1) } } - - test("check incompatible pattern") { - assertThrows[RuntimeException](DateFormatter.checkIncompatiblePattern("MM-DD-u")) - assertThrows[RuntimeException]( - DateFormatter.checkIncompatiblePattern("uuuu-MM-dd'T'HH:mm:ss.SSSz")) - assertThrows[RuntimeException]( - DateFormatter.checkIncompatiblePattern("uuuu-MM'u contains in quoted text'HH:mm:ss")) - - // Pass the check - DateFormatter.checkIncompatiblePattern("yyyy-MM-dd'T'HH:mm:ss.SSSz") - DateFormatter.checkIncompatiblePattern("yyyy-MM'u contains in quoted text'HH:mm:ss") - } } From 6f151ca905c0b0d72d2e7b86f56d9e52d891aca6 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Thu, 27 Feb 2020 22:09:00 +0800 Subject: [PATCH 08/14] fix --- .../org/apache/spark/sql/catalyst/util/DateFormatter.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala index f4e5d55651a3..775d1bef7ff4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala @@ -99,8 +99,7 @@ object DateFormatter { val defaultLocale: Locale = Locale.US def defaultPattern(): String = { - // if (SQLConf.get.legacyTimeParserPolicy == LEGACY) "yyyy-MM-dd" else "uuuu-MM-dd" - "yyyy-MM-dd" + if (SQLConf.get.legacyTimeParserPolicy == LEGACY) "yyyy-MM-dd" else "uuuu-MM-dd" } private def getFormatter( From b2e7150bf0e99d88ade44a38b16ab8d1dd270474 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Mon, 2 Mar 2020 12:00:47 +0800 Subject: [PATCH 09/14] take legacy formatter during checking --- .../sql/catalyst/util/DateFormatter.scala | 67 ++++++++----------- .../util/DateTimeFormatterHelper.scala | 28 +++++++- .../catalyst/util/TimestampFormatter.scala | 62 ++++++++++------- 3 files changed, 90 insertions(+), 67 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala index 775d1bef7ff4..ffa799468411 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.catalyst.util import java.text.SimpleDateFormat import java.time.{LocalDate, ZoneId} -import java.time.format.DateTimeParseException import java.util.{Date, Locale} import org.apache.commons.lang3.time.FastDateFormat @@ -27,7 +26,6 @@ import org.apache.commons.lang3.time.FastDateFormat import org.apache.spark.sql.catalyst.util.DateTimeConstants.MICROS_PER_MILLIS import org.apache.spark.sql.catalyst.util.DateTimeUtils._ import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy._ sealed trait DateFormatter extends Serializable { @@ -38,24 +36,24 @@ sealed trait DateFormatter extends Serializable { class Iso8601DateFormatter( pattern: String, zoneId: ZoneId, - locale: Locale) extends DateFormatter with DateTimeFormatterHelper { + locale: Locale, + legacyFormat: LegacyDateFormats.LegacyDateFormat) + extends DateFormatter with DateTimeFormatterHelper { @transient private lazy val formatter = getOrCreateFormatter(pattern, locale) + @transient + private lazy val legacyFormatter = DateFormatter.getLegacyFormatter( + pattern, zoneId, locale, legacyFormat) + override def parse(s: String): Int = { val specialDate = convertSpecialDate(s.trim, zoneId) specialDate.getOrElse { - val localDate = try { - LocalDate.parse(s, formatter) - } catch { - case e: DateTimeParseException if DateFormatter.hasDiffResult(s, pattern, zoneId) => - throw new RuntimeException(e.getMessage + ", set " + - s"${SQLConf.LEGACY_TIME_PARSER_POLICY.key} to LEGACY to restore the behavior before " + - "Spark 3.0. Set to CORRECTED to use the new approach, which would return null for " + - "this record. See more details in SPARK-30668.") - } - localDateToDays(localDate) + try { + val localDate = LocalDate.parse(s, formatter) + localDateToDays(localDate) + } catch checkDiffResult(s, legacyFormatter.parse) } } @@ -110,14 +108,23 @@ object DateFormatter { val pattern = format.getOrElse(defaultPattern) if (SQLConf.get.legacyTimeParserPolicy == LEGACY) { - legacyFormat match { - case FAST_DATE_FORMAT => - new LegacyFastDateFormatter(pattern, locale) - case SIMPLE_DATE_FORMAT | LENIENT_SIMPLE_DATE_FORMAT => - new LegacySimpleDateFormatter(pattern, locale) - } + getLegacyFormatter(pattern, zoneId, locale, legacyFormat) } else { - new Iso8601DateFormatter(pattern, zoneId, locale) + new Iso8601DateFormatter(pattern, zoneId, locale, legacyFormat) + } + } + + def getLegacyFormatter( + pattern: String, + zoneId: ZoneId, + locale: Locale, + legacyFormat: LegacyDateFormat = LENIENT_SIMPLE_DATE_FORMAT): DateFormatter = { + + legacyFormat match { + case FAST_DATE_FORMAT => + new LegacyFastDateFormatter(pattern, locale) + case SIMPLE_DATE_FORMAT | LENIENT_SIMPLE_DATE_FORMAT => + new LegacySimpleDateFormatter(pattern, locale) } } @@ -136,24 +143,4 @@ object DateFormatter { def apply(zoneId: ZoneId): DateFormatter = { getFormatter(None, zoneId) } - - def hasDiffResult(s: String, format: String, zoneId: ZoneId): Boolean = { - // Only check whether we will get different results between legacy format and new format, while - // legacy time parser policy set to EXCEPTION. For legacy parser, DateTimeParseException will - // not be thrown. On the contrary, if the legacy policy set to CORRECTED, - // DateTimeParseException will address by the caller side. - if (LegacyBehaviorPolicy.withName( - SQLConf.get.getConf(SQLConf.LEGACY_TIME_PARSER_POLICY)) == EXCEPTION) { - val formatter = new LegacySimpleTimestampFormatter( - format, zoneId, defaultLocale, lenient = false) - val res = try { - Some(formatter.parse(s)) - } catch { - case _: Throwable => None - } - res.nonEmpty - } else { - false - } - } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala index a7b6309baf61..a3ce0d5096d2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala @@ -19,13 +19,16 @@ package org.apache.spark.sql.catalyst.util import java.time._ import java.time.chrono.IsoChronology -import java.time.format.{DateTimeFormatter, DateTimeFormatterBuilder, ResolverStyle} +import java.time.format.{DateTimeFormatter, DateTimeFormatterBuilder, DateTimeParseException, ResolverStyle} import java.time.temporal.{ChronoField, TemporalAccessor, TemporalQueries} import java.util.Locale import com.google.common.cache.CacheBuilder import org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy +import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy._ trait DateTimeFormatterHelper { // Converts the parsed temporal object to ZonedDateTime. It sets time components to zeros @@ -57,6 +60,29 @@ trait DateTimeFormatterHelper { } formatter } + + // When legacy time parser policy set to EXCEPTION, check whether we will get different results + // between legacy format and new format. For legacy parser, DateTimeParseException will not be + // thrown. On the contrary, if the legacy policy set to CORRECTED, DateTimeParseException will + // address by the caller side. + protected def checkDiffResult[T]( + s: String, legacyParseFunc: String => T): PartialFunction[Throwable, T] = { + case e: DateTimeParseException if LegacyBehaviorPolicy.withName( + SQLConf.get.getConf(SQLConf.LEGACY_TIME_PARSER_POLICY)) == EXCEPTION => + val res = try { + Some(legacyParseFunc(s)) + } catch { + case _: Throwable => None + } + if (res.nonEmpty) { + throw new RuntimeException(e.getMessage + ", set " + + s"${SQLConf.LEGACY_TIME_PARSER_POLICY.key} to LEGACY to restore the behavior before " + + "Spark 3.0. Set to CORRECTED to use the new approach, which would return null for " + + "this record. See more details in SPARK-30668.") + } else { + throw e + } + } } private object DateTimeFormatterHelper { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala index 682021135cd9..713e70c61ec0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala @@ -29,6 +29,7 @@ import org.apache.commons.lang3.time.FastDateFormat import org.apache.spark.sql.catalyst.util.DateTimeConstants._ import org.apache.spark.sql.catalyst.util.DateTimeUtils._ +import org.apache.spark.sql.catalyst.util.LegacyDateFormats.{LegacyDateFormat, LENIENT_SIMPLE_DATE_FORMAT} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy._ import org.apache.spark.sql.types.Decimal @@ -53,29 +54,29 @@ sealed trait TimestampFormatter extends Serializable { class Iso8601TimestampFormatter( pattern: String, zoneId: ZoneId, - locale: Locale) extends TimestampFormatter with DateTimeFormatterHelper { + locale: Locale, + legacyFormat: LegacyDateFormat = LENIENT_SIMPLE_DATE_FORMAT) + extends TimestampFormatter with DateTimeFormatterHelper { @transient protected lazy val formatter = getOrCreateFormatter(pattern, locale) + @transient + protected lazy val legacyFormatter = TimestampFormatter.getLegacyFormatter( + pattern, zoneId, locale, legacyFormat) + override def parse(s: String): Long = { val specialDate = convertSpecialTimestamp(s.trim, zoneId) specialDate.getOrElse { - val parsed = try { - formatter.parse(s) - } catch { - case e: DateTimeParseException if DateFormatter.hasDiffResult(s, pattern, zoneId) => - throw new RuntimeException(e.getMessage + ", set " + - s"${SQLConf.LEGACY_TIME_PARSER_POLICY.key} to LEGACY to restore the behavior before " + - "Spark 3.0. Set to CORRECTED to use the new approach, which would return null for " + - "this record. See more details in SPARK-30668.") - } - val parsedZoneId = parsed.query(TemporalQueries.zone()) - val timeZoneId = if (parsedZoneId == null) zoneId else parsedZoneId - val zonedDateTime = toZonedDateTime(parsed, timeZoneId) - val epochSeconds = zonedDateTime.toEpochSecond - val microsOfSecond = zonedDateTime.get(MICRO_OF_SECOND) - - Math.addExact(SECONDS.toMicros(epochSeconds), microsOfSecond) + try { + val parsed = formatter.parse(s) + val parsedZoneId = parsed.query(TemporalQueries.zone()) + val timeZoneId = if (parsedZoneId == null) zoneId else parsedZoneId + val zonedDateTime = toZonedDateTime(parsed, timeZoneId) + val epochSeconds = zonedDateTime.toEpochSecond + val microsOfSecond = zonedDateTime.get(MICRO_OF_SECOND) + + Math.addExact(SECONDS.toMicros(epochSeconds), microsOfSecond) + } catch checkDiffResult(s, legacyFormatter.parse) } } @@ -202,16 +203,25 @@ object TimestampFormatter { val pattern = format.getOrElse(defaultPattern) if (SQLConf.get.legacyTimeParserPolicy == LEGACY) { - legacyFormat match { - case FAST_DATE_FORMAT => - new LegacyFastTimestampFormatter(pattern, zoneId, locale) - case SIMPLE_DATE_FORMAT => - new LegacySimpleTimestampFormatter(pattern, zoneId, locale, lenient = false) - case LENIENT_SIMPLE_DATE_FORMAT => - new LegacySimpleTimestampFormatter(pattern, zoneId, locale, lenient = true) - } + getLegacyFormatter(pattern, zoneId, locale, legacyFormat) } else { - new Iso8601TimestampFormatter(pattern, zoneId, locale) + new Iso8601TimestampFormatter(pattern, zoneId, locale, legacyFormat) + } + } + + def getLegacyFormatter( + pattern: String, + zoneId: ZoneId, + locale: Locale, + legacyFormat: LegacyDateFormat): TimestampFormatter = { + + legacyFormat match { + case FAST_DATE_FORMAT => + new LegacyFastTimestampFormatter(pattern, zoneId, locale) + case SIMPLE_DATE_FORMAT => + new LegacySimpleTimestampFormatter(pattern, zoneId, locale, lenient = false) + case LENIENT_SIMPLE_DATE_FORMAT => + new LegacySimpleTimestampFormatter(pattern, zoneId, locale, lenient = true) } } From 4a0113648166b75754820779b87f9e24863141da Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Mon, 2 Mar 2020 20:11:21 +0800 Subject: [PATCH 10/14] Fix for non-codegen --- .../org/apache/spark/SparkException.scala | 6 ++++ .../expressions/datetimeExpressions.scala | 3 ++ .../util/DateTimeFormatterHelper.scala | 3 +- .../expressions/DateExpressionsSuite.scala | 23 ++++++++++++- .../apache/spark/sql/DateFunctionsSuite.scala | 33 +++++++++---------- 5 files changed, 48 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkException.scala b/core/src/main/scala/org/apache/spark/SparkException.scala index 4ad9a0cc4b10..b3d257e42d2e 100644 --- a/core/src/main/scala/org/apache/spark/SparkException.scala +++ b/core/src/main/scala/org/apache/spark/SparkException.scala @@ -43,3 +43,9 @@ private[spark] case class SparkUserAppException(exitCode: Int) */ private[spark] case class ExecutorDeadException(message: String) extends SparkException(message) + +/** + * Exception thrown when Spark returns different result after upgrading to a new version. + */ +private[spark] class SparkUpgradeException(version: String, message: String) + extends SparkException(s"Exception for upgrading to Spark $version: $message") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala index 767dacfde073..81815fc8b969 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala @@ -26,6 +26,7 @@ import scala.util.control.NonFatal import org.apache.commons.text.StringEscapeUtils +import org.apache.spark.SparkUpgradeException import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.codegen._ @@ -789,6 +790,7 @@ abstract class ToTimestamp formatter.parse( t.asInstanceOf[UTF8String].toString) / downScaleFactor } catch { + case e: SparkUpgradeException => throw e case NonFatal(_) => null } } @@ -802,6 +804,7 @@ abstract class ToTimestamp TimestampFormatter(formatString, zoneId, legacyFormat = SIMPLE_DATE_FORMAT) .parse(t.asInstanceOf[UTF8String].toString) / downScaleFactor } catch { + case e: SparkUpgradeException => throw e case NonFatal(_) => null } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala index a3ce0d5096d2..21f7ad0a181e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala @@ -25,6 +25,7 @@ import java.util.Locale import com.google.common.cache.CacheBuilder +import org.apache.spark.SparkUpgradeException import org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy @@ -75,7 +76,7 @@ trait DateTimeFormatterHelper { case _: Throwable => None } if (res.nonEmpty) { - throw new RuntimeException(e.getMessage + ", set " + + throw new SparkUpgradeException("3.0", e.getMessage + ", set " + s"${SQLConf.LEGACY_TIME_PARSER_POLICY.key} to LEGACY to restore the behavior before " + "Spark 3.0. Set to CORRECTED to use the new approach, which would return null for " + "this record. See more details in SPARK-30668.") diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala index 71966a6f774a..afc1f38aa2d9 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala @@ -23,7 +23,7 @@ import java.time.{Instant, LocalDate, LocalDateTime, ZoneId, ZoneOffset} import java.util.{Calendar, Locale, TimeZone} import java.util.concurrent.TimeUnit._ -import org.apache.spark.SparkFunSuite +import org.apache.spark.{SparkFunSuite, SparkUpgradeException} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.catalyst.util.{DateTimeUtils, IntervalUtils, TimestampFormatter} @@ -1164,4 +1164,25 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { Literal(LocalDate.of(1, 1, 1))), IntervalUtils.stringToInterval(UTF8String.fromString("interval 9999 years"))) } + + test("to_timestamp exception mode") { + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "legacy") { + checkEvaluation( + GetTimestamp( + Literal("2020-01-27T20:06:11.847-0800"), + Literal("yyyy-MM-dd'T'HH:mm:ss.SSSz")), 1580184371847000L) + } + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "corrected") { + checkEvaluation( + GetTimestamp( + Literal("2020-01-27T20:06:11.847-0800"), + Literal("yyyy-MM-dd'T'HH:mm:ss.SSSz")), null) + } + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "exception") { + checkExceptionInExpression[SparkUpgradeException]( + GetTimestamp( + Literal("2020-01-27T20:06:11.847-0800"), + Literal("yyyy-MM-dd'T'HH:mm:ss.SSSz")), "Exception for upgrading to Spark 3.0") + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala index 5dd8f93c37a1..a54b07b80625 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala @@ -23,7 +23,8 @@ import java.time.{Instant, LocalDateTime, ZoneId} import java.util.{Locale, TimeZone} import java.util.concurrent.TimeUnit -import org.apache.spark.sql.catalyst.util.{DateTimeUtils, IntervalUtils} +import org.apache.spark.SparkException +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession @@ -378,7 +379,7 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession { } def checkExceptionMessage(df: DataFrame): Unit = { - val message = intercept[Exception] { + val message = intercept[SparkException] { df.collect() }.getCause.getMessage assert(message.contains(s"set ${SQLConf.LEGACY_TIME_PARSER_POLICY.key} to LEGACY to restore " + @@ -833,22 +834,18 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession { } test("SPARK-30668: use legacy timestamp parser in to_timestamp") { - Seq(true, false).foreach { wholeStageCodegen => - withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> wholeStageCodegen.toString) { - val confKey = SQLConf.LEGACY_TIME_PARSER_POLICY.key - val df = Seq("2020-01-27T20:06:11.847-0800").toDF("ts") - withSQLConf(confKey -> "legacy") { - val expected = Timestamp.valueOf("2020-01-27 20:06:11.847") - checkAnswer(df.select(to_timestamp(col("ts"), "yyyy-MM-dd'T'HH:mm:ss.SSSz")), - Row(expected)) - } - withSQLConf(confKey -> "corrected") { - checkAnswer(df.select(to_timestamp(col("ts"), "yyyy-MM-dd'T'HH:mm:ss.SSSz")), Row(null)) - } - withSQLConf(confKey -> "exception") { - checkExceptionMessage(df.select(to_timestamp(col("ts"), "yyyy-MM-dd'T'HH:mm:ss.SSSz"))) - } - } + val confKey = SQLConf.LEGACY_TIME_PARSER_POLICY.key + val df = Seq("2020-01-27T20:06:11.847-0800").toDF("ts") + withSQLConf(confKey -> "legacy") { + val expected = Timestamp.valueOf("2020-01-27 20:06:11.847") + checkAnswer(df.select(to_timestamp(col("ts"), "yyyy-MM-dd'T'HH:mm:ss.SSSz")), + Row(expected)) + } + withSQLConf(confKey -> "corrected") { + checkAnswer(df.select(to_timestamp(col("ts"), "yyyy-MM-dd'T'HH:mm:ss.SSSz")), Row(null)) + } + withSQLConf(confKey -> "exception") { + checkExceptionMessage(df.select(to_timestamp(col("ts"), "yyyy-MM-dd'T'HH:mm:ss.SSSz"))) } } From f66a85939e0ef48a3ef90eda785231712a2f32be Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Tue, 3 Mar 2020 20:06:29 +0800 Subject: [PATCH 11/14] address comment; add UT for CSV/JSON; bug fix --- .../org/apache/spark/SparkException.scala | 4 +-- .../sql/catalyst/csv/UnivocityParser.scala | 2 ++ .../sql/catalyst/json/JacksonParser.scala | 2 ++ .../sql/catalyst/util/DateFormatter.scala | 27 +++++++++---------- .../util/DateTimeFormatterHelper.scala | 13 ++++----- .../catalyst/util/TimestampFormatter.scala | 26 +++++++++--------- .../apache/spark/sql/internal/SQLConf.scala | 5 ++-- .../apache/spark/sql/DateFunctionsSuite.scala | 2 +- .../execution/datasources/csv/CSVSuite.scala | 21 +++++++++++++++ .../datasources/json/JsonSuite.scala | 21 +++++++++++++++ 10 files changed, 81 insertions(+), 42 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkException.scala b/core/src/main/scala/org/apache/spark/SparkException.scala index b3d257e42d2e..347a9b239961 100644 --- a/core/src/main/scala/org/apache/spark/SparkException.scala +++ b/core/src/main/scala/org/apache/spark/SparkException.scala @@ -47,5 +47,5 @@ private[spark] case class ExecutorDeadException(message: String) /** * Exception thrown when Spark returns different result after upgrading to a new version. */ -private[spark] class SparkUpgradeException(version: String, message: String) - extends SparkException(s"Exception for upgrading to Spark $version: $message") +private[spark] class SparkUpgradeException(version: String, message: String, cause: Throwable) + extends SparkException(s"Exception for upgrading to Spark $version: $message", cause) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index f829e6b503ab..dd8537b02935 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -23,6 +23,7 @@ import scala.util.control.NonFatal import com.univocity.parsers.csv.CsvParser +import org.apache.spark.SparkUpgradeException import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{ExprUtils, GenericInternalRow} @@ -285,6 +286,7 @@ class UnivocityParser( } } } catch { + case e: SparkUpgradeException => throw e case NonFatal(e) => badRecordException = badRecordException.orElse(Some(e)) row.setNullAt(i) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index da3b5013fe12..d0db06cae816 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -25,6 +25,7 @@ import scala.util.control.NonFatal import com.fasterxml.jackson.core._ +import org.apache.spark.SparkUpgradeException import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -382,6 +383,7 @@ class JacksonParser( try { row.update(index, fieldConverters(index).apply(parser)) } catch { + case e: SparkUpgradeException => throw e case NonFatal(e) => badRecordException = badRecordException.orElse(Some(e)) parser.skipChildren() diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala index ffa799468411..06ec918f3854 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala @@ -23,7 +23,6 @@ import java.util.{Date, Locale} import org.apache.commons.lang3.time.FastDateFormat -import org.apache.spark.sql.catalyst.util.DateTimeConstants.MICROS_PER_MILLIS import org.apache.spark.sql.catalyst.util.DateTimeUtils._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy._ @@ -101,11 +100,10 @@ object DateFormatter { } private def getFormatter( - format: Option[String], - zoneId: ZoneId, - locale: Locale = defaultLocale, - legacyFormat: LegacyDateFormat = LENIENT_SIMPLE_DATE_FORMAT): DateFormatter = { - + format: Option[String], + zoneId: ZoneId, + locale: Locale = defaultLocale, + legacyFormat: LegacyDateFormat = LENIENT_SIMPLE_DATE_FORMAT): DateFormatter = { val pattern = format.getOrElse(defaultPattern) if (SQLConf.get.legacyTimeParserPolicy == LEGACY) { getLegacyFormatter(pattern, zoneId, locale, legacyFormat) @@ -115,11 +113,10 @@ object DateFormatter { } def getLegacyFormatter( - pattern: String, - zoneId: ZoneId, - locale: Locale, - legacyFormat: LegacyDateFormat = LENIENT_SIMPLE_DATE_FORMAT): DateFormatter = { - + pattern: String, + zoneId: ZoneId, + locale: Locale, + legacyFormat: LegacyDateFormat): DateFormatter = { legacyFormat match { case FAST_DATE_FORMAT => new LegacyFastDateFormatter(pattern, locale) @@ -129,10 +126,10 @@ object DateFormatter { } def apply( - format: String, - zoneId: ZoneId, - locale: Locale, - legacyFormat: LegacyDateFormat): DateFormatter = { + format: String, + zoneId: ZoneId, + locale: Locale, + legacyFormat: LegacyDateFormat): DateFormatter = { getFormatter(Some(format), zoneId, locale, legacyFormat) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala index 21f7ad0a181e..f0d36ba04889 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala @@ -28,7 +28,6 @@ import com.google.common.cache.CacheBuilder import org.apache.spark.SparkUpgradeException import org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper._ import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy._ trait DateTimeFormatterHelper { @@ -67,19 +66,17 @@ trait DateTimeFormatterHelper { // thrown. On the contrary, if the legacy policy set to CORRECTED, DateTimeParseException will // address by the caller side. protected def checkDiffResult[T]( - s: String, legacyParseFunc: String => T): PartialFunction[Throwable, T] = { - case e: DateTimeParseException if LegacyBehaviorPolicy.withName( - SQLConf.get.getConf(SQLConf.LEGACY_TIME_PARSER_POLICY)) == EXCEPTION => + s: String, legacyParseFunc: String => T): PartialFunction[Throwable, T] = { + case e: DateTimeParseException if SQLConf.get.legacyTimeParserPolicy == EXCEPTION => val res = try { Some(legacyParseFunc(s)) } catch { case _: Throwable => None } if (res.nonEmpty) { - throw new SparkUpgradeException("3.0", e.getMessage + ", set " + - s"${SQLConf.LEGACY_TIME_PARSER_POLICY.key} to LEGACY to restore the behavior before " + - "Spark 3.0. Set to CORRECTED to use the new approach, which would return null for " + - "this record. See more details in SPARK-30668.") + throw new SparkUpgradeException("3.0", s"Set ${SQLConf.LEGACY_TIME_PARSER_POLICY.key} to " + + "LEGACY to restore the behavior before Spark 3.0. Set to CORRECTED to use the new " + + "approach, which would return null for this record. See more details in SPARK-30668.", e) } else { throw e } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala index 713e70c61ec0..5c1a16192315 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala @@ -196,11 +196,10 @@ object TimestampFormatter { def defaultPattern(): String = s"${DateFormatter.defaultPattern()} HH:mm:ss" private def getFormatter( - format: Option[String], - zoneId: ZoneId, - locale: Locale = defaultLocale, - legacyFormat: LegacyDateFormat = LENIENT_SIMPLE_DATE_FORMAT): TimestampFormatter = { - + format: Option[String], + zoneId: ZoneId, + locale: Locale = defaultLocale, + legacyFormat: LegacyDateFormat = LENIENT_SIMPLE_DATE_FORMAT): TimestampFormatter = { val pattern = format.getOrElse(defaultPattern) if (SQLConf.get.legacyTimeParserPolicy == LEGACY) { getLegacyFormatter(pattern, zoneId, locale, legacyFormat) @@ -210,11 +209,10 @@ object TimestampFormatter { } def getLegacyFormatter( - pattern: String, - zoneId: ZoneId, - locale: Locale, - legacyFormat: LegacyDateFormat): TimestampFormatter = { - + pattern: String, + zoneId: ZoneId, + locale: Locale, + legacyFormat: LegacyDateFormat): TimestampFormatter = { legacyFormat match { case FAST_DATE_FORMAT => new LegacyFastTimestampFormatter(pattern, zoneId, locale) @@ -226,10 +224,10 @@ object TimestampFormatter { } def apply( - format: String, - zoneId: ZoneId, - locale: Locale, - legacyFormat: LegacyDateFormat): TimestampFormatter = { + format: String, + zoneId: ZoneId, + locale: Locale, + legacyFormat: LegacyDateFormat): TimestampFormatter = { getFormatter(Some(format), zoneId, locale, legacyFormat) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 61f69b420e78..2d17fb950a44 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2755,8 +2755,9 @@ class SQLConf extends Serializable with Logging { def legacyMsSqlServerNumericMappingEnabled: Boolean = getConf(LEGACY_MSSQLSERVER_NUMERIC_MAPPING_ENABLED) - def legacyTimeParserPolicy: LegacyBehaviorPolicy.Value = LegacyBehaviorPolicy.withName( - getConf(SQLConf.LEGACY_TIME_PARSER_POLICY)) + def legacyTimeParserPolicy: LegacyBehaviorPolicy.Value = { + LegacyBehaviorPolicy.withName(getConf(SQLConf.LEGACY_TIME_PARSER_POLICY)) + } /** * Returns the [[Resolver]] for the current configuration, which can be used to determine if two diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala index a54b07b80625..6165c6038b12 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala @@ -382,7 +382,7 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession { val message = intercept[SparkException] { df.collect() }.getCause.getMessage - assert(message.contains(s"set ${SQLConf.LEGACY_TIME_PARSER_POLICY.key} to LEGACY to restore " + + assert(message.contains(s"Set ${SQLConf.LEGACY_TIME_PARSER_POLICY.key} to LEGACY to restore " + "the behavior before Spark 3.0")) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index e2f79a6a84ad..167412b72213 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -2307,6 +2307,27 @@ abstract class CSVSuite extends QueryTest with SharedSparkSession with TestCsvDa val csv = spark.read.option("header", false).schema("t timestamp, d date").csv(ds) checkAnswer(csv, Row(Timestamp.valueOf("2020-1-12 3:23:34.12"), Date.valueOf("2020-1-12"))) } + + test("exception mode for parsing date/timestamp string") { + val ds = Seq("2020-01-27T20:06:11.847-0800").toDS() + val csv = spark.read + .option("header", false) + .option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSz") + .schema("t timestamp").csv(ds) + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "exception") { + val msg = intercept[SparkException] { + csv.collect() + }.getCause.getMessage + assert(msg.contains(s"Set ${SQLConf.LEGACY_TIME_PARSER_POLICY.key} to LEGACY to restore " + + "the behavior before Spark 3.0")) + } + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "legacy") { + checkAnswer(csv, Row(Timestamp.valueOf("2020-01-27 20:06:11.847"))) + } + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "corrected") { + checkAnswer(csv, Row(null)) + } + } } class CSVv1Suite extends CSVSuite { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index a0fd9d7e87df..6cd067814a12 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2669,6 +2669,27 @@ abstract class JsonSuite extends QueryTest with SharedSparkSession with TestJson Date.valueOf("2020-1-12"), Date.valueOf(LocalDate.ofEpochDay(12345)))) } + + test("exception mode for parsing date/timestamp string") { + val ds = Seq("{'t': '2020-01-27T20:06:11.847-0800'}").toDS() + val json = spark.read + .schema("t timestamp") + .option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSz") + .json(ds) + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "exception") { + val msg = intercept[SparkException] { + json.collect() + }.getCause.getMessage + assert(msg.contains(s"Set ${SQLConf.LEGACY_TIME_PARSER_POLICY.key} to LEGACY to restore " + + "the behavior before Spark 3.0")) + } + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "legacy") { + checkAnswer(json, Row(Timestamp.valueOf("2020-01-27 20:06:11.847"))) + } + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "corrected") { + checkAnswer(json, Row(null)) + } + } } class JsonV1Suite extends JsonSuite { From 1a92e0f8176ac8a58594f3d7f9c977a49e50efdd Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Wed, 4 Mar 2020 09:28:39 +0800 Subject: [PATCH 12/14] fix --- .../apache/spark/sql/CsvFunctionsSuite.scala | 21 +++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala index 61f0e138cc35..d2b92c1cf828 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala @@ -59,10 +59,23 @@ class CsvFunctionsSuite extends QueryTest with SharedSparkSession { val df2 = df .select(from_csv($"value", schemaWithCorrField1, Map( "mode" -> "Permissive", "columnNameOfCorruptRecord" -> columnNameOfCorruptRecord))) - - checkAnswer(df2, Seq( - Row(Row(0, null, "0,2013-111-11 12:13:14")), - Row(Row(1, java.sql.Date.valueOf("1983-08-04"), null)))) + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "corrected") { + checkAnswer(df2, Seq( + Row(Row(0, null, "0,2013-111-11 12:13:14")), + Row(Row(1, java.sql.Date.valueOf("1983-08-04"), null)))) + } + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "legacy") { + checkAnswer(df2, Seq( + Row(Row(0, java.sql.Date.valueOf("2022-03-11"), null)), + Row(Row(1, java.sql.Date.valueOf("1983-08-04"), null)))) + } + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "exception") { + val msg = intercept[SparkException] { + df2.collect() + }.getCause.getMessage + assert(msg.contains(s"Set ${SQLConf.LEGACY_TIME_PARSER_POLICY.key} to LEGACY to restore " + + "the behavior before Spark 3.0")) + } } test("schema_of_csv - infers schemas") { From 0e543da404acbd0f89370be0d637a8c02bc54a0a Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Wed, 4 Mar 2020 21:30:20 +0800 Subject: [PATCH 13/14] address comment --- .../org/apache/spark/SparkException.scala | 3 ++- .../sql/catalyst/csv/UnivocityParser.scala | 2 ++ .../sql/catalyst/json/JacksonParser.scala | 2 ++ .../util/DateTimeFormatterHelper.scala | 12 +++++----- .../apache/spark/sql/CsvFunctionsSuite.scala | 3 +-- .../apache/spark/sql/DateFunctionsSuite.scala | 3 +-- .../execution/datasources/csv/CSVSuite.scala | 18 ++++++++++---- .../datasources/json/JsonSuite.scala | 24 +++++++++++++------ 8 files changed, 45 insertions(+), 22 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkException.scala b/core/src/main/scala/org/apache/spark/SparkException.scala index 347a9b239961..81c087e314be 100644 --- a/core/src/main/scala/org/apache/spark/SparkException.scala +++ b/core/src/main/scala/org/apache/spark/SparkException.scala @@ -48,4 +48,5 @@ private[spark] case class ExecutorDeadException(message: String) * Exception thrown when Spark returns different result after upgrading to a new version. */ private[spark] class SparkUpgradeException(version: String, message: String, cause: Throwable) - extends SparkException(s"Exception for upgrading to Spark $version: $message", cause) + extends SparkException("You may get a different result due to the upgrading of Spark" + + s" $version: $message", cause) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index dd8537b02935..f86210750f08 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -180,6 +180,7 @@ class UnivocityParser( try { timestampFormatter.parse(datum) } catch { + case e: SparkUpgradeException => throw e case NonFatal(e) => // If fails to parse, then tries the way used in 2.0 and 1.x for backwards // compatibility. @@ -193,6 +194,7 @@ class UnivocityParser( try { dateFormatter.parse(datum) } catch { + case e: SparkUpgradeException => throw e case NonFatal(e) => // If fails to parse, then tries the way used in 2.0 and 1.x for backwards // compatibility. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index d0db06cae816..386540cbe4b7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -233,6 +233,7 @@ class JacksonParser( try { timestampFormatter.parse(parser.getText) } catch { + case e: SparkUpgradeException => throw e case NonFatal(e) => // If fails to parse, then tries the way used in 2.0 and 1.x for backwards // compatibility. @@ -250,6 +251,7 @@ class JacksonParser( try { dateFormatter.parse(parser.getText) } catch { + case e: SparkUpgradeException => throw e case NonFatal(e) => // If fails to parse, then tries the way used in 2.0 and 1.x for backwards // compatibility. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala index f0d36ba04889..33aa733f2645 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala @@ -62,9 +62,9 @@ trait DateTimeFormatterHelper { } // When legacy time parser policy set to EXCEPTION, check whether we will get different results - // between legacy format and new format. For legacy parser, DateTimeParseException will not be - // thrown. On the contrary, if the legacy policy set to CORRECTED, DateTimeParseException will - // address by the caller side. + // between legacy parser and new parser. If new parser fails but legacy parser works, throw a + // SparkUpgradeException. On the contrary, if the legacy policy set to CORRECTED, + // DateTimeParseException will address by the caller side. protected def checkDiffResult[T]( s: String, legacyParseFunc: String => T): PartialFunction[Throwable, T] = { case e: DateTimeParseException if SQLConf.get.legacyTimeParserPolicy == EXCEPTION => @@ -74,9 +74,9 @@ trait DateTimeFormatterHelper { case _: Throwable => None } if (res.nonEmpty) { - throw new SparkUpgradeException("3.0", s"Set ${SQLConf.LEGACY_TIME_PARSER_POLICY.key} to " + - "LEGACY to restore the behavior before Spark 3.0. Set to CORRECTED to use the new " + - "approach, which would return null for this record. See more details in SPARK-30668.", e) + throw new SparkUpgradeException("3.0", s"Fail to parse '$s' in the new parser. You can " + + s"set ${SQLConf.LEGACY_TIME_PARSER_POLICY.key} to LEGACY to restore the behavior " + + s"before Spark 3.0, or set to CORRECTED and treat it as an invalid datetime string.", e) } else { throw e } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala index d2b92c1cf828..89fb4d5151b3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala @@ -73,8 +73,7 @@ class CsvFunctionsSuite extends QueryTest with SharedSparkSession { val msg = intercept[SparkException] { df2.collect() }.getCause.getMessage - assert(msg.contains(s"Set ${SQLConf.LEGACY_TIME_PARSER_POLICY.key} to LEGACY to restore " + - "the behavior before Spark 3.0")) + assert(msg.contains("Fail to parse")) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala index 6165c6038b12..3865012c97ae 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala @@ -382,8 +382,7 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession { val message = intercept[SparkException] { df.collect() }.getCause.getMessage - assert(message.contains(s"Set ${SQLConf.LEGACY_TIME_PARSER_POLICY.key} to LEGACY to restore " + - "the behavior before Spark 3.0")) + assert(message.contains("Fail to parse")) } test("function to_date") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 167412b72213..d50c05362c27 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -2304,8 +2304,19 @@ abstract class CSVSuite extends QueryTest with SharedSparkSession with TestCsvDa test("SPARK-30960: parse date/timestamp string with legacy format") { val ds = Seq("2020-1-12 3:23:34.12, 2020-1-12 T").toDS() - val csv = spark.read.option("header", false).schema("t timestamp, d date").csv(ds) - checkAnswer(csv, Row(Timestamp.valueOf("2020-1-12 3:23:34.12"), Date.valueOf("2020-1-12"))) + Seq("legacy", "corrected").foreach { config => + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> config) { + val csv = spark.read.option("header", false).schema("t timestamp, d date").csv(ds) + checkAnswer(csv, Row(Timestamp.valueOf("2020-1-12 3:23:34.12"), Date.valueOf("2020-1-12"))) + } + } + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "exception") { + val msg = intercept[SparkException] { + val csv = spark.read.option("header", false).schema("t timestamp, d date").csv(ds) + csv.collect() + }.getCause.getMessage + assert(msg.contains("Fail to parse")) + } } test("exception mode for parsing date/timestamp string") { @@ -2318,8 +2329,7 @@ abstract class CSVSuite extends QueryTest with SharedSparkSession with TestCsvDa val msg = intercept[SparkException] { csv.collect() }.getCause.getMessage - assert(msg.contains(s"Set ${SQLConf.LEGACY_TIME_PARSER_POLICY.key} to LEGACY to restore " + - "the behavior before Spark 3.0")) + assert(msg.contains("Fail to parse")) } withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "legacy") { checkAnswer(csv, Row(Timestamp.valueOf("2020-01-27 20:06:11.847"))) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 6cd067814a12..405fb8da0672 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2663,11 +2663,22 @@ abstract class JsonSuite extends QueryTest with SharedSparkSession with TestJson test("SPARK-30960: parse date/timestamp string with legacy format") { val ds = Seq("{'t': '2020-1-12 3:23:34.12', 'd': '2020-1-12 T', 'd2': '12345'}").toDS() - val json = spark.read.schema("t timestamp, d date, d2 date").json(ds) - checkAnswer(json, Row( - Timestamp.valueOf("2020-1-12 3:23:34.12"), - Date.valueOf("2020-1-12"), - Date.valueOf(LocalDate.ofEpochDay(12345)))) + Seq("legacy", "corrected").foreach { config => + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> config) { + val json = spark.read.schema("t timestamp, d date, d2 date").json(ds) + checkAnswer(json, Row( + Timestamp.valueOf("2020-1-12 3:23:34.12"), + Date.valueOf("2020-1-12"), + Date.valueOf(LocalDate.ofEpochDay(12345)))) + } + } + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "exception") { + val json = spark.read.schema("t timestamp, d date, d2 date").json(ds) + val msg = intercept[SparkException] { + json.collect() + }.getCause.getMessage + assert(msg.contains("Fail to parse")) + } } test("exception mode for parsing date/timestamp string") { @@ -2680,8 +2691,7 @@ abstract class JsonSuite extends QueryTest with SharedSparkSession with TestJson val msg = intercept[SparkException] { json.collect() }.getCause.getMessage - assert(msg.contains(s"Set ${SQLConf.LEGACY_TIME_PARSER_POLICY.key} to LEGACY to restore " + - "the behavior before Spark 3.0")) + assert(msg.contains("Fail to parse")) } withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "legacy") { checkAnswer(json, Row(Timestamp.valueOf("2020-01-27 20:06:11.847"))) From d01c7502ea7b4126bc6be362f1c2c20cebc5f288 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Thu, 5 Mar 2020 10:00:37 +0800 Subject: [PATCH 14/14] don't throw upgrade exception before fallback logic --- .../sql/catalyst/csv/UnivocityParser.scala | 2 -- .../sql/catalyst/json/JacksonParser.scala | 2 -- .../expressions/DateExpressionsSuite.scala | 2 +- .../execution/datasources/csv/CSVSuite.scala | 15 ++----------- .../datasources/json/JsonSuite.scala | 21 +++++-------------- 5 files changed, 8 insertions(+), 34 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index f86210750f08..dd8537b02935 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -180,7 +180,6 @@ class UnivocityParser( try { timestampFormatter.parse(datum) } catch { - case e: SparkUpgradeException => throw e case NonFatal(e) => // If fails to parse, then tries the way used in 2.0 and 1.x for backwards // compatibility. @@ -194,7 +193,6 @@ class UnivocityParser( try { dateFormatter.parse(datum) } catch { - case e: SparkUpgradeException => throw e case NonFatal(e) => // If fails to parse, then tries the way used in 2.0 and 1.x for backwards // compatibility. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index 386540cbe4b7..d0db06cae816 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -233,7 +233,6 @@ class JacksonParser( try { timestampFormatter.parse(parser.getText) } catch { - case e: SparkUpgradeException => throw e case NonFatal(e) => // If fails to parse, then tries the way used in 2.0 and 1.x for backwards // compatibility. @@ -251,7 +250,6 @@ class JacksonParser( try { dateFormatter.parse(parser.getText) } catch { - case e: SparkUpgradeException => throw e case NonFatal(e) => // If fails to parse, then tries the way used in 2.0 and 1.x for backwards // compatibility. diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala index afc1f38aa2d9..7fced04fd588 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala @@ -1182,7 +1182,7 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { checkExceptionInExpression[SparkUpgradeException]( GetTimestamp( Literal("2020-01-27T20:06:11.847-0800"), - Literal("yyyy-MM-dd'T'HH:mm:ss.SSSz")), "Exception for upgrading to Spark 3.0") + Literal("yyyy-MM-dd'T'HH:mm:ss.SSSz")), "Fail to parse") } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index d50c05362c27..30ae9dc67a1e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -2304,19 +2304,8 @@ abstract class CSVSuite extends QueryTest with SharedSparkSession with TestCsvDa test("SPARK-30960: parse date/timestamp string with legacy format") { val ds = Seq("2020-1-12 3:23:34.12, 2020-1-12 T").toDS() - Seq("legacy", "corrected").foreach { config => - withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> config) { - val csv = spark.read.option("header", false).schema("t timestamp, d date").csv(ds) - checkAnswer(csv, Row(Timestamp.valueOf("2020-1-12 3:23:34.12"), Date.valueOf("2020-1-12"))) - } - } - withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "exception") { - val msg = intercept[SparkException] { - val csv = spark.read.option("header", false).schema("t timestamp, d date").csv(ds) - csv.collect() - }.getCause.getMessage - assert(msg.contains("Fail to parse")) - } + val csv = spark.read.option("header", false).schema("t timestamp, d date").csv(ds) + checkAnswer(csv, Row(Timestamp.valueOf("2020-1-12 3:23:34.12"), Date.valueOf("2020-1-12"))) } test("exception mode for parsing date/timestamp string") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 405fb8da0672..917da5ecce7e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2663,22 +2663,11 @@ abstract class JsonSuite extends QueryTest with SharedSparkSession with TestJson test("SPARK-30960: parse date/timestamp string with legacy format") { val ds = Seq("{'t': '2020-1-12 3:23:34.12', 'd': '2020-1-12 T', 'd2': '12345'}").toDS() - Seq("legacy", "corrected").foreach { config => - withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> config) { - val json = spark.read.schema("t timestamp, d date, d2 date").json(ds) - checkAnswer(json, Row( - Timestamp.valueOf("2020-1-12 3:23:34.12"), - Date.valueOf("2020-1-12"), - Date.valueOf(LocalDate.ofEpochDay(12345)))) - } - } - withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "exception") { - val json = spark.read.schema("t timestamp, d date, d2 date").json(ds) - val msg = intercept[SparkException] { - json.collect() - }.getCause.getMessage - assert(msg.contains("Fail to parse")) - } + val json = spark.read.schema("t timestamp, d date, d2 date").json(ds) + checkAnswer(json, Row( + Timestamp.valueOf("2020-1-12 3:23:34.12"), + Date.valueOf("2020-1-12"), + Date.valueOf(LocalDate.ofEpochDay(12345)))) } test("exception mode for parsing date/timestamp string") {