Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,6 @@ object DateTimeUtils {
// it's 2440587.5, rounding up to compatible with Hive
final val JULIAN_DAY_OF_EPOCH = 2440588

final val GREGORIAN_CUTOVER_DAY = LocalDate.of(1582, 10, 15).toEpochDay
final val GREGORIAN_CUTOVER_MICROS = instantToMicros(
LocalDateTime.of(1582, 10, 15, 0, 0, 0)
.atOffset(ZoneOffset.UTC)
.toInstant)
final val GREGORIAN_CUTOVER_MILLIS = microsToMillis(GREGORIAN_CUTOVER_MICROS)

final val julianCommonEraStart = Timestamp.valueOf("0001-01-01 00:00:00")

final val TimeZoneGMT = TimeZone.getTimeZone("GMT")
Expand Down Expand Up @@ -92,53 +85,103 @@ object DateTimeUtils {
}

/**
* Returns the number of days since epoch from java.sql.Date.
* Converts an instance of `java.sql.Date` to a number of days since the epoch
* 1970-01-01 via extracting date fields `year`, `month`, `days` from the input,
* creating a local date in Proleptic Gregorian calendar from the fields, and
* getting the number of days from the resulted local date.
*
* This approach was taken to have the same local date as the triple of `year`,
* `month`, `day` in the original hybrid calendar used by `java.sql.Date` and
* Proleptic Gregorian calendar used by Spark since version 3.0.0, see SPARK-26651.
*
* @param date It represents a specific instant in time based on
* the hybrid calendar which combines Julian and
* Gregorian calendars.
* @return The number of days since epoch from java.sql.Date.
*/
def fromJavaDate(date: Date): SQLDate = {
if (date.getTime < GREGORIAN_CUTOVER_MILLIS) {
val era = if (date.before(julianCommonEraStart)) 0 else 1
val localDate = date.toLocalDate.`with`(ChronoField.ERA, era)
localDateToDays(localDate)
} else {
microsToDays(millisToMicros(date.getTime))
}
val era = if (date.before(julianCommonEraStart)) 0 else 1
val localDate = LocalDate
.of(date.getYear + 1900, date.getMonth + 1, 1)
.`with`(ChronoField.ERA, era)
// Add days separately to convert dates existed in Julian calendar but not
// in Proleptic Gregorian calendar. For example, 1000-02-29 is valid date
// in Julian calendar because 1000 is a leap year but 1000 is not a leap
// year in Proleptic Gregorian calendar. And 1000-02-29 doesn't exist in it.
.plusDays(date.getDate - 1) // Returns the next valid date after `date.getDate - 1` days
localDateToDays(localDate)
}

/**
* Returns a java.sql.Date from number of days since epoch.
* The opposite to `fromJavaDate` method which converts a number of days to an
* instance of `java.sql.Date`. It builds a local date in Proleptic Gregorian
* calendar, extracts date fields `year`, `month`, `day`, and creates a local
* date in the hybrid calendar (Julian + Gregorian calendars) from the fields.
*
* The purpose of the conversion is to have the same local date as the triple
* of `year`, `month`, `day` in the original Proleptic Gregorian calendar and
* in the target calender.
*
* @param daysSinceEpoch The number of days since 1970-01-01.
* @return A `java.sql.Date` from number of days since epoch.
*/
def toJavaDate(daysSinceEpoch: SQLDate): Date = {
if (daysSinceEpoch < GREGORIAN_CUTOVER_DAY) {
Date.valueOf(LocalDate.ofEpochDay(daysSinceEpoch))
} else {
new Date(microsToMillis(daysToMicros(daysSinceEpoch)))
}
val localDate = LocalDate.ofEpochDay(daysSinceEpoch)
new Date(localDate.getYear - 1900, localDate.getMonthValue - 1, localDate.getDayOfMonth)
}

/**
* Returns a java.sql.Timestamp from number of micros since epoch.
* Converts microseconds since the epoch to an instance of `java.sql.Timestamp`
* via creating a local timestamp at the system time zone in Proleptic Gregorian
* calendar, extracting date and time fields like `year` and `hours`, and forming
* new timestamp in the hybrid calendar from the extracted fields.
*
* The conversion is based on the JVM system time zone because the `java.sql.Timestamp`
* uses the time zone internally.
*
* The method performs the conversion via local timestamp fields to have the same date-time
* representation as `year`, `month`, `day`, ..., `seconds` in the original calendar
* and in the target calendar.
*
* @param us The number of microseconds since 1970-01-01T00:00:00.000000Z.
* @return A `java.sql.Timestamp` from number of micros since epoch.
*/
def toJavaTimestamp(us: SQLTimestamp): Timestamp = {
if (us < GREGORIAN_CUTOVER_MICROS) {
val ldt = microsToInstant(us).atZone(ZoneId.systemDefault()).toLocalDateTime
Timestamp.valueOf(ldt)
} else {
Timestamp.from(microsToInstant(us))
}
val ldt = microsToInstant(us).atZone(ZoneId.systemDefault()).toLocalDateTime
Timestamp.valueOf(ldt)
}

/**
* Returns the number of micros since epoch from java.sql.Timestamp.
* Converts an instance of `java.sql.Timestamp` to the number of microseconds since
* 1970-01-01T00:00:00.000000Z. It extracts date-time fields from the input, builds
* a local timestamp in Proleptic Gregorian calendar from the fields, and binds
* the timestamp to the system time zone. The resulted instant is converted to
* microseconds since the epoch.
*
* The conversion is performed via the system time zone because it is used internally
* in `java.sql.Timestamp` while extracting date-time fields.
*
* The goal of the function is to have the same local date-time in the original calendar
* - the hybrid calendar (Julian + Gregorian) and in the target calendar which is
* Proleptic Gregorian calendar, see SPARK-26651.
*
* @param t It represents a specific instant in time based on
* the hybrid calendar which combines Julian and
* Gregorian calendars.
* @return The number of micros since epoch from `java.sql.Timestamp`.
*/
def fromJavaTimestamp(t: Timestamp): SQLTimestamp = {
if (t.getTime < GREGORIAN_CUTOVER_MILLIS) {
val era = if (t.before(julianCommonEraStart)) 0 else 1
val localDateTime = t.toLocalDateTime.`with`(ChronoField.ERA, era)
val instant = ZonedDateTime.of(localDateTime, ZoneId.systemDefault()).toInstant
instantToMicros(instant)
} else {
instantToMicros(t.toInstant)
}
val era = if (t.before(julianCommonEraStart)) 0 else 1
val localDateTime = LocalDateTime.of(
t.getYear + 1900, t.getMonth + 1, 1,
t.getHours, t.getMinutes, t.getSeconds, t.getNanos)
.`with`(ChronoField.ERA, era)
// Add days separately to convert dates existed in Julian calendar but not
// in Proleptic Gregorian calendar. For example, 1000-02-29 is valid date
// in Julian calendar because 1000 is a leap year but 1000 is not a leap
// year in Proleptic Gregorian calendar. And 1000-02-29 doesn't exist in it.
.plusDays(t.getDate - 1) // Returns the next valid date after `date.getDate - 1` days
instantToMicros(localDateTime.atZone(ZoneId.systemDefault).toInstant)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils}
import org.apache.spark.sql.catalyst.util.DateTimeConstants.MICROS_PER_DAY
import org.apache.spark.sql.catalyst.util.DateTimeTestUtils
import org.apache.spark.sql.catalyst.util.IntervalUtils._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -845,9 +845,11 @@ class CollectionExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper

test("Sequence on DST boundaries") {
val timeZone = TimeZone.getTimeZone("Europe/Prague")
val dstOffset = timeZone.getDSTSavings

def noDST(t: Timestamp): Timestamp = new Timestamp(t.getTime - dstOffset)
def ts(s: String, noDST: Boolean = false): Long = {
val offset = if (noDST) timeZone.getDSTSavings else 0
DateTimeUtils.millisToMicros(Timestamp.valueOf(s).getTime - offset)
}

DateTimeTestUtils.withDefaultTimeZone(timeZone) {
// Spring time change
Expand All @@ -856,23 +858,23 @@ class CollectionExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper
Literal(Timestamp.valueOf("2018-03-25 03:30:00")),
Literal(stringToInterval("interval 30 minutes"))),
Seq(
Timestamp.valueOf("2018-03-25 01:30:00"),
Timestamp.valueOf("2018-03-25 03:00:00"),
Timestamp.valueOf("2018-03-25 03:30:00")))
ts("2018-03-25 01:30:00"),
ts("2018-03-25 03:00:00"),
ts("2018-03-25 03:30:00")))

