Skip to content

Conversation

@HeartSaVioR
Copy link
Contributor

What changes were proposed in this pull request?

This PR proposes to introduce WATERMARK clause in SQL statement. WATERMARK clause is to define the watermark against the relation, especially streaming relation where STREAM keyword is added to the relation (or table valued function).

Please refer to the SQL reference doc for WATERMARK clause about its definition.
https://docs.databricks.com/aws/en/sql/language-manual/sql-ref-syntax-qry-select-watermark

The PR also contains new tests which show how to use it.

Why are the changes needed?

This is needed to unblock the stateful workload in Streaming Table & Flow being described as SQL statement.

Does this PR introduce any user-facing change?

Yes, users can define the watermark for stateful workloads with SQL statement.

How was this patch tested?

New UTs.

Was this patch authored or co-authored using generative AI tooling?

No.

@HeartSaVioR
Copy link
Contributor Author

cc. @sryza @cloud-fan @viirya
Could you please take a look? Thanks!

Also, do we have test purposed TVF definition for "streaming source"? I couldn't add E2E test for STREAM TVF + WATERMARK and it'd be ideal if we could add one.

@HeartSaVioR HeartSaVioR changed the title [SPARK-53687][SQL][SS] Introduce WATERMARK clause in SQL statement [SPARK-53687][SQL][SS][SDP] Introduce WATERMARK clause in SQL statement Sep 24, 2025
@HeartSaVioR
Copy link
Contributor Author

cc. @anishshri-db as well

@@ -9880,6 +9880,11 @@
"Doesn't support month or year interval: <interval>"
]
},
"_LEGACY_ERROR_TEMP_3263" : {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: Need to assign one error class

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: remove this

@github-actions github-actions bot added the DOCS label Sep 24, 2025
optionsClause? sample? tableAlias #tableName
| LEFT_PAREN query RIGHT_PAREN sample? tableAlias #aliasedQuery
| LEFT_PAREN relation RIGHT_PAREN sample? tableAlias #aliasedRelation
optionsClause? sample? watermarkClause? tableAlias #tableName
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

watermarkClause is already defined in streamRelationPrimary. Do we still need it here? Is it also applied for non-stream relation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

relation here can be a temp view which could be technically streaming without STREAM keyword

functionTable
: funcName=functionName LEFT_PAREN
(functionTableArgument (COMMA functionTableArgument)*)?
RIGHT_PAREN tableAlias
RIGHT_PAREN watermarkClause? tableAlias
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the doc https://docs.databricks.com/aws/en/sql/language-manual/sql-ref-syntax-qry-select-watermark, it looks like table_valued_function has no watermark_clause support, but we want to have it here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ouch I think it's missed in that doc.

}

// TODO: This seems to need to find the new expression by index. We can't rely on exprId
// due to UnresolvedAlias. Are there better ways to do?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need a new logical plan like UnresolvedEventTimeWatermark, which will be rewritten into EventTimeWatermark + Project during analysis.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed. PTAL!

case e: Expression => UnresolvedAlias(e)
}

val isAttributeReference = namedExpression.isInstanceOf[AttributeReference]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan
Please help verify what I'm doing is correct.

What EventTimeWatermark node does is simply to attach metadata marker against the column (produced by child.output) referred by the expression. The node itself does not do any projection and just updates the accumulators and does the passthrough. If the eventTimeColExpr has to be evaluated, we have to inject Project to evaluate the expression, which we expect the result to be explicitly named.

Hence I skip injecting Project only if the expression is an AttributeReference and the column is available in the child.output, otherwise I simply add Project.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it doesn't matter. We can also always add the Project, as optimizer can remove it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I'm concerned is that we "change" the order of column when we add the project.

For expression with alias, that is not an existing column hence I think it's OK to consider it like withColumn, appending the column in the end. But if the expression is simply referring to the existing column, probably the ideal behavior is to not have a project which might change the column order.

