diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index 5576e71b57024..3ce284d5518a8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -303,7 +303,7 @@ abstract class CastBase extends UnaryExpression with TimeZoneAwareExpression wit case BinaryType => buildCast[Array[Byte]](_, UTF8String.fromBytes) case DateType => buildCast[Int](_, d => UTF8String.fromString(dateFormatter.format(d))) case TimestampType => buildCast[Long](_, - t => UTF8String.fromString(DateTimeUtils.timestampToString(timestampFormatter, t))) + t => UTF8String.fromString(timestampFormatter.format(t))) case ArrayType(et, _) => buildCast[ArrayData](_, array => { val builder = new UTF8StringBuilder @@ -443,7 +443,7 @@ abstract class CastBase extends UnaryExpression with TimeZoneAwareExpression wit case ByteType => buildCast[Byte](_, b => longToTimestamp(b.toLong)) case DateType => - buildCast[Int](_, d => epochDaysToMicros(d, zoneId)) + buildCast[Int](_, d => daysToMicros(d, zoneId)) // TimestampWritable.decimalToTimestamp case DecimalType() => buildCast[Decimal](_, d => decimalToTimestamp(d)) @@ -480,7 +480,7 @@ abstract class CastBase extends UnaryExpression with TimeZoneAwareExpression wit case TimestampType => // throw valid precision more than seconds, according to Hive. // Timestamp.nanos is in 0 to 999,999,999, no more than a second. - buildCast[Long](_, t => microsToEpochDays(t, zoneId)) + buildCast[Long](_, t => microsToDays(t, zoneId)) } // IntervalConverter @@ -1034,8 +1034,7 @@ abstract class CastBase extends UnaryExpression with TimeZoneAwareExpression wit val tf = JavaCode.global( ctx.addReferenceObj("timestampFormatter", timestampFormatter), timestampFormatter.getClass) - (c, evPrim, evNull) => code"""$evPrim = UTF8String.fromString( - org.apache.spark.sql.catalyst.util.DateTimeUtils.timestampToString($tf, $c));""" + (c, evPrim, evNull) => code"""$evPrim = UTF8String.fromString($tf.format($c));""" case CalendarIntervalType => (c, evPrim, _) => code"""$evPrim = UTF8String.fromString($c.toString());""" case ArrayType(et, _) => @@ -1120,7 +1119,7 @@ abstract class CastBase extends UnaryExpression with TimeZoneAwareExpression wit val zid = getZoneId() (c, evPrim, evNull) => code"""$evPrim = - org.apache.spark.sql.catalyst.util.DateTimeUtils.microsToEpochDays($c, $zid);""" + org.apache.spark.sql.catalyst.util.DateTimeUtils.microsToDays($c, $zid);""" case _ => (c, evPrim, evNull) => code"$evNull = true;" } @@ -1247,7 +1246,7 @@ abstract class CastBase extends UnaryExpression with TimeZoneAwareExpression wit zoneIdClass) (c, evPrim, evNull) => code"""$evPrim = - org.apache.spark.sql.catalyst.util.DateTimeUtils.epochDaysToMicros($c, $zid);""" + org.apache.spark.sql.catalyst.util.DateTimeUtils.daysToMicros($c, $zid);""" case DecimalType() => (c, evPrim, evNull) => code"$evPrim = ${decimalToTimestampCode(c)};" case DoubleType => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala index b46c3fb349ee1..b9ba32b8ee337 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala @@ -895,7 +895,7 @@ abstract class ToTimestamp } else { left.dataType match { case DateType => - epochDaysToMicros(t.asInstanceOf[Int], zoneId) / downScaleFactor + daysToMicros(t.asInstanceOf[Int], zoneId) / downScaleFactor case TimestampType => t.asInstanceOf[Long] / downScaleFactor case StringType => @@ -975,7 +975,7 @@ abstract class ToTimestamp boolean ${ev.isNull} = ${eval1.isNull}; $javaType ${ev.value} = ${CodeGenerator.defaultValue(dataType)}; if (!${ev.isNull}) { - ${ev.value} = $dtu.epochDaysToMicros(${eval1.value}, $zid) / $downScaleFactor; + ${ev.value} = $dtu.daysToMicros(${eval1.value}, $zid) / $downScaleFactor; }""") } } @@ -1242,10 +1242,10 @@ case class DateAddInterval( if (ansiEnabled || itvl.microseconds == 0) { DateTimeUtils.dateAddInterval(start.asInstanceOf[Int], itvl) } else { - val startTs = DateTimeUtils.epochDaysToMicros(start.asInstanceOf[Int], zoneId) + val startTs = DateTimeUtils.daysToMicros(start.asInstanceOf[Int], zoneId) val resultTs = DateTimeUtils.timestampAddInterval( startTs, itvl.months, itvl.days, itvl.microseconds, zoneId) - DateTimeUtils.microsToEpochDays(resultTs, zoneId) + DateTimeUtils.microsToDays(resultTs, zoneId) } } @@ -1261,10 +1261,10 @@ case class DateAddInterval( |if ($i.microseconds == 0) { | ${ev.value} = $dtu.dateAddInterval($sd, $i); |} else { - | long $startTs = $dtu.epochDaysToMicros($sd, $zid); + | long $startTs = $dtu.daysToMicros($sd, $zid); | long $resultTs = | $dtu.timestampAddInterval($startTs, $i.months, $i.days, $i.microseconds, $zid); - | ${ev.value} = $dtu.microsToEpochDays($resultTs, $zid); + | ${ev.value} = $dtu.microsToDays($resultTs, $zid); |} |""".stripMargin }) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index 41a271b95e83c..c466a60259c7f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -42,51 +42,32 @@ import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} */ object DateTimeUtils { - // we use Int and Long internally to represent [[DateType]] and [[TimestampType]] - type SQLDate = Int - type SQLTimestamp = Long - - // see http://stackoverflow.com/questions/466321/convert-unix-timestamp-to-julian - // it's 2440587.5, rounding up to compatible with Hive + // See http://stackoverflow.com/questions/466321/convert-unix-timestamp-to-julian + // It's 2440587.5, rounding up to be compatible with Hive. final val JULIAN_DAY_OF_EPOCH = 2440588 - final val julianCommonEraStart = Timestamp.valueOf("0001-01-01 00:00:00") - - final val TimeZoneGMT = TimeZone.getTimeZone("GMT") final val TimeZoneUTC = TimeZone.getTimeZone("UTC") val TIMEZONE_OPTION = "timeZone" - def defaultTimeZone(): TimeZone = TimeZone.getDefault() - def getZoneId(timeZoneId: String): ZoneId = ZoneId.of(timeZoneId, ZoneId.SHORT_IDS) - def getTimeZone(timeZoneId: String): TimeZone = { - TimeZone.getTimeZone(getZoneId(timeZoneId)) - } + def getTimeZone(timeZoneId: String): TimeZone = TimeZone.getTimeZone(getZoneId(timeZoneId)) - def microsToDays(timestamp: SQLTimestamp): SQLDate = { - microsToDays(timestamp, defaultTimeZone().toZoneId) - } - - def microsToDays(timestamp: SQLTimestamp, zoneId: ZoneId): SQLDate = { - val instant = microsToInstant(timestamp) - localDateToDays(LocalDateTime.ofInstant(instant, zoneId).toLocalDate) - } - - def daysToMicros(days: SQLDate): SQLTimestamp = { - daysToMicros(days, defaultTimeZone().toZoneId) + /** + * Converts microseconds since 1970-01-01 00:00:00Z to days since 1970-01-01 at the given zone ID. + */ + def microsToDays(micros: Long, zoneId: ZoneId): Int = { + localDateToDays(getLocalDateTime(micros, zoneId).toLocalDate) } - def daysToMicros(days: SQLDate, zoneId: ZoneId): SQLTimestamp = { + /** + * Converts days since 1970-01-01 at the given zone ID to microseconds since 1970-01-01 00:00:00Z. + */ + def daysToMicros(days: Int, zoneId: ZoneId): Long = { val instant = daysToLocalDate(days).atStartOfDay(zoneId).toInstant instantToMicros(instant) } - // Converts Timestamp to string according to Hive TimestampWritable convention. - def timestampToString(tf: TimestampFormatter, us: SQLTimestamp): String = { - tf.format(us) - } - /** * Converts a local date at the default JVM time zone to the number of days since 1970-01-01 * in the hybrid calendar (Julian + Gregorian) by discarding the time part. The resulted days are @@ -103,7 +84,7 @@ object DateTimeUtils { * combines Julian and Gregorian calendars. * @return The number of days since the epoch in Proleptic Gregorian calendar. */ - def fromJavaDate(date: Date): SQLDate = { + def fromJavaDate(date: Date): Int = { val millisUtc = date.getTime val millisLocal = millisUtc + TimeZone.getDefault.getOffset(millisUtc) val julianDays = Math.toIntExact(Math.floorDiv(millisLocal, MILLIS_PER_DAY)) @@ -121,11 +102,11 @@ object DateTimeUtils { * Note: The date is shifted by the offset of the default JVM time zone for backward compatibility * with Spark 2.4 and earlier versions. * - * @param daysSinceEpoch The number of days since 1970-01-01 in Proleptic Gregorian calendar. + * @param days The number of days since 1970-01-01 in Proleptic Gregorian calendar. * @return A local date in the hybrid calendar as `java.sql.Date` from number of days since epoch. */ - def toJavaDate(daysSinceEpoch: SQLDate): Date = { - val rebasedDays = rebaseGregorianToJulianDays(daysSinceEpoch) + def toJavaDate(days: Int): Date = { + val rebasedDays = rebaseGregorianToJulianDays(days) val localMillis = Math.multiplyExact(rebasedDays, MILLIS_PER_DAY) val timeZoneOffset = TimeZone.getDefault match { case zoneInfo: ZoneInfo => zoneInfo.getOffsetsByWall(localMillis, null) @@ -147,11 +128,11 @@ object DateTimeUtils { * representation as `year`, `month`, `day`, ..., `seconds` in the original calendar * and in the target calendar. * - * @param us The number of microseconds since 1970-01-01T00:00:00.000000Z. + * @param micros The number of microseconds since 1970-01-01T00:00:00.000000Z. * @return A `java.sql.Timestamp` from number of micros since epoch. */ - def toJavaTimestamp(us: SQLTimestamp): Timestamp = { - val rebasedMicros = rebaseGregorianToJulianMicros(us) + def toJavaTimestamp(micros: Long): Timestamp = { + val rebasedMicros = rebaseGregorianToJulianMicros(micros) val seconds = Math.floorDiv(rebasedMicros, MICROS_PER_SECOND) val ts = new Timestamp(seconds * MILLIS_PER_SECOND) val nanos = (rebasedMicros - seconds * MICROS_PER_SECOND) * NANOS_PER_MICROS @@ -178,21 +159,18 @@ object DateTimeUtils { * Gregorian calendars. * @return The number of micros since epoch from `java.sql.Timestamp`. */ - def fromJavaTimestamp(t: Timestamp): SQLTimestamp = { + def fromJavaTimestamp(t: Timestamp): Long = { val micros = millisToMicros(t.getTime) + (t.getNanos / NANOS_PER_MICROS) % MICROS_PER_MILLIS rebaseJulianToGregorianMicros(micros) } /** - * Returns the number of microseconds since epoch from Julian day - * and nanoseconds in a day + * Returns the number of microseconds since epoch from Julian day and nanoseconds in a day. */ - def fromJulianDay(day: Int, nanoseconds: Long): SQLTimestamp = { + def fromJulianDay(days: Int, nanos: Long): Long = { // use Long to avoid rounding errors - val seconds = (day - JULIAN_DAY_OF_EPOCH).toLong * SECONDS_PER_DAY - val micros = SECONDS.toMicros(seconds) + NANOSECONDS.toMicros(nanoseconds) - val rebased = rebaseJulianToGregorianMicros(micros) - rebased + val micros = (days - JULIAN_DAY_OF_EPOCH).toLong * MICROS_PER_DAY + nanos / NANOS_PER_MICROS + rebaseJulianToGregorianMicros(micros) } /** @@ -200,44 +178,33 @@ object DateTimeUtils { * * Note: support timestamp since 4717 BC (without negative nanoseconds, compatible with Hive). */ - def toJulianDay(us: SQLTimestamp): (Int, Long) = { - val julian_us = rebaseGregorianToJulianMicros(us) + JULIAN_DAY_OF_EPOCH * MICROS_PER_DAY - val day = julian_us / MICROS_PER_DAY - val micros = julian_us % MICROS_PER_DAY - (day.toInt, MICROSECONDS.toNanos(micros)) + def toJulianDay(micros: Long): (Int, Long) = { + val julianUs = rebaseGregorianToJulianMicros(micros) + JULIAN_DAY_OF_EPOCH * MICROS_PER_DAY + val days = julianUs / MICROS_PER_DAY + val us = julianUs % MICROS_PER_DAY + (days.toInt, MICROSECONDS.toNanos(us)) } - /* - * Converts the timestamp to milliseconds since epoch. In spark timestamp values have microseconds + /** + * Converts the timestamp to milliseconds since epoch. In Spark timestamp values have microseconds * precision, so this conversion is lossy. */ - def microsToMillis(us: SQLTimestamp): Long = { + def microsToMillis(micros: Long): Long = { // When the timestamp is negative i.e before 1970, we need to adjust the millseconds portion. // Example - 1965-01-01 10:11:12.123456 is represented as (-157700927876544) in micro precision. // In millis precision the above needs to be represented as (-157700927877). - Math.floorDiv(us, MICROS_PER_MILLIS) + Math.floorDiv(micros, MICROS_PER_MILLIS) } - /* - * Converts milliseconds since epoch to SQLTimestamp. + /** + * Converts milliseconds since the epoch to microseconds. */ - def millisToMicros(millis: Long): SQLTimestamp = { + def millisToMicros(millis: Long): Long = { Math.multiplyExact(millis, MICROS_PER_MILLIS) } - def microsToEpochDays(epochMicros: SQLTimestamp, zoneId: ZoneId): SQLDate = { - localDateToDays(microsToInstant(epochMicros).atZone(zoneId).toLocalDate) - } - - def epochDaysToMicros(epochDays: SQLDate, zoneId: ZoneId): SQLTimestamp = { - val localDate = LocalDate.ofEpochDay(epochDays) - val zeroLocalTime = LocalTime.MIDNIGHT - val localDateTime = LocalDateTime.of(localDate, zeroLocalTime) - instantToMicros(localDateTime.atZone(zoneId).toInstant) - } - - // A method called by JSON/CSV parser to clean up the legacy timestamp string by removing the - // "GMT" string. + // The method is called by JSON/CSV parser to clean up the legacy timestamp string by removing + // the "GMT" string. def cleanLegacyTimestampStr(s: String): String = { val indexOfGMT = s.indexOf("GMT") if (indexOfGMT != -1) { @@ -252,8 +219,8 @@ object DateTimeUtils { } /** - * Trim and parse a given UTF8 date string to the corresponding a corresponding [[Long]] value. - * The return type is [[Option]] in order to distinguish between 0L and null. The following + * Trims and parses a given UTF8 timestamp string to the corresponding a corresponding [[Long]] + * value. The return type is [[Option]] in order to distinguish between 0L and null. The following * formats are allowed: * * `yyyy` @@ -277,7 +244,7 @@ object DateTimeUtils { * - +|-hhmmss * - Region-based zone IDs in the form `area/city`, such as `Europe/Paris` */ - def stringToTimestamp(s: UTF8String, timeZoneId: ZoneId): Option[SQLTimestamp] = { + def stringToTimestamp(s: UTF8String, timeZoneId: ZoneId): Option[Long] = { if (s == null) { return None } @@ -408,34 +375,41 @@ object DateTimeUtils { } } + /** + * Gets the number of microseconds since the epoch of 1970-01-01 00:00:00Z from the given + * instance of `java.time.Instant`. The epoch microsecond count is a simple incrementing count of + * microseconds where microsecond 0 is 1970-01-01 00:00:00Z. + */ def instantToMicros(instant: Instant): Long = { val us = Math.multiplyExact(instant.getEpochSecond, MICROS_PER_SECOND) val result = Math.addExact(us, NANOSECONDS.toMicros(instant.getNano)) result } - def microsToInstant(us: Long): Instant = { - val secs = Math.floorDiv(us, MICROS_PER_SECOND) + /** + * Obtains an instance of `java.time.Instant` using microseconds from + * the epoch of 1970-01-01 00:00:00Z. + */ + def microsToInstant(micros: Long): Instant = { + val secs = Math.floorDiv(micros, MICROS_PER_SECOND) // Unfolded Math.floorMod(us, MICROS_PER_SECOND) to reuse the result of // the above calculation of `secs` via `floorDiv`. - val mos = us - secs * MICROS_PER_SECOND + val mos = micros - secs * MICROS_PER_SECOND Instant.ofEpochSecond(secs, mos * NANOS_PER_MICROS) } - def instantToDays(instant: Instant): Int = { - val seconds = instant.getEpochSecond - val days = Math.floorDiv(seconds, SECONDS_PER_DAY) - days.toInt - } - - def localDateToDays(localDate: LocalDate): Int = { - Math.toIntExact(localDate.toEpochDay) - } + /** + * Converts the local date to the number of days since 1970-01-01. + */ + def localDateToDays(localDate: LocalDate): Int = Math.toIntExact(localDate.toEpochDay) + /** + * Obtains an instance of `java.time.LocalDate` from the epoch day count. + */ def daysToLocalDate(days: Int): LocalDate = LocalDate.ofEpochDay(days) /** - * Trim and parse a given UTF8 date string to a corresponding [[Int]] value. + * Trims and parses a given UTF8 date string to a corresponding [[Int]] value. * The return type is [[Option]] in order to distinguish between 0 and null. The following * formats are allowed: * @@ -446,7 +420,7 @@ object DateTimeUtils { * `yyyy-[m]m-[d]d *` * `yyyy-[m]m-[d]dT*` */ - def stringToDate(s: UTF8String, zoneId: ZoneId): Option[SQLDate] = { + def stringToDate(s: UTF8String, zoneId: ZoneId): Option[Int] = { if (s == null) { return None } @@ -494,124 +468,102 @@ object DateTimeUtils { } } - private def localTimestamp(microsec: SQLTimestamp, zoneId: ZoneId): LocalDateTime = { - microsToInstant(microsec).atZone(zoneId).toLocalDateTime + // Gets the local date-time parts (year, month, day and time) of the instant expressed as the + // number of microseconds since the epoch at the given time zone ID. + private def getLocalDateTime(micros: Long, zoneId: ZoneId): LocalDateTime = { + microsToInstant(micros).atZone(zoneId).toLocalDateTime } /** * Returns the hour value of a given timestamp value. The timestamp is expressed in microseconds. */ - def getHours(microsec: SQLTimestamp, zoneId: ZoneId): Int = { - localTimestamp(microsec, zoneId).getHour + def getHours(micros: Long, zoneId: ZoneId): Int = { + getLocalDateTime(micros, zoneId).getHour } /** * Returns the minute value of a given timestamp value. The timestamp is expressed in - * microseconds. + * microseconds since the epoch. */ - def getMinutes(microsec: SQLTimestamp, zoneId: ZoneId): Int = { - localTimestamp(microsec, zoneId).getMinute + def getMinutes(micros: Long, zoneId: ZoneId): Int = { + getLocalDateTime(micros, zoneId).getMinute } /** * Returns the second value of a given timestamp value. The timestamp is expressed in - * microseconds. + * microseconds since the epoch. */ - def getSeconds(microsec: SQLTimestamp, zoneId: ZoneId): Int = { - localTimestamp(microsec, zoneId).getSecond + def getSeconds(micros: Long, zoneId: ZoneId): Int = { + getLocalDateTime(micros, zoneId).getSecond } /** * Returns the seconds part and its fractional part with microseconds. */ - def getSecondsWithFraction(microsec: SQLTimestamp, zoneId: ZoneId): Decimal = { - Decimal(getMicroseconds(microsec, zoneId), 8, 6) + def getSecondsWithFraction(micros: Long, zoneId: ZoneId): Decimal = { + Decimal(getMicroseconds(micros, zoneId), 8, 6) } /** - * Returns seconds, including fractional parts, multiplied by 1000000. The timestamp - * is expressed in microseconds since the epoch. + * Returns local seconds, including fractional parts, multiplied by 1000000. + * + * @param micros The number of microseconds since the epoch. + * @param zoneId The time zone id which milliseconds should be obtained in. */ - def getMicroseconds(timestamp: SQLTimestamp, zoneId: ZoneId): Int = { - val lt = localTimestamp(timestamp, zoneId) + def getMicroseconds(micros: Long, zoneId: ZoneId): Int = { + val lt = getLocalDateTime(micros, zoneId) (lt.getLong(ChronoField.MICRO_OF_SECOND) + lt.getSecond * MICROS_PER_SECOND).toInt } /** - * Returns the 'day in year' value for the given date. The date is expressed in days - * since 1.1.1970. + * Returns the 'day in year' value for the given number of days since 1970-01-01. */ - def getDayInYear(date: SQLDate): Int = { - LocalDate.ofEpochDay(date).getDayOfYear - } + def getDayInYear(days: Int): Int = daysToLocalDate(days).getDayOfYear /** - * Returns the year value for the given date. The date is expressed in days - * since 1.1.1970. + * Returns the year value for the given number of days since 1970-01-01. */ - def getYear(date: SQLDate): Int = { - LocalDate.ofEpochDay(date).getYear - } + def getYear(days: Int): Int = daysToLocalDate(days).getYear /** * Returns the year which conforms to ISO 8601. Each ISO 8601 week-numbering * year begins with the Monday of the week containing the 4th of January. */ - def getWeekBasedYear(date: SQLDate): Int = { - daysToLocalDate(date).get(IsoFields.WEEK_BASED_YEAR) - } + def getWeekBasedYear(days: Int): Int = daysToLocalDate(days).get(IsoFields.WEEK_BASED_YEAR) - /** - * Returns the quarter for the given date. The date is expressed in days - * since 1.1.1970. - */ - def getQuarter(date: SQLDate): Int = { - LocalDate.ofEpochDay(date).get(IsoFields.QUARTER_OF_YEAR) - } + /** Returns the quarter for the given number of days since 1970-01-01. */ + def getQuarter(days: Int): Int = daysToLocalDate(days).get(IsoFields.QUARTER_OF_YEAR) /** - * Split date (expressed in days since 1.1.1970) into four fields: - * year, month (Jan is Month 1), dayInMonth, daysToMonthEnd (0 if it's last day of month). - */ - def splitDate(date: SQLDate): (Int, Int, Int, Int) = { - val ld = LocalDate.ofEpochDay(date) - (ld.getYear, ld.getMonthValue, ld.getDayOfMonth, ld.lengthOfMonth() - ld.getDayOfMonth) - } - - /** - * Returns the month value for the given date. The date is expressed in days - * since 1.1.1970. January is month 1. + * Returns the month value for the given number of days since 1970-01-01. + * January is month 1. */ - def getMonth(date: SQLDate): Int = { - LocalDate.ofEpochDay(date).getMonthValue - } + def getMonth(days: Int): Int = daysToLocalDate(days).getMonthValue /** - * Returns the 'day of month' value for the given date. The date is expressed in days - * since 1.1.1970. + * Returns the 'day of month' value for the given number of days since 1970-01-01. */ - def getDayOfMonth(date: SQLDate): Int = { - LocalDate.ofEpochDay(date).getDayOfMonth - } + def getDayOfMonth(days: Int): Int = daysToLocalDate(days).getDayOfMonth /** - * Add date and year-month interval. - * Returns a date value, expressed in days since 1.1.1970. + * Adds an year-month interval to a date represented as days since 1970-01-01. + * @return a date value, expressed in days since 1970-01-01. */ - def dateAddMonths(days: SQLDate, months: Int): SQLDate = { - LocalDate.ofEpochDay(days).plusMonths(months).toEpochDay.toInt + def dateAddMonths(days: Int, months: Int): Int = { + localDateToDays(daysToLocalDate(days).plusMonths(months)) } /** - * Add timestamp and full interval. - * Returns a timestamp value, expressed in microseconds since 1.1.1970 00:00:00. + * Adds a full interval (months, days, microseconds) a timestamp represented as the number of + * microseconds since 1970-01-01 00:00:00Z. + * @return A timestamp value, expressed in microseconds since 1970-01-01 00:00:00Z. */ def timestampAddInterval( - start: SQLTimestamp, + start: Long, months: Int, days: Int, microseconds: Long, - zoneId: ZoneId): SQLTimestamp = { + zoneId: ZoneId): Long = { val resultTimestamp = microsToInstant(start) .atZone(zoneId) .plusMonths(months) @@ -621,38 +573,47 @@ object DateTimeUtils { } /** - * Add the date and the interval's months and days. - * Returns a date value, expressed in days since 1.1.1970. + * Adds the interval's months and days to a date expressed as days since the epoch. + * @return A date value, expressed in days since 1970-01-01. * * @throws DateTimeException if the result exceeds the supported date range * @throws IllegalArgumentException if the interval has `microseconds` part */ def dateAddInterval( - start: SQLDate, - interval: CalendarInterval): SQLDate = { + start: Int, + interval: CalendarInterval): Int = { require(interval.microseconds == 0, "Cannot add hours, minutes or seconds, milliseconds, microseconds to a date") - val ld = LocalDate.ofEpochDay(start).plusMonths(interval.months).plusDays(interval.days) + val ld = daysToLocalDate(start).plusMonths(interval.months).plusDays(interval.days) localDateToDays(ld) } /** - * Returns number of months between time1 and time2. time1 and time2 are expressed in - * microseconds since 1.1.1970. If time1 is later than time2, the result is positive. + * Splits date (expressed in days since 1970-01-01) into four fields: + * year, month (Jan is Month 1), dayInMonth, daysToMonthEnd (0 if it's last day of month). + */ + private def splitDate(days: Int): (Int, Int, Int, Int) = { + val ld = daysToLocalDate(days) + (ld.getYear, ld.getMonthValue, ld.getDayOfMonth, ld.lengthOfMonth() - ld.getDayOfMonth) + } + + /** + * Returns number of months between micros1 and micros2. micros1 and micros2 are expressed in + * microseconds since 1970-01-01. If micros1 is later than micros2, the result is positive. * - * If time1 and time2 are on the same day of month, or both are the last day of month, + * If micros1 and micros2 are on the same day of month, or both are the last day of month, * returns, time of day will be ignored. * * Otherwise, the difference is calculated based on 31 days per month. * The result is rounded to 8 decimal places if `roundOff` is set to true. */ def monthsBetween( - time1: SQLTimestamp, - time2: SQLTimestamp, + micros1: Long, + micros2: Long, roundOff: Boolean, zoneId: ZoneId): Double = { - val date1 = microsToDays(time1, zoneId) - val date2 = microsToDays(time2, zoneId) + val date1 = microsToDays(micros1, zoneId) + val date2 = microsToDays(micros2, zoneId) val (year1, monthInYear1, dayInMonth1, daysToMonthEnd1) = splitDate(date1) val (year2, monthInYear2, dayInMonth2, daysToMonthEnd2) = splitDate(date2) @@ -666,8 +627,8 @@ object DateTimeUtils { } // using milliseconds can cause precision loss with more than 8 digits // we follow Hive's implementation which uses seconds - val secondsInDay1 = MICROSECONDS.toSeconds(time1 - daysToMicros(date1, zoneId)) - val secondsInDay2 = MICROSECONDS.toSeconds(time2 - daysToMicros(date2, zoneId)) + val secondsInDay1 = MICROSECONDS.toSeconds(micros1 - daysToMicros(date1, zoneId)) + val secondsInDay2 = MICROSECONDS.toSeconds(micros2 - daysToMicros(date2, zoneId)) val secondsDiff = (dayInMonth1 - dayInMonth2) * SECONDS_PER_DAY + secondsInDay1 - secondsInDay2 val secondsInMonth = DAYS.toSeconds(31) val diff = monthDiff + secondsDiff / secondsInMonth.toDouble @@ -710,17 +671,14 @@ object DateTimeUtils { * Returns the first date which is later than startDate and is of the given dayOfWeek. * dayOfWeek is an integer ranges in [0, 6], and 0 is Thu, 1 is Fri, etc,. */ - def getNextDateForDayOfWeek(startDate: SQLDate, dayOfWeek: Int): SQLDate = { - startDate + 1 + ((dayOfWeek - 1 - startDate) % 7 + 7) % 7 + def getNextDateForDayOfWeek(startDay: Int, dayOfWeek: Int): Int = { + startDay + 1 + ((dayOfWeek - 1 - startDay) % 7 + 7) % 7 } - /** - * Returns last day of the month for the given date. The date is expressed in days - * since 1.1.1970. - */ - def getLastDayOfMonth(date: SQLDate): SQLDate = { - val localDate = LocalDate.ofEpochDay(date) - (date - localDate.getDayOfMonth) + localDate.lengthOfMonth() + /** Returns last day of the month for the given number of days since 1970-01-01. */ + def getLastDayOfMonth(days: Int): Int = { + val localDate = daysToLocalDate(days) + (days - localDate.getDayOfMonth) + localDate.lengthOfMonth() } // The constants are visible for testing purpose only. @@ -746,21 +704,21 @@ object DateTimeUtils { * Returns the trunc date from original date and trunc level. * Trunc level should be generated using `parseTruncLevel()`, should be between 6 and 9. */ - def truncDate(d: SQLDate, level: Int): SQLDate = { + def truncDate(days: Int, level: Int): Int = { level match { - case TRUNC_TO_WEEK => getNextDateForDayOfWeek(d - 7, MONDAY) - case TRUNC_TO_MONTH => d - DateTimeUtils.getDayOfMonth(d) + 1 + case TRUNC_TO_WEEK => getNextDateForDayOfWeek(days - 7, MONDAY) + case TRUNC_TO_MONTH => days - getDayOfMonth(days) + 1 case TRUNC_TO_QUARTER => - localDateToDays(daysToLocalDate(d).`with`(IsoFields.DAY_OF_QUARTER, 1L)) - case TRUNC_TO_YEAR => d - DateTimeUtils.getDayInYear(d) + 1 + localDateToDays(daysToLocalDate(days).`with`(IsoFields.DAY_OF_QUARTER, 1L)) + case TRUNC_TO_YEAR => days - getDayInYear(days) + 1 case _ => // caller make sure that this should never be reached sys.error(s"Invalid trunc level: $level") } } - private def truncToUnit(t: SQLTimestamp, zoneId: ZoneId, unit: ChronoUnit): SQLTimestamp = { - val truncated = microsToInstant(t).atZone(zoneId).truncatedTo(unit) + private def truncToUnit(micros: Long, zoneId: ZoneId, unit: ChronoUnit): Long = { + val truncated = microsToInstant(micros).atZone(zoneId).truncatedTo(unit) instantToMicros(truncated.toInstant) } @@ -768,19 +726,19 @@ object DateTimeUtils { * Returns the trunc date time from original date time and trunc level. * Trunc level should be generated using `parseTruncLevel()`, should be between 0 and 9. */ - def truncTimestamp(t: SQLTimestamp, level: Int, zoneId: ZoneId): SQLTimestamp = { + def truncTimestamp(micros: Long, level: Int, zoneId: ZoneId): Long = { level match { - case TRUNC_TO_MICROSECOND => t + case TRUNC_TO_MICROSECOND => micros case TRUNC_TO_MILLISECOND => - t - Math.floorMod(t, MICROS_PER_MILLIS) + micros - Math.floorMod(micros, MICROS_PER_MILLIS) case TRUNC_TO_SECOND => - t - Math.floorMod(t, MICROS_PER_SECOND) + micros - Math.floorMod(micros, MICROS_PER_SECOND) case TRUNC_TO_MINUTE => - t - Math.floorMod(t, MICROS_PER_MINUTE) - case TRUNC_TO_HOUR => truncToUnit(t, zoneId, ChronoUnit.HOURS) - case TRUNC_TO_DAY => truncToUnit(t, zoneId, ChronoUnit.DAYS) + micros - Math.floorMod(micros, MICROS_PER_MINUTE) + case TRUNC_TO_HOUR => truncToUnit(micros, zoneId, ChronoUnit.HOURS) + case TRUNC_TO_DAY => truncToUnit(micros, zoneId, ChronoUnit.DAYS) case _ => // Try to truncate date levels - val dDays = microsToDays(t, zoneId) + val dDays = microsToDays(micros, zoneId) daysToMicros(truncDate(dDays, level), zoneId) } } @@ -810,36 +768,41 @@ object DateTimeUtils { } /** - * Convert the timestamp `ts` from one timezone to another. + * Converts the timestamp `micros` from one timezone to another. * - * TODO: Because of DST, the conversion between UTC and human time is not exactly one-to-one - * mapping, the conversion here may return wrong result, we should make the timestamp - * timezone-aware. + * Time-zone rules, such as daylight savings, mean that not every local date-time + * is valid for the `toZone` time zone, thus the local date-time may be adjusted. */ - def convertTz(ts: SQLTimestamp, fromZone: ZoneId, toZone: ZoneId): SQLTimestamp = { - val rebasedDateTime = microsToInstant(ts).atZone(toZone).toLocalDateTime.atZone(fromZone) + def convertTz(micros: Long, fromZone: ZoneId, toZone: ZoneId): Long = { + val rebasedDateTime = getLocalDateTime(micros, toZone).atZone(fromZone) instantToMicros(rebasedDateTime.toInstant) } /** - * Returns a timestamp of given timezone from utc timestamp, with the same string + * Returns a timestamp of given timezone from UTC timestamp, with the same string * representation in their timezone. */ - def fromUTCTime(time: SQLTimestamp, timeZone: String): SQLTimestamp = { - convertTz(time, ZoneOffset.UTC, getZoneId(timeZone)) + def fromUTCTime(micros: Long, timeZone: String): Long = { + convertTz(micros, ZoneOffset.UTC, getZoneId(timeZone)) } /** * Returns a utc timestamp from a given timestamp from a given timezone, with the same * string representation in their timezone. */ - def toUTCTime(time: SQLTimestamp, timeZone: String): SQLTimestamp = { - convertTz(time, getZoneId(timeZone), ZoneOffset.UTC) + def toUTCTime(micros: Long, timeZone: String): Long = { + convertTz(micros, getZoneId(timeZone), ZoneOffset.UTC) } - def currentTimestamp(): SQLTimestamp = instantToMicros(Instant.now()) + /** + * Obtains the current instant as microseconds since the epoch at the UTC time zone. + */ + def currentTimestamp(): Long = instantToMicros(Instant.now()) - def currentDate(zoneId: ZoneId): SQLDate = localDateToDays(LocalDate.now(zoneId)) + /** + * Obtains the current date as days since the epoch in the specified time-zone. + */ + def currentDate(zoneId: ZoneId): Int = localDateToDays(LocalDate.now(zoneId)) private def today(zoneId: ZoneId): ZonedDateTime = { Instant.now().atZone(zoneId).`with`(LocalTime.MIDNIGHT) @@ -849,6 +812,7 @@ object DateTimeUtils { /** * Extracts special values from an input string ignoring case. + * * @param input A trimmed string * @param zoneId Zone identifier used to get the current date. * @return Some special value in lower case or None. @@ -878,12 +842,13 @@ object DateTimeUtils { /** * Converts notational shorthands that are converted to ordinary timestamps. + * * @param input A trimmed string * @param zoneId Zone identifier used to get the current date. * @return Some of microseconds since the epoch if the conversion completed * successfully otherwise None. */ - def convertSpecialTimestamp(input: String, zoneId: ZoneId): Option[SQLTimestamp] = { + def convertSpecialTimestamp(input: String, zoneId: ZoneId): Option[Long] = { extractSpecialValue(input, zoneId).flatMap { case "epoch" => Some(0) case "now" => Some(currentTimestamp()) @@ -894,7 +859,7 @@ object DateTimeUtils { } } - private def convertSpecialTimestamp(bytes: Array[Byte], zoneId: ZoneId): Option[SQLTimestamp] = { + private def convertSpecialTimestamp(bytes: Array[Byte], zoneId: ZoneId): Option[Long] = { if (bytes.length > 0 && Character.isAlphabetic(bytes(0))) { convertSpecialTimestamp(new String(bytes, StandardCharsets.UTF_8), zoneId) } else { @@ -904,11 +869,12 @@ object DateTimeUtils { /** * Converts notational shorthands that are converted to ordinary dates. + * * @param input A trimmed string * @param zoneId Zone identifier used to get the current date. * @return Some of days since the epoch if the conversion completed successfully otherwise None. */ - def convertSpecialDate(input: String, zoneId: ZoneId): Option[SQLDate] = { + def convertSpecialDate(input: String, zoneId: ZoneId): Option[Int] = { extractSpecialValue(input, zoneId).flatMap { case "epoch" => Some(0) case "now" | "today" => Some(currentDate(zoneId)) @@ -918,7 +884,7 @@ object DateTimeUtils { } } - private def convertSpecialDate(bytes: Array[Byte], zoneId: ZoneId): Option[SQLDate] = { + private def convertSpecialDate(bytes: Array[Byte], zoneId: ZoneId): Option[Int] = { if (bytes.length > 0 && Character.isAlphabetic(bytes(0))) { convertSpecialDate(new String(bytes, StandardCharsets.UTF_8), zoneId) } else { @@ -927,17 +893,16 @@ object DateTimeUtils { } /** - * Subtracts two dates. - * @param endDate The end date, exclusive - * @param startDate The start date, inclusive + * Subtracts two dates expressed as days since 1970-01-01. + * + * @param endDay The end date, exclusive + * @param startDay The start date, inclusive * @return An interval between two dates. The interval can be negative * if the end date is before the start date. */ - def subtractDates(endDate: SQLDate, startDate: SQLDate): CalendarInterval = { - val period = Period.between( - LocalDate.ofEpochDay(startDate), - LocalDate.ofEpochDay(endDate)) - val months = period.getMonths + 12 * period.getYears + def subtractDates(endDay: Int, startDay: Int): CalendarInterval = { + val period = Period.between(daysToLocalDate(startDay), daysToLocalDate(endDay)) + val months = Math.toIntExact(period.toTotalMonths) val days = period.getDays new CalendarInterval(months, days, 0) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala index f460404800264..8db95044359c3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala @@ -180,7 +180,7 @@ class MicrosCalendar(tz: TimeZone, digitsInFraction: Int) // Converts parsed `MILLISECOND` field to seconds fraction in microsecond precision. // For example if the fraction pattern is `SSSS` then `digitsInFraction` = 4, and // if the `MILLISECOND` field was parsed to `1234`. - def getMicros(): SQLTimestamp = { + def getMicros(): Long = { // Append 6 zeros to the field: 1234 -> 1234000000 val d = fields(Calendar.MILLISECOND) * MICROS_PER_SECOND // Take the first 6 digits from `d`: 1234000000 -> 123400 @@ -209,7 +209,7 @@ class LegacyFastTimestampFormatter( fastDateFormat.getTimeZone, fastDateFormat.getPattern.count(_ == 'S')) - override def parse(s: String): SQLTimestamp = { + override def parse(s: String): Long = { cal.clear() // Clear the calendar because it can be re-used many times if (!fastDateFormat.parse(s, new ParsePosition(0), cal)) { throw new IllegalArgumentException(s"'$s' is an invalid timestamp") @@ -220,7 +220,7 @@ class LegacyFastTimestampFormatter( rebaseJulianToGregorianMicros(julianMicros) } - override def format(timestamp: SQLTimestamp): String = { + override def format(timestamp: Long): String = { val julianMicros = rebaseGregorianToJulianMicros(timestamp) cal.setTimeInMillis(Math.floorDiv(julianMicros, MICROS_PER_SECOND) * MILLIS_PER_SECOND) cal.setMicros(Math.floorMod(julianMicros, MICROS_PER_SECOND)) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala index 35b4017980138..76ec450a4d7c6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala @@ -31,7 +31,6 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.{CollectList, Collect import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext import org.apache.spark.sql.catalyst.util.DateTimeConstants._ import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._ -import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.catalyst.util.DateTimeUtils._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -271,13 +270,13 @@ abstract class CastSuiteBase extends SparkFunSuite with ExpressionEvalHelper { for (tz <- ALL_TIMEZONES) { val timeZoneId = Option(tz.getId) - var c = Calendar.getInstance(TimeZoneGMT) + var c = Calendar.getInstance(TimeZoneUTC) c.set(2015, 2, 8, 2, 30, 0) checkEvaluation( cast(cast(new Timestamp(c.getTimeInMillis), StringType, timeZoneId), TimestampType, timeZoneId), millisToMicros(c.getTimeInMillis)) - c = Calendar.getInstance(TimeZoneGMT) + c = Calendar.getInstance(TimeZoneUTC) c.set(2015, 10, 1, 2, 30, 0) checkEvaluation( cast(cast(new Timestamp(c.getTimeInMillis), StringType, timeZoneId), @@ -294,7 +293,7 @@ abstract class CastSuiteBase extends SparkFunSuite with ExpressionEvalHelper { checkEvaluation(cast(cast(nts, TimestampType, UTC_OPT), StringType, UTC_OPT), nts) checkEvaluation( cast(cast(ts, StringType, UTC_OPT), TimestampType, UTC_OPT), - DateTimeUtils.fromJavaTimestamp(ts)) + fromJavaTimestamp(ts)) // all convert to string type to check checkEvaluation( @@ -386,11 +385,11 @@ abstract class CastSuiteBase extends SparkFunSuite with ExpressionEvalHelper { withSQLConf(SQLConf.LEGACY_ALLOW_CAST_NUMERIC_TO_TIMESTAMP.key -> "true") { checkEvaluation(cast(cast(tss, ShortType), TimestampType), - DateTimeUtils.fromJavaTimestamp(ts) * MILLIS_PER_SECOND) + fromJavaTimestamp(ts) * MILLIS_PER_SECOND) checkEvaluation(cast(cast(tss, IntegerType), TimestampType), - DateTimeUtils.fromJavaTimestamp(ts) * MILLIS_PER_SECOND) + fromJavaTimestamp(ts) * MILLIS_PER_SECOND) checkEvaluation(cast(cast(tss, LongType), TimestampType), - DateTimeUtils.fromJavaTimestamp(ts) * MILLIS_PER_SECOND) + fromJavaTimestamp(ts) * MILLIS_PER_SECOND) checkEvaluation( cast(cast(millis.toFloat / MILLIS_PER_SECOND, TimestampType), FloatType), millis.toFloat / MILLIS_PER_SECOND) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala index 16d78b0526503..4a19add23fc58 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CsvExpressionsSuite.scala @@ -73,7 +73,7 @@ class CsvExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with P val schema = StructType(StructField("t", TimestampType) :: Nil) val csvData1 = "2016-01-01T00:00:00.123Z" - var c = Calendar.getInstance(DateTimeUtils.TimeZoneGMT) + var c = Calendar.getInstance(DateTimeUtils.TimeZoneUTC) c.set(2016, 0, 1, 0, 0, 0) c.set(Calendar.MILLISECOND, 123) checkEvaluation( @@ -184,7 +184,7 @@ class CsvExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with P test("to_csv with timestamp") { val schema = StructType(StructField("t", TimestampType) :: Nil) - val c = Calendar.getInstance(DateTimeUtils.TimeZoneGMT) + val c = Calendar.getInstance(DateTimeUtils.TimeZoneUTC) c.set(2016, 0, 1, 0, 0, 0) c.set(Calendar.MILLISECOND, 0) val struct = Literal.create(create_row(c.getTimeInMillis * 1000L), schema) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala index f248a3454f39a..4edf95d8f994b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjectio import org.apache.spark.sql.catalyst.util.{DateTimeUtils, IntervalUtils, TimestampFormatter} import org.apache.spark.sql.catalyst.util.DateTimeConstants._ import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._ +import org.apache.spark.sql.catalyst.util.DateTimeUtils.TimeZoneUTC import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} @@ -803,7 +804,7 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { val sdf2 = new SimpleDateFormat(fmt2, Locale.US) val fmt3 = "yy-MM-dd" val sdf3 = new SimpleDateFormat(fmt3, Locale.US) - sdf3.setTimeZone(TimeZone.getTimeZone(UTC)) + sdf3.setTimeZone(TimeZoneUTC) withDefaultTimeZone(UTC) { for (zid <- outstandingZoneIds) { @@ -872,7 +873,7 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { val sdf2 = new SimpleDateFormat(fmt2, Locale.US) val fmt3 = "yy-MM-dd" val sdf3 = new SimpleDateFormat(fmt3, Locale.US) - sdf3.setTimeZone(TimeZone.getTimeZone(UTC)) + sdf3.setTimeZone(TimeZoneUTC) withDefaultTimeZone(UTC) { for (zid <- outstandingZoneIds) { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala index c40149368b055..0fea84bb183e0 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala @@ -156,10 +156,10 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { "fromPrimitiveArray", ObjectType(classOf[Array[Int]]), Array[Int](1, 2, 3), UnsafeArrayData.fromPrimitiveArray(Array[Int](1, 2, 3))), (DateTimeUtils.getClass, ObjectType(classOf[Date]), - "toJavaDate", ObjectType(classOf[DateTimeUtils.SQLDate]), 77777, + "toJavaDate", ObjectType(classOf[Int]), 77777, DateTimeUtils.toJavaDate(77777)), (DateTimeUtils.getClass, ObjectType(classOf[Timestamp]), - "toJavaTimestamp", ObjectType(classOf[DateTimeUtils.SQLTimestamp]), + "toJavaTimestamp", ObjectType(classOf[Long]), 88888888.toLong, DateTimeUtils.toJavaTimestamp(88888888)) ).foreach { case (cls, dataType, methodName, argType, arg, expected) => checkObjectExprEvaluation(StaticInvoke(cls, dataType, methodName, diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonGeneratorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonGeneratorSuite.scala index 2bb948ec24fb3..4b8693cf7fd53 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonGeneratorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonGeneratorSuite.scala @@ -26,8 +26,8 @@ import org.apache.spark.sql.types._ class JacksonGeneratorSuite extends SparkFunSuite { - val gmtId = DateTimeUtils.TimeZoneGMT.getID - val option = new JSONOptions(Map.empty, gmtId) + val utcId = DateTimeUtils.TimeZoneUTC.getID + val option = new JSONOptions(Map.empty, utcId) test("initial with StructType and write out a row") { val dataType = StructType(StructField("a", IntegerType) :: Nil) @@ -45,7 +45,7 @@ class JacksonGeneratorSuite extends SparkFunSuite { val input = InternalRow(null) val writer = new CharArrayWriter() val allowNullOption = - new JSONOptions(Map("ignoreNullFields" -> "false"), gmtId) + new JSONOptions(Map("ignoreNullFields" -> "false"), utcId) val gen = new JacksonGenerator(dataType, writer, allowNullOption) gen.write(input) gen.flush() @@ -59,7 +59,7 @@ class JacksonGeneratorSuite extends SparkFunSuite { val input = InternalRow(InternalRow(null)) val writer = new CharArrayWriter() val allowNullOption = - new JSONOptions(Map("ignoreNullFields" -> "false"), gmtId) + new JSONOptions(Map("ignoreNullFields" -> "false"), utcId) val gen = new JacksonGenerator(dataType, writer, allowNullOption) gen.write(input) gen.flush() diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala index 4883bef8c0886..caf4b7e16f285 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala @@ -37,12 +37,12 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper { private def defaultZoneId = ZoneId.systemDefault() test("nanoseconds truncation") { - val tf = TimestampFormatter.getFractionFormatter(DateTimeUtils.defaultTimeZone.toZoneId) + val tf = TimestampFormatter.getFractionFormatter(ZoneId.systemDefault()) def checkStringToTimestamp(originalTime: String, expectedParsedTime: String): Unit = { val parsedTimestampOp = DateTimeUtils.stringToTimestamp( UTF8String.fromString(originalTime), defaultZoneId) assert(parsedTimestampOp.isDefined, "timestamp with nanoseconds was not parsed correctly") - assert(DateTimeUtils.timestampToString(tf, parsedTimestampOp.get) === expectedParsedTime) + assert(tf.format(parsedTimestampOp.get) === expectedParsedTime) } checkStringToTimestamp("2015-01-02 00:00:00.123456789", "2015-01-02 00:00:00.123456") @@ -121,7 +121,7 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper { checkFromToJavaDate(new Date(df2.parse("1776-07-04 18:30:00 UTC").getTime)) } - private def toDate(s: String, zoneId: ZoneId = UTC): Option[SQLDate] = { + private def toDate(s: String, zoneId: ZoneId = UTC): Option[Int] = { stringToDate(UTF8String.fromString(s), zoneId) } @@ -149,7 +149,7 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper { assert(toDate("1999 08").isEmpty) } - private def toTimestamp(str: String, zoneId: ZoneId): Option[SQLTimestamp] = { + private def toTimestamp(str: String, zoneId: ZoneId): Option[Long] = { stringToTimestamp(UTF8String.fromString(str), zoneId) } @@ -520,7 +520,7 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper { def testTrunc( level: Int, expected: String, - inputTS: SQLTimestamp, + inputTS: Long, zoneId: ZoneId = defaultZoneId): Unit = { val truncated = DateTimeUtils.truncTimestamp(inputTS, level, zoneId) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DatetimeFormatterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DatetimeFormatterSuite.scala index b78facd963338..3b9a4ae88d586 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DatetimeFormatterSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DatetimeFormatterSuite.scala @@ -45,7 +45,7 @@ trait DatetimeFormatterSuite extends SparkFunSuite with SQLHelper with Matchers private def assertEqual(pattern: String, datetimeStr: String, expected: Long): Unit = { if (useDateFormatter) { assert(dateFormatter(pattern).parse(datetimeStr) === - DateTimeUtils.microsToEpochDays(expected, UTC)) + DateTimeUtils.microsToDays(expected, UTC)) } else { assert(timestampFormatter(pattern).parse(datetimeStr) === expected) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/RebaseDateTimeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/RebaseDateTimeSuite.scala index 254bf01c89b4f..fdfe029c65c11 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/RebaseDateTimeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/RebaseDateTimeSuite.scala @@ -77,7 +77,7 @@ class RebaseDateTimeSuite extends SparkFunSuite with Matchers with SQLHelper { Math.floor(millisLocal.toDouble / MILLIS_PER_DAY).toInt } private def fromJavaDateLegacy(date: Date): Int = { - millisToDaysLegacy(date.getTime, defaultTimeZone()) + millisToDaysLegacy(date.getTime, TimeZone.getTimeZone(ZoneId.systemDefault())) } test("rebase gregorian to/from julian days") { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/ArrowUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/ArrowUtilsSuite.scala index 0ffb492e702d1..e6565feebf25c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/ArrowUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/ArrowUtilsSuite.scala @@ -17,11 +17,12 @@ package org.apache.spark.sql.util +import java.time.ZoneId + import org.apache.arrow.vector.types.pojo.ArrowType import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.util.DateTimeTestUtils.LA -import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.types._ class ArrowUtilsSuite extends SparkFunSuite { @@ -63,7 +64,7 @@ class ArrowUtilsSuite extends SparkFunSuite { assert(ArrowUtils.fromArrowSchema(arrowSchema) === schema) } - roundtripWithTz(DateTimeUtils.defaultTimeZone().getID) + roundtripWithTz(ZoneId.systemDefault().getId) roundtripWithTz("Asia/Tokyo") roundtripWithTz("UTC") roundtripWithTz(LA.getId) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala index f5a474ddf3904..2f1ee0f23d45a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala @@ -203,7 +203,7 @@ private[sql] object JDBCRelation extends Logging { case TimestampType => val timestampFormatter = TimestampFormatter.getFractionFormatter( DateTimeUtils.getZoneId(timeZoneId)) - DateTimeUtils.timestampToString(timestampFormatter, value) + timestampFormatter.format(value) } s"'$dateTimeStr'" } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 491977c61d3cb..39bbc60200b86 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -34,7 +34,6 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._ import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} -import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate import org.apache.spark.sql.sources import org.apache.spark.unsafe.types.UTF8String @@ -124,7 +123,7 @@ class ParquetFilters( private val ParquetTimestampMicrosType = ParquetSchemaType(TIMESTAMP_MICROS, INT64, 0, null) private val ParquetTimestampMillisType = ParquetSchemaType(TIMESTAMP_MILLIS, INT64, 0, null) - private def dateToDays(date: Any): SQLDate = date match { + private def dateToDays(date: Any): Int = date match { case d: Date => DateTimeUtils.fromJavaDate(d) case ld: LocalDate => DateTimeUtils.localDateToDays(ld) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index 201ee16faeb08..9a010d7192081 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -34,7 +34,6 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, CaseInsensitiveMap, DateTimeUtils, GenericArrayData} -import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLTimestamp import org.apache.spark.sql.execution.datasources.DataSourceUtils import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy @@ -743,7 +742,7 @@ private[parquet] object ParquetRowConverter { unscaled } - def binaryToSQLTimestamp(binary: Binary): SQLTimestamp = { + def binaryToSQLTimestamp(binary: Binary): Long = { assert(binary.length() == 12, s"Timestamps (with nanoseconds) are expected to be stored in" + s" 12-byte long binaries. Found a ${binary.length()}-byte binary instead.") val buffer = binary.toByteBuffer.order(ByteOrder.LITTLE_ENDIAN) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala index ac98d3f0c7095..cb11519497747 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} -import java.util.TimeZone import scala.util.Random @@ -28,8 +27,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback import org.apache.spark.sql.catalyst.plans.logical.OneRowRelation -import org.apache.spark.sql.catalyst.util.DateTimeTestUtils -import org.apache.spark.sql.catalyst.util.DateTimeTestUtils.UTC +import org.apache.spark.sql.catalyst.util.DateTimeTestUtils.{withDefaultTimeZone, UTC} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession @@ -1029,7 +1027,7 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession { Timestamp.valueOf("2018-01-01 12:00:00"), Timestamp.valueOf("2018-01-02 00:00:00"))))) - DateTimeTestUtils.withDefaultTimeZone(UTC) { + withDefaultTimeZone(UTC) { checkAnswer( spark.sql("select sequence(" + " cast('2018-01-01' as date)" + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala index 91ec1b5ab2937..18356a4de9ef4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala @@ -28,7 +28,8 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.catalog.CatalogColumnStat import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils} +import org.apache.spark.sql.catalyst.util.DateTimeTestUtils +import org.apache.spark.sql.catalyst.util.DateTimeUtils.TimeZoneUTC import org.apache.spark.sql.functions.timestamp_seconds import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession @@ -484,11 +485,11 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared DateTimeTestUtils.outstandingZoneIds.foreach { zid => val timeZone = TimeZone.getTimeZone(zid) - checkTimestampStats(DateType, DateTimeUtils.TimeZoneUTC, timeZone) { stats => + checkTimestampStats(DateType, TimeZoneUTC, timeZone) { stats => assert(stats.min.get.asInstanceOf[Int] == TimeUnit.SECONDS.toDays(start)) assert(stats.max.get.asInstanceOf[Int] == TimeUnit.SECONDS.toDays(end - 1)) } - checkTimestampStats(TimestampType, DateTimeUtils.TimeZoneUTC, timeZone) { stats => + checkTimestampStats(TimestampType, TimeZoneUTC, timeZone) { stats => assert(stats.min.get.asInstanceOf[Long] == TimeUnit.SECONDS.toMicros(start)) assert(stats.max.get.asInstanceOf[Long] == TimeUnit.SECONDS.toMicros(end - 1)) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 3f8ee12f97776..6344ec6be4878 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -21,7 +21,7 @@ import java.io._ import java.nio.charset.{Charset, StandardCharsets, UnsupportedCharsetException} import java.nio.file.Files import java.sql.{Date, Timestamp} -import java.time.LocalDate +import java.time.{LocalDate, ZoneId} import java.util.Locale import com.fasterxml.jackson.core.JsonFactory @@ -125,7 +125,7 @@ abstract class JsonSuite extends QueryTest with SharedSparkSession with TestJson Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ssXXX"))) val ISO8601Date = "1970-01-01" - checkTypePromotion(DateTimeUtils.microsToDays(32400000000L), + checkTypePromotion(DateTimeUtils.microsToDays(32400000000L, ZoneId.systemDefault), enforceCorrectType(ISO8601Date, DateType)) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala index 32a9558e91f10..accd04592bec5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala @@ -21,7 +21,7 @@ import java.io.File import java.math.BigInteger import java.sql.{Date, Timestamp} import java.time.{ZoneId, ZoneOffset} -import java.util.{Calendar, Locale, TimeZone} +import java.util.{Calendar, Locale} import scala.collection.mutable.ArrayBuffer @@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, TimestampFormatter} +import org.apache.spark.sql.catalyst.util.DateTimeUtils.TimeZoneUTC import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.{PartitionPath => Partition} import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, FileTable} @@ -88,7 +89,7 @@ abstract class ParquetPartitionDiscoverySuite check("1990-02-24 12:00:30", Literal.create(Timestamp.valueOf("1990-02-24 12:00:30"), TimestampType)) - val c = Calendar.getInstance(TimeZone.getTimeZone("UTC")) + val c = Calendar.getInstance(TimeZoneUTC) c.set(1990, 1, 24, 12, 0, 30) c.set(Calendar.MILLISECOND, 0) check("1990-02-24 12:00:30", diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala index 36fb418b09cb6..818a66eb436cc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.sources +import java.time.ZoneId + import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.datasources.DataSource @@ -27,7 +29,7 @@ class ResolvedDataSourceSuite extends SharedSparkSession { DataSource( sparkSession = spark, className = name, - options = Map(DateTimeUtils.TIMEZONE_OPTION -> DateTimeUtils.defaultTimeZone().getID) + options = Map(DateTimeUtils.TIMEZONE_OPTION -> ZoneId.systemDefault().getId) ).providingClass test("jdbc") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index bbd0220a74f88..030009572deb3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.streaming import java.io.{File, InterruptedIOException, IOException, UncheckedIOException} import java.nio.channels.ClosedByInterruptException +import java.time.ZoneId import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit} import scala.concurrent.TimeoutException @@ -1219,7 +1220,8 @@ class StreamSuite extends StreamTest { } var lastTimestamp = System.currentTimeMillis() - val currentDate = DateTimeUtils.microsToDays(DateTimeUtils.millisToMicros(lastTimestamp)) + val currentDate = DateTimeUtils.microsToDays( + DateTimeUtils.millisToMicros(lastTimestamp), ZoneId.systemDefault) testStream(df) ( AddData(input, 1), CheckLastBatch { rows: Seq[Row] =>