Skip to content
Closed
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package org.apache.spark.sql.catalyst.util
import java.time.{Instant, ZoneId}
import java.util.Locale

import org.apache.spark.sql.catalyst.util.DateTimeUtils.instantToDays

sealed trait DateFormatter extends Serializable {
def parse(s: String): Int // returns days since epoch
def format(days: Int): String
Expand All @@ -38,11 +40,7 @@ class Iso8601DateFormatter(
toInstantWithZoneId(temporalAccessor, UTC)
}

override def parse(s: String): Int = {
val seconds = toInstant(s).getEpochSecond
val days = Math.floorDiv(seconds, DateTimeUtils.SECONDS_PER_DAY)
days.toInt
}
override def parse(s: String): Int = instantToDays(toInstant(s))

override def format(days: Int): String = {
val instant = Instant.ofEpochSecond(days * DateTimeUtils.SECONDS_PER_DAY)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ package org.apache.spark.sql.catalyst.util

import java.sql.{Date, Timestamp}
import java.text.{DateFormat, SimpleDateFormat}
import java.time._
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there aren't like 10 classes to import, I'd make them explicit. At least, the following line is currently redundant.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yah, IntelliJ IDEA collapses the imports automatically. I will try to revert them back.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The DateTimeUtils uses many classes from java.time. I will keep import java.time._ if you don't mind.

import java.time.LocalDate
import java.time.temporal.IsoFields
import java.util.{Calendar, Locale, TimeZone}
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import java.util.function.{Function => JFunction}
import javax.xml.bind.DatatypeConverter

Expand Down Expand Up @@ -53,30 +56,12 @@ object DateTimeUtils {
final val NANOS_PER_MICROS = 1000L
final val MILLIS_PER_DAY = SECONDS_PER_DAY * 1000L

// number of days in 400 years by Gregorian calendar
final val daysIn400Years: Int = 146097

// In the Julian calendar every year that is exactly divisible by 4 is a leap year without any
// exception. But in the Gregorian calendar every year that is exactly divisible by four
// is a leap year, except for years that are exactly divisible by 100, but these centurial years
// are leap years if they are exactly divisible by 400.
// So there are 3 extra days in the Julian calendar within a 400 years cycle compared to the
// Gregorian calendar.
final val extraLeapDaysIn400YearsJulian = 3

// number of days in 400 years by Julian calendar
final val daysIn400YearsInJulian: Int = daysIn400Years + extraLeapDaysIn400YearsJulian

// number of days between 1.1.1970 and 1.1.2001
final val to2001 = -11323

// this is year -17999, calculation: 50 * daysIn400Year
final val YearZero = -17999
final val toYearZero = to2001 + 7304850

// days to year -17999 in Julian calendar
final val toYearZeroInJulian = toYearZero + 49 * extraLeapDaysIn400YearsJulian

final val TimeZoneGMT = TimeZone.getTimeZone("GMT")
final val TimeZoneUTC = TimeZone.getTimeZone("UTC")
final val MonthOf31Days = Set(1, 3, 5, 7, 8, 10, 12)
Expand All @@ -85,13 +70,6 @@ object DateTimeUtils {

def defaultTimeZone(): TimeZone = TimeZone.getDefault()

// Reuse the Calendar object in each thread as it is expensive to create in each method call.
private val threadLocalGmtCalendar = new ThreadLocal[Calendar] {
override protected def initialValue: Calendar = {
Calendar.getInstance(TimeZoneGMT)
}
}

// `SimpleDateFormat` is not thread-safe.
private val threadLocalTimestampFormat = new ThreadLocal[DateFormat] {
override def initialValue(): SimpleDateFormat = {
Expand Down Expand Up @@ -427,23 +405,35 @@ object DateTimeUtils {
return None
}

val c = if (tz.isEmpty) {
Calendar.getInstance(timeZone)
val zoneId = if (tz.isEmpty) {
timeZone.toZoneId
} else {
Calendar.getInstance(
getTimeZone(f"GMT${tz.get.toChar}${segments(7)}%02d:${segments(8)}%02d"))
getTimeZone(f"GMT${tz.get.toChar}${segments(7)}%02d:${segments(8)}%02d").toZoneId
}
c.set(Calendar.MILLISECOND, 0)

if (justTime) {
c.set(Calendar.HOUR_OF_DAY, segments(3))
c.set(Calendar.MINUTE, segments(4))
c.set(Calendar.SECOND, segments(5))
val nanoseconds = TimeUnit.MICROSECONDS.toNanos(segments(6))
val localTime = LocalTime.of(segments(3), segments(4), segments(5), nanoseconds.toInt)
val localDate = if (justTime) {
LocalDate.now(zoneId)
} else {
c.set(segments(0), segments(1) - 1, segments(2), segments(3), segments(4), segments(5))
LocalDate.of(segments(0), segments(1), segments(2))
}
val localDateTime = LocalDateTime.of(localDate, localTime)
val zonedDateTime = ZonedDateTime.of(localDateTime, zoneId)
val instant = Instant.from(zonedDateTime)

Some(instantToMicros(instant))
}

Some(c.getTimeInMillis * 1000 + segments(6))
def instantToMicros(instant: Instant): Long = {
val sec = Math.multiplyExact(instant.getEpochSecond, MICROS_PER_SECOND)
val result = Math.addExact(sec, instant.getNano / NANOS_PER_MICROS)
result
}

def instantToDays(instant: Instant): Int = {
val seconds = instant.getEpochSecond
val days = Math.floorDiv(seconds, DateTimeUtils.SECONDS_PER_DAY)
days.toInt
}

/**
Expand Down Expand Up @@ -496,11 +486,9 @@ object DateTimeUtils {
return None
}

val c = threadLocalGmtCalendar.get()
c.clear()
c.set(segments(0), segments(1) - 1, segments(2), 0, 0, 0)
c.set(Calendar.MILLISECOND, 0)
Some((c.getTimeInMillis / MILLIS_PER_DAY).toInt)
val localDate = LocalDate.of(segments(0), segments(1), segments(2))
val instant = localDate.atStartOfDay(TimeZoneUTC.toZoneId).toInstant
Some(instantToDays(instant))
}

/**
Expand Down Expand Up @@ -587,228 +575,53 @@ object DateTimeUtils {
(year % 4) == 0 && ((year % 100) != 0 || (year % 400) == 0)
}

/**
* Return the number of days since the start of 400 year period.
* The second year of a 400 year period (year 1) starts on day 365.
*/
private[this] def yearBoundary(year: Int, isGregorian: Boolean): Int = {
if (isGregorian) {
year * 365 + ((year / 4) - (year / 100) + (year / 400))
} else {
year * 365 + (year / 4)
}
}

/**
* Calculates the number of years for the given number of days. This depends
* on a 400 year period.
* @param days days since the beginning of the 400 year period
* @param isGregorian indicates whether leap years should be calculated according to Gregorian
* (or Julian) calendar
* @return (number of year, days in year)
*/
private[this] def numYears(days: Int, isGregorian: Boolean): (Int, Int) = {
val year = days / 365
val boundary = yearBoundary(year, isGregorian)
if (days > boundary) {
(year, days - boundary)
} else {
(year - 1, days - yearBoundary(year - 1, isGregorian))
}
}

/**
* Calculates the year and the number of the day in the year for the given
* number of days. The given days is the number of days since 1.1.1970.
*
* The calculation uses the fact that the period 1.1.2001 until 31.12.2400 is
* equals to the period 1.1.1601 until 31.12.2000.
*/
private[this] def getYearAndDayInYear(daysSince1970: SQLDate): (Int, Int) = {
// Since Julian calendar was replaced with the Gregorian calendar,
// the 10 days after Oct. 4 were skipped.
// (1582-10-04) -141428 days since 1970-01-01
if (daysSince1970 <= -141428) {
getYearAndDayInYear(daysSince1970 - 10, toYearZeroInJulian, daysIn400YearsInJulian, false)
} else {
getYearAndDayInYear(daysSince1970, toYearZero, daysIn400Years, true)
}
}

private def getYearAndDayInYear(
daysSince1970: SQLDate,
toYearZero: SQLDate,
daysIn400Years: SQLDate,
isGregorian: Boolean): (Int, Int) = {
// add the difference (in days) between 1.1.1970 and the artificial year 0 (-17999)
val daysNormalized = daysSince1970 + toYearZero
val numOfQuarterCenturies = daysNormalized / daysIn400Years
val daysInThis400 = daysNormalized % daysIn400Years + 1
val (years, dayInYear) = numYears(daysInThis400, isGregorian)
val year: Int = (2001 - 20000) + 400 * numOfQuarterCenturies + years
(year, dayInYear)
}

/**
* Returns the 'day in year' value for the given date. The date is expressed in days
* since 1.1.1970.
*/
def getDayInYear(date: SQLDate): Int = {
getYearAndDayInYear(date)._2
LocalDate.ofEpochDay(date).getDayOfYear
}

/**
* Returns the year value for the given date. The date is expressed in days
* since 1.1.1970.
*/
def getYear(date: SQLDate): Int = {
getYearAndDayInYear(date)._1
LocalDate.ofEpochDay(date).getYear
}

/**
* Returns the quarter for the given date. The date is expressed in days
* since 1.1.1970.
*/
def getQuarter(date: SQLDate): Int = {
var (year, dayInYear) = getYearAndDayInYear(date)
if (isLeapYear(year)) {
dayInYear = dayInYear - 1
}
if (dayInYear <= 90) {
1
} else if (dayInYear <= 181) {
2
} else if (dayInYear <= 273) {
3
} else {
4
}
LocalDate.ofEpochDay(date).get(IsoFields.QUARTER_OF_YEAR)
}

/**
* Split date (expressed in days since 1.1.1970) into four fields:
* year, month (Jan is Month 1), dayInMonth, daysToMonthEnd (0 if it's last day of month).
*/
def splitDate(date: SQLDate): (Int, Int, Int, Int) = {
var (year, dayInYear) = getYearAndDayInYear(date)
val isLeap = isLeapYear(year)
if (isLeap && dayInYear == 60) {
(year, 2, 29, 0)
} else {
if (isLeap && dayInYear > 60) dayInYear -= 1

if (dayInYear <= 181) {
if (dayInYear <= 31) {
(year, 1, dayInYear, 31 - dayInYear)
} else if (dayInYear <= 59) {
(year, 2, dayInYear - 31, if (isLeap) 60 - dayInYear else 59 - dayInYear)
} else if (dayInYear <= 90) {
(year, 3, dayInYear - 59, 90 - dayInYear)
} else if (dayInYear <= 120) {
(year, 4, dayInYear - 90, 120 - dayInYear)
} else if (dayInYear <= 151) {
(year, 5, dayInYear - 120, 151 - dayInYear)
} else {
(year, 6, dayInYear - 151, 181 - dayInYear)
}
} else {
if (dayInYear <= 212) {
(year, 7, dayInYear - 181, 212 - dayInYear)
} else if (dayInYear <= 243) {
(year, 8, dayInYear - 212, 243 - dayInYear)
} else if (dayInYear <= 273) {
(year, 9, dayInYear - 243, 273 - dayInYear)
} else if (dayInYear <= 304) {
(year, 10, dayInYear - 273, 304 - dayInYear)
} else if (dayInYear <= 334) {
(year, 11, dayInYear - 304, 334 - dayInYear)
} else {
(year, 12, dayInYear - 334, 365 - dayInYear)
}
}
}
val ld = LocalDate.ofEpochDay(date)
(ld.getYear, ld.getMonthValue, ld.getDayOfMonth, ld.lengthOfMonth() - ld.getDayOfMonth)
}

/**
* Returns the month value for the given date. The date is expressed in days
* since 1.1.1970. January is month 1.
*/
def getMonth(date: SQLDate): Int = {
var (year, dayInYear) = getYearAndDayInYear(date)
if (isLeapYear(year)) {
if (dayInYear == 60) {
return 2
} else if (dayInYear > 60) {
dayInYear = dayInYear - 1
}
}

if (dayInYear <= 31) {
1
} else if (dayInYear <= 59) {
2
} else if (dayInYear <= 90) {
3
} else if (dayInYear <= 120) {
4
} else if (dayInYear <= 151) {
5
} else if (dayInYear <= 181) {
6
} else if (dayInYear <= 212) {
7
} else if (dayInYear <= 243) {
8
} else if (dayInYear <= 273) {
9
} else if (dayInYear <= 304) {
10
} else if (dayInYear <= 334) {
11
} else {
12
}
LocalDate.ofEpochDay(date).getMonthValue
}

/**
* Returns the 'day of month' value for the given date. The date is expressed in days
* since 1.1.1970.
*/
def getDayOfMonth(date: SQLDate): Int = {
var (year, dayInYear) = getYearAndDayInYear(date)
if (isLeapYear(year)) {
if (dayInYear == 60) {
return 29
} else if (dayInYear > 60) {
dayInYear = dayInYear - 1
}
}

if (dayInYear <= 31) {
dayInYear
} else if (dayInYear <= 59) {
dayInYear - 31
} else if (dayInYear <= 90) {
dayInYear - 59
} else if (dayInYear <= 120) {
dayInYear - 90
} else if (dayInYear <= 151) {
dayInYear - 120
} else if (dayInYear <= 181) {
dayInYear - 151
} else if (dayInYear <= 212) {
dayInYear - 181
} else if (dayInYear <= 243) {
dayInYear - 212
} else if (dayInYear <= 273) {
dayInYear - 243
} else if (dayInYear <= 304) {
dayInYear - 273
} else if (dayInYear <= 334) {
dayInYear - 304
} else {
dayInYear - 334
}
LocalDate.ofEpochDay(date).getDayOfMonth
}

/**
Expand Down Expand Up @@ -1167,7 +980,6 @@ object DateTimeUtils {
* Re-initialize the current thread's thread locals. Exposed for testing.
*/
private[util] def resetThreadLocals(): Unit = {
threadLocalGmtCalendar.remove()
threadLocalTimestampFormat.remove()
threadLocalDateFormat.remove()
}
Expand Down
Loading