override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUpWithPruning(
_.containsPattern(TreePattern.UNRESOLVED_EVENT_TIME_WATERMARK), ruleId) {

case u: UnresolvedEventTimeWatermark if u.childrenResolved =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we wait for u.eventTimeColExpr.resolved?

}

val isAttributeReference = namedExpression.isInstanceOf[AttributeReference]
val exprInChildOutput = u.child.output.exists(_.name == namedExpression.name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's weird to search by name for resolved attributes, we can just do u.child.outputSet.contains(namedExpression)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So when eventTimeColExpr is resolved, the expression can refer to the column by exprId? If then yes we can do that.


final override val nodePatterns: Seq[TreePattern] = Seq(UNRESOLVED_EVENT_TIME_WATERMARK)

private val delay = IntervalUtils.fromIntervalString(delayInterval.toString)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we do it in the parser? This logical plan should take CalendarInterval directly.

@@ -81,8 +81,9 @@ case class EventTimeWatermark(
// This is not allowed by default - WatermarkPropagator will throw an exception. We keep the
// logic here because we also maintain the compatibility flag. (See
// SQLConf.STATEFUL_OPERATOR_ALLOW_MULTIPLE for details.)
// Should be defined as lazy so that attributes can be resolved before calling this.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we still need this hack?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me try removing it.


val namedExpression = u.eventTimeColExpr match {
case e: NamedExpression => e
case e: Expression => UnresolvedAlias(e)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This alias is not user-facing and doesn't matter. We can simply do Alias(e, "watermark_col")()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The watermark column "name" is determined by the expression, not something we use the reserved name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But wait, now this pattern is only executed "after" the eventTimeColExpr is resolved. Maybe we can just throw an exception if it's still not a named expression.

sqlContext.streams.active.foreach(_.stop())
}

test("event time and watermark metrics with watermark in select DML - case 1") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does DML mean here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant Data Manipulation Language - I can just specify it as "select statement".

// attribute reference vs expression

// tableName, attribute reference, alias
assertEqual(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is assertEqual defined?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad. I'll push the fix sooner.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably proper to have these tests in PlanParserSuite.

_.containsPattern(TreePattern.UNRESOLVED_EVENT_TIME_WATERMARK), ruleId) {

case u: UnresolvedEventTimeWatermark if u.eventTimeColExpr.resolved && u.childrenResolved =>
if (u.eventTimeColExpr.isInstanceOf[MultiAlias]) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can fail it earlier in the parser

// We need to inject projection as we can't find the matching column directly in the
// child output.
val proj = Project(Seq(u.eventTimeColExpr) ++ u.child.output, u.child)
val attrRef = u.eventTimeColExpr.toAttribute
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: let's move it before the if-else, as we need it in the if branch as well

@HeartSaVioR
Copy link
Contributor Author

I've moved the check from AstBuilder to the analyzer rule since it's likely to be read more time than the AstBuilder impl.

@cloud-fan
Copy link
Contributor

the docker test failure is unrelated, thanks, merging to master!

@cloud-fan cloud-fan closed this in d65ed4a Oct 22, 2025
@HeartSaVioR
Copy link
Contributor Author

Thanks for your thoughtful review!

huangxiaopingRD pushed a commit to huangxiaopingRD/spark that referenced this pull request Nov 25, 2025
### What changes were proposed in this pull request?

This PR proposes to introduce WATERMARK clause in SQL statement. WATERMARK clause is to define the watermark against the relation, especially streaming relation where STREAM keyword is added to the relation (or table valued function).

Please refer to the SQL reference doc for WATERMARK clause about its definition.
https://docs.databricks.com/aws/en/sql/language-manual/sql-ref-syntax-qry-select-watermark

The PR also contains new tests which show how to use it.

### Why are the changes needed?

This is needed to unblock the stateful workload in Streaming Table & Flow being described as SQL statement.

### Does this PR introduce _any_ user-facing change?

Yes, users can define the watermark for stateful workloads with SQL statement.

### How was this patch tested?

New UTs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#52428 from HeartSaVioR/SPARK-53687.

Authored-by: Jungtaek Lim <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants