Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.catalyst.util

import java.lang.{Long => JLong}
import java.sql.{Date, Timestamp}
import java.text.{DateFormat, SimpleDateFormat}
import java.util.{TimeZone, Calendar}
Expand Down Expand Up @@ -56,6 +57,13 @@ object DateTimeUtils {

@transient lazy val defaultTimeZone = TimeZone.getDefault

// Constants defining the allowed ranges for timestamps that can be parsed by java.sql.Timestamp.
// Limits are calculated based on UTC.
private[spark] final val MIN_TIMESTAMP: Long =
Timestamp.valueOf("0001-01-01 00:00:00").getTime() + defaultTimeZone.getRawOffset()
private[spark] final val MAX_TIMESTAMP: Long =
Timestamp.valueOf("9999-12-31 23:59:59.999999").getTime() + defaultTimeZone.getRawOffset()

// Java TimeZone has no mention of thread safety. Use thread local instance to be safe.
private val threadLocalLocalTimeZone = new ThreadLocal[TimeZone] {
override protected def initialValue: TimeZone = {
Expand All @@ -82,7 +90,9 @@ object DateTimeUtils {
// SPARK-6785: use Math.floor so negative number of days (dates before 1970)
// will correctly work as input for function toJavaDate(Int)
val millisLocal = millisUtc + threadLocalLocalTimeZone.get().getOffset(millisUtc)
Math.floor(millisLocal.toDouble / MILLIS_PER_DAY).toInt
val days = Math.floor(millisLocal.toDouble / MILLIS_PER_DAY)
require(days <= Integer.MAX_VALUE && days >= Integer.MIN_VALUE, "Date exceeeds allowed range.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd assume the millisUtc is a valid timestamp, then this check is not needed.

days.toInt
}

// reverse of millisToDays
Expand Down Expand Up @@ -172,6 +182,8 @@ object DateTimeUtils {
*/
def fromJavaTimestamp(t: Timestamp): SQLTimestamp = {
if (t != null) {
require(t.getTime() <= JLong.MAX_VALUE / 1000 && t.getTime() >= JLong.MIN_VALUE / 1000,
s"Timestamp exceeds allowed range (${t.getTime()}).")
t.getTime() * 1000L + (t.getNanos().toLong / 1000) % 1000L
} else {
0L
Expand All @@ -193,9 +205,15 @@ object DateTimeUtils {
*/
def toJulianDay(us: SQLTimestamp): (Int, Long) = {
val seconds = us / MICROS_PER_SECOND
val day = seconds / SECONDS_PER_DAY + JULIAN_DAY_OF_EPOCH
val secondsInDay = seconds % SECONDS_PER_DAY
val nanos = (us % MICROS_PER_SECOND) * 1000L
var day = seconds / SECONDS_PER_DAY + JULIAN_DAY_OF_EPOCH
var secondsInDay = seconds % SECONDS_PER_DAY
var nanos = (us % MICROS_PER_SECOND) * 1000L
if (us < 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!

day -= 1
secondsInDay += SECONDS_PER_DAY
nanos += (SECONDS_PER_DAY * MICROS_PER_SECOND * 1000L)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not correct, it should be:


val julian_us = us + JULIAN_DAY_OF_EPOCH * MICROSECONDS_PER_DAY
val day = julian_us / MICROSECONDS_PER_DAY
val micros = julian_us % MICROSECONDS_PER_DAY
(day.toInt, micros * 1000L)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch; my code can return nanos > 999999999 which java.sql.Timestamp does not like...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created https://issues.apache.org/jira/browse/SPARK-10522 for this, then we could back port the fix for 1.5.

require(day >= 0, "Timestamp exceeds allowed range.")
}
(day.toInt, secondsInDay * NANOS_PER_SECOND + nanos)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import scala.util.Random
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.unsafe.types.CalendarInterval

/**
Expand Down Expand Up @@ -109,22 +110,11 @@ object RandomDataGenerator {
})
case BooleanType => Some(() => rand.nextBoolean())
case DateType => Some(() => new java.sql.Date(rand.nextInt()))
case TimestampType =>
val generator =
() => {
var milliseconds = rand.nextLong() % 253402329599999L
// -62135740800000L is the number of milliseconds before January 1, 1970, 00:00:00 GMT
// for "0001-01-01 00:00:00.000000". We need to find a
// number that is greater or equals to this number as a valid timestamp value.
while (milliseconds < -62135740800000L) {
// 253402329599999L is the the number of milliseconds since
// January 1, 1970, 00:00:00 GMT for "9999-12-31 23:59:59.999999".
milliseconds = rand.nextLong() % 253402329599999L
}
// DateTimeUtils.toJavaTimestamp takes microsecond.
DateTimeUtils.toJavaTimestamp(milliseconds * 1000)
}
Some(generator)
case TimestampType => Some { () =>
val range = DateTimeUtils.MAX_TIMESTAMP - DateTimeUtils.MIN_TIMESTAMP
val milliseconds = (range * rand.nextDouble()).toLong + DateTimeUtils.MIN_TIMESTAMP
DateTimeUtils.toJavaTimestamp(milliseconds * 1000)
}
case CalendarIntervalType => Some(() => {
val months = rand.nextInt(1000)
val ns = rand.nextLong()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.scalatest.Matchers
import org.scalatest.prop.GeneratorDrivenPropertyChecks

import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.unsafe.types.CalendarInterval

/**
Expand Down Expand Up @@ -94,7 +95,9 @@ object LiteralGenerator {
for { d <- Arbitrary.arbInt.arbitrary } yield Literal.create(new Date(d), DateType)

lazy val timestampLiteralGen: Gen[Literal] =
for { t <- Arbitrary.arbLong.arbitrary } yield Literal.create(new Timestamp(t), TimestampType)
for { t <- Gen.chooseNum(DateTimeUtils.MIN_TIMESTAMP, DateTimeUtils.MAX_TIMESTAMP) } yield {
Literal.create(new Timestamp(t), TimestampType)
}

lazy val calendarIntervalLiterGen: Gen[Literal] =
for { m <- Arbitrary.arbInt.arbitrary; s <- Arbitrary.arbLong.arbitrary}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.catalyst.util

import java.lang.{Long => JLong}
import java.sql.{Date, Timestamp}
import java.text.SimpleDateFormat
import java.util.{Calendar, TimeZone}
Expand Down Expand Up @@ -425,4 +426,34 @@ class DateTimeUtilsSuite extends SparkFunSuite {
test("2011-12-25 01:00:00.123456", "PST", "2011-12-25 09:00:00.123456")
test("2011-12-25 17:00:00.123456", "Asia/Shanghai", "2011-12-25 09:00:00.123456")
}

test("SPARK-10439: bound checks") {
// Avoid truncation when converting from ms to us.
Seq(JLong.MIN_VALUE, JLong.MAX_VALUE).foreach { ts =>
intercept[IllegalArgumentException] {
fromJavaTimestamp(new Timestamp(ts))
}
}

// Avoid negative Julian day counts since the Hive/Impala timestamp spec does not
// expect them.
val julianDay = -(JULIAN_DAY_OF_EPOCH * SECONDS_PER_DAY * MICROS_PER_SECOND)
intercept[IllegalArgumentException] {
toJulianDay(julianDay - 1)
}

// Make sure calculated nanos are positive, since that's what the Hive/Impala timestamp
// spec expects.
val (_, nanos) = toJulianDay(julianDay + 1)
assert(nanos >= 0)

// Make sure timestamp in ms does not overflow number of days returned as int. millisToDays()
// considers the current time zone, so add 2 full days when checking to be sure.
val msPerDay = SECONDS_PER_DAY * 1000L
val tooManyDays = Integer.MAX_VALUE * msPerDay
intercept[IllegalArgumentException] {
millisToDays(tooManyDays + 2 * msPerDay)
}
}

}