Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
d229cee
[SPARK-53687][SQL][SS] Introduce WATERMARK clause in SQL statement
HeartSaVioR Sep 23, 2025
01eab25
addressed error class
HeartSaVioR Sep 24, 2025
f06846a
fix CI failures
HeartSaVioR Sep 24, 2025
a2db250
removed unused error class
HeartSaVioR Sep 24, 2025
92dd249
another fix
HeartSaVioR Sep 24, 2025
150e9ef
Move the logic of resolving EventTimeWatermark to separate analysis rule
HeartSaVioR Sep 26, 2025
fc092b6
maybe correct one
HeartSaVioR Sep 26, 2025
c451b1b
reflect review comments
HeartSaVioR Sep 30, 2025
5fd9cf5
fix
HeartSaVioR Sep 30, 2025
b1c8619
Apply suggestion from @cloud-fan
HeartSaVioR Oct 19, 2025
ff96c94
fix
HeartSaVioR Oct 19, 2025
c2ec240
Apply suggestion from @cloud-fan
HeartSaVioR Oct 19, 2025
107f7b3
DML -> statement
HeartSaVioR Oct 19, 2025
9206997
parser test
HeartSaVioR Oct 19, 2025
31ddd57
fix
HeartSaVioR Oct 20, 2025
27e4de3
remove out test
HeartSaVioR Oct 20, 2025
855c453
parser tests
HeartSaVioR Oct 20, 2025
8805a58
reflect review comments
HeartSaVioR Oct 21, 2025
bba84a9
reflect review comments
HeartSaVioR Oct 21, 2025
d09b56e
remove redundant test as suggested
HeartSaVioR Oct 21, 2025
b06efdc
reflect recent change
HeartSaVioR Oct 21, 2025
6a40983
fix test
HeartSaVioR Oct 22, 2025
35cd9ed
[empty] this is a empty commit
HeartSaVioR Oct 22, 2025
66d4578
Move out the requirement of explicit name into Analyzer
HeartSaVioR Oct 22, 2025
612df08
add the lost test coverage from the last commit & fix the issue
HeartSaVioR Oct 22, 2025
c556393
fix
HeartSaVioR Oct 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,12 @@
],
"sqlState" : "22KD3"
},
"CANNOT_USE_MULTI_ALIASES_IN_WATERMARK_CLAUSE" : {
"message" : [
"Multiple aliases are not supported in watermark clause."
],
"sqlState" : "42000"
},
"CANNOT_WRITE_STATE_STORE" : {
"message" : [
"Error writing state store files for provider <providerClass>."
Expand Down Expand Up @@ -4985,6 +4991,12 @@
],
"sqlState" : "4274K"
},
"REQUIRES_EXPLICIT_NAME_IN_WATERMARK_CLAUSE" : {
"message" : [
"The watermark clause requires an explicit name if expression is specified, but got <sqlExpr>."
],
"sqlState" : "42000"
},
"REQUIRES_SINGLE_PART_NAMESPACE" : {
"message" : [
"<sessionCatalog> requires a single-part namespace, but got <namespace>."
Expand Down
2 changes: 2 additions & 0 deletions docs/sql-ref-ansi-compliance.md
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,7 @@ Below is a list of all the keywords in Spark SQL.
|DEFAULT|non-reserved|non-reserved|non-reserved|
|DEFINED|non-reserved|non-reserved|non-reserved|
|DEFINER|non-reserved|non-reserved|non-reserved|
|DELAY|non-reserved|non-reserved|non-reserved|
|DELETE|non-reserved|non-reserved|reserved|
|DELIMITED|non-reserved|non-reserved|non-reserved|
|DESC|non-reserved|non-reserved|non-reserved|
Expand Down Expand Up @@ -793,6 +794,7 @@ Below is a list of all the keywords in Spark SQL.
|VIEW|non-reserved|non-reserved|non-reserved|
|VIEWS|non-reserved|non-reserved|non-reserved|
|VOID|non-reserved|non-reserved|non-reserved|
|WATERMARK|non-reserved|non-reserved|non-reserved|
|WEEK|non-reserved|non-reserved|non-reserved|
|WEEKS|non-reserved|non-reserved|non-reserved|
|WHEN|reserved|non-reserved|reserved|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ DECLARE: 'DECLARE';
DEFAULT: 'DEFAULT';
DEFINED: 'DEFINED';
DEFINER: 'DEFINER';
DELAY: 'DELAY';
DELETE: 'DELETE';
DELIMITED: 'DELIMITED';
DESC: 'DESC';
Expand Down Expand Up @@ -501,6 +502,7 @@ VERSION: 'VERSION';
VIEW: 'VIEW';
VIEWS: 'VIEWS';
VOID: 'VOID';
WATERMARK: 'WATERMARK';
WEEK: 'WEEK';
WEEKS: 'WEEKS';
WHEN: 'WHEN';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,8 +382,10 @@ createPipelineDatasetHeader
;

streamRelationPrimary
: STREAM multipartIdentifier optionsClause? tableAlias #streamTableName
| STREAM LEFT_PAREN multipartIdentifier RIGHT_PAREN optionsClause? tableAlias #streamTableName
: STREAM multipartIdentifier optionsClause? watermarkClause?
tableAlias #streamTableName
| STREAM LEFT_PAREN multipartIdentifier RIGHT_PAREN
optionsClause? watermarkClause? tableAlias #streamTableName
;

setResetStatement
Expand Down Expand Up @@ -927,6 +929,10 @@ lateralView
: LATERAL VIEW (OUTER)? qualifiedName LEFT_PAREN (expression (COMMA expression)*)? RIGHT_PAREN tblName=identifier (AS? colName+=identifier (COMMA colName+=identifier)*)?
;

watermarkClause
: WATERMARK colName=namedExpression DELAY OF delay=interval
;

setQuantifier
: DISTINCT
| ALL
Expand Down Expand Up @@ -1001,9 +1007,11 @@ identifierComment
relationPrimary
: streamRelationPrimary #streamRelation
| identifierReference temporalClause?
optionsClause? sample? tableAlias #tableName
| LEFT_PAREN query RIGHT_PAREN sample? tableAlias #aliasedQuery
| LEFT_PAREN relation RIGHT_PAREN sample? tableAlias #aliasedRelation
optionsClause? sample? watermarkClause? tableAlias #tableName
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

watermarkClause is already defined in streamRelationPrimary. Do we still need it here? Is it also applied for non-stream relation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

relation here can be a temp view which could be technically streaming without STREAM keyword

| LEFT_PAREN query RIGHT_PAREN sample? watermarkClause?
tableAlias #aliasedQuery
| LEFT_PAREN relation RIGHT_PAREN sample?
watermarkClause? tableAlias #aliasedRelation
| inlineTable #inlineTableDefault2
| functionTable #tableValuedFunction
;
Expand All @@ -1012,6 +1020,8 @@ optionsClause
: WITH options=propertyList
;

// Unlike all other types of expression for relation, we do not support watermarkClause for
// inlineTable.
inlineTable
: VALUES expression (COMMA expression)* tableAlias
;
Expand Down Expand Up @@ -1048,10 +1058,13 @@ functionTableArgument
| functionArgument
;

// This is only used in relationPrimary where having watermarkClause makes sense. If this becomes
// referred by other clause, please check wheter watermarkClause makes sense to the clause.
// If not, consider separate this rule.
functionTable
: funcName=functionName LEFT_PAREN
(functionTableArgument (COMMA functionTableArgument)*)?
RIGHT_PAREN tableAlias
RIGHT_PAREN watermarkClause? tableAlias
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the doc https://docs.databricks.com/aws/en/sql/language-manual/sql-ref-syntax-qry-select-watermark, it looks like table_valued_function has no watermark_clause support, but we want to have it here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ouch I think it's missed in that doc.

;

tableAlias
Expand Down Expand Up @@ -1819,6 +1832,7 @@ ansiNonReserved
| DEFAULT
| DEFINED
| DEFINER
| DELAY
| DELETE
| DELIMITED
| DESC
Expand Down Expand Up @@ -2063,6 +2077,7 @@ ansiNonReserved
| WEEK
| WEEKS
| WHILE
| WATERMARK
| WINDOW
| WITHOUT
| YEAR
Expand Down Expand Up @@ -2188,6 +2203,7 @@ nonReserved
| DEFAULT
| DEFINED
| DEFINER
| DELAY
| DELETE
| DELIMITED
| DESC
Expand Down Expand Up @@ -2469,6 +2485,7 @@ nonReserved
| VIEW
| VIEWS
| VOID
| WATERMARK
| WEEK
| WEEKS
| WHILE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,10 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
case c: CollectMetrics
if c.child.resolved && AliasResolution.hasUnresolvedAlias(c.metrics) =>
c.copy(metrics = AliasResolution.assignAliases(c.metrics))

case u: UnresolvedEventTimeWatermark
if u.child.resolved && AliasResolution.hasUnresolvedAlias(Seq(u.eventTimeColExpr)) =>
u.copy(eventTimeColExpr = AliasResolution.assignAliases(Seq(u.eventTimeColExpr)).head)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreePattern
import org.apache.spark.sql.catalyst.util.AUTO_GENERATED_ALIAS

/**
* Resolve [[UnresolvedEventTimeWatermark]] to [[EventTimeWatermark]].
*/
object ResolveEventTimeWatermark extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUpWithPruning(
_.containsPattern(TreePattern.UNRESOLVED_EVENT_TIME_WATERMARK), ruleId) {

case u: UnresolvedEventTimeWatermark if u.eventTimeColExpr.resolved && u.childrenResolved =>
if (u.eventTimeColExpr.metadata.contains(AUTO_GENERATED_ALIAS) &&
u.eventTimeColExpr.metadata.getString(AUTO_GENERATED_ALIAS) == "true") {
throw new AnalysisException(
errorClass = "REQUIRES_EXPLICIT_NAME_IN_WATERMARK_CLAUSE",
messageParameters = Map("sqlExpr" -> u.eventTimeColExpr.sql)
)
}

val uuid = java.util.UUID.randomUUID()

val attrRef = u.eventTimeColExpr.toAttribute
if (u.child.outputSet.contains(u.eventTimeColExpr)) {
// We don't need to have projection since the attribute being referenced will be available.
EventTimeWatermark(uuid, attrRef, u.delay, u.child)
} else {
// We need to inject projection as we can't find the matching column directly in the
// child output.
val proj = Project(Seq(u.eventTimeColExpr) ++ u.child.output, u.child)
EventTimeWatermark(uuid, attrRef, u.delay, proj)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.sql.connector.catalog.TableWritePrivilege
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.types.{DataType, Metadata, StructType}
import org.apache.spark.sql.util.{CaseInsensitiveStringMap, SchemaUtils}
import org.apache.spark.unsafe.types.CalendarInterval
import org.apache.spark.util.ArrayImplicits._

/**
Expand Down Expand Up @@ -1228,3 +1229,15 @@ case class UnresolvedExecuteImmediate(

final override val nodePatterns: Seq[TreePattern] = Seq(EXECUTE_IMMEDIATE)
}

case class UnresolvedEventTimeWatermark(
eventTimeColExpr: NamedExpression,
delay: CalendarInterval,
child: LogicalPlan)
extends UnresolvedUnaryNode {

final override val nodePatterns: Seq[TreePattern] = Seq(UNRESOLVED_EVENT_TIME_WATERMARK)

override protected def withNewChildInternal(
newChild: LogicalPlan): UnresolvedEventTimeWatermark = copy(child = newChild)
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.trees.CurrentOrigin
import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.catalyst.util.CollationFactory
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}

/**
* A collection of implicit conversions that create a DSL for constructing catalyst data structures.
Expand Down Expand Up @@ -566,6 +566,19 @@ package object dsl extends SQLConfHelper {
}

def deduplicate(colNames: Attribute*): LogicalPlan = Deduplicate(colNames, logicalPlan)

def withWatermark(
uuid: java.util.UUID,
expr: NamedExpression,
delayThreshold: CalendarInterval): LogicalPlan = {
EventTimeWatermark(uuid, expr.toAttribute, delayThreshold, logicalPlan)
}

def unresolvedWithWatermark(
expr: NamedExpression,
delayThreshold: CalendarInterval): LogicalPlan = {
UnresolvedEventTimeWatermark(expr, delayThreshold, logicalPlan)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.antlr.v4.runtime.tree.{ParseTree, RuleNode, TerminalNode}
import org.apache.spark.{SparkArithmeticException, SparkException, SparkIllegalArgumentException, SparkThrowable, SparkThrowableHelper}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.LogKeys.PARTITION_SPECIFICATION
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, SQLConfHelper, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FUNC_ALIAS
Expand Down Expand Up @@ -2076,6 +2077,34 @@ class AstBuilder extends DataTypeAstBuilder
query)
}

/**
* Add an [[EventTimeWatermark]] to a logical plan.
*/
private def withWatermark(
ctx: WatermarkClauseContext,
query: LogicalPlan): LogicalPlan = withOrigin(ctx) {
val expression = visitNamedExpression(ctx.namedExpression())

val namedExpression = expression match {
// Need to check this earlier since MultiAlias is also a NamedExpression
case _: MultiAlias =>
throw new AnalysisException(
errorClass = "CANNOT_USE_MULTI_ALIASES_IN_WATERMARK_CLAUSE",
messageParameters = Map()
)
case e: NamedExpression => e
case e => UnresolvedAlias(e)
}

val delayInterval = visitInterval(ctx.delay)

val delay = IntervalUtils.fromIntervalString(delayInterval.toString)
require(!IntervalUtils.isNegative(delay),
s"delay threshold (${delayInterval.toString}) should not be negative.")

UnresolvedEventTimeWatermark(namedExpression, delay, query)
}

/**
* Create a single relation referenced in a FROM clause. This method is used when a part of the
* join condition is nested, for example:
Expand Down Expand Up @@ -2255,7 +2284,8 @@ class AstBuilder extends DataTypeAstBuilder
val relation = createUnresolvedRelation(ctx.identifierReference, Option(ctx.optionsClause))
val table = mayApplyAliasPlan(
ctx.tableAlias, relation.optionalMap(ctx.temporalClause)(withTimeTravel))
table.optionalMap(ctx.sample)(withSample)
val sample = table.optionalMap(ctx.sample)(withSample)
sample.optionalMap(ctx.watermarkClause)(withWatermark)
}

override def visitVersion(ctx: VersionContext): Option[String] = {
Expand Down Expand Up @@ -2395,7 +2425,9 @@ class AstBuilder extends DataTypeAstBuilder

val tvfAliases = if (aliases.nonEmpty) UnresolvedTVFAliases(ident, tvf, aliases) else tvf

tvfAliases.optionalMap(func.tableAlias.strictIdentifier)(aliasPlan)
val watermarkClause = func.watermarkClause()
val tvfWithWatermark = tvfAliases.optionalMap(watermarkClause)(withWatermark)
tvfWithWatermark.optionalMap(func.tableAlias.strictIdentifier)(aliasPlan)
})
}

Expand All @@ -2407,7 +2439,9 @@ class AstBuilder extends DataTypeAstBuilder
optionsClause = Option(ctx.optionsClause),
writePrivileges = Seq.empty,
isStreaming = true)
mayApplyAliasPlan(ctx.tableAlias, tableStreamingRelation)

val tableWithWatermark = tableStreamingRelation.optionalMap(ctx.watermarkClause)(withWatermark)
mayApplyAliasPlan(ctx.tableAlias, tableWithWatermark)
}

/**
Expand Down Expand Up @@ -2450,7 +2484,8 @@ class AstBuilder extends DataTypeAstBuilder
*/
override def visitAliasedRelation(ctx: AliasedRelationContext): LogicalPlan = withOrigin(ctx) {
val relation = plan(ctx.relation).optionalMap(ctx.sample)(withSample)
mayApplyAliasPlan(ctx.tableAlias, relation)
val watermark = relation.optionalMap(ctx.watermarkClause)(withWatermark)
mayApplyAliasPlan(ctx.tableAlias, watermark)
}

/**
Expand All @@ -2463,7 +2498,7 @@ class AstBuilder extends DataTypeAstBuilder
*/
override def visitAliasedQuery(ctx: AliasedQueryContext): LogicalPlan = withOrigin(ctx) {
val relation = plan(ctx.query).optionalMap(ctx.sample)(withSample)
if (ctx.tableAlias.strictIdentifier == null) {
val alias = if (ctx.tableAlias.strictIdentifier == null) {
// For un-aliased subqueries, use a default alias name that is not likely to conflict with
// normal subquery names, so that parent operators can only access the columns in subquery by
// unqualified names. Users can still use this special qualifier to access columns if they
Expand All @@ -2472,6 +2507,7 @@ class AstBuilder extends DataTypeAstBuilder
} else {
mayApplyAliasPlan(ctx.tableAlias, relation)
}
alias.optionalMap(ctx.watermarkClause)(withWatermark)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ object RuleIdCollection {
"org.apache.spark.sql.catalyst.analysis.EliminateUnions" ::
"org.apache.spark.sql.catalyst.analysis.ResolveCollationName" ::
"org.apache.spark.sql.catalyst.analysis.ResolveDefaultColumns" ::
"org.apache.spark.sql.catalyst.analysis.ResolveEventTimeWatermark" ::
"org.apache.spark.sql.catalyst.analysis.ResolveExecuteImmediate" ::
"org.apache.spark.sql.catalyst.analysis.ResolveExpressionsWithNamePlaceholders" ::
"org.apache.spark.sql.catalyst.analysis.ResolveGroupByAll" ::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ object TreePattern extends Enumeration {

// Unresolved Plan patterns (Alphabetically ordered)
val PLAN_WITH_UNRESOLVED_IDENTIFIER: Value = Value
val UNRESOLVED_EVENT_TIME_WATERMARK: Value = Value
val UNRESOLVED_HAVING: Value = Value
val UNRESOLVED_HINT: Value = Value
val UNRESOLVED_FUNC: Value = Value
Expand Down
Loading