Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,15 @@ object DateTimeUtils {
// it's 2440587.5, rounding up to compatible with Hive
final val JULIAN_DAY_OF_EPOCH = 2440588

final val GREGORIAN_CUTOVER_DAY = LocalDate.of(1582, 10, 15).toEpochDay
final val GREGORIAN_CUTOVER_MICROS = instantToMicros(
LocalDateTime.of(1582, 10, 15, 0, 0, 0)
.atOffset(ZoneOffset.UTC)
.toInstant)
final val GREGORIAN_CUTOVER_MILLIS = microsToMillis(GREGORIAN_CUTOVER_MICROS)

final val julianCommonEraStart = Timestamp.valueOf("0001-01-01 00:00:00")

final val TimeZoneGMT = TimeZone.getTimeZone("GMT")
final val TimeZoneUTC = TimeZone.getTimeZone("UTC")

Expand Down Expand Up @@ -86,28 +95,50 @@ object DateTimeUtils {
* Returns the number of days since epoch from java.sql.Date.
*/
def fromJavaDate(date: Date): SQLDate = {
microsToDays(millisToMicros(date.getTime))
if (date.getTime < GREGORIAN_CUTOVER_MILLIS) {
val era = if (date.before(julianCommonEraStart)) 0 else 1
val localDate = date.toLocalDate.`with`(ChronoField.ERA, era)
localDateToDays(localDate)
} else {
microsToDays(millisToMicros(date.getTime))
}
}

/**
* Returns a java.sql.Date from number of days since epoch.
*/
def toJavaDate(daysSinceEpoch: SQLDate): Date = {
new Date(microsToMillis(daysToMicros(daysSinceEpoch)))
if (daysSinceEpoch < GREGORIAN_CUTOVER_DAY) {
Date.valueOf(LocalDate.ofEpochDay(daysSinceEpoch))
} else {
new Date(microsToMillis(daysToMicros(daysSinceEpoch)))
}
}

/**
* Returns a java.sql.Timestamp from number of micros since epoch.
*/
def toJavaTimestamp(us: SQLTimestamp): Timestamp = {
Timestamp.from(microsToInstant(us))
if (us < GREGORIAN_CUTOVER_MICROS) {
val ldt = microsToInstant(us).atZone(ZoneId.systemDefault()).toLocalDateTime
Timestamp.valueOf(ldt)
} else {
Timestamp.from(microsToInstant(us))
}
}

/**
* Returns the number of micros since epoch from java.sql.Timestamp.
*/
def fromJavaTimestamp(t: Timestamp): SQLTimestamp = {
instantToMicros(t.toInstant)
if (t.getTime < GREGORIAN_CUTOVER_MILLIS) {
val era = if (t.before(julianCommonEraStart)) 0 else 1
val localDateTime = t.toLocalDateTime.`with`(ChronoField.ERA, era)
val instant = ZonedDateTime.of(localDateTime, ZoneId.systemDefault()).toInstant
instantToMicros(instant)
} else {
instantToMicros(t.toInstant)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class HiveResultSuite extends SharedSparkSession {
import testImplicits._

test("date formatting in hive result") {
val dates = Seq("2018-12-28", "1582-10-13", "1582-10-14", "1582-10-15")
val dates = Seq("2018-12-28", "1582-10-03", "1582-10-04", "1582-10-15")
val df = dates.toDF("a").selectExpr("cast(a as date) as b")
val executedPlan1 = df.queryExecution.executedPlan
val result = HiveResult.hiveResultString(executedPlan1)
Expand All @@ -36,8 +36,8 @@ class HiveResultSuite extends SharedSparkSession {
test("timestamp formatting in hive result") {
val timestamps = Seq(
"2018-12-28 01:02:03",
"1582-10-13 01:02:03",
"1582-10-14 01:02:03",
"1582-10-03 01:02:03",
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Conversions of timestamps in the range 1582-10-04 - 1582-10-15 is implementation specific because of calendar switching.

"1582-10-04 01:02:03",
"1582-10-15 01:02:03")
val df = timestamps.toDF("a").selectExpr("cast(a as timestamp) as b")
val executedPlan1 = df.queryExecution.executedPlan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.orc.storage.ql.exec.vector.*;

import org.apache.spark.sql.catalyst.util.DateTimeUtils;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.types.TimestampType;
Expand Down Expand Up @@ -136,7 +137,7 @@ public int getInt(int rowId) {
public long getLong(int rowId) {
int index = getRowIndex(rowId);
if (isTimestamp) {
return timestampData.time[index] * 1000 + timestampData.nanos[index] / 1000 % 1000;
return DateTimeUtils.fromJavaTimestamp(timestampData.asScratchTimestamp(index));
} else {
return longData.vector[index];
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.hadoop.hive.ql.exec.vector.*;

import org.apache.spark.sql.catalyst.util.DateTimeUtils;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.Decimal;
import org.apache.spark.sql.types.TimestampType;
Expand Down Expand Up @@ -136,7 +137,7 @@ public int getInt(int rowId) {
public long getLong(int rowId) {
int index = getRowIndex(rowId);
if (isTimestamp) {
return timestampData.time[index] * 1000 + timestampData.nanos[index] / 1000 % 1000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took a look at ORC type spec but it doesn't mention the calendar. The physical timestamp type looks very similar to Timestamp, so this looks correct to me.

How about the write side?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The write side uses toJavaTimestamp already:

case TimestampType => (getter, ordinal) =>
val ts = DateTimeUtils.toJavaTimestamp(getter.getLong(ordinal))
val result = new OrcTimestamp(ts.getTime)
result.setNanos(ts.getNanos)
result
. So after this PR, it will do rebasing automatically

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return DateTimeUtils.fromJavaTimestamp(timestampData.asScratchTimestamp(index));
} else {
return longData.vector[index];
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
package org.apache.spark.sql.hive

import java.lang.reflect.{ParameterizedType, Type, WildcardType}
import java.util.concurrent.TimeUnit._
import java.time.LocalDate
import java.util.Calendar

import scala.collection.JavaConverters._

Expand Down Expand Up @@ -181,6 +182,33 @@ import org.apache.spark.unsafe.types.UTF8String
*/
private[hive] trait HiveInspectors {

private final val JULIAN_CUTOVER_DAY =
rebaseGregorianToJulianDays(DateTimeUtils.GREGORIAN_CUTOVER_DAY.toInt)

private def rebaseJulianToGregorianDays(daysSinceEpoch: Int): Int = {
val localDate = LocalDate.ofEpochDay(daysSinceEpoch)
val utcCal = new Calendar.Builder()
.setCalendarType("gregory")
.setTimeZone(DateTimeUtils.TimeZoneUTC)
.setDate(localDate.getYear, localDate.getMonthValue - 1, localDate.getDayOfMonth)
.build()
Math.toIntExact(Math.floorDiv(utcCal.getTimeInMillis, DateTimeConstants.MILLIS_PER_DAY))
}

private def rebaseGregorianToJulianDays(daysSinceEpoch: Int): Int = {
val millis = Math.multiplyExact(daysSinceEpoch, DateTimeConstants.MILLIS_PER_DAY)
val utcCal = new Calendar.Builder()
.setCalendarType("gregory")
.setTimeZone(DateTimeUtils.TimeZoneUTC)
.setInstant(millis)
.build()
val localDate = LocalDate.of(
utcCal.get(Calendar.YEAR),
utcCal.get(Calendar.MONTH) + 1,
utcCal.get(Calendar.DAY_OF_MONTH))
Math.toIntExact(localDate.toEpochDay)
}

def javaTypeToDataType(clz: Type): DataType = clz match {
// writable
case c: Class[_] if c == classOf[hadoopIo.DoubleWritable] => DoubleType
Expand Down Expand Up @@ -466,7 +494,7 @@ private[hive] trait HiveInspectors {
_ => constant
case poi: WritableConstantTimestampObjectInspector =>
val t = poi.getWritableConstantValue
val constant = SECONDS.toMicros(t.getSeconds) + NANOSECONDS.toMicros(t.getNanos)
val constant = DateTimeUtils.fromJavaTimestamp(t.getTimestamp)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The bug fix should be made independently from the PR, I think.

_ => constant
case poi: WritableConstantIntObjectInspector =>
val constant = poi.getWritableConstantValue.get()
Expand Down Expand Up @@ -618,7 +646,14 @@ private[hive] trait HiveInspectors {
case x: DateObjectInspector if x.preferWritable() =>
data: Any => {
if (data != null) {
DateTimeUtils.fromJavaDate(x.getPrimitiveWritableObject(data).get())
// Rebasing written days via conversion to local dates.
// See the comment for `getDateWritable()`.
val daysSinceEpoch = x.getPrimitiveWritableObject(data).getDays
if (daysSinceEpoch < JULIAN_CUTOVER_DAY) {
rebaseJulianToGregorianDays(daysSinceEpoch)
} else {
daysSinceEpoch
}
} else {
null
}
Expand All @@ -634,8 +669,7 @@ private[hive] trait HiveInspectors {
case x: TimestampObjectInspector if x.preferWritable() =>
data: Any => {
if (data != null) {
val t = x.getPrimitiveWritableObject(data)
SECONDS.toMicros(t.getSeconds) + NANOSECONDS.toMicros(t.getNanos)
DateTimeUtils.fromJavaTimestamp(x.getPrimitiveWritableObject(data).getTimestamp)
} else {
null
}
Expand Down Expand Up @@ -1012,7 +1046,27 @@ private[hive] trait HiveInspectors {
}

private def getDateWritable(value: Any): hiveIo.DateWritable =
if (value == null) null else new hiveIo.DateWritable(value.asInstanceOf[Int])
if (value == null) {
null
} else {
// Rebasing days since the epoch to store the same number of days
// as by Spark 2.4 and earlier versions. Spark 3.0 switched to
// Proleptic Gregorian calendar (see SPARK-26651), and as a consequence of that,
// this affects dates before 1582-10-15. Spark 2.4 and earlier versions use
// Julian calendar for dates before 1582-10-15. So, the same local date may
// be mapped to different number of days since the epoch in different calendars.
// For example:
// Proleptic Gregorian calendar: 1582-01-01 -> -141714
// Julian calendar: 1582-01-01 -> -141704
// The code below converts -141714 to -141704.
Copy link
Member Author

@MaxGekk MaxGekk Mar 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gregorian year is shorter than Julian year: 365.2425 days vs 365.25 days, so the same local date in Gregorian calendar requires less days in Julian calendar.

val daysSinceEpoch = value.asInstanceOf[Int]
val rebasedDays = if (daysSinceEpoch < DateTimeUtils.GREGORIAN_CUTOVER_DAY) {
rebaseGregorianToJulianDays(daysSinceEpoch)
} else {
daysSinceEpoch
}
new hiveIo.DateWritable(rebasedDays)
}

private def getTimestampWritable(value: Any): hiveIo.TimestampWritable =
if (value == null) {
Expand Down