Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.catalyst.util

import java.lang.{Long => JLong}
import java.sql.{Date, Timestamp}
import java.text.{DateFormat, SimpleDateFormat}
import java.util.{TimeZone, Calendar}
Expand All @@ -41,6 +42,7 @@ object DateTimeUtils {
final val JULIAN_DAY_OF_EPOCH = 2440588
final val SECONDS_PER_DAY = 60 * 60 * 24L
final val MICROS_PER_SECOND = 1000L * 1000L
final val MICROS_PER_DAY = MICROS_PER_SECOND * SECONDS_PER_DAY
final val NANOS_PER_SECOND = MICROS_PER_SECOND * 1000L

final val MILLIS_PER_DAY = SECONDS_PER_DAY * 1000L
Expand All @@ -56,6 +58,13 @@ object DateTimeUtils {

@transient lazy val defaultTimeZone = TimeZone.getDefault

// Constants defining the allowed ranges for timestamps that can be parsed by java.sql.Timestamp.
// Limits are calculated based on UTC.
private[spark] final val MIN_TIMESTAMP: Long =
Timestamp.valueOf("0001-01-01 00:00:00").getTime() + defaultTimeZone.getRawOffset()
private[spark] final val MAX_TIMESTAMP: Long =
Timestamp.valueOf("9999-12-31 23:59:59.999999").getTime() + defaultTimeZone.getRawOffset()

// Java TimeZone has no mention of thread safety. Use thread local instance to be safe.
private val threadLocalLocalTimeZone = new ThreadLocal[TimeZone] {
override protected def initialValue: TimeZone = {
Expand Down Expand Up @@ -139,6 +148,8 @@ object DateTimeUtils {
* Returns the number of days since epoch from from java.sql.Date.
*/
def fromJavaDate(date: Date): SQLDate = {
require(date.getTime() <= MAX_TIMESTAMP && date.getTime() >= MIN_TIMESTAMP,
s"Timestamp exceeds allowed range.")
millisToDays(date.getTime)
}

Expand Down Expand Up @@ -172,6 +183,8 @@ object DateTimeUtils {
*/
def fromJavaTimestamp(t: Timestamp): SQLTimestamp = {
if (t != null) {
require(t.getTime() <= MAX_TIMESTAMP && t.getTime() >= MIN_TIMESTAMP,
s"Timestamp exceeds allowed range.")
t.getTime() * 1000L + (t.getNanos().toLong / 1000) % 1000L
} else {
0L
Expand All @@ -192,11 +205,10 @@ object DateTimeUtils {
* Returns Julian day and nanoseconds in a day from the number of microseconds
*/
def toJulianDay(us: SQLTimestamp): (Int, Long) = {
val seconds = us / MICROS_PER_SECOND
val day = seconds / SECONDS_PER_DAY + JULIAN_DAY_OF_EPOCH
val secondsInDay = seconds % SECONDS_PER_DAY
val nanos = (us % MICROS_PER_SECOND) * 1000L
(day.toInt, secondsInDay * NANOS_PER_SECOND + nanos)
val usFromJulianEpoch = us + JULIAN_DAY_OF_EPOCH * MICROS_PER_DAY
val day = usFromJulianEpoch / MICROS_PER_DAY
val micros = usFromJulianEpoch % MICROS_PER_DAY
(day.toInt, micros * 1000L)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import scala.util.Random
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.unsafe.types.CalendarInterval

/**
Expand Down Expand Up @@ -109,22 +110,11 @@ object RandomDataGenerator {
})
case BooleanType => Some(() => rand.nextBoolean())
case DateType => Some(() => new java.sql.Date(rand.nextInt()))
case TimestampType =>
val generator =
() => {
var milliseconds = rand.nextLong() % 253402329599999L
// -62135740800000L is the number of milliseconds before January 1, 1970, 00:00:00 GMT
// for "0001-01-01 00:00:00.000000". We need to find a
// number that is greater or equals to this number as a valid timestamp value.
while (milliseconds < -62135740800000L) {
// 253402329599999L is the the number of milliseconds since
// January 1, 1970, 00:00:00 GMT for "9999-12-31 23:59:59.999999".
milliseconds = rand.nextLong() % 253402329599999L
}
// DateTimeUtils.toJavaTimestamp takes microsecond.
DateTimeUtils.toJavaTimestamp(milliseconds * 1000)
}
Some(generator)
case TimestampType => Some { () =>
val range = DateTimeUtils.MAX_TIMESTAMP - DateTimeUtils.MIN_TIMESTAMP
val milliseconds = (range * rand.nextDouble()).toLong + DateTimeUtils.MIN_TIMESTAMP
DateTimeUtils.toJavaTimestamp(milliseconds * 1000)
}
case CalendarIntervalType => Some(() => {
val months = rand.nextInt(1000)
val ns = rand.nextLong()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.scalatest.Matchers
import org.scalatest.prop.GeneratorDrivenPropertyChecks

import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.unsafe.types.CalendarInterval

/**
Expand Down Expand Up @@ -94,7 +95,9 @@ object LiteralGenerator {
for { d <- Arbitrary.arbInt.arbitrary } yield Literal.create(new Date(d), DateType)

lazy val timestampLiteralGen: Gen[Literal] =
for { t <- Arbitrary.arbLong.arbitrary } yield Literal.create(new Timestamp(t), TimestampType)
for { t <- Gen.chooseNum(DateTimeUtils.MIN_TIMESTAMP, DateTimeUtils.MAX_TIMESTAMP) } yield {
Literal.create(new Timestamp(t), TimestampType)
}

lazy val calendarIntervalLiterGen: Gen[Literal] =
for { m <- Arbitrary.arbInt.arbitrary; s <- Arbitrary.arbLong.arbitrary}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.catalyst.util

import java.lang.{Long => JLong}
import java.sql.{Date, Timestamp}
import java.text.SimpleDateFormat
import java.util.{Calendar, TimeZone}
Expand Down Expand Up @@ -425,4 +426,26 @@ class DateTimeUtilsSuite extends SparkFunSuite {
test("2011-12-25 01:00:00.123456", "PST", "2011-12-25 09:00:00.123456")
test("2011-12-25 17:00:00.123456", "Asia/Shanghai", "2011-12-25 09:00:00.123456")
}

test("SPARK-10439: bound checks") {
// Avoid truncation when converting from ms to us. Make sure dates are within allowed range.
Seq(JLong.MIN_VALUE, JLong.MAX_VALUE).foreach { ts =>
intercept[IllegalArgumentException] {
fromJavaTimestamp(new Timestamp(ts))
}
intercept[IllegalArgumentException] {
fromJavaDate(new Date(ts))
}
}

// Make sure calculated nanos are positive, since that's what the Hive/Impala timestamp spec
// expects. Also make sure it's less than 999999999 (the limit imposed by java.sql.Timestamp).
val julianDay = -(JULIAN_DAY_OF_EPOCH * MICROS_PER_DAY)
val nextDay = julianDay + MICROS_PER_DAY
val (day, nanos) = toJulianDay(nextDay + 1)
assert(day === 1)
assert(nanos >= 0)
assert(nanos <= 999999999)
}

}