// Autumn time change
checkEvaluation(new Sequence(
Literal(Timestamp.valueOf("2018-10-28 01:30:00")),
Literal(Timestamp.valueOf("2018-10-28 03:30:00")),
Literal(stringToInterval("interval 30 minutes"))),
Seq(
Timestamp.valueOf("2018-10-28 01:30:00"),
noDST(Timestamp.valueOf("2018-10-28 02:00:00")),
noDST(Timestamp.valueOf("2018-10-28 02:30:00")),
Timestamp.valueOf("2018-10-28 02:00:00"),
Timestamp.valueOf("2018-10-28 02:30:00"),
Timestamp.valueOf("2018-10-28 03:00:00"),
Timestamp.valueOf("2018-10-28 03:30:00")))
ts("2018-10-28 01:30:00"),
ts("2018-10-28 02:00:00", noDST = true),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to change the test because of the 2 lines. I wasn't able to build 2 timestamps via Timestamp.valueOf after rebasing. Timestamp class does normalization underneath, and replaces milliseconds since the epoch, and as a consequence of that checking of the results fails. Even textual representation, and year, month, ... nanos are the same.

ts("2018-10-28 02:30:00", noDST = true),
ts("2018-10-28 02:00:00"),
ts("2018-10-28 02:30:00"),
ts("2018-10-28 03:00:00"),
ts("2018-10-28 03:30:00")))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,9 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper {
}

