From 38d90d8ca27361f3dddb858fcc4843d518bd6911 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 10 Feb 2020 19:07:31 +0300 Subject: [PATCH 01/11] Support SimpleDateFormat as a legacy parser --- .../sql/catalyst/csv/CSVInferSchema.scala | 4 +- .../sql/catalyst/csv/UnivocityGenerator.scala | 4 +- .../sql/catalyst/csv/UnivocityParser.scala | 4 +- .../expressions/datetimeExpressions.scala | 24 +- .../sql/catalyst/json/JacksonGenerator.scala | 4 +- .../sql/catalyst/json/JacksonParser.scala | 4 +- .../sql/catalyst/json/JsonInferSchema.scala | 4 +- .../catalyst/util/TimestampFormatter.scala | 101 ++++- .../expressions/DateExpressionsSuite.scala | 390 ++++++++++-------- .../org/apache/spark/sql/functions.scala | 7 +- .../apache/spark/sql/DateFunctionsSuite.scala | 346 +++++++++------- 11 files changed, 511 insertions(+), 381 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala index 03cc3cbdf790..c6a03183ab45 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala @@ -24,6 +24,7 @@ import scala.util.control.Exception.allCatch import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.analysis.TypeCoercion import org.apache.spark.sql.catalyst.expressions.ExprUtils +import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT import org.apache.spark.sql.catalyst.util.TimestampFormatter import org.apache.spark.sql.types._ @@ -32,7 +33,8 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { private val timestampParser = TimestampFormatter( options.timestampFormat, options.zoneId, - options.locale) + options.locale, + legacyFormat = FAST_DATE_FORMAT) private val decimalParser = if (options.locale == Locale.US) { // Special handling the default locale for backward compatibility diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala index 05cb91d10868..efe0d42d57b0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala @@ -23,6 +23,7 @@ import com.univocity.parsers.csv.CsvWriter import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter} +import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT import org.apache.spark.sql.types._ class UnivocityGenerator( @@ -44,7 +45,8 @@ class UnivocityGenerator( private val timestampFormatter = TimestampFormatter( options.timestampFormat, options.zoneId, - options.locale) + options.locale, + legacyFormat = FAST_DATE_FORMAT) private val dateFormatter = DateFormatter( options.dateFormat, options.zoneId, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 551095380402..049faa12740d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -27,6 +27,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{ExprUtils, GenericInternalRow} import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -86,7 +87,8 @@ class UnivocityParser( private val timestampFormatter = TimestampFormatter( options.timestampFormat, options.zoneId, - options.locale) + options.locale, + legacyFormat = FAST_DATE_FORMAT) private val dateFormatter = DateFormatter( options.dateFormat, options.zoneId, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala index aa2bd5a1273e..24662562fcec 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala @@ -628,7 +628,7 @@ case class DateFormatClass(left: Expression, right: Expression, timeZoneId: Opti override protected def nullSafeEval(timestamp: Any, format: Any): Any = { val tf = if (formatter.isEmpty) { - TimestampFormatter(format.toString, zoneId) + TimestampFormatter.withStrongLegacy(format.toString, zoneId) } else { formatter.get } @@ -645,7 +645,7 @@ case class DateFormatClass(left: Expression, right: Expression, timeZoneId: Opti val tf = TimestampFormatter.getClass.getName.stripSuffix("$") val zid = ctx.addReferenceObj("zoneId", zoneId, classOf[ZoneId].getName) defineCodeGen(ctx, ev, (timestamp, format) => { - s"""UTF8String.fromString($tf$$.MODULE$$.apply($format.toString(), $zid) + s"""UTF8String.fromString($tf$$.MODULE$$.withStrongLegacy($format.toString(), $zid) .format($timestamp))""" }) } @@ -688,7 +688,7 @@ case class ToUnixTimestamp( copy(timeZoneId = Option(timeZoneId)) def this(time: Expression) = { - this(time, Literal("uuuu-MM-dd HH:mm:ss")) + this(time, Literal(TimestampFormatter.defaultPattern)) } override def prettyName: String = "to_unix_timestamp" @@ -732,7 +732,7 @@ case class UnixTimestamp(timeExp: Expression, format: Expression, timeZoneId: Op copy(timeZoneId = Option(timeZoneId)) def this(time: Expression) = { - this(time, Literal("uuuu-MM-dd HH:mm:ss")) + this(time, Literal(TimestampFormatter.defaultPattern)) } def this() = { @@ -758,7 +758,7 @@ abstract class ToTimestamp private lazy val constFormat: UTF8String = right.eval().asInstanceOf[UTF8String] private lazy val formatter: TimestampFormatter = try { - TimestampFormatter(constFormat.toString, zoneId) + TimestampFormatter.withStrongLegacy(constFormat.toString, zoneId) } catch { case NonFatal(_) => null } @@ -791,7 +791,7 @@ abstract class ToTimestamp } else { val formatString = f.asInstanceOf[UTF8String].toString try { - TimestampFormatter(formatString, zoneId).parse( + TimestampFormatter.withStrongLegacy(formatString, zoneId).parse( t.asInstanceOf[UTF8String].toString) / downScaleFactor } catch { case NonFatal(_) => null @@ -831,12 +831,11 @@ abstract class ToTimestamp } case StringType => val zid = ctx.addReferenceObj("zoneId", zoneId, classOf[ZoneId].getName) - val locale = ctx.addReferenceObj("locale", Locale.US) val tf = TimestampFormatter.getClass.getName.stripSuffix("$") nullSafeCodeGen(ctx, ev, (string, format) => { s""" try { - ${ev.value} = $tf$$.MODULE$$.apply($format.toString(), $zid, $locale) + ${ev.value} = $tf$$.MODULE$$.withStrongLegacy($format.toString(), $zid) .parse($string.toString()) / $downScaleFactor; } catch (java.lang.IllegalArgumentException e) { ${ev.isNull} = true; @@ -908,7 +907,7 @@ case class FromUnixTime(sec: Expression, format: Expression, timeZoneId: Option[ override def prettyName: String = "from_unixtime" def this(unix: Expression) = { - this(unix, Literal("uuuu-MM-dd HH:mm:ss")) + this(unix, Literal(TimestampFormatter.defaultPattern)) } override def dataType: DataType = StringType @@ -922,7 +921,7 @@ case class FromUnixTime(sec: Expression, format: Expression, timeZoneId: Option[ private lazy val constFormat: UTF8String = right.eval().asInstanceOf[UTF8String] private lazy val formatter: TimestampFormatter = try { - TimestampFormatter(constFormat.toString, zoneId) + TimestampFormatter.withStrongLegacy(constFormat.toString, zoneId) } catch { case NonFatal(_) => null } @@ -948,7 +947,7 @@ case class FromUnixTime(sec: Expression, format: Expression, timeZoneId: Option[ null } else { try { - UTF8String.fromString(TimestampFormatter(f.toString, zoneId) + UTF8String.fromString(TimestampFormatter.withStrongLegacy(f.toString, zoneId) .format(time.asInstanceOf[Long] * MICROS_PER_SECOND)) } catch { case NonFatal(_) => null @@ -980,12 +979,11 @@ case class FromUnixTime(sec: Expression, format: Expression, timeZoneId: Option[ } } else { val zid = ctx.addReferenceObj("zoneId", zoneId, classOf[ZoneId].getName) - val locale = ctx.addReferenceObj("locale", Locale.US) val tf = TimestampFormatter.getClass.getName.stripSuffix("$") nullSafeCodeGen(ctx, ev, (seconds, f) => { s""" try { - ${ev.value} = UTF8String.fromString($tf$$.MODULE$$.apply($f.toString(), $zid, $locale). + ${ev.value} = UTF8String.fromString($tf$$.MODULE$$.withStrongLegacy($f.toString(), $zid). format($seconds * 1000000L)); } catch (java.lang.IllegalArgumentException e) { ${ev.isNull} = true; diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala index 9c63593ea175..33fd3e040f91 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala @@ -24,6 +24,7 @@ import com.fasterxml.jackson.core._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.SpecializedGetters import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT import org.apache.spark.sql.types._ /** @@ -80,7 +81,8 @@ private[sql] class JacksonGenerator( private val timestampFormatter = TimestampFormatter( options.timestampFormat, options.zoneId, - options.locale) + options.locale, + legacyFormat = FAST_DATE_FORMAT) private val dateFormatter = DateFormatter( options.dateFormat, options.zoneId, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index 76efa574a99f..d5cd67da48b1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -30,6 +30,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} @@ -58,7 +59,8 @@ class JacksonParser( private val timestampFormatter = TimestampFormatter( options.timestampFormat, options.zoneId, - options.locale) + options.locale, + legacyFormat = FAST_DATE_FORMAT) private val dateFormatter = DateFormatter( options.dateFormat, options.zoneId, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala index f030955ee6e7..82dd6d0da263 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.analysis.TypeCoercion import org.apache.spark.sql.catalyst.expressions.ExprUtils import org.apache.spark.sql.catalyst.json.JacksonUtils.nextUntil import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -40,7 +41,8 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable { private val timestampFormatter = TimestampFormatter( options.timestampFormat, options.zoneId, - options.locale) + options.locale, + legacyFormat = FAST_DATE_FORMAT) /** * Infer the type of a collection of json records in three stages: diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala index fe1a4fe710c2..89ff71919589 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala @@ -17,12 +17,13 @@ package org.apache.spark.sql.catalyst.util -import java.text.ParseException +import java.sql.Timestamp +import java.text.{ParseException, SimpleDateFormat} import java.time._ import java.time.format.DateTimeParseException import java.time.temporal.ChronoField.MICRO_OF_SECOND import java.time.temporal.TemporalQueries -import java.util.{Locale, TimeZone} +import java.util.{Date, Locale, TimeZone} import java.util.concurrent.TimeUnit.SECONDS import org.apache.commons.lang3.time.FastDateFormat @@ -90,47 +91,105 @@ class FractionTimestampFormatter(zoneId: ZoneId) override protected lazy val formatter = DateTimeFormatterHelper.fractionFormatter } -class LegacyTimestampFormatter( - pattern: String, - zoneId: ZoneId, - locale: Locale) extends TimestampFormatter { +trait LegacyTimestampFormatter extends TimestampFormatter { + def parseToDate(s: String): Date + def formatTimestamp(t: Timestamp): String - @transient private lazy val format = - FastDateFormat.getInstance(pattern, TimeZone.getTimeZone(zoneId), locale) + override def parse(s: String): Long = { + parseToDate(s).getTime * MICROS_PER_MILLIS + } - protected def toMillis(s: String): Long = format.parse(s).getTime + override def format(us: Long): String = { + val timestamp = DateTimeUtils.toJavaTimestamp(us) + formatTimestamp(timestamp) + } +} - override def parse(s: String): Long = toMillis(s) * MICROS_PER_MILLIS +class LegacyFastDateFormatter( + pattern: String, + zoneId: ZoneId, + locale: Locale) extends LegacyTimestampFormatter { + @transient private lazy val fdf = + FastDateFormat.getInstance(pattern, TimeZone.getTimeZone(zoneId), locale) + override def parseToDate(s: String): Date = fdf.parse(s) + override def formatTimestamp(t: Timestamp): String = fdf.format(t) +} - override def format(us: Long): String = { - format.format(DateTimeUtils.toJavaTimestamp(us)) +class LegacySimpleDateFormatter( + pattern: String, + zoneId: ZoneId, + locale: Locale, + lenient: Boolean = true) extends LegacyTimestampFormatter { + @transient private lazy val sdf = { + val formatter = new SimpleDateFormat(pattern, locale) + formatter.setTimeZone(TimeZone.getTimeZone(zoneId)) + formatter.setLenient(lenient) + formatter } + override def parseToDate(s: String): Date = sdf.parse(s) + override def formatTimestamp(t: Timestamp): String = sdf.format(t) +} + +object LegacyDateFormats extends Enumeration { + type LegacyDateFormat = Value + val FAST_DATE_FORMAT, SIMPLE_DATE_FORMAT, LENIENT_SIMPLE_DATE_FORMAT = Value } object TimestampFormatter { + import LegacyDateFormats._ + val defaultLocale: Locale = Locale.US - def apply(format: String, zoneId: ZoneId, locale: Locale): TimestampFormatter = { + def defaultPattern(): String = { if (SQLConf.get.legacyTimeParserEnabled) { - new LegacyTimestampFormatter(format, zoneId, locale) + "yyyy-MM-dd HH:mm:ss" } else { - new Iso8601TimestampFormatter(format, zoneId, locale) + "uuuu-MM-dd HH:mm:ss" } } - def apply(format: String, zoneId: ZoneId): TimestampFormatter = { - apply(format, zoneId, defaultLocale) - } + private def getFormatter( + format: Option[String], + zoneId: ZoneId, + locale: Locale, + legacyFormat: LegacyDateFormat): TimestampFormatter = { - def apply(zoneId: ZoneId): TimestampFormatter = { + val pattern = format.getOrElse(defaultPattern) if (SQLConf.get.legacyTimeParserEnabled) { - new LegacyTimestampFormatter("yyyy-MM-dd HH:mm:ss", zoneId, defaultLocale) + legacyFormat match { + case FAST_DATE_FORMAT => + new LegacyFastDateFormatter(pattern, zoneId, locale) + case SIMPLE_DATE_FORMAT => + new LegacySimpleDateFormatter(pattern, zoneId, locale, lenient = false) + case LENIENT_SIMPLE_DATE_FORMAT => + new LegacySimpleDateFormatter(pattern, zoneId, locale, lenient = true) + } } else { - new Iso8601TimestampFormatter("uuuu-MM-dd HH:mm:ss", zoneId, defaultLocale) + new Iso8601TimestampFormatter(pattern, zoneId, locale) } } + def apply( + format: String, + zoneId: ZoneId, + locale: Locale, + legacyFormat: LegacyDateFormat): TimestampFormatter = { + getFormatter(Some(format), zoneId, locale, legacyFormat) + } + + def apply(format: String, zoneId: ZoneId): TimestampFormatter = { + apply(format, zoneId, defaultLocale, LENIENT_SIMPLE_DATE_FORMAT) + } + + def apply(zoneId: ZoneId): TimestampFormatter = { + getFormatter(None, zoneId, defaultLocale, LENIENT_SIMPLE_DATE_FORMAT) + } + def getFractionFormatter(zoneId: ZoneId): TimestampFormatter = { new FractionTimestampFormatter(zoneId) } + + def withStrongLegacy(format: String, zoneId: ZoneId): TimestampFormatter = { + apply(format, zoneId, defaultLocale, SIMPLE_DATE_FORMAT) + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala index 274d0beebd30..f04149ab7eb2 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.util.{DateTimeUtils, IntervalUtils, Timesta import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_SECOND import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._ import org.apache.spark.sql.catalyst.util.DateTimeUtils.TimeZoneGMT +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} @@ -241,41 +242,45 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { } test("DateFormat") { - checkEvaluation( - DateFormatClass(Literal.create(null, TimestampType), Literal("y"), gmtId), - null) - checkEvaluation(DateFormatClass(Cast(Literal(d), TimestampType, gmtId), - Literal.create(null, StringType), gmtId), null) - - checkEvaluation(DateFormatClass(Cast(Literal(d), TimestampType, gmtId), - Literal("y"), gmtId), "2015") - checkEvaluation(DateFormatClass(Literal(ts), Literal("y"), gmtId), "2013") - checkEvaluation(DateFormatClass(Cast(Literal(d), TimestampType, gmtId), - Literal("H"), gmtId), "0") - checkEvaluation(DateFormatClass(Literal(ts), Literal("H"), gmtId), "13") - - checkEvaluation(DateFormatClass(Cast(Literal(d), TimestampType, pstId), - Literal("y"), pstId), "2015") - checkEvaluation(DateFormatClass(Literal(ts), Literal("y"), pstId), "2013") - checkEvaluation(DateFormatClass(Cast(Literal(d), TimestampType, pstId), - Literal("H"), pstId), "0") - checkEvaluation(DateFormatClass(Literal(ts), Literal("H"), pstId), "5") - - checkEvaluation(DateFormatClass(Cast(Literal(d), TimestampType, jstId), - Literal("y"), jstId), "2015") - checkEvaluation(DateFormatClass(Literal(ts), Literal("y"), jstId), "2013") - checkEvaluation(DateFormatClass(Cast(Literal(d), TimestampType, jstId), - Literal("H"), jstId), "0") - checkEvaluation(DateFormatClass(Literal(ts), Literal("H"), jstId), "22") - - // SPARK-28072 The codegen path should work - checkEvaluation( - expression = DateFormatClass( - BoundReference(ordinal = 0, dataType = TimestampType, nullable = true), - BoundReference(ordinal = 1, dataType = StringType, nullable = true), - jstId), - expected = "22", - inputRow = InternalRow(DateTimeUtils.fromJavaTimestamp(ts), UTF8String.fromString("H"))) + Seq(false, true).foreach { legacyParser => + withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) { + checkEvaluation( + DateFormatClass(Literal.create(null, TimestampType), Literal("y"), gmtId), + null) + checkEvaluation(DateFormatClass(Cast(Literal(d), TimestampType, gmtId), + Literal.create(null, StringType), gmtId), null) + + checkEvaluation(DateFormatClass(Cast(Literal(d), TimestampType, gmtId), + Literal("y"), gmtId), "2015") + checkEvaluation(DateFormatClass(Literal(ts), Literal("y"), gmtId), "2013") + checkEvaluation(DateFormatClass(Cast(Literal(d), TimestampType, gmtId), + Literal("H"), gmtId), "0") + checkEvaluation(DateFormatClass(Literal(ts), Literal("H"), gmtId), "13") + + checkEvaluation(DateFormatClass(Cast(Literal(d), TimestampType, pstId), + Literal("y"), pstId), "2015") + checkEvaluation(DateFormatClass(Literal(ts), Literal("y"), pstId), "2013") + checkEvaluation(DateFormatClass(Cast(Literal(d), TimestampType, pstId), + Literal("H"), pstId), "0") + checkEvaluation(DateFormatClass(Literal(ts), Literal("H"), pstId), "5") + + checkEvaluation(DateFormatClass(Cast(Literal(d), TimestampType, jstId), + Literal("y"), jstId), "2015") + checkEvaluation(DateFormatClass(Literal(ts), Literal("y"), jstId), "2013") + checkEvaluation(DateFormatClass(Cast(Literal(d), TimestampType, jstId), + Literal("H"), jstId), "0") + checkEvaluation(DateFormatClass(Literal(ts), Literal("H"), jstId), "22") + + // SPARK-28072 The codegen path should work + checkEvaluation( + expression = DateFormatClass( + BoundReference(ordinal = 0, dataType = TimestampType, nullable = true), + BoundReference(ordinal = 1, dataType = StringType, nullable = true), + jstId), + expected = "22", + inputRow = InternalRow(DateTimeUtils.fromJavaTimestamp(ts), UTF8String.fromString("H"))) + } + } } test("Hour") { @@ -705,162 +710,189 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { } test("from_unixtime") { - val fmt1 = "yyyy-MM-dd HH:mm:ss" - val sdf1 = new SimpleDateFormat(fmt1, Locale.US) - val fmt2 = "yyyy-MM-dd HH:mm:ss.SSS" - val sdf2 = new SimpleDateFormat(fmt2, Locale.US) - for (tz <- Seq(TimeZoneGMT, TimeZonePST, TimeZoneJST)) { - val timeZoneId = Option(tz.getID) - sdf1.setTimeZone(tz) - sdf2.setTimeZone(tz) + Seq(false, true).foreach { legacyParser => + withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) { + val fmt1 = "yyyy-MM-dd HH:mm:ss" + val sdf1 = new SimpleDateFormat(fmt1, Locale.US) + val fmt2 = "yyyy-MM-dd HH:mm:ss.SSS" + val sdf2 = new SimpleDateFormat(fmt2, Locale.US) + for (tz <- Seq(TimeZoneGMT, TimeZonePST, TimeZoneJST)) { + val timeZoneId = Option(tz.getID) + sdf1.setTimeZone(tz) + sdf2.setTimeZone(tz) - checkEvaluation( - FromUnixTime(Literal(0L), Literal(fmt1), timeZoneId), - sdf1.format(new Timestamp(0))) - checkEvaluation(FromUnixTime( - Literal(1000L), Literal(fmt1), timeZoneId), - sdf1.format(new Timestamp(1000000))) - checkEvaluation( - FromUnixTime(Literal(-1000L), Literal(fmt2), timeZoneId), - sdf2.format(new Timestamp(-1000000))) - checkEvaluation( - FromUnixTime(Literal.create(null, LongType), Literal.create(null, StringType), timeZoneId), - null) - checkEvaluation( - FromUnixTime(Literal.create(null, LongType), Literal(fmt1), timeZoneId), - null) - checkEvaluation( - FromUnixTime(Literal(1000L), Literal.create(null, StringType), timeZoneId), - null) - checkEvaluation( - FromUnixTime(Literal(0L), Literal("not a valid format"), timeZoneId), null) + checkEvaluation( + FromUnixTime(Literal(0L), Literal(fmt1), timeZoneId), + sdf1.format(new Timestamp(0))) + checkEvaluation(FromUnixTime( + Literal(1000L), Literal(fmt1), timeZoneId), + sdf1.format(new Timestamp(1000000))) + checkEvaluation( + FromUnixTime(Literal(-1000L), Literal(fmt2), timeZoneId), + sdf2.format(new Timestamp(-1000000))) + checkEvaluation( + FromUnixTime( + Literal.create(null, LongType), + Literal.create(null, StringType), timeZoneId), + null) + checkEvaluation( + FromUnixTime(Literal.create(null, LongType), Literal(fmt1), timeZoneId), + null) + checkEvaluation( + FromUnixTime(Literal(1000L), Literal.create(null, StringType), timeZoneId), + null) + checkEvaluation( + FromUnixTime(Literal(0L), Literal("not a valid format"), timeZoneId), null) - // SPARK-28072 The codegen path for non-literal input should also work - checkEvaluation( - expression = FromUnixTime( - BoundReference(ordinal = 0, dataType = LongType, nullable = true), - BoundReference(ordinal = 1, dataType = StringType, nullable = true), - timeZoneId), - expected = UTF8String.fromString(sdf1.format(new Timestamp(0))), - inputRow = InternalRow(0L, UTF8String.fromString(fmt1))) + // SPARK-28072 The codegen path for non-literal input should also work + checkEvaluation( + expression = FromUnixTime( + BoundReference(ordinal = 0, dataType = LongType, nullable = true), + BoundReference(ordinal = 1, dataType = StringType, nullable = true), + timeZoneId), + expected = UTF8String.fromString(sdf1.format(new Timestamp(0))), + inputRow = InternalRow(0L, UTF8String.fromString(fmt1))) + } + } } } test("unix_timestamp") { - val sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US) - val fmt2 = "yyyy-MM-dd HH:mm:ss.SSS" - val sdf2 = new SimpleDateFormat(fmt2, Locale.US) - val fmt3 = "yy-MM-dd" - val sdf3 = new SimpleDateFormat(fmt3, Locale.US) - sdf3.setTimeZone(TimeZoneGMT) - - withDefaultTimeZone(TimeZoneGMT) { - for (tz <- Seq(TimeZoneGMT, TimeZonePST, TimeZoneJST)) { - val timeZoneId = Option(tz.getID) - sdf1.setTimeZone(tz) - sdf2.setTimeZone(tz) - - val date1 = Date.valueOf("2015-07-24") - checkEvaluation(UnixTimestamp( - Literal(sdf1.format(new Timestamp(0))), Literal("yyyy-MM-dd HH:mm:ss"), timeZoneId), 0L) - checkEvaluation(UnixTimestamp( - Literal(sdf1.format(new Timestamp(1000000))), Literal("yyyy-MM-dd HH:mm:ss"), timeZoneId), - 1000L) - checkEvaluation( - UnixTimestamp( - Literal(new Timestamp(1000000)), Literal("yyyy-MM-dd HH:mm:ss"), timeZoneId), - 1000L) - checkEvaluation( - UnixTimestamp(Literal(date1), Literal("yyyy-MM-dd HH:mm:ss"), timeZoneId), - MILLISECONDS.toSeconds(DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1), tz))) - checkEvaluation( - UnixTimestamp(Literal(sdf2.format(new Timestamp(-1000000))), Literal(fmt2), timeZoneId), - -1000L) - checkEvaluation(UnixTimestamp( - Literal(sdf3.format(Date.valueOf("2015-07-24"))), Literal(fmt3), timeZoneId), - MILLISECONDS.toSeconds(DateTimeUtils.daysToMillis( - DateTimeUtils.fromJavaDate(Date.valueOf("2015-07-24")), tz))) - val t1 = UnixTimestamp( - CurrentTimestamp(), Literal("yyyy-MM-dd HH:mm:ss")).eval().asInstanceOf[Long] - val t2 = UnixTimestamp( - CurrentTimestamp(), Literal("yyyy-MM-dd HH:mm:ss")).eval().asInstanceOf[Long] - assert(t2 - t1 <= 1) - checkEvaluation( - UnixTimestamp( - Literal.create(null, DateType), Literal.create(null, StringType), timeZoneId), - null) - checkEvaluation( - UnixTimestamp(Literal.create(null, DateType), Literal("yyyy-MM-dd HH:mm:ss"), timeZoneId), - null) - checkEvaluation( - UnixTimestamp(Literal(date1), Literal.create(null, StringType), timeZoneId), - MILLISECONDS.toSeconds(DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1), tz))) - checkEvaluation( - UnixTimestamp(Literal("2015-07-24"), Literal("not a valid format"), timeZoneId), null) + Seq(false, true).foreach { legacyParser => + withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) { + val sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US) + val fmt2 = "yyyy-MM-dd HH:mm:ss.SSS" + val sdf2 = new SimpleDateFormat(fmt2, Locale.US) + val fmt3 = "yy-MM-dd" + val sdf3 = new SimpleDateFormat(fmt3, Locale.US) + sdf3.setTimeZone(TimeZoneGMT) + + withDefaultTimeZone(TimeZoneGMT) { + for (tz <- Seq(TimeZoneGMT, TimeZonePST, TimeZoneJST)) { + val timeZoneId = Option(tz.getID) + sdf1.setTimeZone(tz) + sdf2.setTimeZone(tz) + + val date1 = Date.valueOf("2015-07-24") + checkEvaluation(UnixTimestamp( + Literal(sdf1.format(new Timestamp(0))), + Literal("yyyy-MM-dd HH:mm:ss"), timeZoneId), 0L) + checkEvaluation(UnixTimestamp( + Literal(sdf1.format(new Timestamp(1000000))), + Literal("yyyy-MM-dd HH:mm:ss"), timeZoneId), + 1000L) + checkEvaluation( + UnixTimestamp( + Literal(new Timestamp(1000000)), Literal("yyyy-MM-dd HH:mm:ss"), timeZoneId), + 1000L) + checkEvaluation( + UnixTimestamp(Literal(date1), Literal("yyyy-MM-dd HH:mm:ss"), timeZoneId), + MILLISECONDS.toSeconds( + DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1), tz))) + checkEvaluation( + UnixTimestamp(Literal(sdf2.format(new Timestamp(-1000000))), + Literal(fmt2), timeZoneId), + -1000L) + checkEvaluation(UnixTimestamp( + Literal(sdf3.format(Date.valueOf("2015-07-24"))), Literal(fmt3), timeZoneId), + MILLISECONDS.toSeconds(DateTimeUtils.daysToMillis( + DateTimeUtils.fromJavaDate(Date.valueOf("2015-07-24")), tz))) + val t1 = UnixTimestamp( + CurrentTimestamp(), Literal("yyyy-MM-dd HH:mm:ss")).eval().asInstanceOf[Long] + val t2 = UnixTimestamp( + CurrentTimestamp(), Literal("yyyy-MM-dd HH:mm:ss")).eval().asInstanceOf[Long] + assert(t2 - t1 <= 1) + checkEvaluation( + UnixTimestamp( + Literal.create(null, DateType), Literal.create(null, StringType), timeZoneId), + null) + checkEvaluation( + UnixTimestamp( + Literal.create(null, DateType), + Literal("yyyy-MM-dd HH:mm:ss"), timeZoneId), + null) + checkEvaluation( + UnixTimestamp(Literal(date1), Literal.create(null, StringType), timeZoneId), + MILLISECONDS.toSeconds( + DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1), tz))) + checkEvaluation( + UnixTimestamp(Literal("2015-07-24"), Literal("not a valid format"), timeZoneId), null) + } + } } } } test("to_unix_timestamp") { - val fmt1 = "yyyy-MM-dd HH:mm:ss" - val sdf1 = new SimpleDateFormat(fmt1, Locale.US) - val fmt2 = "yyyy-MM-dd HH:mm:ss.SSS" - val sdf2 = new SimpleDateFormat(fmt2, Locale.US) - val fmt3 = "yy-MM-dd" - val sdf3 = new SimpleDateFormat(fmt3, Locale.US) - sdf3.setTimeZone(TimeZoneGMT) - - withDefaultTimeZone(TimeZoneGMT) { - for (tz <- Seq(TimeZoneGMT, TimeZonePST, TimeZoneJST)) { - val timeZoneId = Option(tz.getID) - sdf1.setTimeZone(tz) - sdf2.setTimeZone(tz) - - val date1 = Date.valueOf("2015-07-24") - checkEvaluation(ToUnixTimestamp( - Literal(sdf1.format(new Timestamp(0))), Literal(fmt1), timeZoneId), 0L) - checkEvaluation(ToUnixTimestamp( - Literal(sdf1.format(new Timestamp(1000000))), Literal(fmt1), timeZoneId), - 1000L) - checkEvaluation(ToUnixTimestamp( - Literal(new Timestamp(1000000)), Literal(fmt1)), - 1000L) - checkEvaluation( - ToUnixTimestamp(Literal(date1), Literal(fmt1), timeZoneId), - MILLISECONDS.toSeconds(DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1), tz))) - checkEvaluation( - ToUnixTimestamp(Literal(sdf2.format(new Timestamp(-1000000))), Literal(fmt2), timeZoneId), - -1000L) - checkEvaluation(ToUnixTimestamp( - Literal(sdf3.format(Date.valueOf("2015-07-24"))), Literal(fmt3), timeZoneId), - MILLISECONDS.toSeconds(DateTimeUtils.daysToMillis( - DateTimeUtils.fromJavaDate(Date.valueOf("2015-07-24")), tz))) - val t1 = ToUnixTimestamp( - CurrentTimestamp(), Literal(fmt1)).eval().asInstanceOf[Long] - val t2 = ToUnixTimestamp( - CurrentTimestamp(), Literal(fmt1)).eval().asInstanceOf[Long] - assert(t2 - t1 <= 1) - checkEvaluation(ToUnixTimestamp( - Literal.create(null, DateType), Literal.create(null, StringType), timeZoneId), null) - checkEvaluation( - ToUnixTimestamp( - Literal.create(null, DateType), Literal(fmt1), timeZoneId), - null) - checkEvaluation(ToUnixTimestamp( - Literal(date1), Literal.create(null, StringType), timeZoneId), - MILLISECONDS.toSeconds(DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1), tz))) - checkEvaluation( - ToUnixTimestamp(Literal("2015-07-24"), Literal("not a valid format"), timeZoneId), null) + Seq(false, true).foreach { legacyParser => + withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) { + val fmt1 = "yyyy-MM-dd HH:mm:ss" + val sdf1 = new SimpleDateFormat(fmt1, Locale.US) + val fmt2 = "yyyy-MM-dd HH:mm:ss.SSS" + val sdf2 = new SimpleDateFormat(fmt2, Locale.US) + val fmt3 = "yy-MM-dd" + val sdf3 = new SimpleDateFormat(fmt3, Locale.US) + sdf3.setTimeZone(TimeZoneGMT) + + withDefaultTimeZone(TimeZoneGMT) { + for (tz <- Seq(TimeZoneGMT, TimeZonePST, TimeZoneJST)) { + val timeZoneId = Option(tz.getID) + sdf1.setTimeZone(tz) + sdf2.setTimeZone(tz) + + val date1 = Date.valueOf("2015-07-24") + checkEvaluation(ToUnixTimestamp( + Literal(sdf1.format(new Timestamp(0))), Literal(fmt1), timeZoneId), 0L) + checkEvaluation(ToUnixTimestamp( + Literal(sdf1.format(new Timestamp(1000000))), Literal(fmt1), timeZoneId), + 1000L) + checkEvaluation(ToUnixTimestamp( + Literal(new Timestamp(1000000)), Literal(fmt1)), + 1000L) + checkEvaluation( + ToUnixTimestamp(Literal(date1), Literal(fmt1), timeZoneId), + MILLISECONDS.toSeconds( + DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1), tz))) + checkEvaluation( + ToUnixTimestamp( + Literal(sdf2.format(new Timestamp(-1000000))), + Literal(fmt2), timeZoneId), + -1000L) + checkEvaluation(ToUnixTimestamp( + Literal(sdf3.format(Date.valueOf("2015-07-24"))), Literal(fmt3), timeZoneId), + MILLISECONDS.toSeconds(DateTimeUtils.daysToMillis( + DateTimeUtils.fromJavaDate(Date.valueOf("2015-07-24")), tz))) + val t1 = ToUnixTimestamp( + CurrentTimestamp(), Literal(fmt1)).eval().asInstanceOf[Long] + val t2 = ToUnixTimestamp( + CurrentTimestamp(), Literal(fmt1)).eval().asInstanceOf[Long] + assert(t2 - t1 <= 1) + checkEvaluation(ToUnixTimestamp( + Literal.create(null, DateType), Literal.create(null, StringType), timeZoneId), null) + checkEvaluation( + ToUnixTimestamp( + Literal.create(null, DateType), Literal(fmt1), timeZoneId), + null) + checkEvaluation(ToUnixTimestamp( + Literal(date1), Literal.create(null, StringType), timeZoneId), + MILLISECONDS.toSeconds( + DateTimeUtils.daysToMillis(DateTimeUtils.fromJavaDate(date1), tz))) + checkEvaluation( + ToUnixTimestamp( + Literal("2015-07-24"), + Literal("not a valid format"), timeZoneId), null) - // SPARK-28072 The codegen path for non-literal input should also work - checkEvaluation( - expression = ToUnixTimestamp( - BoundReference(ordinal = 0, dataType = StringType, nullable = true), - BoundReference(ordinal = 1, dataType = StringType, nullable = true), - timeZoneId), - expected = 0L, - inputRow = InternalRow( - UTF8String.fromString(sdf1.format(new Timestamp(0))), UTF8String.fromString(fmt1))) + // SPARK-28072 The codegen path for non-literal input should also work + checkEvaluation( + expression = ToUnixTimestamp( + BoundReference(ordinal = 0, dataType = StringType, nullable = true), + BoundReference(ordinal = 1, dataType = StringType, nullable = true), + timeZoneId), + expected = 0L, + inputRow = InternalRow( + UTF8String.fromString(sdf1.format(new Timestamp(0))), UTF8String.fromString(fmt1))) + } + } } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index d125581857e0..2d5504ac00ff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, HintInfo, ResolvedHint} +import org.apache.spark.sql.catalyst.util.TimestampFormatter import org.apache.spark.sql.execution.SparkSqlParser import org.apache.spark.sql.expressions.{Aggregator, SparkUserDefinedFunction, UserDefinedAggregator, UserDefinedFunction} import org.apache.spark.sql.internal.SQLConf @@ -2881,7 +2882,7 @@ object functions { * @since 1.5.0 */ def from_unixtime(ut: Column): Column = withExpr { - FromUnixTime(ut.expr, Literal("uuuu-MM-dd HH:mm:ss")) + FromUnixTime(ut.expr, Literal(TimestampFormatter.defaultPattern)) } /** @@ -2913,7 +2914,7 @@ object functions { * @since 1.5.0 */ def unix_timestamp(): Column = withExpr { - UnixTimestamp(CurrentTimestamp(), Literal("uuuu-MM-dd HH:mm:ss")) + UnixTimestamp(CurrentTimestamp(), Literal(TimestampFormatter.defaultPattern)) } /** @@ -2927,7 +2928,7 @@ object functions { * @since 1.5.0 */ def unix_timestamp(s: Column): Column = withExpr { - UnixTimestamp(s.expr, Literal("uuuu-MM-dd HH:mm:ss")) + UnixTimestamp(s.expr, Literal(TimestampFormatter.defaultPattern)) } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala index bb8cdf3cb6de..41d53c959ef9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala @@ -96,15 +96,19 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession { } test("date format") { - val df = Seq((d, sdf.format(d), ts)).toDF("a", "b", "c") + Seq(false, true).foreach { legacyParser => + withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) { + val df = Seq((d, sdf.format(d), ts)).toDF("a", "b", "c") - checkAnswer( - df.select(date_format($"a", "y"), date_format($"b", "y"), date_format($"c", "y")), - Row("2015", "2015", "2013")) + checkAnswer( + df.select(date_format($"a", "y"), date_format($"b", "y"), date_format($"c", "y")), + Row("2015", "2015", "2013")) - checkAnswer( - df.selectExpr("date_format(a, 'y')", "date_format(b, 'y')", "date_format(c, 'y')"), - Row("2015", "2015", "2013")) + checkAnswer( + df.selectExpr("date_format(a, 'y')", "date_format(b, 'y')", "date_format(c, 'y')"), + Row("2015", "2015", "2013")) + } + } } test("year") { @@ -525,170 +529,194 @@ class DateFunctionsSuite extends QueryTest with SharedSparkSession { } test("from_unixtime") { - val sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US) - val fmt2 = "yyyy-MM-dd HH:mm:ss.SSS" - val sdf2 = new SimpleDateFormat(fmt2, Locale.US) - val fmt3 = "yy-MM-dd HH-mm-ss" - val sdf3 = new SimpleDateFormat(fmt3, Locale.US) - val df = Seq((1000, "yyyy-MM-dd HH:mm:ss.SSS"), (-1000, "yy-MM-dd HH-mm-ss")).toDF("a", "b") - checkAnswer( - df.select(from_unixtime(col("a"))), - Seq(Row(sdf1.format(new Timestamp(1000000))), Row(sdf1.format(new Timestamp(-1000000))))) - checkAnswer( - df.select(from_unixtime(col("a"), fmt2)), - Seq(Row(sdf2.format(new Timestamp(1000000))), Row(sdf2.format(new Timestamp(-1000000))))) - checkAnswer( - df.select(from_unixtime(col("a"), fmt3)), - Seq(Row(sdf3.format(new Timestamp(1000000))), Row(sdf3.format(new Timestamp(-1000000))))) - checkAnswer( - df.selectExpr("from_unixtime(a)"), - Seq(Row(sdf1.format(new Timestamp(1000000))), Row(sdf1.format(new Timestamp(-1000000))))) - checkAnswer( - df.selectExpr(s"from_unixtime(a, '$fmt2')"), - Seq(Row(sdf2.format(new Timestamp(1000000))), Row(sdf2.format(new Timestamp(-1000000))))) - checkAnswer( - df.selectExpr(s"from_unixtime(a, '$fmt3')"), - Seq(Row(sdf3.format(new Timestamp(1000000))), Row(sdf3.format(new Timestamp(-1000000))))) + Seq(false, true).foreach { legacyParser => + withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) { + val sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US) + val fmt2 = "yyyy-MM-dd HH:mm:ss.SSS" + val sdf2 = new SimpleDateFormat(fmt2, Locale.US) + val fmt3 = "yy-MM-dd HH-mm-ss" + val sdf3 = new SimpleDateFormat(fmt3, Locale.US) + val df = Seq((1000, "yyyy-MM-dd HH:mm:ss.SSS"), (-1000, "yy-MM-dd HH-mm-ss")).toDF("a", "b") + checkAnswer( + df.select(from_unixtime(col("a"))), + Seq(Row(sdf1.format(new Timestamp(1000000))), Row(sdf1.format(new Timestamp(-1000000))))) + checkAnswer( + df.select(from_unixtime(col("a"), fmt2)), + Seq(Row(sdf2.format(new Timestamp(1000000))), Row(sdf2.format(new Timestamp(-1000000))))) + checkAnswer( + df.select(from_unixtime(col("a"), fmt3)), + Seq(Row(sdf3.format(new Timestamp(1000000))), Row(sdf3.format(new Timestamp(-1000000))))) + checkAnswer( + df.selectExpr("from_unixtime(a)"), + Seq(Row(sdf1.format(new Timestamp(1000000))), Row(sdf1.format(new Timestamp(-1000000))))) + checkAnswer( + df.selectExpr(s"from_unixtime(a, '$fmt2')"), + Seq(Row(sdf2.format(new Timestamp(1000000))), Row(sdf2.format(new Timestamp(-1000000))))) + checkAnswer( + df.selectExpr(s"from_unixtime(a, '$fmt3')"), + Seq(Row(sdf3.format(new Timestamp(1000000))), Row(sdf3.format(new Timestamp(-1000000))))) + } + } } private def secs(millis: Long): Long = TimeUnit.MILLISECONDS.toSeconds(millis) test("unix_timestamp") { - val date1 = Date.valueOf("2015-07-24") - val date2 = Date.valueOf("2015-07-25") - val ts1 = Timestamp.valueOf("2015-07-24 10:00:00.3") - val ts2 = Timestamp.valueOf("2015-07-25 02:02:02.2") - val s1 = "2015/07/24 10:00:00.5" - val s2 = "2015/07/25 02:02:02.6" - val ss1 = "2015-07-24 10:00:00" - val ss2 = "2015-07-25 02:02:02" - val fmt = "yyyy/MM/dd HH:mm:ss.S" - val df = Seq((date1, ts1, s1, ss1), (date2, ts2, s2, ss2)).toDF("d", "ts", "s", "ss") - checkAnswer(df.select(unix_timestamp(col("ts"))), Seq( - Row(secs(ts1.getTime)), Row(secs(ts2.getTime)))) - checkAnswer(df.select(unix_timestamp(col("ss"))), Seq( - Row(secs(ts1.getTime)), Row(secs(ts2.getTime)))) - checkAnswer(df.select(unix_timestamp(col("d"), fmt)), Seq( - Row(secs(date1.getTime)), Row(secs(date2.getTime)))) - checkAnswer(df.select(unix_timestamp(col("s"), fmt)), Seq( - Row(secs(ts1.getTime)), Row(secs(ts2.getTime)))) - checkAnswer(df.selectExpr("unix_timestamp(ts)"), Seq( - Row(secs(ts1.getTime)), Row(secs(ts2.getTime)))) - checkAnswer(df.selectExpr("unix_timestamp(ss)"), Seq( - Row(secs(ts1.getTime)), Row(secs(ts2.getTime)))) - checkAnswer(df.selectExpr(s"unix_timestamp(d, '$fmt')"), Seq( - Row(secs(date1.getTime)), Row(secs(date2.getTime)))) - checkAnswer(df.selectExpr(s"unix_timestamp(s, '$fmt')"), Seq( - Row(secs(ts1.getTime)), Row(secs(ts2.getTime)))) - - val x1 = "2015-07-24 10:00:00" - val x2 = "2015-25-07 02:02:02" - val x3 = "2015-07-24 25:02:02" - val x4 = "2015-24-07 26:02:02" - val ts3 = Timestamp.valueOf("2015-07-24 02:25:02") - val ts4 = Timestamp.valueOf("2015-07-24 00:10:00") - - val df1 = Seq(x1, x2, x3, x4).toDF("x") - checkAnswer(df1.select(unix_timestamp(col("x"))), Seq( - Row(secs(ts1.getTime)), Row(null), Row(null), Row(null))) - checkAnswer(df1.selectExpr("unix_timestamp(x)"), Seq( - Row(secs(ts1.getTime)), Row(null), Row(null), Row(null))) - checkAnswer(df1.select(unix_timestamp(col("x"), "yyyy-dd-MM HH:mm:ss")), Seq( - Row(null), Row(secs(ts2.getTime)), Row(null), Row(null))) - checkAnswer(df1.selectExpr(s"unix_timestamp(x, 'yyyy-MM-dd mm:HH:ss')"), Seq( - Row(secs(ts4.getTime)), Row(null), Row(secs(ts3.getTime)), Row(null))) - - // invalid format - checkAnswer(df1.selectExpr(s"unix_timestamp(x, 'yyyy-MM-dd aa:HH:ss')"), Seq( - Row(null), Row(null), Row(null), Row(null))) - - // february - val y1 = "2016-02-29" - val y2 = "2017-02-29" - val ts5 = Timestamp.valueOf("2016-02-29 00:00:00") - val df2 = Seq(y1, y2).toDF("y") - checkAnswer(df2.select(unix_timestamp(col("y"), "yyyy-MM-dd")), Seq( - Row(secs(ts5.getTime)), Row(null))) - - val now = sql("select unix_timestamp()").collect().head.getLong(0) - checkAnswer( - sql(s"select cast ($now as timestamp)"), - Row(new java.util.Date(TimeUnit.SECONDS.toMillis(now)))) + Seq(false, true).foreach { legacyParser => + withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) { + val date1 = Date.valueOf("2015-07-24") + val date2 = Date.valueOf("2015-07-25") + val ts1 = Timestamp.valueOf("2015-07-24 10:00:00.3") + val ts2 = Timestamp.valueOf("2015-07-25 02:02:02.2") + val s1 = "2015/07/24 10:00:00.5" + val s2 = "2015/07/25 02:02:02.6" + val ss1 = "2015-07-24 10:00:00" + val ss2 = "2015-07-25 02:02:02" + val fmt = "yyyy/MM/dd HH:mm:ss.S" + val df = Seq((date1, ts1, s1, ss1), (date2, ts2, s2, ss2)).toDF("d", "ts", "s", "ss") + checkAnswer(df.select(unix_timestamp(col("ts"))), Seq( + Row(secs(ts1.getTime)), Row(secs(ts2.getTime)))) + checkAnswer(df.select(unix_timestamp(col("ss"))), Seq( + Row(secs(ts1.getTime)), Row(secs(ts2.getTime)))) + checkAnswer(df.select(unix_timestamp(col("d"), fmt)), Seq( + Row(secs(date1.getTime)), Row(secs(date2.getTime)))) + checkAnswer(df.select(unix_timestamp(col("s"), fmt)), Seq( + Row(secs(ts1.getTime)), Row(secs(ts2.getTime)))) + checkAnswer(df.selectExpr("unix_timestamp(ts)"), Seq( + Row(secs(ts1.getTime)), Row(secs(ts2.getTime)))) + checkAnswer(df.selectExpr("unix_timestamp(ss)"), Seq( + Row(secs(ts1.getTime)), Row(secs(ts2.getTime)))) + checkAnswer(df.selectExpr(s"unix_timestamp(d, '$fmt')"), Seq( + Row(secs(date1.getTime)), Row(secs(date2.getTime)))) + checkAnswer(df.selectExpr(s"unix_timestamp(s, '$fmt')"), Seq( + Row(secs(ts1.getTime)), Row(secs(ts2.getTime)))) + + val x1 = "2015-07-24 10:00:00" + val x2 = "2015-25-07 02:02:02" + val x3 = "2015-07-24 25:02:02" + val x4 = "2015-24-07 26:02:02" + val ts3 = Timestamp.valueOf("2015-07-24 02:25:02") + val ts4 = Timestamp.valueOf("2015-07-24 00:10:00") + + val df1 = Seq(x1, x2, x3, x4).toDF("x") + checkAnswer(df1.select(unix_timestamp(col("x"))), Seq( + Row(secs(ts1.getTime)), Row(null), Row(null), Row(null))) + checkAnswer(df1.selectExpr("unix_timestamp(x)"), Seq( + Row(secs(ts1.getTime)), Row(null), Row(null), Row(null))) + checkAnswer(df1.select(unix_timestamp(col("x"), "yyyy-dd-MM HH:mm:ss")), Seq( + Row(null), Row(secs(ts2.getTime)), Row(null), Row(null))) + checkAnswer(df1.selectExpr(s"unix_timestamp(x, 'yyyy-MM-dd mm:HH:ss')"), Seq( + Row(secs(ts4.getTime)), Row(null), Row(secs(ts3.getTime)), Row(null))) + + // invalid format + checkAnswer(df1.selectExpr(s"unix_timestamp(x, 'yyyy-MM-dd aa:HH:ss')"), Seq( + Row(null), Row(null), Row(null), Row(null))) + + // february + val y1 = "2016-02-29" + val y2 = "2017-02-29" + val ts5 = Timestamp.valueOf("2016-02-29 00:00:00") + val df2 = Seq(y1, y2).toDF("y") + checkAnswer(df2.select(unix_timestamp(col("y"), "yyyy-MM-dd")), Seq( + Row(secs(ts5.getTime)), Row(null))) + + val now = sql("select unix_timestamp()").collect().head.getLong(0) + checkAnswer( + sql(s"select cast ($now as timestamp)"), + Row(new java.util.Date(TimeUnit.SECONDS.toMillis(now)))) + } + } } test("to_unix_timestamp") { - val date1 = Date.valueOf("2015-07-24") - val date2 = Date.valueOf("2015-07-25") - val ts1 = Timestamp.valueOf("2015-07-24 10:00:00.3") - val ts2 = Timestamp.valueOf("2015-07-25 02:02:02.2") - val s1 = "2015/07/24 10:00:00.5" - val s2 = "2015/07/25 02:02:02.6" - val ss1 = "2015-07-24 10:00:00" - val ss2 = "2015-07-25 02:02:02" - val fmt = "yyyy/MM/dd HH:mm:ss.S" - val df = Seq((date1, ts1, s1, ss1), (date2, ts2, s2, ss2)).toDF("d", "ts", "s", "ss") - checkAnswer(df.selectExpr("to_unix_timestamp(ts)"), Seq( - Row(secs(ts1.getTime)), Row(secs(ts2.getTime)))) - checkAnswer(df.selectExpr("to_unix_timestamp(ss)"), Seq( - Row(secs(ts1.getTime)), Row(secs(ts2.getTime)))) - checkAnswer(df.selectExpr(s"to_unix_timestamp(d, '$fmt')"), Seq( - Row(secs(date1.getTime)), Row(secs(date2.getTime)))) - checkAnswer(df.selectExpr(s"to_unix_timestamp(s, '$fmt')"), Seq( - Row(secs(ts1.getTime)), Row(secs(ts2.getTime)))) - - val x1 = "2015-07-24 10:00:00" - val x2 = "2015-25-07 02:02:02" - val x3 = "2015-07-24 25:02:02" - val x4 = "2015-24-07 26:02:02" - val ts3 = Timestamp.valueOf("2015-07-24 02:25:02") - val ts4 = Timestamp.valueOf("2015-07-24 00:10:00") - - val df1 = Seq(x1, x2, x3, x4).toDF("x") - checkAnswer(df1.selectExpr("to_unix_timestamp(x)"), Seq( - Row(secs(ts1.getTime)), Row(null), Row(null), Row(null))) - checkAnswer(df1.selectExpr(s"to_unix_timestamp(x, 'yyyy-MM-dd mm:HH:ss')"), Seq( - Row(secs(ts4.getTime)), Row(null), Row(secs(ts3.getTime)), Row(null))) - - // february - val y1 = "2016-02-29" - val y2 = "2017-02-29" - val ts5 = Timestamp.valueOf("2016-02-29 00:00:00") - val df2 = Seq(y1, y2).toDF("y") - checkAnswer(df2.select(unix_timestamp(col("y"), "yyyy-MM-dd")), Seq( - Row(secs(ts5.getTime)), Row(null))) - - // invalid format - checkAnswer(df1.selectExpr(s"to_unix_timestamp(x, 'yyyy-MM-dd bb:HH:ss')"), Seq( - Row(null), Row(null), Row(null), Row(null))) + Seq(false, true).foreach { legacyParser => + withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) { + val date1 = Date.valueOf("2015-07-24") + val date2 = Date.valueOf("2015-07-25") + val ts1 = Timestamp.valueOf("2015-07-24 10:00:00.3") + val ts2 = Timestamp.valueOf("2015-07-25 02:02:02.2") + val s1 = "2015/07/24 10:00:00.5" + val s2 = "2015/07/25 02:02:02.6" + val ss1 = "2015-07-24 10:00:00" + val ss2 = "2015-07-25 02:02:02" + val fmt = "yyyy/MM/dd HH:mm:ss.S" + val df = Seq((date1, ts1, s1, ss1), (date2, ts2, s2, ss2)).toDF("d", "ts", "s", "ss") + checkAnswer(df.selectExpr("to_unix_timestamp(ts)"), Seq( + Row(secs(ts1.getTime)), Row(secs(ts2.getTime)))) + checkAnswer(df.selectExpr("to_unix_timestamp(ss)"), Seq( + Row(secs(ts1.getTime)), Row(secs(ts2.getTime)))) + checkAnswer(df.selectExpr(s"to_unix_timestamp(d, '$fmt')"), Seq( + Row(secs(date1.getTime)), Row(secs(date2.getTime)))) + checkAnswer(df.selectExpr(s"to_unix_timestamp(s, '$fmt')"), Seq( + Row(secs(ts1.getTime)), Row(secs(ts2.getTime)))) + + val x1 = "2015-07-24 10:00:00" + val x2 = "2015-25-07 02:02:02" + val x3 = "2015-07-24 25:02:02" + val x4 = "2015-24-07 26:02:02" + val ts3 = Timestamp.valueOf("2015-07-24 02:25:02") + val ts4 = Timestamp.valueOf("2015-07-24 00:10:00") + + val df1 = Seq(x1, x2, x3, x4).toDF("x") + checkAnswer(df1.selectExpr("to_unix_timestamp(x)"), Seq( + Row(secs(ts1.getTime)), Row(null), Row(null), Row(null))) + checkAnswer(df1.selectExpr(s"to_unix_timestamp(x, 'yyyy-MM-dd mm:HH:ss')"), Seq( + Row(secs(ts4.getTime)), Row(null), Row(secs(ts3.getTime)), Row(null))) + + // february + val y1 = "2016-02-29" + val y2 = "2017-02-29" + val ts5 = Timestamp.valueOf("2016-02-29 00:00:00") + val df2 = Seq(y1, y2).toDF("y") + checkAnswer(df2.select(unix_timestamp(col("y"), "yyyy-MM-dd")), Seq( + Row(secs(ts5.getTime)), Row(null))) + + // invalid format + checkAnswer(df1.selectExpr(s"to_unix_timestamp(x, 'yyyy-MM-dd bb:HH:ss')"), Seq( + Row(null), Row(null), Row(null), Row(null))) + } + } } test("to_timestamp") { - val date1 = Date.valueOf("2015-07-24") - val date2 = Date.valueOf("2015-07-25") - val ts_date1 = Timestamp.valueOf("2015-07-24 00:00:00") - val ts_date2 = Timestamp.valueOf("2015-07-25 00:00:00") - val ts1 = Timestamp.valueOf("2015-07-24 10:00:00") - val ts2 = Timestamp.valueOf("2015-07-25 02:02:02") - val s1 = "2015/07/24 10:00:00.5" - val s2 = "2015/07/25 02:02:02.6" - val ts1m = Timestamp.valueOf("2015-07-24 10:00:00.5") - val ts2m = Timestamp.valueOf("2015-07-25 02:02:02.6") - val ss1 = "2015-07-24 10:00:00" - val ss2 = "2015-07-25 02:02:02" - val fmt = "yyyy/MM/dd HH:mm:ss.S" - val df = Seq((date1, ts1, s1, ss1), (date2, ts2, s2, ss2)).toDF("d", "ts", "s", "ss") - - checkAnswer(df.select(to_timestamp(col("ss"))), - df.select(unix_timestamp(col("ss")).cast("timestamp"))) - checkAnswer(df.select(to_timestamp(col("ss"))), Seq( - Row(ts1), Row(ts2))) - checkAnswer(df.select(to_timestamp(col("s"), fmt)), Seq( - Row(ts1m), Row(ts2m))) - checkAnswer(df.select(to_timestamp(col("ts"), fmt)), Seq( - Row(ts1), Row(ts2))) - checkAnswer(df.select(to_timestamp(col("d"), "yyyy-MM-dd")), Seq( - Row(ts_date1), Row(ts_date2))) + Seq(false, true).foreach { legacyParser => + withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> legacyParser.toString) { + val date1 = Date.valueOf("2015-07-24") + val date2 = Date.valueOf("2015-07-25") + val ts_date1 = Timestamp.valueOf("2015-07-24 00:00:00") + val ts_date2 = Timestamp.valueOf("2015-07-25 00:00:00") + val ts1 = Timestamp.valueOf("2015-07-24 10:00:00") + val ts2 = Timestamp.valueOf("2015-07-25 02:02:02") + val s1 = "2015/07/24 10:00:00.5" + val s2 = "2015/07/25 02:02:02.6" + val ts1m = Timestamp.valueOf("2015-07-24 10:00:00.5") + val ts2m = Timestamp.valueOf("2015-07-25 02:02:02.6") + val ss1 = "2015-07-24 10:00:00" + val ss2 = "2015-07-25 02:02:02" + val fmt = "yyyy/MM/dd HH:mm:ss.S" + val df = Seq((date1, ts1, s1, ss1), (date2, ts2, s2, ss2)).toDF("d", "ts", "s", "ss") + + checkAnswer(df.select(to_timestamp(col("ss"))), + df.select(unix_timestamp(col("ss")).cast("timestamp"))) + checkAnswer(df.select(to_timestamp(col("ss"))), Seq( + Row(ts1), Row(ts2))) + if (legacyParser) { + // In Spark 2.4 and earlier, to_timestamp() parses in seconds precision and cuts off + // the fractional part of seconds. The behavior was changed by SPARK-27438. + val legacyFmt = "yyyy/MM/dd HH:mm:ss" + checkAnswer(df.select(to_timestamp(col("s"), legacyFmt)), Seq( + Row(ts1), Row(ts2))) + } else { + checkAnswer(df.select(to_timestamp(col("s"), fmt)), Seq( + Row(ts1m), Row(ts2m))) + } + checkAnswer(df.select(to_timestamp(col("ts"), fmt)), Seq( + Row(ts1), Row(ts2))) + checkAnswer(df.select(to_timestamp(col("d"), "yyyy-MM-dd")), Seq( + Row(ts_date1), Row(ts_date2))) + } + } } test("datediff") { From ab1d57f1b131db20c32ac1169730d3e8f45f4698 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 10 Feb 2020 21:51:29 +0300 Subject: [PATCH 02/11] Support SimpleDateFormat as a legacy date parser --- .../spark/sql/catalyst/csv/CSVOptions.scala | 4 +- .../sql/catalyst/csv/UnivocityGenerator.scala | 3 +- .../sql/catalyst/csv/UnivocityParser.scala | 3 +- .../spark/sql/catalyst/json/JSONOptions.scala | 4 +- .../sql/catalyst/json/JacksonGenerator.scala | 3 +- .../sql/catalyst/json/JacksonParser.scala | 3 +- .../sql/catalyst/util/DateFormatter.scala | 66 ++++++++++++++----- .../catalyst/util/TimestampFormatter.scala | 26 +++----- 8 files changed, 73 insertions(+), 39 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala index 5e40d74e54f1..8892037e03a7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala @@ -146,10 +146,10 @@ class CSVOptions( // A language tag in IETF BCP 47 format val locale: Locale = parameters.get("locale").map(Locale.forLanguageTag).getOrElse(Locale.US) - val dateFormat: String = parameters.getOrElse("dateFormat", "uuuu-MM-dd") + val dateFormat: String = parameters.getOrElse("dateFormat", DateFormatter.defaultPattern) val timestampFormat: String = - parameters.getOrElse("timestampFormat", "uuuu-MM-dd'T'HH:mm:ss.SSSXXX") + parameters.getOrElse("timestampFormat", s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSSXXX") val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala index efe0d42d57b0..00e3d49787db 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala @@ -50,7 +50,8 @@ class UnivocityGenerator( private val dateFormatter = DateFormatter( options.dateFormat, options.zoneId, - options.locale) + options.locale, + legacyFormat = FAST_DATE_FORMAT) private def makeConverter(dataType: DataType): ValueConverter = dataType match { case DateType => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 049faa12740d..cd69c21a0197 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -92,7 +92,8 @@ class UnivocityParser( private val dateFormatter = DateFormatter( options.dateFormat, options.zoneId, - options.locale) + options.locale, + legacyFormat = FAST_DATE_FORMAT) private val csvFilters = new CSVFilters(filters, requiredSchema) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala index cdf4b4689e82..45c4edff4707 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala @@ -88,10 +88,10 @@ private[sql] class JSONOptions( val zoneId: ZoneId = DateTimeUtils.getZoneId( parameters.getOrElse(DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId)) - val dateFormat: String = parameters.getOrElse("dateFormat", "uuuu-MM-dd") + val dateFormat: String = parameters.getOrElse("dateFormat", DateFormatter.defaultPattern) val timestampFormat: String = - parameters.getOrElse("timestampFormat", "uuuu-MM-dd'T'HH:mm:ss.SSSXXX") + parameters.getOrElse("timestampFormat", s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSSXXX") val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala index 33fd3e040f91..141360ff0211 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala @@ -86,7 +86,8 @@ private[sql] class JacksonGenerator( private val dateFormatter = DateFormatter( options.dateFormat, options.zoneId, - options.locale) + options.locale, + legacyFormat = FAST_DATE_FORMAT) private def makeWriter(dataType: DataType): ValueWriter = dataType match { case NullType => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index d5cd67da48b1..1e408cdb126b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -64,7 +64,8 @@ class JacksonParser( private val dateFormatter = DateFormatter( options.dateFormat, options.zoneId, - options.locale) + options.locale, + legacyFormat = FAST_DATE_FORMAT) /** * Create a converter which converts the JSON documents held by the `JsonParser` diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala index 28189b65dee9..09e7fe569230 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala @@ -17,8 +17,9 @@ package org.apache.spark.sql.catalyst.util +import java.text.SimpleDateFormat import java.time.{LocalDate, ZoneId} -import java.util.Locale +import java.util.{Date, Locale} import org.apache.commons.lang3.time.FastDateFormat @@ -51,41 +52,76 @@ class Iso8601DateFormatter( } } -class LegacyDateFormatter(pattern: String, locale: Locale) extends DateFormatter { - @transient - private lazy val format = FastDateFormat.getInstance(pattern, locale) +trait LegacyDateFormatter extends DateFormatter { + def parseToDate(s: String): Date + def formatDate(d: Date): String override def parse(s: String): Int = { - val milliseconds = format.parse(s).getTime + val milliseconds = parseToDate(s).getTime DateTimeUtils.millisToDays(milliseconds) } override def format(days: Int): String = { val date = DateTimeUtils.toJavaDate(days) - format.format(date) + formatDate(date) } } +class LegacyFastDateFormatter(pattern: String, locale: Locale) extends LegacyDateFormatter { + @transient + private lazy val fdf = FastDateFormat.getInstance(pattern, locale) + override def parseToDate(s: String): Date = fdf.parse(s) + def formatDate(d: Date): String = fdf.format(d) +} + +class LegacySimpleDateFormatter(pattern: String, locale: Locale) extends LegacyDateFormatter { + @transient + private lazy val sdf = new SimpleDateFormat(pattern, locale) + override def parseToDate(s: String): Date = sdf.parse(s) + def formatDate(d: Date): String = sdf.format(d) +} + object DateFormatter { + import LegacyDateFormats._ + val defaultLocale: Locale = Locale.US - def apply(format: String, zoneId: ZoneId, locale: Locale): DateFormatter = { + def defaultPattern(): String = { + if (SQLConf.get.legacyTimeParserEnabled) "yyyy-MM-dd" else "uuuu-MM-dd" + } + + private def getFormatter( + format: Option[String], + zoneId: ZoneId, + locale: Locale = defaultLocale, + legacyFormat: LegacyDateFormat = LENIENT_SIMPLE_DATE_FORMAT): DateFormatter = { + + val pattern = format.getOrElse(defaultPattern) if (SQLConf.get.legacyTimeParserEnabled) { - new LegacyDateFormatter(format, locale) + legacyFormat match { + case FAST_DATE_FORMAT => + new LegacyFastDateFormatter(pattern, locale) + case SIMPLE_DATE_FORMAT | LENIENT_SIMPLE_DATE_FORMAT => + new LegacySimpleDateFormatter(pattern, locale) + } } else { - new Iso8601DateFormatter(format, zoneId, locale) + new Iso8601DateFormatter(pattern, zoneId, locale) } } + def apply( + format: String, + zoneId: ZoneId, + locale: Locale, + legacyFormat: LegacyDateFormat): DateFormatter = { + getFormatter(Some(format), zoneId, defaultLocale, legacyFormat) + } + def apply(format: String, zoneId: ZoneId): DateFormatter = { - apply(format, zoneId, defaultLocale) + getFormatter(Some(format), zoneId) } def apply(zoneId: ZoneId): DateFormatter = { - if (SQLConf.get.legacyTimeParserEnabled) { - new LegacyDateFormatter("yyyy-MM-dd", defaultLocale) - } else { - new Iso8601DateFormatter("uuuu-MM-dd", zoneId, defaultLocale) - } + getFormatter(None, zoneId) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala index 89ff71919589..d396d8b4cb69 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala @@ -105,7 +105,7 @@ trait LegacyTimestampFormatter extends TimestampFormatter { } } -class LegacyFastDateFormatter( +class LegacyFastTimestampFormatter( pattern: String, zoneId: ZoneId, locale: Locale) extends LegacyTimestampFormatter { @@ -115,7 +115,7 @@ class LegacyFastDateFormatter( override def formatTimestamp(t: Timestamp): String = fdf.format(t) } -class LegacySimpleDateFormatter( +class LegacySimpleTimestampFormatter( pattern: String, zoneId: ZoneId, locale: Locale, @@ -140,29 +140,23 @@ object TimestampFormatter { val defaultLocale: Locale = Locale.US - def defaultPattern(): String = { - if (SQLConf.get.legacyTimeParserEnabled) { - "yyyy-MM-dd HH:mm:ss" - } else { - "uuuu-MM-dd HH:mm:ss" - } - } + def defaultPattern(): String = s"${DateFormatter.defaultPattern()} HH:mm:ss" private def getFormatter( format: Option[String], zoneId: ZoneId, - locale: Locale, - legacyFormat: LegacyDateFormat): TimestampFormatter = { + locale: Locale = defaultLocale, + legacyFormat: LegacyDateFormat = LENIENT_SIMPLE_DATE_FORMAT): TimestampFormatter = { val pattern = format.getOrElse(defaultPattern) if (SQLConf.get.legacyTimeParserEnabled) { legacyFormat match { case FAST_DATE_FORMAT => - new LegacyFastDateFormatter(pattern, zoneId, locale) + new LegacyFastTimestampFormatter(pattern, zoneId, locale) case SIMPLE_DATE_FORMAT => - new LegacySimpleDateFormatter(pattern, zoneId, locale, lenient = false) + new LegacySimpleTimestampFormatter(pattern, zoneId, locale, lenient = false) case LENIENT_SIMPLE_DATE_FORMAT => - new LegacySimpleDateFormatter(pattern, zoneId, locale, lenient = true) + new LegacySimpleTimestampFormatter(pattern, zoneId, locale, lenient = true) } } else { new Iso8601TimestampFormatter(pattern, zoneId, locale) @@ -178,11 +172,11 @@ object TimestampFormatter { } def apply(format: String, zoneId: ZoneId): TimestampFormatter = { - apply(format, zoneId, defaultLocale, LENIENT_SIMPLE_DATE_FORMAT) + getFormatter(Some(format), zoneId) } def apply(zoneId: ZoneId): TimestampFormatter = { - getFormatter(None, zoneId, defaultLocale, LENIENT_SIMPLE_DATE_FORMAT) + getFormatter(None, zoneId) } def getFractionFormatter(zoneId: ZoneId): TimestampFormatter = { From d8ddc20e228f810b5c089b5399cb940eeb17bb5b Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 11 Feb 2020 01:18:16 +0300 Subject: [PATCH 03/11] Add CSVLegacyTimeParserSuite --- .../catalyst/util/TimestampFormatter.scala | 109 ++++++++++++++---- .../org/apache/spark/sql/types/Decimal.scala | 2 +- .../resources/test-data/bad_after_good.csv | 2 +- .../resources/test-data/value-malformed.csv | 2 +- .../execution/datasources/csv/CSVSuite.scala | 23 ++-- 5 files changed, 107 insertions(+), 31 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala index d396d8b4cb69..2490b6f7c6d3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala @@ -17,20 +17,20 @@ package org.apache.spark.sql.catalyst.util -import java.sql.Timestamp -import java.text.{ParseException, SimpleDateFormat} +import java.text.{ParseException, ParsePosition, SimpleDateFormat} import java.time._ import java.time.format.DateTimeParseException import java.time.temporal.ChronoField.MICRO_OF_SECOND import java.time.temporal.TemporalQueries -import java.util.{Date, Locale, TimeZone} +import java.util.{Calendar, GregorianCalendar, Locale, TimeZone} import java.util.concurrent.TimeUnit.SECONDS import org.apache.commons.lang3.time.FastDateFormat -import org.apache.spark.sql.catalyst.util.DateTimeConstants.MICROS_PER_MILLIS -import org.apache.spark.sql.catalyst.util.DateTimeUtils.convertSpecialTimestamp +import org.apache.spark.sql.catalyst.util.DateTimeConstants._ +import org.apache.spark.sql.catalyst.util.DateTimeUtils.{ convertSpecialTimestamp, SQLTimestamp} import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.Decimal sealed trait TimestampFormatter extends Serializable { /** @@ -91,43 +91,112 @@ class FractionTimestampFormatter(zoneId: ZoneId) override protected lazy val formatter = DateTimeFormatterHelper.fractionFormatter } -trait LegacyTimestampFormatter extends TimestampFormatter { - def parseToDate(s: String): Date - def formatTimestamp(t: Timestamp): String +/** + * The custom sub-class of `GregorianCalendar` is needed to get access to + * protected `fields` immediately after parsing. We cannot use + * the `get()` method because it performs normalization of the fraction + * part. Accordingly, the `MILLISECOND` field doesn't contain original value. + * + * Also this class allows to set raw value to the `MILLISECOND` field + * directly before formatting. + */ +class MicrosCalendar(tz: TimeZone, digitsInFraction: Int) + extends GregorianCalendar(tz, Locale.US) { + // Converts parsed `MILLISECOND` field to seconds fraction in microsecond precision. + // For example if the fraction pattern is `SSSS` then `digitsInFraction` = 4, and + // if the `MILLISECOND` field was parsed to `1234`. + def getMicros(): SQLTimestamp = { + // Append 6 zeros to the field: 1234 -> 1234000000 + val d = fields(Calendar.MILLISECOND) * MICROS_PER_SECOND + // Take the first 6 digits from `d`: 1234000000 -> 123400 + // The rest contains exactly `digitsInFraction`: `0000` = 10 ^ digitsInFraction + // So, the result is `(1234 * 1000000) / (10 ^ digitsInFraction) + d / Decimal.POW_10(digitsInFraction) + } - override def parse(s: String): Long = { - parseToDate(s).getTime * MICROS_PER_MILLIS + // Converts the seconds fraction in microsecond precision to a value + // that can be correctly formatted according to the specified fraction pattern. + // The method performs operations opposite to `getMicros()`. + def setMicros(micros: Long): Unit = { + val d = micros * Decimal.POW_10(digitsInFraction) + fields(Calendar.MILLISECOND) = (d / MICROS_PER_SECOND).toInt } +} - override def format(us: Long): String = { - val timestamp = DateTimeUtils.toJavaTimestamp(us) - formatTimestamp(timestamp) +/** + * An instance of the class is aimed to re-use many times. It contains helper objects + * `cal` which is reused between `parse()` and `format` invokes. + */ +class LegacyFastDateFormat(fastDateFormat: FastDateFormat) { + private val cal = new MicrosCalendar( + fastDateFormat.getTimeZone, + fastDateFormat.getPattern.count(_ == 'S')) + + def parse(s: String): SQLTimestamp = { + cal.clear() // Clear the calendar because it can be re-used many times + if (!fastDateFormat.parse(s, new ParsePosition(0), cal)) { + throw new IllegalArgumentException(s"'$s' is an invalid timestamp") + } + val micros = cal.getMicros() + cal.set(Calendar.MILLISECOND, 0) + cal.getTimeInMillis * MICROS_PER_MILLIS + micros + } + + def format(timestamp: SQLTimestamp): String = { + cal.setTimeInMillis(Math.floorDiv(timestamp, MICROS_PER_SECOND) * MILLIS_PER_SECOND) + cal.setMicros(Math.floorMod(timestamp, MICROS_PER_SECOND)) + fastDateFormat.format(cal) } } class LegacyFastTimestampFormatter( pattern: String, zoneId: ZoneId, - locale: Locale) extends LegacyTimestampFormatter { - @transient private lazy val fdf = + locale: Locale) extends TimestampFormatter { + + @transient private lazy val fastDateFormat = FastDateFormat.getInstance(pattern, TimeZone.getTimeZone(zoneId), locale) - override def parseToDate(s: String): Date = fdf.parse(s) - override def formatTimestamp(t: Timestamp): String = fdf.format(t) + @transient private lazy val cal = new MicrosCalendar( + fastDateFormat.getTimeZone, + fastDateFormat.getPattern.count(_ == 'S')) + + def parse(s: String): SQLTimestamp = { + cal.clear() // Clear the calendar because it can be re-used many times + if (!fastDateFormat.parse(s, new ParsePosition(0), cal)) { + throw new IllegalArgumentException(s"'$s' is an invalid timestamp") + } + val micros = cal.getMicros() + cal.set(Calendar.MILLISECOND, 0) + cal.getTimeInMillis * MICROS_PER_MILLIS + micros + } + + def format(timestamp: SQLTimestamp): String = { + cal.setTimeInMillis(Math.floorDiv(timestamp, MICROS_PER_SECOND) * MILLIS_PER_SECOND) + cal.setMicros(Math.floorMod(timestamp, MICROS_PER_SECOND)) + fastDateFormat.format(cal) + } } class LegacySimpleTimestampFormatter( pattern: String, zoneId: ZoneId, locale: Locale, - lenient: Boolean = true) extends LegacyTimestampFormatter { + lenient: Boolean = true) extends TimestampFormatter { @transient private lazy val sdf = { val formatter = new SimpleDateFormat(pattern, locale) formatter.setTimeZone(TimeZone.getTimeZone(zoneId)) formatter.setLenient(lenient) formatter } - override def parseToDate(s: String): Date = sdf.parse(s) - override def formatTimestamp(t: Timestamp): String = sdf.format(t) + + override def parse(s: String): Long = { + sdf.parse(s).getTime * MICROS_PER_MILLIS + } + + override def format(us: Long): String = { + val timestamp = DateTimeUtils.toJavaTimestamp(us) + sdf.format(timestamp) + } } object LegacyDateFormats extends Enumeration { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala index 9ce64b09f787..f32e48e1cc12 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala @@ -541,7 +541,7 @@ object Decimal { /** Maximum number of decimal digits a Long can represent */ val MAX_LONG_DIGITS = 18 - private val POW_10 = Array.tabulate[Long](MAX_LONG_DIGITS + 1)(i => math.pow(10, i).toLong) + val POW_10 = Array.tabulate[Long](MAX_LONG_DIGITS + 1)(i => math.pow(10, i).toLong) private val BIG_DEC_ZERO = BigDecimal(0) diff --git a/sql/core/src/test/resources/test-data/bad_after_good.csv b/sql/core/src/test/resources/test-data/bad_after_good.csv index 4621a7d23714..1a7c2651a11a 100644 --- a/sql/core/src/test/resources/test-data/bad_after_good.csv +++ b/sql/core/src/test/resources/test-data/bad_after_good.csv @@ -1,2 +1,2 @@ "good record",1999-08-01 -"bad record",1999-088-01 +"bad record",1999-088_01 diff --git a/sql/core/src/test/resources/test-data/value-malformed.csv b/sql/core/src/test/resources/test-data/value-malformed.csv index 8945ed73d2e8..6e6f08fca6df 100644 --- a/sql/core/src/test/resources/test-data/value-malformed.csv +++ b/sql/core/src/test/resources/test-data/value-malformed.csv @@ -1,2 +1,2 @@ -0,2013-111-11 12:13:14 +0,2013-111_11 12:13:14 1,1983-08-04 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 97dfbbdb7fd2..b1105b4a63bb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1182,7 +1182,7 @@ abstract class CSVSuite extends QueryTest with SharedSparkSession with TestCsvDa .schema(schemaWithCorrField1) .csv(testFile(valueMalformedFile)) checkAnswer(df2, - Row(0, null, "0,2013-111-11 12:13:14") :: + Row(0, null, "0,2013-111_11 12:13:14") :: Row(1, java.sql.Date.valueOf("1983-08-04"), null) :: Nil) @@ -1199,7 +1199,7 @@ abstract class CSVSuite extends QueryTest with SharedSparkSession with TestCsvDa .schema(schemaWithCorrField2) .csv(testFile(valueMalformedFile)) checkAnswer(df3, - Row(0, "0,2013-111-11 12:13:14", null) :: + Row(0, "0,2013-111_11 12:13:14", null) :: Row(1, null, java.sql.Date.valueOf("1983-08-04")) :: Nil) @@ -1435,7 +1435,7 @@ abstract class CSVSuite extends QueryTest with SharedSparkSession with TestCsvDa assert(df.filter($"_corrupt_record".isNull).count() == 1) checkAnswer( df.select(columnNameOfCorruptRecord), - Row("0,2013-111-11 12:13:14") :: Row(null) :: Nil + Row("0,2013-111_11 12:13:14") :: Row(null) :: Nil ) } @@ -2093,7 +2093,7 @@ abstract class CSVSuite extends QueryTest with SharedSparkSession with TestCsvDa Seq("csv", "").foreach { reader => withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> reader) { withTempPath { path => - val df = Seq(("0", "2013-111-11")).toDF("a", "b") + val df = Seq(("0", "2013-111_11")).toDF("a", "b") df.write .option("header", "true") .csv(path.getAbsolutePath) @@ -2109,7 +2109,7 @@ abstract class CSVSuite extends QueryTest with SharedSparkSession with TestCsvDa .option("columnNameOfCorruptRecord", columnNameOfCorruptRecord) .schema(schemaWithCorrField) .csv(path.getAbsoluteFile.toString) - checkAnswer(readDF, Row(0, null, "0,2013-111-11") :: Nil) + checkAnswer(readDF, Row(0, null, "0,2013-111_11") :: Nil) } } } @@ -2216,7 +2216,7 @@ abstract class CSVSuite extends QueryTest with SharedSparkSession with TestCsvDa val readback = spark.read .option("mode", mode) .option("header", true) - .option("timestampFormat", "uuuu-MM-dd HH:mm:ss") + .option("timestampFormat", "yyyy-MM-dd HH:mm:ss") .option("multiLine", multiLine) .schema("c0 string, c1 integer, c2 timestamp") .csv(path.getAbsolutePath) @@ -2235,7 +2235,7 @@ abstract class CSVSuite extends QueryTest with SharedSparkSession with TestCsvDa } test("filters push down - malformed input in PERMISSIVE mode") { - val invalidTs = "2019-123-14 20:35:30" + val invalidTs = "2019-123_14 20:35:30" val invalidRow = s"0,$invalidTs,999" val validTs = "2019-12-14 20:35:30" Seq(true, false).foreach { filterPushdown => @@ -2252,7 +2252,7 @@ abstract class CSVSuite extends QueryTest with SharedSparkSession with TestCsvDa .option("mode", "PERMISSIVE") .option("columnNameOfCorruptRecord", "c3") .option("header", true) - .option("timestampFormat", "uuuu-MM-dd HH:mm:ss") + .option("timestampFormat", "yyyy-MM-dd HH:mm:ss") .schema("c0 integer, c1 timestamp, c2 integer, c3 string") .csv(path.getAbsolutePath) .where(condition) @@ -2309,3 +2309,10 @@ class CSVv2Suite extends CSVSuite { .sparkConf .set(SQLConf.USE_V1_SOURCE_LIST, "") } + +class CSVLegacyTimeParserSuite extends CSVSuite { + override protected def sparkConf: SparkConf = + super + .sparkConf + .set(SQLConf.LEGACY_TIME_PARSER_ENABLED, true) +} From d993d1a6ad93b991035d77ac4d6f38bb86b194cf Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 11 Feb 2020 01:20:05 +0300 Subject: [PATCH 04/11] Add JsonLegacyTimeParserSuite --- .../spark/sql/execution/datasources/json/JsonSuite.scala | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index b20da2266b0f..8dcb4f631c7f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2572,3 +2572,10 @@ class JsonV2Suite extends JsonSuite { .sparkConf .set(SQLConf.USE_V1_SOURCE_LIST, "") } + +class JsonLegacyTimeParserSuite extends JsonSuite { + override protected def sparkConf: SparkConf = + super + .sparkConf + .set(SQLConf.LEGACY_TIME_PARSER_ENABLED, true) +} \ No newline at end of file From 566170a6d7581f013142b19fcfdd2d9ea4ebaf9f Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 11 Feb 2020 01:25:05 +0300 Subject: [PATCH 05/11] Bug fix: set correct locale --- .../org/apache/spark/sql/catalyst/util/DateFormatter.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala index 09e7fe569230..c35f26c73be9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala @@ -114,7 +114,7 @@ object DateFormatter { zoneId: ZoneId, locale: Locale, legacyFormat: LegacyDateFormat): DateFormatter = { - getFormatter(Some(format), zoneId, defaultLocale, legacyFormat) + getFormatter(Some(format), zoneId, locale, legacyFormat) } def apply(format: String, zoneId: ZoneId): DateFormatter = { From ff52d499a6330962cf6a979a4ad5dfb5cf58229b Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 11 Feb 2020 09:32:49 +0300 Subject: [PATCH 06/11] Add override --- .../org/apache/spark/sql/catalyst/util/DateFormatter.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala index c35f26c73be9..20d2dd94124a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala @@ -78,7 +78,7 @@ class LegacySimpleDateFormatter(pattern: String, locale: Locale) extends LegacyD @transient private lazy val sdf = new SimpleDateFormat(pattern, locale) override def parseToDate(s: String): Date = sdf.parse(s) - def formatDate(d: Date): String = sdf.format(d) + override def formatDate(d: Date): String = sdf.format(d) } object DateFormatter { From b14ee41551f1cb4f61addecf560917b265066da5 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 11 Feb 2020 09:33:42 +0300 Subject: [PATCH 07/11] Add empty line at the end of JsonSuite --- .../apache/spark/sql/execution/datasources/json/JsonSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 8dcb4f631c7f..7abe818a29d9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2578,4 +2578,4 @@ class JsonLegacyTimeParserSuite extends JsonSuite { super .sparkConf .set(SQLConf.LEGACY_TIME_PARSER_ENABLED, true) -} \ No newline at end of file +} From ddf127dd148c445d7f8910af1c3aef9f3976d075 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 11 Feb 2020 10:18:38 +0300 Subject: [PATCH 08/11] Remove withStrongLegacy --- .../expressions/datetimeExpressions.scala | 46 ++++++++++++------- .../catalyst/util/TimestampFormatter.scala | 8 ++-- 2 files changed, 33 insertions(+), 21 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala index 24662562fcec..6352a5e43718 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala @@ -23,14 +23,13 @@ import java.time.temporal.IsoFields import java.util.{Locale, TimeZone} import scala.util.control.NonFatal - import org.apache.commons.text.StringEscapeUtils - import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.expressions.codegen.Block._ -import org.apache.spark.sql.catalyst.util.{DateTimeUtils, TimestampFormatter} +import org.apache.spark.sql.catalyst.util.LegacyDateFormats.SIMPLE_DATE_FORMAT +import org.apache.spark.sql.catalyst.util.{DateTimeUtils, LegacyDateFormats, TimestampFormatter} import org.apache.spark.sql.catalyst.util.DateTimeConstants._ import org.apache.spark.sql.catalyst.util.DateTimeUtils._ import org.apache.spark.sql.types._ @@ -622,13 +621,15 @@ case class DateFormatClass(left: Expression, right: Expression, timeZoneId: Opti @transient private lazy val formatter: Option[TimestampFormatter] = { if (right.foldable) { - Option(right.eval()).map(format => TimestampFormatter(format.toString, zoneId)) + Option(right.eval()).map { format => + TimestampFormatter(format.toString, zoneId, legacyFormat = SIMPLE_DATE_FORMAT) + } } else None } override protected def nullSafeEval(timestamp: Any, format: Any): Any = { val tf = if (formatter.isEmpty) { - TimestampFormatter.withStrongLegacy(format.toString, zoneId) + TimestampFormatter(format.toString, zoneId, legacyFormat = SIMPLE_DATE_FORMAT) } else { formatter.get } @@ -643,10 +644,14 @@ case class DateFormatClass(left: Expression, right: Expression, timeZoneId: Opti }) }.getOrElse { val tf = TimestampFormatter.getClass.getName.stripSuffix("$") + val ldf = LegacyDateFormats.getClass.getName.stripSuffix("$") val zid = ctx.addReferenceObj("zoneId", zoneId, classOf[ZoneId].getName) defineCodeGen(ctx, ev, (timestamp, format) => { - s"""UTF8String.fromString($tf$$.MODULE$$.withStrongLegacy($format.toString(), $zid) - .format($timestamp))""" + s"""|UTF8String.fromString($tf$$.MODULE$$.apply( + | $format.toString(), + | $zid, + | $ldf$$.MODULE$$.SIMPLE_DATE_FORMAT()) + |.format($timestamp))""".stripMargin }) } } @@ -758,7 +763,7 @@ abstract class ToTimestamp private lazy val constFormat: UTF8String = right.eval().asInstanceOf[UTF8String] private lazy val formatter: TimestampFormatter = try { - TimestampFormatter.withStrongLegacy(constFormat.toString, zoneId) + TimestampFormatter(constFormat.toString, zoneId, legacyFormat = SIMPLE_DATE_FORMAT) } catch { case NonFatal(_) => null } @@ -791,8 +796,8 @@ abstract class ToTimestamp } else { val formatString = f.asInstanceOf[UTF8String].toString try { - TimestampFormatter.withStrongLegacy(formatString, zoneId).parse( - t.asInstanceOf[UTF8String].toString) / downScaleFactor + TimestampFormatter(formatString, zoneId, legacyFormat = SIMPLE_DATE_FORMAT) + .parse(t.asInstanceOf[UTF8String].toString) / downScaleFactor } catch { case NonFatal(_) => null } @@ -832,11 +837,15 @@ abstract class ToTimestamp case StringType => val zid = ctx.addReferenceObj("zoneId", zoneId, classOf[ZoneId].getName) val tf = TimestampFormatter.getClass.getName.stripSuffix("$") + val ldf = LegacyDateFormats.getClass.getName.stripSuffix("$") nullSafeCodeGen(ctx, ev, (string, format) => { s""" try { - ${ev.value} = $tf$$.MODULE$$.withStrongLegacy($format.toString(), $zid) - .parse($string.toString()) / $downScaleFactor; + ${ev.value} = $tf$$.MODULE$$.apply( + $format.toString(), + $zid, + $ldf$$.MODULE$$.SIMPLE_DATE_FORMAT()) + .parse($string.toString()) / $downScaleFactor; } catch (java.lang.IllegalArgumentException e) { ${ev.isNull} = true; } catch (java.text.ParseException e) { @@ -921,7 +930,7 @@ case class FromUnixTime(sec: Expression, format: Expression, timeZoneId: Option[ private lazy val constFormat: UTF8String = right.eval().asInstanceOf[UTF8String] private lazy val formatter: TimestampFormatter = try { - TimestampFormatter.withStrongLegacy(constFormat.toString, zoneId) + TimestampFormatter(constFormat.toString, zoneId, legacyFormat = SIMPLE_DATE_FORMAT) } catch { case NonFatal(_) => null } @@ -947,8 +956,9 @@ case class FromUnixTime(sec: Expression, format: Expression, timeZoneId: Option[ null } else { try { - UTF8String.fromString(TimestampFormatter.withStrongLegacy(f.toString, zoneId) - .format(time.asInstanceOf[Long] * MICROS_PER_SECOND)) + UTF8String.fromString( + TimestampFormatter(f.toString, zoneId, legacyFormat = SIMPLE_DATE_FORMAT) + .format(time.asInstanceOf[Long] * MICROS_PER_SECOND)) } catch { case NonFatal(_) => null } @@ -980,11 +990,13 @@ case class FromUnixTime(sec: Expression, format: Expression, timeZoneId: Option[ } else { val zid = ctx.addReferenceObj("zoneId", zoneId, classOf[ZoneId].getName) val tf = TimestampFormatter.getClass.getName.stripSuffix("$") + val ldf = LegacyDateFormats.getClass.getName.stripSuffix("$") nullSafeCodeGen(ctx, ev, (seconds, f) => { s""" try { - ${ev.value} = UTF8String.fromString($tf$$.MODULE$$.withStrongLegacy($f.toString(), $zid). - format($seconds * 1000000L)); + ${ev.value} = UTF8String.fromString( + $tf$$.MODULE$$.apply($f.toString(), $zid, $ldf$$.MODULE$$.SIMPLE_DATE_FORMAT()) + .format($seconds * 1000000L)); } catch (java.lang.IllegalArgumentException e) { ${ev.isNull} = true; }""" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala index 2490b6f7c6d3..23a9de675494 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala @@ -240,6 +240,10 @@ object TimestampFormatter { getFormatter(Some(format), zoneId, locale, legacyFormat) } + def apply(format: String, zoneId: ZoneId, legacyFormat: LegacyDateFormat): TimestampFormatter = { + getFormatter(Some(format), zoneId, defaultLocale, legacyFormat) + } + def apply(format: String, zoneId: ZoneId): TimestampFormatter = { getFormatter(Some(format), zoneId) } @@ -251,8 +255,4 @@ object TimestampFormatter { def getFractionFormatter(zoneId: ZoneId): TimestampFormatter = { new FractionTimestampFormatter(zoneId) } - - def withStrongLegacy(format: String, zoneId: ZoneId): TimestampFormatter = { - apply(format, zoneId, defaultLocale, SIMPLE_DATE_FORMAT) - } } From ca739255a397c04b73c40c5a31638f096ce8c89c Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 11 Feb 2020 10:22:03 +0300 Subject: [PATCH 09/11] Add override --- .../org/apache/spark/sql/catalyst/util/DateFormatter.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala index 20d2dd94124a..2cf82d1cfa17 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala @@ -71,7 +71,7 @@ class LegacyFastDateFormatter(pattern: String, locale: Locale) extends LegacyDat @transient private lazy val fdf = FastDateFormat.getInstance(pattern, locale) override def parseToDate(s: String): Date = fdf.parse(s) - def formatDate(d: Date): String = fdf.format(d) + override def formatDate(d: Date): String = fdf.format(d) } class LegacySimpleDateFormatter(pattern: String, locale: Locale) extends LegacyDateFormatter { From 11394cd62e4fb54e0deddc69487a29ae28a18158 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 11 Feb 2020 10:29:05 +0300 Subject: [PATCH 10/11] Remove unused class LegacyFastDateFormat --- .../catalyst/util/TimestampFormatter.scala | 26 ------------------- 1 file changed, 26 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala index 23a9de675494..4893a7ec91cb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala @@ -123,32 +123,6 @@ class MicrosCalendar(tz: TimeZone, digitsInFraction: Int) } } -/** - * An instance of the class is aimed to re-use many times. It contains helper objects - * `cal` which is reused between `parse()` and `format` invokes. - */ -class LegacyFastDateFormat(fastDateFormat: FastDateFormat) { - private val cal = new MicrosCalendar( - fastDateFormat.getTimeZone, - fastDateFormat.getPattern.count(_ == 'S')) - - def parse(s: String): SQLTimestamp = { - cal.clear() // Clear the calendar because it can be re-used many times - if (!fastDateFormat.parse(s, new ParsePosition(0), cal)) { - throw new IllegalArgumentException(s"'$s' is an invalid timestamp") - } - val micros = cal.getMicros() - cal.set(Calendar.MILLISECOND, 0) - cal.getTimeInMillis * MICROS_PER_MILLIS + micros - } - - def format(timestamp: SQLTimestamp): String = { - cal.setTimeInMillis(Math.floorDiv(timestamp, MICROS_PER_SECOND) * MILLIS_PER_SECOND) - cal.setMicros(Math.floorMod(timestamp, MICROS_PER_SECOND)) - fastDateFormat.format(cal) - } -} - class LegacyFastTimestampFormatter( pattern: String, zoneId: ZoneId, From 93f3ae1c32f893cc3cc22ae075753098440e76b5 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 11 Feb 2020 10:33:27 +0300 Subject: [PATCH 11/11] Make Scala code style checker happy --- .../spark/sql/catalyst/expressions/datetimeExpressions.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala index 6352a5e43718..1f4c8c041f8b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala @@ -23,15 +23,17 @@ import java.time.temporal.IsoFields import java.util.{Locale, TimeZone} import scala.util.control.NonFatal + import org.apache.commons.text.StringEscapeUtils + import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.expressions.codegen.Block._ -import org.apache.spark.sql.catalyst.util.LegacyDateFormats.SIMPLE_DATE_FORMAT import org.apache.spark.sql.catalyst.util.{DateTimeUtils, LegacyDateFormats, TimestampFormatter} import org.apache.spark.sql.catalyst.util.DateTimeConstants._ import org.apache.spark.sql.catalyst.util.DateTimeUtils._ +import org.apache.spark.sql.catalyst.util.LegacyDateFormats.SIMPLE_DATE_FORMAT import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}