Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.util
import java.time.{LocalDate, LocalDateTime, LocalTime, ZoneId}
import java.time.temporal.ChronoField
import java.util.{Calendar, TimeZone}
import java.util.Calendar.{DAY_OF_MONTH, DST_OFFSET, ERA, HOUR_OF_DAY, MINUTE, MONTH, SECOND, YEAR, ZONE_OFFSET}

import scala.collection.mutable.AnyRefMap

Expand Down Expand Up @@ -102,15 +103,15 @@ object RebaseDateTime {
.setInstant(Math.multiplyExact(days, MILLIS_PER_DAY))
.build()
val localDate = LocalDate.of(
utcCal.get(Calendar.YEAR),
utcCal.get(Calendar.MONTH) + 1,
utcCal.get(YEAR),
utcCal.get(MONTH) + 1,
// The number of days will be added later to handle non-existing
// Julian dates in Proleptic Gregorian calendar.
// For example, 1000-02-29 exists in Julian calendar because 1000
// is a leap year but it is not a leap year in Gregorian calendar.
1)
.`with`(ChronoField.ERA, utcCal.get(Calendar.ERA))
.plusDays(utcCal.get(Calendar.DAY_OF_MONTH) - 1)
.`with`(ChronoField.ERA, utcCal.get(ERA))
.plusDays(utcCal.get(DAY_OF_MONTH) - 1)
Math.toIntExact(localDate.toEpochDay)
}

Expand Down Expand Up @@ -350,9 +351,9 @@ object RebaseDateTime {
// If so, we will take zone offsets from the previous day otherwise from the next day.
// This assumes that transitions cannot happen often than once per 2 days.
val shift = if (trans.getOffsetBefore == zonedDateTime.getOffset) -1 else 1
cloned.add(Calendar.DAY_OF_MONTH, shift)
cal.set(Calendar.ZONE_OFFSET, cloned.get(Calendar.ZONE_OFFSET))
cal.set(Calendar.DST_OFFSET, cloned.get(Calendar.DST_OFFSET))
cloned.add(DAY_OF_MONTH, shift)
cal.set(ZONE_OFFSET, cloned.get(ZONE_OFFSET))
cal.set(DST_OFFSET, cloned.get(DST_OFFSET))
}
millisToMicros(cal.getTimeInMillis) + ldt.get(ChronoField.MICRO_OF_SECOND)
}
Expand Down Expand Up @@ -413,32 +414,36 @@ object RebaseDateTime {
.setInstant(microsToMillis(micros))
.build()
val localDateTime = LocalDateTime.of(
cal.get(Calendar.YEAR),
cal.get(Calendar.MONTH) + 1,
cal.get(YEAR),
cal.get(MONTH) + 1,
// The number of days will be added later to handle non-existing
// Julian dates in Proleptic Gregorian calendar.
// For example, 1000-02-29 exists in Julian calendar because 1000
// is a leap year but it is not a leap year in Gregorian calendar.
1,
cal.get(Calendar.HOUR_OF_DAY),
cal.get(Calendar.MINUTE),
cal.get(Calendar.SECOND),
cal.get(HOUR_OF_DAY),
cal.get(MINUTE),
cal.get(SECOND),
(Math.floorMod(micros, MICROS_PER_SECOND) * NANOS_PER_MICROS).toInt)
.`with`(ChronoField.ERA, cal.get(Calendar.ERA))
.plusDays(cal.get(Calendar.DAY_OF_MONTH) - 1)
.`with`(ChronoField.ERA, cal.get(ERA))
.plusDays(cal.get(DAY_OF_MONTH) - 1)
val zonedDateTime = localDateTime.atZone(zoneId)
// Assuming the daylight saving switchover time is 2:00, the local clock will go back to
// 2:00 after hitting 2:59. This means the local time between [2:00, 3:00) appears twice, and
// can map to two different physical times (seconds from the UTC epoch).
// Java 8 time API resolves the ambiguity by picking the earlier physical time. This is the same
// as Java 7 time API, except for 2:00 where Java 7 picks the later physical time.
// Here we detect the "2:00" case and pick the latter physical time, to be compatible with the
// Java 7 date-time.
val adjustedZdt = if (cal.get(Calendar.DST_OFFSET) == 0) {
zonedDateTime.withLaterOffsetAtOverlap()
} else {
zonedDateTime
}
// In the case of local timestamp overlapping, we need to choose the correct time instant
// which is related to the original local timestamp. We look ahead of 1 day, and if the next
// date has the same standard zone and DST offsets, the current local timestamp belongs to
// the period after the transition. In that case, we take the later zoned date time otherwise
// earlier one. Here, we assume that transitions happen not often than once per day.
val trans = zoneId.getRules.getTransition(localDateTime)
val adjustedZdt = if (trans != null && trans.isOverlap) {
val dstOffset = cal.get(DST_OFFSET)
val zoneOffset = cal.get(ZONE_OFFSET)
cal.add(DAY_OF_MONTH, 1)
if (zoneOffset == cal.get(ZONE_OFFSET) && dstOffset == cal.get(DST_OFFSET)) {
zonedDateTime.withLaterOffsetAtOverlap()
} else {
zonedDateTime.withEarlierOffsetAtOverlap()
}
} else zonedDateTime
instantToMicros(adjustedZdt.toInstant)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,9 @@ class RebaseDateTimeSuite extends SparkFunSuite with Matchers with SQLHelper {
// Check reverse rebasing
assert(rebaseJulianToGregorianMicros(rebasedEarlierMicros) === earlierMicros)
assert(rebaseJulianToGregorianMicros(rebasedLaterMicros) === laterMicros)
// Check reverse not-optimized rebasing
assert(rebaseJulianToGregorianMicros(hkZid, rebasedEarlierMicros) === earlierMicros)
assert(rebaseJulianToGregorianMicros(hkZid, rebasedLaterMicros) === laterMicros)
}
}
}