Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.util.control.Exception.allCatch
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis.TypeCoercion
import org.apache.spark.sql.catalyst.expressions.ExprUtils
import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
import org.apache.spark.sql.catalyst.util.TimestampFormatter
import org.apache.spark.sql.types._

Expand All @@ -32,7 +33,8 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable {
private val timestampParser = TimestampFormatter(
options.timestampFormat,
options.zoneId,
options.locale)
options.locale,
legacyFormat = FAST_DATE_FORMAT)

private val decimalParser = if (options.locale == Locale.US) {
// Special handling the default locale for backward compatibility
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import com.univocity.parsers.csv.CsvWriter

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter}
import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
import org.apache.spark.sql.types._

class UnivocityGenerator(
Expand All @@ -44,7 +45,8 @@ class UnivocityGenerator(
private val timestampFormatter = TimestampFormatter(
options.timestampFormat,
options.zoneId,
options.locale)
options.locale,
legacyFormat = FAST_DATE_FORMAT)
private val dateFormatter = DateFormatter(
options.dateFormat,
options.zoneId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{ExprUtils, GenericInternalRow}
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
Expand Down Expand Up @@ -86,7 +87,8 @@ class UnivocityParser(
private val timestampFormatter = TimestampFormatter(
options.timestampFormat,
options.zoneId,
options.locale)
options.locale,
legacyFormat = FAST_DATE_FORMAT)
private val dateFormatter = DateFormatter(
options.dateFormat,
options.zoneId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,7 @@ case class DateFormatClass(left: Expression, right: Expression, timeZoneId: Opti

override protected def nullSafeEval(timestamp: Any, format: Any): Any = {
val tf = if (formatter.isEmpty) {
TimestampFormatter(format.toString, zoneId)
TimestampFormatter.withStrongLegacy(format.toString, zoneId)
} else {
formatter.get
}
Expand All @@ -645,7 +645,7 @@ case class DateFormatClass(left: Expression, right: Expression, timeZoneId: Opti
val tf = TimestampFormatter.getClass.getName.stripSuffix("$")
val zid = ctx.addReferenceObj("zoneId", zoneId, classOf[ZoneId].getName)
defineCodeGen(ctx, ev, (timestamp, format) => {
s"""UTF8String.fromString($tf$$.MODULE$$.apply($format.toString(), $zid)
s"""UTF8String.fromString($tf$$.MODULE$$.withStrongLegacy($format.toString(), $zid)
.format($timestamp))"""
})
}
Expand Down Expand Up @@ -688,7 +688,7 @@ case class ToUnixTimestamp(
copy(timeZoneId = Option(timeZoneId))

def this(time: Expression) = {
this(time, Literal("uuuu-MM-dd HH:mm:ss"))
this(time, Literal(TimestampFormatter.defaultPattern))
}

override def prettyName: String = "to_unix_timestamp"
Expand Down Expand Up @@ -732,7 +732,7 @@ case class UnixTimestamp(timeExp: Expression, format: Expression, timeZoneId: Op
copy(timeZoneId = Option(timeZoneId))

def this(time: Expression) = {
this(time, Literal("uuuu-MM-dd HH:mm:ss"))
this(time, Literal(TimestampFormatter.defaultPattern))
}

def this() = {
Expand All @@ -758,7 +758,7 @@ abstract class ToTimestamp
private lazy val constFormat: UTF8String = right.eval().asInstanceOf[UTF8String]
private lazy val formatter: TimestampFormatter =
try {
TimestampFormatter(constFormat.toString, zoneId)
TimestampFormatter.withStrongLegacy(constFormat.toString, zoneId)
} catch {
case NonFatal(_) => null
}
Expand Down Expand Up @@ -791,7 +791,7 @@ abstract class ToTimestamp
} else {
val formatString = f.asInstanceOf[UTF8String].toString
try {
TimestampFormatter(formatString, zoneId).parse(
TimestampFormatter.withStrongLegacy(formatString, zoneId).parse(
t.asInstanceOf[UTF8String].toString) / downScaleFactor
} catch {
case NonFatal(_) => null
Expand Down Expand Up @@ -831,12 +831,11 @@ abstract class ToTimestamp
}
case StringType =>
val zid = ctx.addReferenceObj("zoneId", zoneId, classOf[ZoneId].getName)
val locale = ctx.addReferenceObj("locale", Locale.US)
val tf = TimestampFormatter.getClass.getName.stripSuffix("$")
nullSafeCodeGen(ctx, ev, (string, format) => {
s"""
try {
${ev.value} = $tf$$.MODULE$$.apply($format.toString(), $zid, $locale)
${ev.value} = $tf$$.MODULE$$.withStrongLegacy($format.toString(), $zid)
.parse($string.toString()) / $downScaleFactor;
} catch (java.lang.IllegalArgumentException e) {
${ev.isNull} = true;
Expand Down Expand Up @@ -908,7 +907,7 @@ case class FromUnixTime(sec: Expression, format: Expression, timeZoneId: Option[
override def prettyName: String = "from_unixtime"

def this(unix: Expression) = {
this(unix, Literal("uuuu-MM-dd HH:mm:ss"))
this(unix, Literal(TimestampFormatter.defaultPattern))
}

override def dataType: DataType = StringType
Expand All @@ -922,7 +921,7 @@ case class FromUnixTime(sec: Expression, format: Expression, timeZoneId: Option[
private lazy val constFormat: UTF8String = right.eval().asInstanceOf[UTF8String]
private lazy val formatter: TimestampFormatter =
try {
TimestampFormatter(constFormat.toString, zoneId)
TimestampFormatter.withStrongLegacy(constFormat.toString, zoneId)
} catch {
case NonFatal(_) => null
}
Expand All @@ -948,7 +947,7 @@ case class FromUnixTime(sec: Expression, format: Expression, timeZoneId: Option[
null
} else {
try {
UTF8String.fromString(TimestampFormatter(f.toString, zoneId)
UTF8String.fromString(TimestampFormatter.withStrongLegacy(f.toString, zoneId)
.format(time.asInstanceOf[Long] * MICROS_PER_SECOND))
} catch {
case NonFatal(_) => null
Expand Down Expand Up @@ -980,12 +979,11 @@ case class FromUnixTime(sec: Expression, format: Expression, timeZoneId: Option[
}
} else {
val zid = ctx.addReferenceObj("zoneId", zoneId, classOf[ZoneId].getName)
val locale = ctx.addReferenceObj("locale", Locale.US)
val tf = TimestampFormatter.getClass.getName.stripSuffix("$")
nullSafeCodeGen(ctx, ev, (seconds, f) => {
s"""
try {
${ev.value} = UTF8String.fromString($tf$$.MODULE$$.apply($f.toString(), $zid, $locale).
${ev.value} = UTF8String.fromString($tf$$.MODULE$$.withStrongLegacy($f.toString(), $zid).
format($seconds * 1000000L));
} catch (java.lang.IllegalArgumentException e) {
${ev.isNull} = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import com.fasterxml.jackson.core._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
import org.apache.spark.sql.types._

/**
Expand Down Expand Up @@ -80,7 +81,8 @@ private[sql] class JacksonGenerator(
private val timestampFormatter = TimestampFormatter(
options.timestampFormat,
options.zoneId,
options.locale)
options.locale,
legacyFormat = FAST_DATE_FORMAT)
private val dateFormatter = DateFormatter(
options.dateFormat,
options.zoneId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
Expand Down Expand Up @@ -58,7 +59,8 @@ class JacksonParser(
private val timestampFormatter = TimestampFormatter(
options.timestampFormat,
options.zoneId,
options.locale)
options.locale,
legacyFormat = FAST_DATE_FORMAT)
private val dateFormatter = DateFormatter(
options.dateFormat,
options.zoneId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.analysis.TypeCoercion
import org.apache.spark.sql.catalyst.expressions.ExprUtils
import org.apache.spark.sql.catalyst.json.JacksonUtils.nextUntil
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
Expand All @@ -40,7 +41,8 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable {
private val timestampFormatter = TimestampFormatter(
options.timestampFormat,
options.zoneId,
options.locale)
options.locale,
legacyFormat = FAST_DATE_FORMAT)

/**
* Infer the type of a collection of json records in three stages:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@

package org.apache.spark.sql.catalyst.util

import java.text.ParseException
import java.sql.Timestamp
import java.text.{ParseException, SimpleDateFormat}
import java.time._
import java.time.format.DateTimeParseException
import java.time.temporal.ChronoField.MICRO_OF_SECOND
import java.time.temporal.TemporalQueries
import java.util.{Locale, TimeZone}
import java.util.{Date, Locale, TimeZone}
import java.util.concurrent.TimeUnit.SECONDS

import org.apache.commons.lang3.time.FastDateFormat
Expand Down Expand Up @@ -90,47 +91,105 @@ class FractionTimestampFormatter(zoneId: ZoneId)
override protected lazy val formatter = DateTimeFormatterHelper.fractionFormatter
}

class LegacyTimestampFormatter(
pattern: String,
zoneId: ZoneId,
locale: Locale) extends TimestampFormatter {
trait LegacyTimestampFormatter extends TimestampFormatter {
def parseToDate(s: String): Date
def formatTimestamp(t: Timestamp): String

@transient private lazy val format =
FastDateFormat.getInstance(pattern, TimeZone.getTimeZone(zoneId), locale)
override def parse(s: String): Long = {
parseToDate(s).getTime * MICROS_PER_MILLIS
}

protected def toMillis(s: String): Long = format.parse(s).getTime
override def format(us: Long): String = {
val timestamp = DateTimeUtils.toJavaTimestamp(us)
formatTimestamp(timestamp)
}
}

override def parse(s: String): Long = toMillis(s) * MICROS_PER_MILLIS
class LegacyFastDateFormatter(
pattern: String,
zoneId: ZoneId,
locale: Locale) extends LegacyTimestampFormatter {
@transient private lazy val fdf =
FastDateFormat.getInstance(pattern, TimeZone.getTimeZone(zoneId), locale)
override def parseToDate(s: String): Date = fdf.parse(s)
override def formatTimestamp(t: Timestamp): String = fdf.format(t)
}

override def format(us: Long): String = {
format.format(DateTimeUtils.toJavaTimestamp(us))
class LegacySimpleDateFormatter(
pattern: String,
zoneId: ZoneId,
locale: Locale,
lenient: Boolean = true) extends LegacyTimestampFormatter {
@transient private lazy val sdf = {
val formatter = new SimpleDateFormat(pattern, locale)
formatter.setTimeZone(TimeZone.getTimeZone(zoneId))
formatter.setLenient(lenient)
formatter
}
override def parseToDate(s: String): Date = sdf.parse(s)
override def formatTimestamp(t: Timestamp): String = sdf.format(t)
}

object LegacyDateFormats extends Enumeration {
type LegacyDateFormat = Value
val FAST_DATE_FORMAT, SIMPLE_DATE_FORMAT, LENIENT_SIMPLE_DATE_FORMAT = Value
}

object TimestampFormatter {
import LegacyDateFormats._

val defaultLocale: Locale = Locale.US

def apply(format: String, zoneId: ZoneId, locale: Locale): TimestampFormatter = {
def defaultPattern(): String = {
if (SQLConf.get.legacyTimeParserEnabled) {
new LegacyTimestampFormatter(format, zoneId, locale)
"yyyy-MM-dd HH:mm:ss"
} else {
new Iso8601TimestampFormatter(format, zoneId, locale)
"uuuu-MM-dd HH:mm:ss"
}
}

def apply(format: String, zoneId: ZoneId): TimestampFormatter = {
apply(format, zoneId, defaultLocale)
}
private def getFormatter(
format: Option[String],
zoneId: ZoneId,
locale: Locale,
legacyFormat: LegacyDateFormat): TimestampFormatter = {

def apply(zoneId: ZoneId): TimestampFormatter = {
val pattern = format.getOrElse(defaultPattern)
if (SQLConf.get.legacyTimeParserEnabled) {
new LegacyTimestampFormatter("yyyy-MM-dd HH:mm:ss", zoneId, defaultLocale)
legacyFormat match {
case FAST_DATE_FORMAT =>
new LegacyFastDateFormatter(pattern, zoneId, locale)
case SIMPLE_DATE_FORMAT =>
new LegacySimpleDateFormatter(pattern, zoneId, locale, lenient = false)
case LENIENT_SIMPLE_DATE_FORMAT =>
new LegacySimpleDateFormatter(pattern, zoneId, locale, lenient = true)
}
} else {
new Iso8601TimestampFormatter("uuuu-MM-dd HH:mm:ss", zoneId, defaultLocale)
new Iso8601TimestampFormatter(pattern, zoneId, locale)
}
}

def apply(
format: String,
zoneId: ZoneId,
locale: Locale,
legacyFormat: LegacyDateFormat): TimestampFormatter = {
getFormatter(Some(format), zoneId, locale, legacyFormat)
}

def apply(format: String, zoneId: ZoneId): TimestampFormatter = {
apply(format, zoneId, defaultLocale, LENIENT_SIMPLE_DATE_FORMAT)
}

def apply(zoneId: ZoneId): TimestampFormatter = {
getFormatter(None, zoneId, defaultLocale, LENIENT_SIMPLE_DATE_FORMAT)
}

def getFractionFormatter(zoneId: ZoneId): TimestampFormatter = {
new FractionTimestampFormatter(zoneId)
}

def withStrongLegacy(format: String, zoneId: ZoneId): TimestampFormatter = {
apply(format, zoneId, defaultLocale, SIMPLE_DATE_FORMAT)
}
}
Loading