diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java index f2d06e793f9d..6d4c65573aac 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java @@ -66,9 +66,8 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; CalendarInterval that = (CalendarInterval) o; - return months == that.months && - days == that.days && - microseconds == that.microseconds; + return ((this.months * MICROS_PER_MONTH) + (this.days * MICROS_PER_DAY) + this.microseconds) == + ((that.months * MICROS_PER_MONTH) + (that.days * MICROS_PER_DAY) + that.microseconds); } @Override diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala index 44601b4b8db9..07b090d6a886 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala @@ -2200,7 +2200,7 @@ case class DatePart(field: Expression, source: Expression, child: Expression) * between the given timestamps. */ case class SubtractTimestamps(endTimestamp: Expression, startTimestamp: Expression) - extends BinaryExpression with ExpectsInputTypes { + extends BinaryExpression with ImplicitCastInputTypes { override def left: Expression = endTimestamp override def right: Expression = startTimestamp @@ -2208,12 +2208,14 @@ case class SubtractTimestamps(endTimestamp: Expression, startTimestamp: Expressi override def dataType: DataType = CalendarIntervalType override def nullSafeEval(end: Any, start: Any): Any = { - new CalendarInterval(0, 0, end.asInstanceOf[Long] - start.asInstanceOf[Long]) + DateTimeUtils.subtractTimestamps(end.asInstanceOf[Long], start.asInstanceOf[Long]) } override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { - defineCodeGen(ctx, ev, (end, start) => - s"new org.apache.spark.unsafe.types.CalendarInterval(0, 0, $end - $start)") + defineCodeGen(ctx, ev, (end, start) => { + val dtu = DateTimeUtils.getClass.getName.stripSuffix("$") + s"$dtu.subtractTimestamps($end, $start)" + }) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/intervalExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/intervalExpressions.scala index c09350f33c7f..6d3de9a07278 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/intervalExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/intervalExpressions.scala @@ -58,10 +58,10 @@ case class ExtractIntervalYears(child: Expression) extends ExtractIntervalPart(child, IntegerType, getYears, "getYears") case class ExtractIntervalQuarters(child: Expression) - extends ExtractIntervalPart(child, ByteType, getQuarters, "getQuarters") + extends ExtractIntervalPart(child, IntegerType, getQuarters, "getQuarters") case class ExtractIntervalMonths(child: Expression) - extends ExtractIntervalPart(child, ByteType, getMonths, "getMonths") + extends ExtractIntervalPart(child, IntegerType, getMonths, "getMonths") case class ExtractIntervalDays(child: Expression) extends ExtractIntervalPart(child, IntegerType, getDays, "getDays") @@ -70,13 +70,13 @@ case class ExtractIntervalHours(child: Expression) extends ExtractIntervalPart(child, LongType, getHours, "getHours") case class ExtractIntervalMinutes(child: Expression) - extends ExtractIntervalPart(child, ByteType, getMinutes, "getMinutes") + extends ExtractIntervalPart(child, LongType, getMinutes, "getMinutes") case class ExtractIntervalSeconds(child: Expression) - extends ExtractIntervalPart(child, DecimalType(8, 6), getSeconds, "getSeconds") + extends ExtractIntervalPart(child, DecimalType(18, 6), getSeconds, "getSeconds") case class ExtractIntervalMilliseconds(child: Expression) - extends ExtractIntervalPart(child, DecimalType(8, 3), getMilliseconds, "getMilliseconds") + extends ExtractIntervalPart(child, DecimalType(18, 3), getMilliseconds, "getMilliseconds") case class ExtractIntervalMicroseconds(child: Expression) extends ExtractIntervalPart(child, LongType, getMicroseconds, "getMicroseconds") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index 56259dfb114c..cde441c01cd4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -971,4 +971,32 @@ object DateTimeUtils { val days = period.getDays new CalendarInterval(months, days, 0) } + + /** + * Subtracts two timestamps. + * @param endTimestamp The end timestamp + * @param startTimestamp The start timestamp + * @return An interval between two timestamps. The interval can be negative + * if the end timestamp is before the start timestamp. + */ + def subtractTimestamps(endTimestamp: SQLTimestamp, + startTimestamp: SQLTimestamp): CalendarInterval = { + durationToCalendarInterval(endTimestamp-startTimestamp) + } + + /** + * Calculate CalendarInterval from duration + * @param microseconds Microseconds to calculate duration from + * @return A CalendarInterval from microseconds duration. The interval can be negative + * if the input is negative. + */ + def durationToCalendarInterval(microseconds: Long): CalendarInterval = { + val duration = Duration.ofMillis(MICROSECONDS.toMillis(microseconds)) + val months = duration.toDays / DAYS_PER_MONTH + val days = duration.toDays - (months * DAYS_PER_MONTH) + var microSeconds = NANOSECONDS.toMicros(duration.minusDays(duration.toDays).toNanos) + microSeconds += microseconds%1000 + + new CalendarInterval(months.toInt, days.toInt, microSeconds) + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IntervalUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IntervalUtils.scala index eeea0e2722a4..e17b6ed6ac9f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IntervalUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IntervalUtils.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.util import java.math.BigDecimal import java.util.concurrent.TimeUnit +import java.util.concurrent.TimeUnit._ import scala.util.control.NonFatal @@ -62,44 +63,51 @@ object IntervalUtils { getYears(interval) / YEARS_PER_DECADE } - def getMonths(interval: CalendarInterval): Byte = { - (interval.months % MONTHS_PER_YEAR).toByte + def getMonths(interval: CalendarInterval): Int = { + interval.months } - def getQuarters(interval: CalendarInterval): Byte = { - (getMonths(interval) / MONTHS_PER_QUARTER + 1).toByte + def getQuarters(interval: CalendarInterval): Int = { + (getMonths(interval) / MONTHS_PER_QUARTER).toInt } def getDays(interval: CalendarInterval): Int = { - interval.days + var result = interval.days + result += getMonths(interval) * DAYS_PER_MONTH.toInt + result } def getHours(interval: CalendarInterval): Long = { - interval.microseconds / MICROS_PER_HOUR + var result = interval.microseconds / MICROS_PER_HOUR + result += getDays(interval) * HOURS_PER_DAY + result } - def getMinutes(interval: CalendarInterval): Byte = { - ((interval.microseconds % MICROS_PER_HOUR) / MICROS_PER_MINUTE).toByte + def getMinutes(interval: CalendarInterval): Long = { + var result = (interval.microseconds / MICROS_PER_MINUTE) + result += getHours(interval) * MINUTES_PER_HOUR + result } def getMicroseconds(interval: CalendarInterval): Long = { - interval.microseconds % MICROS_PER_MINUTE + var result = interval.microseconds + result += getMinutes(interval) * MICROS_PER_MINUTE + result } def getSeconds(interval: CalendarInterval): Decimal = { - Decimal(getMicroseconds(interval), 8, 6) + Decimal(getMicroseconds(interval), 18, 6) } def getMilliseconds(interval: CalendarInterval): Decimal = { - Decimal(getMicroseconds(interval), 8, 3) + Decimal(getMicroseconds(interval), 18, 3) } // Returns total number of seconds with microseconds fractional part in the given interval. def getEpoch(interval: CalendarInterval): Decimal = { var result = interval.microseconds result += MICROS_PER_DAY * interval.days - result += MICROS_PER_YEAR * (interval.months / MONTHS_PER_YEAR) - result += MICROS_PER_MONTH * (interval.months % MONTHS_PER_YEAR) + result += MICROS_PER_MONTH * interval.months Decimal(result, 18, 6) } @@ -767,7 +775,8 @@ object IntervalUtils { val result = state match { case UNIT_SUFFIX | UNIT_END | TRIM_BEFORE_SIGN => - new CalendarInterval(months, days, microseconds) + val microsecondsTotal = microseconds + DAYS.toMicros((months * DAYS_PER_MONTH) + days) + DateTimeUtils.durationToCalendarInterval(microsecondsTotal) case _ => null } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/IntervalExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/IntervalExpressionsSuite.scala index e591c4984a7b..05df98e4b3cf 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/IntervalExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/IntervalExpressionsSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalyst.expressions +import java.time.Instant + import scala.language.implicitConversions import org.apache.spark.SparkFunSuite @@ -83,40 +85,43 @@ class IntervalExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { } test("quarters") { - checkEvaluation(ExtractIntervalQuarters("0 months"), 1.toByte) - checkEvaluation(ExtractIntervalQuarters("1 months"), 1.toByte) - checkEvaluation(ExtractIntervalQuarters("-1 months"), 1.toByte) - checkEvaluation(ExtractIntervalQuarters("2 months"), 1.toByte) - checkEvaluation(ExtractIntervalQuarters("-2 months"), 1.toByte) - checkEvaluation(ExtractIntervalQuarters("1 years -1 months"), 4.toByte) - checkEvaluation(ExtractIntervalQuarters("-1 years 1 months"), -2.toByte) - checkEvaluation(ExtractIntervalQuarters("2 years 3 months"), 2.toByte) - checkEvaluation(ExtractIntervalQuarters("-2 years -3 months"), 0.toByte) - checkEvaluation(ExtractIntervalQuarters("9999 years"), 1.toByte) + checkEvaluation(ExtractIntervalQuarters("0 months"), 0) + checkEvaluation(ExtractIntervalQuarters("1 months"), 0) + checkEvaluation(ExtractIntervalQuarters("-1 months"), 0) + checkEvaluation(ExtractIntervalQuarters("2 months"), 0) + checkEvaluation(ExtractIntervalQuarters("-2 months"), 0) + checkEvaluation(ExtractIntervalQuarters("1 years -1 months"), 3) + checkEvaluation(ExtractIntervalQuarters("-1 years 1 months"), -3) + checkEvaluation(ExtractIntervalQuarters("2 years 3 months"), 9) + checkEvaluation(ExtractIntervalQuarters("-2 years -3 months"), -9) + checkEvaluation(ExtractIntervalQuarters("9999 years"), 39996) } test("months") { - checkEvaluation(ExtractIntervalMonths("0 year"), 0.toByte) + checkEvaluation(ExtractIntervalMonths("0 year"), 0) for (m <- -24 to 24) { - checkEvaluation(ExtractIntervalMonths(s"$m months"), (m % 12).toByte) + checkEvaluation(ExtractIntervalMonths(s"$m months"), m) } - checkEvaluation(ExtractIntervalMonths("1 year 10 months"), 10.toByte) - checkEvaluation(ExtractIntervalMonths("-2 year -10 months"), -10.toByte) - checkEvaluation(ExtractIntervalMonths("9999 years"), 0.toByte) + checkEvaluation(ExtractIntervalMonths("1 year 10 months"), 22) + checkEvaluation(ExtractIntervalMonths("-2 year -10 months"), -34) + checkEvaluation(ExtractIntervalMonths("9999 years"), 119988) } private val largeInterval: String = "9999 years 11 months " + "31 days 11 hours 59 minutes 59 seconds 999 milliseconds 999 microseconds" test("days") { + val start = Instant.parse("2019-01-01T00:00:00.000000Z") + val end = Instant.parse("2019-01-15T00:00:00.000000Z") + checkEvaluation(ExtractIntervalDays("0 days"), 0) checkEvaluation(ExtractIntervalDays("1 days 100 seconds"), 1) checkEvaluation(ExtractIntervalDays("-1 days -100 seconds"), -1) checkEvaluation(ExtractIntervalDays("-365 days"), -365) checkEvaluation(ExtractIntervalDays("365 days"), 365) - // Years and months must not be taken into account - checkEvaluation(ExtractIntervalDays("100 year 10 months 5 days"), 5) - checkEvaluation(ExtractIntervalDays(largeInterval), 31) + checkEvaluation(ExtractIntervalDays("100 year 10 months 5 days"), 36305) + checkEvaluation(ExtractIntervalDays(largeInterval), 3600001) + checkEvaluation(ExtractIntervalDays(SubtractTimestamps(Literal(end), Literal(start))), 14) } test("hours") { @@ -125,55 +130,52 @@ class IntervalExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { checkEvaluation(ExtractIntervalHours("-1 hour"), -1L) checkEvaluation(ExtractIntervalHours("23 hours"), 23L) checkEvaluation(ExtractIntervalHours("-23 hours"), -23L) - // Years, months and days must not be taken into account - checkEvaluation(ExtractIntervalHours("100 year 10 months 10 days 10 hours"), 10L) - // Minutes should be taken into account + checkEvaluation(ExtractIntervalHours("100 year 10 months 10 days 10 hours"), 871450L) checkEvaluation(ExtractIntervalHours("10 hours 100 minutes"), 11L) - checkEvaluation(ExtractIntervalHours(largeInterval), 11L) + checkEvaluation(ExtractIntervalHours(largeInterval), 86400035L) } test("minutes") { - checkEvaluation(ExtractIntervalMinutes("0 minute"), 0.toByte) - checkEvaluation(ExtractIntervalMinutes("1 minute"), 1.toByte) - checkEvaluation(ExtractIntervalMinutes("-1 minute"), -1.toByte) - checkEvaluation(ExtractIntervalMinutes("59 minute"), 59.toByte) - checkEvaluation(ExtractIntervalMinutes("-59 minute"), -59.toByte) - // Years and months must not be taken into account - checkEvaluation(ExtractIntervalMinutes("100 year 10 months 10 minutes"), 10.toByte) - checkEvaluation(ExtractIntervalMinutes(largeInterval), 59.toByte) + checkEvaluation(ExtractIntervalMinutes("0 minute"), 0L) + checkEvaluation(ExtractIntervalMinutes("1 minute"), 1L) + checkEvaluation(ExtractIntervalMinutes("-1 minute"), -1L) + checkEvaluation(ExtractIntervalMinutes("59 minute"), 59L) + checkEvaluation(ExtractIntervalMinutes("-59 minute"), -59L) + checkEvaluation(ExtractIntervalMinutes("100 year 10 months 10 minutes"), 52272010L) + checkEvaluation(ExtractIntervalMinutes(largeInterval), 5184002159L) } test("seconds") { - checkEvaluation(ExtractIntervalSeconds("0 second"), Decimal(0, 8, 6)) - checkEvaluation(ExtractIntervalSeconds("1 second"), Decimal(1.0, 8, 6)) - checkEvaluation(ExtractIntervalSeconds("-1 second"), Decimal(-1.0, 8, 6)) - checkEvaluation(ExtractIntervalSeconds("1 minute 59 second"), Decimal(59.0, 8, 6)) - checkEvaluation(ExtractIntervalSeconds("-59 minutes -59 seconds"), Decimal(-59.0, 8, 6)) - // Years and months must not be taken into account - checkEvaluation(ExtractIntervalSeconds("100 year 10 months 10 seconds"), Decimal(10.0, 8, 6)) - checkEvaluation(ExtractIntervalSeconds(largeInterval), Decimal(59.999999, 8, 6)) + checkEvaluation(ExtractIntervalSeconds("0 second"), Decimal(0, 18, 6)) + checkEvaluation(ExtractIntervalSeconds("1 second"), Decimal(1.0, 18, 6)) + checkEvaluation(ExtractIntervalSeconds("-1 second"), Decimal(-1.0, 18, 6)) + checkEvaluation(ExtractIntervalSeconds("1 minute 59 second"), Decimal(119.0, 18, 6)) + checkEvaluation(ExtractIntervalSeconds("-59 minutes -59 seconds"), Decimal(-3599.0, 18, 6)) + checkEvaluation( + ExtractIntervalSeconds("100 year 10 months 10 seconds"), Decimal(3136320010.0, 18, 6)) + checkEvaluation(ExtractIntervalSeconds(largeInterval), Decimal(311040129599999999L, 18, 6)) checkEvaluation( ExtractIntervalSeconds("10 seconds 1 milliseconds 1 microseconds"), - Decimal(10001001, 8, 6)) - checkEvaluation(ExtractIntervalSeconds("61 seconds 1 microseconds"), Decimal(1000001, 8, 6)) + Decimal(10001001, 18, 6)) + checkEvaluation(ExtractIntervalSeconds("61 seconds 1 microseconds"), Decimal(61.000001, 18, 6)) } test("milliseconds") { - checkEvaluation(ExtractIntervalMilliseconds("0 milliseconds"), Decimal(0, 8, 3)) - checkEvaluation(ExtractIntervalMilliseconds("1 milliseconds"), Decimal(1.0, 8, 3)) - checkEvaluation(ExtractIntervalMilliseconds("-1 milliseconds"), Decimal(-1.0, 8, 3)) + checkEvaluation(ExtractIntervalMilliseconds("0 milliseconds"), Decimal(0, 18, 3)) + checkEvaluation(ExtractIntervalMilliseconds("1 milliseconds"), Decimal(1.0, 18, 3)) + checkEvaluation(ExtractIntervalMilliseconds("-1 milliseconds"), Decimal(-1.0, 18, 3)) checkEvaluation( ExtractIntervalMilliseconds("1 second 999 milliseconds"), - Decimal(1999.0, 8, 3)) + Decimal(1999.0, 18, 3)) checkEvaluation( ExtractIntervalMilliseconds("999 milliseconds 1 microsecond"), - Decimal(999.001, 8, 3)) + Decimal(999.001, 18, 3)) checkEvaluation( ExtractIntervalMilliseconds("-1 second -999 milliseconds"), - Decimal(-1999.0, 8, 3)) - // Years and months must not be taken into account - checkEvaluation(ExtractIntervalMilliseconds("100 year 1 millisecond"), Decimal(1.0, 8, 3)) - checkEvaluation(ExtractIntervalMilliseconds(largeInterval), Decimal(59999.999, 8, 3)) + Decimal(-1999.0, 18, 3)) + checkEvaluation( + ExtractIntervalMilliseconds("100 year 1 millisecond"), Decimal(3110400000001.000, 18, 3)) + checkEvaluation(ExtractIntervalMilliseconds(largeInterval), Decimal(311040129599999999L, 18, 3)) } test("microseconds") { @@ -183,16 +185,15 @@ class IntervalExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { checkEvaluation(ExtractIntervalMicroseconds("1 second 999 microseconds"), 1000999L) checkEvaluation(ExtractIntervalMicroseconds("999 milliseconds 1 microseconds"), 999001L) checkEvaluation(ExtractIntervalMicroseconds("-1 second -999 microseconds"), -1000999L) - // Years and months must not be taken into account - checkEvaluation(ExtractIntervalMicroseconds("11 year 1 microseconds"), 1L) - checkEvaluation(ExtractIntervalMicroseconds(largeInterval), 59999999L) + checkEvaluation(ExtractIntervalMicroseconds("11 year 1 microseconds"), 342144000000001L) + checkEvaluation(ExtractIntervalMicroseconds(largeInterval), 311040129599999999L) } test("epoch") { checkEvaluation(ExtractIntervalEpoch("0 months"), Decimal(0.0, 18, 6)) - checkEvaluation(ExtractIntervalEpoch("10000 years"), Decimal(315576000000.0, 18, 6)) - checkEvaluation(ExtractIntervalEpoch("1 year"), Decimal(31557600.0, 18, 6)) - checkEvaluation(ExtractIntervalEpoch("-1 year"), Decimal(-31557600.0, 18, 6)) + checkEvaluation(ExtractIntervalEpoch("10000 years"), Decimal(311040000000.0, 18, 6)) + checkEvaluation(ExtractIntervalEpoch("1 year"), Decimal(31104000.0, 18, 6)) + checkEvaluation(ExtractIntervalEpoch("-1 year"), Decimal(-31104000.0, 18, 6)) checkEvaluation( ExtractIntervalEpoch("1 second 1 millisecond 1 microsecond"), Decimal(1.001001, 18, 6))