Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.unsafe.types;

import java.io.Serializable;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -409,4 +410,25 @@ private void appendUnit(StringBuilder sb, long value, String unit) {
sb.append(' ').append(value).append(' ').append(unit).append('s');
}
}

/**
* Gets interval duration
* @param daysPerMonth the number of days per one month
* @param targetUnit time units of the result
* @return duration in the specified time units
*/
public long getDuration(int daysPerMonth, TimeUnit targetUnit) {
long monthsDuration = Math.multiplyExact(daysPerMonth * MICROS_PER_DAY, months);
long result = Math.addExact(microseconds, monthsDuration);
return targetUnit.convert(result, TimeUnit.MICROSECONDS);
}

/**
* Checks the interval is negative
* @param daysPerMonth the number of days per one month
* @return true if duration of the given interval is less than 0 otherwise false
*/
public boolean isNegative(int daysPerMonth) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to expose this vs just letting callers get the duration and compare?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added it for code readability at caller side.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also it could be considered as a companion method for negate()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen Do you think it is not worth it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm neutral on it. It makes some sense; I'm just always reluctant to add API methods.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be static?

Yes, it can.

Can the methods be package private?

Probably, yes.

Why does the caller define how many days are in a month?

Because number of days per month is not constant. Structure Streaming follows conservative approach, and assumes 31 days per months (see #16304 (comment)). In another places, we can assume another numbers (see

).

would that assumption not be fixed?

For now, the methods are used by SS only. Apparently 31 can be hard coded inside.

More generally, do we really need to allow for negative intervals at all? semantically it's nonnegative. Is it allowed in SQL?

The SQL standard allows negative intervals. If we look at other DMBS, PostgreSQL allows as well. If you subtract 2 timestamp columns timestamp1 - timestamp2, what are you going to produce when timestamp1 < timstamp2?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, yes makes sense about negative intervals as the necessary result of subtracting timestamps.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The CalendarInterval class has many static methods that can be move out of it to a separate class, for instance CalendarIntervalUtils. I would leave only milliseconds(), add(), subtract(), negate(), hashCode(), toString() in CalendarInterval, and put fromString(), fromCaseInsensitiveString(), fromYearMonthString() and etc. to CalendarIntervalUtils.

... or any kind of refactoring is impossible in Spark already because of commit history tracking problem? @cloud-fan WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't bother with a big refactoring here. Whatever is consistent. But yeah you can make it static at least?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will move those 2 methods to IntervalUtils

return getDuration(daysPerMonth, TimeUnit.MICROSECONDS) < 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.junit.Test;

import java.util.Arrays;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.*;
import static org.apache.spark.unsafe.types.CalendarInterval.*;
Expand Down Expand Up @@ -297,4 +298,32 @@ public void fromStringCaseSensitivityTest() {
assertNull(fromString("INTERVAL"));
assertNull(fromString(" Interval "));
}

@Test
public void durationTest() {
assertEquals(fromString("0 seconds").getDuration(31, TimeUnit.MILLISECONDS), 0);
assertEquals(fromString("1 month").getDuration(31, TimeUnit.DAYS), 31);
assertEquals(fromString("1 microsecond").getDuration(30, TimeUnit.MICROSECONDS), 1);
assertEquals(fromString("1 month -30 days").getDuration(31, TimeUnit.DAYS), 1);

try {
fromString(Integer.MAX_VALUE + " month").getDuration(31, TimeUnit.SECONDS);
fail("Expected to throw an exception for the invalid input");
} catch (ArithmeticException e) {
assertTrue(e.getMessage().contains("overflow"));
}
}