test("SPARK-6785: java date conversion before and after epoch") {
def format(d: Date): String = {
TimestampFormatter("yyyy-MM-dd", defaultTimeZone().toZoneId)
.format(millisToMicros(d.getTime))
}
def checkFromToJavaDate(d1: Date): Unit = {
val d2 = toJavaDate(fromJavaDate(d1))
assert(format(d2) === format(d1))
assert(d2.toString === d1.toString)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

val df1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US)
Expand Down Expand Up @@ -500,17 +496,16 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper {
}
}

withDefaultTimeZone(TimeZone.getTimeZone("PST")) {
val tz = "America/Los_Angeles"
withDefaultTimeZone(TimeZone.getTimeZone(tz)) {
// Daylight Saving Time
test("2016-03-13 01:59:59", "PST", "2016-03-13 09:59:59.0")
// 2016-03-13 02:00:00 PST does not exists
test("2016-03-13 02:00:00", "PST", "2016-03-13 10:00:00.0")
test("2016-03-13 03:00:00", "PST", "2016-03-13 10:00:00.0")
test("2016-11-06 00:59:59", "PST", "2016-11-06 07:59:59.0")
// 2016-11-06 01:00:00 PST could be 2016-11-06 08:00:00 UTC or 2016-11-06 09:00:00 UTC
test("2016-11-06 01:00:00", "PST", "2016-11-06 09:00:00.0")
test("2016-11-06 01:59:59", "PST", "2016-11-06 09:59:59.0")
test("2016-11-06 02:00:00", "PST", "2016-11-06 10:00:00.0")
test("2016-03-13 01:59:59", tz, "2016-03-13 09:59:59.0")
test("2016-03-13 02:00:00", tz, "2016-03-13 10:00:00.0")
test("2016-03-13 03:00:00", tz, "2016-03-13 10:00:00.0")
test("2016-11-06 00:59:59", tz, "2016-11-06 07:59:59.0")
test("2016-11-06 01:00:00", tz, "2016-11-06 08:00:00.0")
test("2016-11-06 01:59:59", tz, "2016-11-06 08:59:59.0")
test("2016-11-06 02:00:00", tz, "2016-11-06 10:00:00.0")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.sql.Date
import org.apache.hadoop.hive.serde2.io.DateWritable
import org.apache.hadoop.io.WritableUtils

import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{rebaseGregorianToJulianDays, rebaseJulianToGregorianDays}

/**
* The class accepts/returns days in Gregorian calendar and rebase them
Expand All @@ -41,13 +41,13 @@ private[hive] class DaysWritable(
extends DateWritable {

def this(gregorianDays: Int) =
this(gregorianDays, DaysWritable.rebaseGregorianToJulianDays(gregorianDays))
this(gregorianDays, rebaseGregorianToJulianDays(gregorianDays))
def this(dateWritable: DateWritable) = {
this(
gregorianDays = dateWritable match {
case daysWritable: DaysWritable => daysWritable.gregorianDays
case dateWritable: DateWritable =>
DaysWritable.rebaseJulianToGregorianDays(dateWritable.getDays)
rebaseJulianToGregorianDays(dateWritable.getDays)
},
julianDays = dateWritable.getDays)
}
Expand All @@ -63,37 +63,6 @@ private[hive] class DaysWritable(
@throws[IOException]
override def readFields(in: DataInput): Unit = {
julianDays = WritableUtils.readVInt(in)
gregorianDays = DaysWritable.rebaseJulianToGregorianDays(julianDays)
gregorianDays = rebaseJulianToGregorianDays(julianDays)
}
}

private[hive] object DaysWritable {
// Rebasing days since the epoch to store the same number of days
// as by Spark 2.4 and earlier versions. Spark 3.0 switched to
// Proleptic Gregorian calendar (see SPARK-26651), and as a consequence of that,
// this affects dates before 1582-10-15. Spark 2.4 and earlier versions use
// Julian calendar for dates before 1582-10-15. So, the same local date may
// be mapped to different number of days since the epoch in different calendars.
// For example:
// Proleptic Gregorian calendar: 1582-01-01 -> -141714
// Julian calendar: 1582-01-01 -> -141704
// The code below converts -141714 to -141704.
def rebaseGregorianToJulianDays(daysSinceEpoch: Int): Int = {
if (daysSinceEpoch < DateTimeUtils.GREGORIAN_CUTOVER_DAY) {
DateTimeUtils.rebaseGregorianToJulianDays(daysSinceEpoch)
} else {
daysSinceEpoch
}
}

def rebaseJulianToGregorianDays(daysSinceEpoch: Int): Int = {
if (daysSinceEpoch < JULIAN_CUTOVER_DAY) {
DateTimeUtils.rebaseJulianToGregorianDays(daysSinceEpoch)
} else {
daysSinceEpoch
}
}

final val JULIAN_CUTOVER_DAY =
rebaseGregorianToJulianDays(DateTimeUtils.GREGORIAN_CUTOVER_DAY.toInt)
}