Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2474,6 +2474,7 @@ case class Flatten(child: Expression) extends UnaryExpression with NullIntoleran
copy(child = newChild)
}

// scalastyle:off line.size.limit line.contains.tab
@ExpressionDescription(
usage = """
_FUNC_(start, stop, step) - Generates an array of elements from start to stop (inclusive),
Expand Down Expand Up @@ -2502,12 +2503,17 @@ case class Flatten(child: Expression) extends UnaryExpression with NullIntoleran
[1,2,3,4,5]
> SELECT _FUNC_(5, 1);
[5,4,3,2,1]
> SET spark.sql.legacy.interval.enabled=true;
spark.sql.legacy.interval.enabled true
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created JIRA SPARK-35088 for ANSI intervals

> SELECT _FUNC_(to_date('2018-01-01'), to_date('2018-03-01'), interval 1 month);
[2018-01-01,2018-02-01,2018-03-01]
> SET spark.sql.legacy.interval.enabled=false;
spark.sql.legacy.interval.enabled false
""",
group = "array_funcs",
since = "2.4.0"
)
// scalastyle:on line.size.limit line.contains.tab
case class Sequence(
start: Expression,
stop: Expression,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2392,6 +2392,7 @@ object DatePart {
}
}

// scalastyle:off line.size.limit line.contains.tab
// scalastyle:off line.size.limit
@ExpressionDescription(
usage = "_FUNC_(field, source) - Extracts a part of the date/timestamp or interval source.",
Expand All @@ -2410,17 +2411,22 @@ object DatePart {
224
> SELECT _FUNC_('SECONDS', timestamp'2019-10-01 00:00:01.000001');
1.000001
> SET spark.sql.legacy.interval.enabled=true;
spark.sql.legacy.interval.enabled true
Copy link
Member Author

@MaxGekk MaxGekk Apr 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SPARK-35090 for ANSI intervals

> SELECT _FUNC_('days', interval 1 year 10 months 5 days);
5
> SELECT _FUNC_('seconds', interval 5 hours 30 seconds 1 milliseconds 1 microseconds);
30.001001
> SET spark.sql.legacy.interval.enabled=false;
spark.sql.legacy.interval.enabled false
""",
note = """
The _FUNC_ function is equivalent to the SQL-standard function `EXTRACT(field FROM source)`
""",
group = "datetime_funcs",
since = "3.0.0")
// scalastyle:on line.size.limit
// scalastyle:on line.size.limit line.contains.tab
case class DatePart(field: Expression, source: Expression, child: Expression)
extends RuntimeReplaceable {

Expand All @@ -2437,6 +2443,7 @@ case class DatePart(field: Expression, source: Expression, child: Expression)
copy(child = newChild)
}

// scalastyle:off line.size.limit line.contains.tab
// scalastyle:off line.size.limit
@ExpressionDescription(
usage = "_FUNC_(field FROM source) - Extracts a part of the date/timestamp or interval source.",
Expand Down Expand Up @@ -2475,17 +2482,22 @@ case class DatePart(field: Expression, source: Expression, child: Expression)
224
> SELECT _FUNC_(SECONDS FROM timestamp'2019-10-01 00:00:01.000001');
1.000001
> SET spark.sql.legacy.interval.enabled=true;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created SPARK-35091 to support ANSI intervals by date_part()

spark.sql.legacy.interval.enabled true
> SELECT _FUNC_(days FROM interval 1 year 10 months 5 days);
5
> SELECT _FUNC_(seconds FROM interval 5 hours 30 seconds 1 milliseconds 1 microseconds);
30.001001
> SET spark.sql.legacy.interval.enabled=false;
spark.sql.legacy.interval.enabled false
""",
note = """
The _FUNC_ function is equivalent to `date_part(field, source)`.
""",
group = "datetime_funcs",
since = "3.0.0")
// scalastyle:on line.size.limit
// scalastyle:on line.size.limit line.contains.tab
case class Extract(field: Expression, source: Expression, child: Expression)
extends RuntimeReplaceable {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.catalyst.parser

import java.util.Locale
import java.util.concurrent.TimeUnit
import javax.xml.bind.DatatypeConverter

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -2098,7 +2099,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
ex.setStackTrace(e.getStackTrace)
throw ex
}
Literal(interval, CalendarIntervalType)
calendarIntervalToLiteral(interval, ctx)
case "X" =>
val padding = if (value.length % 2 != 0) "0" else ""
Literal(DatatypeConverter.parseHexBinary(padding + value))
Expand Down Expand Up @@ -2305,13 +2306,30 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
UnresolvedTableOrView(visitMultipartIdentifier(ctx), commandName, allowTempView)
}

