Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.catalyst.catalog

import java.net.URI
import java.time.ZoneOffset
import java.util.Date

import scala.collection.mutable
Expand Down Expand Up @@ -477,7 +478,7 @@ object CatalogColumnStat extends Logging {
val VERSION = 2

private def getTimestampFormatter(): TimestampFormatter = {
TimestampFormatter(format = "yyyy-MM-dd HH:mm:ss.SSSSSS", timeZone = DateTimeUtils.TimeZoneUTC)
TimestampFormatter(format = "yyyy-MM-dd HH:mm:ss.SSSSSS", zoneId = ZoneOffset.UTC)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable {

private val timestampParser = TimestampFormatter(
options.timestampFormat,
options.timeZone,
options.zoneId,
options.locale)

private val decimalParser = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
package org.apache.spark.sql.catalyst.csv

import java.nio.charset.StandardCharsets
import java.util.{Locale, TimeZone}
import java.time.ZoneId
import java.util.Locale

import com.univocity.parsers.csv.{CsvParserSettings, CsvWriterSettings, UnescapedQuoteHandling}

Expand Down Expand Up @@ -139,7 +140,7 @@ class CSVOptions(
name.map(CompressionCodecs.getCodecClassName)
}

val timeZone: TimeZone = DateTimeUtils.getTimeZone(
val zoneId: ZoneId = DateTimeUtils.getZoneId(
parameters.getOrElse(DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId))

// A language tag in IETF BCP 47 format
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class UnivocityGenerator(

private val timestampFormatter = TimestampFormatter(
options.timestampFormat,
options.timeZone,
options.zoneId,
options.locale)
private val dateFormatter = DateFormatter(options.dateFormat, options.locale)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class UnivocityParser(

private val timestampFormatter = TimestampFormatter(
options.timestampFormat,
options.timeZone,
options.zoneId,
options.locale)
private val dateFormatter = DateFormatter(options.dateFormat, options.locale)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String
@inline private[this] def buildCast[T](a: Any, func: T => Any): Any = func(a.asInstanceOf[T])

private lazy val dateFormatter = DateFormatter()
private lazy val timestampFormatter = TimestampFormatter.getFractionFormatter(timeZone)
private lazy val timestampFormatter = TimestampFormatter.getFractionFormatter(zoneId)

// UDFToString
private[this] def castToString(from: DataType): Any => Any = from match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.catalyst.expressions

import java.sql.Timestamp
import java.time.{Instant, LocalDate}
import java.time.{Instant, LocalDate, ZoneId}
import java.time.temporal.IsoFields
import java.util.{Locale, TimeZone}

Expand Down Expand Up @@ -49,6 +49,7 @@ trait TimeZoneAwareExpression extends Expression {
def withTimeZone(timeZoneId: String): TimeZoneAwareExpression

@transient lazy val timeZone: TimeZone = DateTimeUtils.getTimeZone(timeZoneId.get)
@transient lazy val zoneId: ZoneId = DateTimeUtils.getZoneId(timeZoneId.get)
}

/**
Expand Down Expand Up @@ -532,16 +533,16 @@ case class DateFormatClass(left: Expression, right: Expression, timeZoneId: Opti
copy(timeZoneId = Option(timeZoneId))

override protected def nullSafeEval(timestamp: Any, format: Any): Any = {
val df = TimestampFormatter(format.toString, timeZone)
val df = TimestampFormatter(format.toString, zoneId)
UTF8String.fromString(df.format(timestamp.asInstanceOf[Long]))
}

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val tf = TimestampFormatter.getClass.getName.stripSuffix("$")
val tz = ctx.addReferenceObj("timeZone", timeZone)
val zid = ctx.addReferenceObj("zoneId", zoneId, "java.time.ZoneId")
val locale = ctx.addReferenceObj("locale", Locale.US)
defineCodeGen(ctx, ev, (timestamp, format) => {
s"""UTF8String.fromString($tf$$.MODULE$$.apply($format.toString(), $tz, $locale)
s"""UTF8String.fromString($tf$$.MODULE$$.apply($format.toString(), $zid, $locale)
.format($timestamp))"""
})
}
Expand Down Expand Up @@ -635,7 +636,7 @@ abstract class UnixTime
private lazy val constFormat: UTF8String = right.eval().asInstanceOf[UTF8String]
private lazy val formatter: TimestampFormatter =
try {
TimestampFormatter(constFormat.toString, timeZone)
TimestampFormatter(constFormat.toString, zoneId)
} catch {
case NonFatal(_) => null
}
Expand Down Expand Up @@ -668,7 +669,7 @@ abstract class UnixTime
} else {
val formatString = f.asInstanceOf[UTF8String].toString
try {
TimestampFormatter(formatString, timeZone).parse(
TimestampFormatter(formatString, zoneId).parse(
t.asInstanceOf[UTF8String].toString) / MICROS_PER_SECOND
} catch {
case NonFatal(_) => null
Expand Down Expand Up @@ -707,7 +708,7 @@ abstract class UnixTime
}""")
}
case StringType =>
val tz = ctx.addReferenceObj("timeZone", timeZone)
val tz = ctx.addReferenceObj("zoneId", zoneId)
val locale = ctx.addReferenceObj("locale", Locale.US)
val tf = TimestampFormatter.getClass.getName.stripSuffix("$")
nullSafeCodeGen(ctx, ev, (string, format) => {
Expand Down Expand Up @@ -789,7 +790,7 @@ case class FromUnixTime(sec: Expression, format: Expression, timeZoneId: Option[
private lazy val constFormat: UTF8String = right.eval().asInstanceOf[UTF8String]
private lazy val formatter: TimestampFormatter =
try {
TimestampFormatter(constFormat.toString, timeZone)
TimestampFormatter(constFormat.toString, zoneId)
} catch {
case NonFatal(_) => null
}
Expand All @@ -815,7 +816,7 @@ case class FromUnixTime(sec: Expression, format: Expression, timeZoneId: Option[
null
} else {
try {
UTF8String.fromString(TimestampFormatter(f.toString, timeZone)
UTF8String.fromString(TimestampFormatter(f.toString, zoneId)
.format(time.asInstanceOf[Long] * MICROS_PER_SECOND))
} catch {
case NonFatal(_) => null
Expand Down Expand Up @@ -846,7 +847,7 @@ case class FromUnixTime(sec: Expression, format: Expression, timeZoneId: Option[
}""")
}
} else {
val tz = ctx.addReferenceObj("timeZone", timeZone)
val tz = ctx.addReferenceObj("zoneId", zoneId)
val locale = ctx.addReferenceObj("locale", Locale.US)
val tf = TimestampFormatter.getClass.getName.stripSuffix("$")
nullSafeCodeGen(ctx, ev, (seconds, f) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
package org.apache.spark.sql.catalyst.json

import java.nio.charset.{Charset, StandardCharsets}
import java.util.{Locale, TimeZone}
import java.time.ZoneId
import java.util.Locale

import com.fasterxml.jackson.core.{JsonFactory, JsonParser}

Expand Down Expand Up @@ -78,7 +79,7 @@ private[sql] class JSONOptions(
// A language tag in IETF BCP 47 format
val locale: Locale = parameters.get("locale").map(Locale.forLanguageTag).getOrElse(Locale.US)

val timeZone: TimeZone = DateTimeUtils.getTimeZone(
val zoneId: ZoneId = DateTimeUtils.getZoneId(
parameters.getOrElse(DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId))

val dateFormat: String = parameters.getOrElse("dateFormat", "yyyy-MM-dd")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ private[sql] class JacksonGenerator(

private val timestampFormatter = TimestampFormatter(
options.timestampFormat,
options.timeZone,
options.zoneId,
options.locale)
private val dateFormatter = DateFormatter(options.dateFormat, options.locale)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class JacksonParser(

private val timestampFormatter = TimestampFormatter(
options.timestampFormat,
options.timeZone,
options.zoneId,
options.locale)
private val dateFormatter = DateFormatter(options.dateFormat, options.locale)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable {

private val timestampFormatter = TimestampFormatter(
options.timestampFormat,
options.timeZone,
options.zoneId,
options.locale)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ object DateTimeUtils {

def defaultTimeZone(): TimeZone = TimeZone.getDefault()

def getZoneId(timeZoneId: String): ZoneId = ZoneId.of(timeZoneId, ZoneId.SHORT_IDS)
def getTimeZone(timeZoneId: String): TimeZone = {
val zoneId = ZoneId.of(timeZoneId, ZoneId.SHORT_IDS)
TimeZone.getTimeZone(zoneId)
TimeZone.getTimeZone(getZoneId(timeZoneId))
}

// we should use the exact day as Int, for example, (year, month, day) -> day
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,16 @@ sealed trait TimestampFormatter extends Serializable {

class Iso8601TimestampFormatter(
pattern: String,
timeZone: TimeZone,
zoneId: ZoneId,
locale: Locale) extends TimestampFormatter with DateTimeFormatterHelper {
@transient
protected lazy val formatter = getOrCreateFormatter(pattern, locale)
private val timeZoneId = timeZone.toZoneId

override def parse(s: String): Long = {
val parsed = formatter.parse(s)
val parsedZoneId = parsed.query(TemporalQueries.zone())
val zoneId = if (parsedZoneId == null) timeZoneId else parsedZoneId
val zonedDateTime = toZonedDateTime(parsed, zoneId)
val timeZoneId = if (parsedZoneId == null) zoneId else parsedZoneId
val zonedDateTime = toZonedDateTime(parsed, timeZoneId)
val epochSeconds = zonedDateTime.toEpochSecond
val microsOfSecond = zonedDateTime.get(MICRO_OF_SECOND)

Expand All @@ -63,7 +62,7 @@ class Iso8601TimestampFormatter(

override def format(us: Long): String = {
val instant = DateTimeUtils.microsToInstant(us)
formatter.withZone(timeZoneId).format(instant)
formatter.withZone(zoneId).format(instant)
}
}

Expand All @@ -73,10 +72,10 @@ class Iso8601TimestampFormatter(
* output trailing zeros in the fraction. For example, the timestamp `2019-03-05 15:00:01.123400` is
* formatted as the string `2019-03-05 15:00:01.1234`.
*
* @param timeZone the time zone in which the formatter parses or format timestamps
* @param zoneId the time zone identifier in which the formatter parses or format timestamps
*/
class FractionTimestampFormatter(timeZone: TimeZone)
extends Iso8601TimestampFormatter("", timeZone, TimestampFormatter.defaultLocale) {
class FractionTimestampFormatter(zoneId: ZoneId)
extends Iso8601TimestampFormatter("", zoneId, TimestampFormatter.defaultLocale) {

@transient
override protected lazy val formatter = DateTimeFormatterHelper.fractionFormatter
Expand All @@ -86,19 +85,19 @@ object TimestampFormatter {
val defaultPattern: String = "yyyy-MM-dd HH:mm:ss"
val defaultLocale: Locale = Locale.US

def apply(format: String, timeZone: TimeZone, locale: Locale): TimestampFormatter = {
new Iso8601TimestampFormatter(format, timeZone, locale)
def apply(format: String, zoneId: ZoneId, locale: Locale): TimestampFormatter = {
new Iso8601TimestampFormatter(format, zoneId, locale)
}

def apply(format: String, timeZone: TimeZone): TimestampFormatter = {
apply(format, timeZone, defaultLocale)
def apply(format: String, zoneId: ZoneId): TimestampFormatter = {
apply(format, zoneId, defaultLocale)
}

def apply(timeZone: TimeZone): TimestampFormatter = {
apply(defaultPattern, timeZone, defaultLocale)
def apply(zoneId: ZoneId): TimestampFormatter = {
apply(defaultPattern, zoneId, defaultLocale)
}

def getFractionFormatter(timeZone: TimeZone): TimestampFormatter = {
new FractionTimestampFormatter(timeZone)
def getFractionFormatter(zoneId: ZoneId): TimestampFormatter = {
new FractionTimestampFormatter(zoneId)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,9 @@ class UnivocityParserSuite extends SparkFunSuite with SQLHelper {
parser = new UnivocityParser(StructType(Seq.empty), timestampsOptions)
val customTimestamp = "31/01/2015 00:00"
var format = FastDateFormat.getInstance(
timestampsOptions.timestampFormat, timestampsOptions.timeZone, timestampsOptions.locale)
timestampsOptions.timestampFormat,
TimeZone.getTimeZone(timestampsOptions.zoneId),
timestampsOptions.locale)
val expectedTime = format.parse(customTimestamp).getTime
val castedTimestamp = parser.makeConverter("_1", TimestampType, nullable = true)
.apply(customTimestamp)
Expand All @@ -126,7 +128,9 @@ class UnivocityParserSuite extends SparkFunSuite with SQLHelper {
val dateOptions = new CSVOptions(Map("dateFormat" -> "dd/MM/yyyy"), false, "GMT")
parser = new UnivocityParser(StructType(Seq.empty), dateOptions)
format = FastDateFormat.getInstance(
dateOptions.dateFormat, dateOptions.timeZone, dateOptions.locale)
dateOptions.dateFormat,
TimeZone.getTimeZone(dateOptions.zoneId),
dateOptions.locale)
val expectedDate = format.parse(customDate).getTime
val castedDate = parser.makeConverter("_1", DateType, nullable = true)
.apply(customDate)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.expressions

import java.sql.{Date, Timestamp}
import java.text.SimpleDateFormat
import java.time.ZoneOffset
import java.util.{Calendar, Locale, TimeZone}
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeUnit._
Expand All @@ -43,7 +44,7 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
val jstId = Option(TimeZoneJST.getID)

def toMillis(timestamp: String): Long = {
val tf = TimestampFormatter("yyyy-MM-dd HH:mm:ss", TimeZoneGMT)
val tf = TimestampFormatter("yyyy-MM-dd HH:mm:ss", ZoneOffset.UTC)
TimeUnit.MICROSECONDS.toMillis(tf.parse(timestamp))
}
val date = "2015-04-08 13:10:15"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.catalyst.util

import java.time.{LocalDate, LocalDateTime, LocalTime}
import java.time.{LocalDate, LocalDateTime, LocalTime, ZoneId}
import java.util.TimeZone
import java.util.concurrent.TimeUnit

Expand All @@ -40,6 +40,7 @@ object DateTimeTestUtils {
"Asia/Hong_Kong",
"Europe/Amsterdam")
val outstandingTimezones: Seq[TimeZone] = outstandingTimezonesIds.map(TimeZone.getTimeZone)
val outstandingZoneIds: Seq[ZoneId] = outstandingTimezonesIds.map(DateTimeUtils.getZoneId)

def withDefaultTimeZone[T](newDefaultTimeZone: TimeZone)(block: => T): T = {
val originalDefaultTimeZone = TimeZone.getDefault
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class DateTimeUtilsSuite extends SparkFunSuite {
private def defaultTz = DateTimeUtils.defaultTimeZone()

test("nanoseconds truncation") {
val tf = TimestampFormatter.getFractionFormatter(DateTimeUtils.defaultTimeZone())
val tf = TimestampFormatter.getFractionFormatter(DateTimeUtils.defaultTimeZone.toZoneId)
def checkStringToTimestamp(originalTime: String, expectedParsedTime: String) {
val parsedTimestampOp = DateTimeUtils.stringToTimestamp(
UTF8String.fromString(originalTime), defaultTz)
Expand Down
Loading