diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RebaseDateTime.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RebaseDateTime.scala index c31dc624b0611..8b23bed6d2788 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RebaseDateTime.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RebaseDateTime.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.util import java.time.{LocalDate, LocalDateTime, LocalTime, ZoneId} import java.time.temporal.ChronoField import java.util.{Calendar, TimeZone} +import java.util.Calendar.{DAY_OF_MONTH, DST_OFFSET, ERA, HOUR_OF_DAY, MINUTE, MONTH, SECOND, YEAR, ZONE_OFFSET} import scala.collection.mutable.AnyRefMap @@ -102,15 +103,15 @@ object RebaseDateTime { .setInstant(Math.multiplyExact(days, MILLIS_PER_DAY)) .build() val localDate = LocalDate.of( - utcCal.get(Calendar.YEAR), - utcCal.get(Calendar.MONTH) + 1, + utcCal.get(YEAR), + utcCal.get(MONTH) + 1, // The number of days will be added later to handle non-existing // Julian dates in Proleptic Gregorian calendar. // For example, 1000-02-29 exists in Julian calendar because 1000 // is a leap year but it is not a leap year in Gregorian calendar. 1) - .`with`(ChronoField.ERA, utcCal.get(Calendar.ERA)) - .plusDays(utcCal.get(Calendar.DAY_OF_MONTH) - 1) + .`with`(ChronoField.ERA, utcCal.get(ERA)) + .plusDays(utcCal.get(DAY_OF_MONTH) - 1) Math.toIntExact(localDate.toEpochDay) } @@ -350,9 +351,9 @@ object RebaseDateTime { // If so, we will take zone offsets from the previous day otherwise from the next day. // This assumes that transitions cannot happen often than once per 2 days. val shift = if (trans.getOffsetBefore == zonedDateTime.getOffset) -1 else 1 - cloned.add(Calendar.DAY_OF_MONTH, shift) - cal.set(Calendar.ZONE_OFFSET, cloned.get(Calendar.ZONE_OFFSET)) - cal.set(Calendar.DST_OFFSET, cloned.get(Calendar.DST_OFFSET)) + cloned.add(DAY_OF_MONTH, shift) + cal.set(ZONE_OFFSET, cloned.get(ZONE_OFFSET)) + cal.set(DST_OFFSET, cloned.get(DST_OFFSET)) } millisToMicros(cal.getTimeInMillis) + ldt.get(ChronoField.MICRO_OF_SECOND) } @@ -413,32 +414,36 @@ object RebaseDateTime { .setInstant(microsToMillis(micros)) .build() val localDateTime = LocalDateTime.of( - cal.get(Calendar.YEAR), - cal.get(Calendar.MONTH) + 1, + cal.get(YEAR), + cal.get(MONTH) + 1, // The number of days will be added later to handle non-existing // Julian dates in Proleptic Gregorian calendar. // For example, 1000-02-29 exists in Julian calendar because 1000 // is a leap year but it is not a leap year in Gregorian calendar. 1, - cal.get(Calendar.HOUR_OF_DAY), - cal.get(Calendar.MINUTE), - cal.get(Calendar.SECOND), + cal.get(HOUR_OF_DAY), + cal.get(MINUTE), + cal.get(SECOND), (Math.floorMod(micros, MICROS_PER_SECOND) * NANOS_PER_MICROS).toInt) - .`with`(ChronoField.ERA, cal.get(Calendar.ERA)) - .plusDays(cal.get(Calendar.DAY_OF_MONTH) - 1) + .`with`(ChronoField.ERA, cal.get(ERA)) + .plusDays(cal.get(DAY_OF_MONTH) - 1) val zonedDateTime = localDateTime.atZone(zoneId) - // Assuming the daylight saving switchover time is 2:00, the local clock will go back to - // 2:00 after hitting 2:59. This means the local time between [2:00, 3:00) appears twice, and - // can map to two different physical times (seconds from the UTC epoch). - // Java 8 time API resolves the ambiguity by picking the earlier physical time. This is the same - // as Java 7 time API, except for 2:00 where Java 7 picks the later physical time. - // Here we detect the "2:00" case and pick the latter physical time, to be compatible with the - // Java 7 date-time. - val adjustedZdt = if (cal.get(Calendar.DST_OFFSET) == 0) { - zonedDateTime.withLaterOffsetAtOverlap() - } else { - zonedDateTime - } + // In the case of local timestamp overlapping, we need to choose the correct time instant + // which is related to the original local timestamp. We look ahead of 1 day, and if the next + // date has the same standard zone and DST offsets, the current local timestamp belongs to + // the period after the transition. In that case, we take the later zoned date time otherwise + // earlier one. Here, we assume that transitions happen not often than once per day. + val trans = zoneId.getRules.getTransition(localDateTime) + val adjustedZdt = if (trans != null && trans.isOverlap) { + val dstOffset = cal.get(DST_OFFSET) + val zoneOffset = cal.get(ZONE_OFFSET) + cal.add(DAY_OF_MONTH, 1) + if (zoneOffset == cal.get(ZONE_OFFSET) && dstOffset == cal.get(DST_OFFSET)) { + zonedDateTime.withLaterOffsetAtOverlap() + } else { + zonedDateTime.withEarlierOffsetAtOverlap() + } + } else zonedDateTime instantToMicros(adjustedZdt.toInstant) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/RebaseDateTimeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/RebaseDateTimeSuite.scala index 8a51f158a9429..b3363169b391e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/RebaseDateTimeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/RebaseDateTimeSuite.scala @@ -445,6 +445,9 @@ class RebaseDateTimeSuite extends SparkFunSuite with Matchers with SQLHelper { // Check reverse rebasing assert(rebaseJulianToGregorianMicros(rebasedEarlierMicros) === earlierMicros) assert(rebaseJulianToGregorianMicros(rebasedLaterMicros) === laterMicros) + // Check reverse not-optimized rebasing + assert(rebaseJulianToGregorianMicros(hkZid, rebasedEarlierMicros) === earlierMicros) + assert(rebaseJulianToGregorianMicros(hkZid, rebasedLaterMicros) === laterMicros) } } }