@Test
public void negativeIntervalTest() {
assertTrue(fromString("-1 months").isNegative(28));
assertTrue(fromString("-1 microsecond").isNegative(30));
assertTrue(fromString("-1 month 30 days").isNegative(31));
assertTrue(fromString("2 months -61 days").isNegative(30));
assertTrue(fromString("-1 year -2 seconds").isNegative(30));

assertFalse(fromString("0 months").isNegative(28));
assertFalse(fromString("1 year -360 days").isNegative(31));
assertFalse(fromString("-1 year 380 days").isNegative(31));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ object StreamingJoinHelper extends PredicateHelper with Logging {
val castedLit = lit.dataType match {
case CalendarIntervalType =>
val calendarInterval = lit.value.asInstanceOf[CalendarInterval]
if (calendarInterval.months > 0) {
if (calendarInterval.months != 0) {
invalid = true
logWarning(
s"Failed to extract state value watermark from condition $exprToCollectFrom " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ object TimeWindow {
*/
private def getIntervalInMicroSeconds(interval: String): Long = {
val cal = CalendarInterval.fromCaseInsensitiveString(interval)
if (cal.months > 0) {
if (cal.months != 0) {
throw new IllegalArgumentException(
s"Intervals greater than a month is not supported ($interval).")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ object EventTimeWatermark {

def getDelayMs(delay: CalendarInterval): Long = {
// We define month as `31 days` to simplify calculation.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment can be removed, as no 31 can be seen here.

val millisPerMonth = TimeUnit.MICROSECONDS.toMillis(CalendarInterval.MICROS_PER_DAY) * 31
delay.milliseconds + delay.months * millisPerMonth
delay.getDuration(31, TimeUnit.MILLISECONDS)
}
}

Expand Down
2 changes: 1 addition & 1 deletion sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,7 @@ class Dataset[T] private[sql](
s"Unable to parse time delay '$delayThreshold'",
cause = Some(e))
}
require(parsedDelay.milliseconds >= 0 && parsedDelay.months >= 0,
require(!parsedDelay.isNegative(31),
s"delay threshold ($delayThreshold) should not be negative.")
EliminateEventTimeWatermark(
EventTimeWatermark(UnresolvedAttribute(eventTime), parsedDelay, logicalPlan))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,12 +160,12 @@ private[sql] class GroupStateImpl[S] private(

private def parseDuration(duration: String): Long = {
val cal = CalendarInterval.fromCaseInsensitiveString(duration)
if (cal.milliseconds < 0 || cal.months < 0) {
throw new IllegalArgumentException(s"Provided duration ($duration) is not positive")
val daysPerMonth = 31
if (cal.isNegative(daysPerMonth)) {
throw new IllegalArgumentException(s"Provided duration ($duration) is negative")
}

val millisPerMonth = TimeUnit.MICROSECONDS.toMillis(CalendarInterval.MICROS_PER_DAY) * 31
cal.milliseconds + cal.months * millisPerMonth
cal.getDuration(daysPerMonth, TimeUnit.MILLISECONDS)
}

private def checkTimeoutTimestampAllowed(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ private object Triggers {

def convert(interval: String): Long = {
val cal = CalendarInterval.fromCaseInsensitiveString(interval)
if (cal.months > 0) {
if (cal.months != 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like another way of converting interval to duration: make sure the months field is 0. Shall we also take it into account in the new getDuration method?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can change getDuration() to:

  def getDuration(
      interval: CalendarInterval,
      targetUnit: TimeUnit,
      daysPerMonth: Option[Int] = Some(31)): Long = {
    val monthsDuration = daysPerMonth
      .map { days =>
        Math.multiplyExact(days * DateTimeUtils.MICROS_PER_DAY, interval.months)
      }.getOrElse {
        if (interval.months == 0) {
          0L
        } else {
          throw new IllegalArgumentException(s"Doesn't support month or year interval: $interval")
        }
      }
    val result = Math.addExact(interval.microseconds, monthsDuration)
    targetUnit.convert(result, TimeUnit.MICROSECONDS)
  }

and call getDuration(cal, TimeUnit.MILLISECONDS, None)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but I am not sure that this check should be inside of getDuration()

throw new IllegalArgumentException(s"Doesn't support month or year interval: $interval")
}
TimeUnit.MICROSECONDS.toMillis(cal.microseconds)
Expand Down