private def calendarIntervalToLiteral(
calendarInterval: CalendarInterval,
ctx: ParserRuleContext): Literal = {
if (conf.legacyIntervalEnabled) {
Literal(calendarInterval, CalendarIntervalType)
} else if (calendarInterval.months != 0) {
if (calendarInterval.days != 0 || calendarInterval.microseconds != 0) {
throw QueryParsingErrors.mixedIntervalError(ctx)
}
Literal(calendarInterval.months, YearMonthIntervalType)
} else {
val micros = IntervalUtils.getDuration(calendarInterval, TimeUnit.MICROSECONDS)
Literal(micros, DayTimeIntervalType)
}
}

/**
* Create a [[CalendarInterval]] literal expression. Two syntaxes are supported:
* Create a [[CalendarInterval]] or ANSI interval literal expression.
* Two syntaxes are supported:
* - multiple unit value pairs, for instance: interval 2 months 2 days.
* - from-to unit, for instance: interval '1-2' year to month.
*/
override def visitInterval(ctx: IntervalContext): Literal = withOrigin(ctx) {
Literal(parseIntervalLiteral(ctx), CalendarIntervalType)
calendarIntervalToLiteral(parseIntervalLiteral(ctx), ctx)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.antlr.v4.runtime.ParserRuleContext
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
import org.apache.spark.sql.catalyst.trees.Origin
import org.apache.spark.sql.internal.SQLConf

/**
* Object for grouping all error messages of the query parsing.
Expand Down Expand Up @@ -367,4 +368,10 @@ object QueryParsingErrors {
new ParseException("LOCAL is supported only with file: scheme", ctx)
}

def mixedIntervalError(ctx: ParserRuleContext): Throwable = {
new ParseException(
"Mixing of year-month and day-time fields is not allowed. " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does ANSI allow mixing daytime interval with same unit e.g. day and day

s"Set '${SQLConf.LEGACY_INTERVAL_ENABLED.key}' to true to enable the legacy interval type " +
"which supports mixed fields.", ctx)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,120 +21,129 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference,
import org.apache.spark.sql.catalyst.optimizer.SimpleTestOptimizer
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, Filter, LeafNode}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{IntegerType, MetadataBuilder, TimestampType}

class StreamingJoinHelperSuite extends AnalysisTest {

test("extract watermark from time condition") {
val attributesToFindConstraintFor = Seq(
AttributeReference("leftTime", TimestampType)(),
AttributeReference("leftOther", IntegerType)())
val metadataWithWatermark = new MetadataBuilder()
.putLong(EventTimeWatermark.delayKey, 1000)
.build()
val attributesWithWatermark = Seq(
AttributeReference("rightTime", TimestampType, metadata = metadataWithWatermark)(),
AttributeReference("rightOther", IntegerType)())
// TODO(SPARK-35095): Use ANSI intervals in streaming join tests
withSQLConf(SQLConf.LEGACY_INTERVAL_ENABLED.key -> "true") {
val attributesToFindConstraintFor = Seq(
AttributeReference("leftTime", TimestampType)(),
AttributeReference("leftOther", IntegerType)())
val metadataWithWatermark = new MetadataBuilder()
.putLong(EventTimeWatermark.delayKey, 1000)
.build()
val attributesWithWatermark = Seq(
AttributeReference("rightTime", TimestampType, metadata = metadataWithWatermark)(),
AttributeReference("rightOther", IntegerType)())

case class DummyLeafNode() extends LeafNode {
override def output: Seq[Attribute] =
attributesToFindConstraintFor ++ attributesWithWatermark
}
case class DummyLeafNode() extends LeafNode {
override def output: Seq[Attribute] =
attributesToFindConstraintFor ++ attributesWithWatermark
}

def watermarkFrom(
conditionStr: String,
rightWatermark: Option[Long] = Some(10000)): Option[Long] = {
val conditionExpr = Some(conditionStr).map { str =>
val plan =
Filter(
CatalystSqlParser.parseExpression(str),
DummyLeafNode())
val optimized = SimpleTestOptimizer.execute(SimpleAnalyzer.execute(plan))
optimized.asInstanceOf[Filter].condition
def watermarkFrom(
conditionStr: String,
rightWatermark: Option[Long] = Some(10000)): Option[Long] = {
val conditionExpr = Some(conditionStr).map { str =>
val plan =
Filter(
CatalystSqlParser.parseExpression(str),
DummyLeafNode())
val optimized = SimpleTestOptimizer.execute(SimpleAnalyzer.execute(plan))
optimized.asInstanceOf[Filter].condition
}
StreamingJoinHelper.getStateValueWatermark(
AttributeSet(attributesToFindConstraintFor), AttributeSet(attributesWithWatermark),
conditionExpr, rightWatermark)
}
StreamingJoinHelper.getStateValueWatermark(
AttributeSet(attributesToFindConstraintFor), AttributeSet(attributesWithWatermark),
conditionExpr, rightWatermark)
}

// Test comparison directionality. E.g. if leftTime < rightTime and rightTime > watermark,
// then cannot define constraint on leftTime.
assert(watermarkFrom("leftTime > rightTime") === Some(10000))
assert(watermarkFrom("leftTime >= rightTime") === Some(9999))
assert(watermarkFrom("leftTime < rightTime") === None)
assert(watermarkFrom("leftTime <= rightTime") === None)
assert(watermarkFrom("rightTime > leftTime") === None)
assert(watermarkFrom("rightTime >= leftTime") === None)
assert(watermarkFrom("rightTime < leftTime") === Some(10000))
assert(watermarkFrom("rightTime <= leftTime") === Some(9999))
// Test comparison directionality. E.g. if leftTime < rightTime and rightTime > watermark,
// then cannot define constraint on leftTime.
assert(watermarkFrom("leftTime > rightTime") === Some(10000))
assert(watermarkFrom("leftTime >= rightTime") === Some(9999))
assert(watermarkFrom("leftTime < rightTime") === None)
assert(watermarkFrom("leftTime <= rightTime") === None)
assert(watermarkFrom("rightTime > leftTime") === None)
assert(watermarkFrom("rightTime >= leftTime") === None)
assert(watermarkFrom("rightTime < leftTime") === Some(10000))
assert(watermarkFrom("rightTime <= leftTime") === Some(9999))

// Test type conversions
assert(watermarkFrom("CAST(leftTime AS LONG) > CAST(rightTime AS LONG)") === Some(10000))
assert(watermarkFrom("CAST(leftTime AS LONG) < CAST(rightTime AS LONG)") === None)
assert(watermarkFrom("CAST(leftTime AS DOUBLE) > CAST(rightTime AS DOUBLE)") === Some(10000))
assert(watermarkFrom("CAST(leftTime AS LONG) > CAST(rightTime AS DOUBLE)") === Some(10000))
assert(watermarkFrom("CAST(leftTime AS LONG) > CAST(rightTime AS FLOAT)") === Some(10000))
assert(watermarkFrom("CAST(leftTime AS DOUBLE) > CAST(rightTime AS FLOAT)") === Some(10000))
assert(watermarkFrom("CAST(leftTime AS STRING) > CAST(rightTime AS STRING)") === None)
// Test type conversions
assert(watermarkFrom("CAST(leftTime AS LONG) > CAST(rightTime AS LONG)") === Some(10000))
assert(watermarkFrom("CAST(leftTime AS LONG) < CAST(rightTime AS LONG)") === None)
assert(watermarkFrom("CAST(leftTime AS DOUBLE) > CAST(rightTime AS DOUBLE)") === Some(10000))
assert(watermarkFrom("CAST(leftTime AS LONG) > CAST(rightTime AS DOUBLE)") === Some(10000))
assert(watermarkFrom("CAST(leftTime AS LONG) > CAST(rightTime AS FLOAT)") === Some(10000))
assert(watermarkFrom("CAST(leftTime AS DOUBLE) > CAST(rightTime AS FLOAT)") === Some(10000))
assert(watermarkFrom("CAST(leftTime AS STRING) > CAST(rightTime AS STRING)") === None)

// Test with timestamp type + calendar interval on either side of equation
// Note: timestamptype and calendar interval don't commute, so less valid combinations to test.
assert(watermarkFrom("leftTime > rightTime + interval 1 second") === Some(11000))
assert(watermarkFrom("leftTime + interval 2 seconds > rightTime ") === Some(8000))
assert(watermarkFrom("leftTime > rightTime - interval 3 second") === Some(7000))
assert(watermarkFrom("rightTime < leftTime - interval 3 second") === Some(13000))
assert(watermarkFrom("rightTime - interval 1 second < leftTime - interval 3 second")
=== Some(12000))
// Test with timestamp type + calendar interval on either side of equation
// Note: timestamptype and calendar interval don't commute,
// so less valid combinations to test.
assert(watermarkFrom("leftTime > rightTime + interval 1 second") === Some(11000))
assert(watermarkFrom("leftTime + interval 2 seconds > rightTime ") === Some(8000))
assert(watermarkFrom("leftTime > rightTime - interval 3 second") === Some(7000))
assert(watermarkFrom("rightTime < leftTime - interval 3 second") === Some(13000))
assert(watermarkFrom("rightTime - interval 1 second < leftTime - interval 3 second")
=== Some(12000))

// Test with casted long type + constants on either side of equation
// Note: long type and constants commute, so more combinations to test.
// -- Constants on the right
assert(watermarkFrom("CAST(leftTime AS LONG) > CAST(rightTime AS LONG) + 1") === Some(11000))
assert(watermarkFrom("CAST(leftTime AS LONG) > CAST(rightTime AS LONG) - 1") === Some(9000))
assert(watermarkFrom("CAST(leftTime AS LONG) > CAST((rightTime + interval 1 second) AS LONG)")
=== Some(11000))
assert(watermarkFrom("CAST(leftTime AS LONG) > 2 + CAST(rightTime AS LONG)") === Some(12000))
assert(watermarkFrom("CAST(leftTime AS LONG) > -0.5 + CAST(rightTime AS LONG)") === Some(9500))
assert(watermarkFrom("CAST(leftTime AS LONG) - CAST(rightTime AS LONG) > 2") === Some(12000))
assert(watermarkFrom("-CAST(rightTime AS DOUBLE) + CAST(leftTime AS LONG) > 0.1")
=== Some(10100))
assert(watermarkFrom("0 > CAST(rightTime AS LONG) - CAST(leftTime AS LONG) + 0.2")
=== Some(10200))
// -- Constants on the left
assert(watermarkFrom("CAST(leftTime AS LONG) + 2 > CAST(rightTime AS LONG)") === Some(8000))
assert(watermarkFrom("1 + CAST(leftTime AS LONG) > CAST(rightTime AS LONG)") === Some(9000))
assert(watermarkFrom("CAST((leftTime + interval 3 second) AS LONG) > CAST(rightTime AS LONG)")
=== Some(7000))
assert(watermarkFrom("CAST(leftTime AS LONG) - 2 > CAST(rightTime AS LONG)") === Some(12000))
assert(watermarkFrom("CAST(leftTime AS LONG) + 0.5 > CAST(rightTime AS LONG)") === Some(9500))
assert(watermarkFrom("CAST(leftTime AS LONG) - CAST(rightTime AS LONG) - 2 > 0")
=== Some(12000))
assert(watermarkFrom("-CAST(rightTime AS LONG) + CAST(leftTime AS LONG) - 0.1 > 0")
=== Some(10100))
// -- Constants on both sides, mixed types
assert(watermarkFrom("CAST(leftTime AS LONG) - 2.0 > CAST(rightTime AS LONG) + 1")
=== Some(13000))
// Test with casted long type + constants on either side of equation
// Note: long type and constants commute, so more combinations to test.
// -- Constants on the right
assert(watermarkFrom("CAST(leftTime AS LONG) > CAST(rightTime AS LONG) + 1") === Some(11000))
assert(watermarkFrom("CAST(leftTime AS LONG) > CAST(rightTime AS LONG) - 1") === Some(9000))
assert(watermarkFrom("CAST(leftTime AS LONG) > CAST((rightTime + interval 1 second) AS LONG)")
=== Some(11000))
assert(watermarkFrom("CAST(leftTime AS LONG) > 2 + CAST(rightTime AS LONG)") === Some(12000))
assert(watermarkFrom("CAST(leftTime AS LONG) > -0.5 + CAST(rightTime AS LONG)") ===
Some(9500))
assert(watermarkFrom("CAST(leftTime AS LONG) - CAST(rightTime AS LONG) > 2") === Some(12000))
assert(watermarkFrom("-CAST(rightTime AS DOUBLE) + CAST(leftTime AS LONG) > 0.1")
=== Some(10100))
assert(watermarkFrom("0 > CAST(rightTime AS LONG) - CAST(leftTime AS LONG) + 0.2")
=== Some(10200))
// -- Constants on the left
assert(watermarkFrom("CAST(leftTime AS LONG) + 2 > CAST(rightTime AS LONG)") === Some(8000))
assert(watermarkFrom("1 + CAST(leftTime AS LONG) > CAST(rightTime AS LONG)") === Some(9000))
assert(
watermarkFrom("CAST((leftTime + interval 3 second) AS LONG) > CAST(rightTime AS LONG)")
=== Some(7000))
assert(watermarkFrom("CAST(leftTime AS LONG) - 2 > CAST(rightTime AS LONG)") === Some(12000))
assert(watermarkFrom("CAST(leftTime AS LONG) + 0.5 > CAST(rightTime AS LONG)") === Some(9500))
assert(watermarkFrom("CAST(leftTime AS LONG) - CAST(rightTime AS LONG) - 2 > 0")
=== Some(12000))
assert(watermarkFrom("-CAST(rightTime AS LONG) + CAST(leftTime AS LONG) - 0.1 > 0")
=== Some(10100))
// -- Constants on both sides, mixed types
assert(watermarkFrom("CAST(leftTime AS LONG) - 2.0 > CAST(rightTime AS LONG) + 1")
=== Some(13000))

// Test multiple conditions, should return minimum watermark
assert(watermarkFrom(
"leftTime > rightTime - interval 3 second AND rightTime < leftTime + interval 2 seconds") ===
Some(7000)) // first condition wins
assert(watermarkFrom(
"leftTime > rightTime - interval 3 second AND rightTime < leftTime + interval 4 seconds") ===
Some(6000)) // second condition wins
// Test multiple conditions, should return minimum watermark
assert(watermarkFrom(
"leftTime > rightTime - interval 3 second AND rightTime < leftTime + interval 2 seconds")
=== Some(7000)) // first condition wins
assert(watermarkFrom(
"leftTime > rightTime - interval 3 second AND rightTime < leftTime + interval 4 seconds")
=== Some(6000)) // second condition wins

// Test invalid comparisons
assert(watermarkFrom("cast(leftTime AS LONG) > leftOther") === None) // non-time attributes
assert(watermarkFrom("leftOther > rightOther") === None) // non-time attributes
assert(watermarkFrom("leftOther > rightOther AND leftTime > rightTime") === Some(10000))
assert(watermarkFrom("cast(rightTime AS DOUBLE) < rightOther") === None) // non-time attributes
assert(watermarkFrom("leftTime > rightTime + interval 1 month") === None) // month not allowed
// Test invalid comparisons
assert(watermarkFrom("cast(leftTime AS LONG) > leftOther") === None) // non-time attributes
assert(watermarkFrom("leftOther > rightOther") === None) // non-time attributes
assert(watermarkFrom("leftOther > rightOther AND leftTime > rightTime") === Some(10000))
// non-time attributes
assert(watermarkFrom("cast(rightTime AS DOUBLE) < rightOther") === None)
assert(watermarkFrom("leftTime > rightTime + interval 1 month") === None) // month not allowed

// Test static comparisons
assert(watermarkFrom("cast(leftTime AS LONG) > 10") === Some(10000))
// Test static comparisons
assert(watermarkFrom("cast(leftTime AS LONG) > 10") === Some(10000))

// Test non-positive results
assert(watermarkFrom("CAST(leftTime AS LONG) > CAST(rightTime AS LONG) - 10") === Some(0))
assert(watermarkFrom("CAST(leftTime AS LONG) > CAST(rightTime AS LONG) - 100") === Some(-90000))
// Test non-positive results
assert(watermarkFrom("CAST(leftTime AS LONG) > CAST(rightTime AS LONG) - 10") === Some(0))
assert(watermarkFrom("CAST(leftTime AS LONG) > CAST(rightTime AS LONG) - 100") ===
Some(-90000))
}
}
}
Loading