Skip to content

Conversation

@alex-balikov
Copy link
Contributor

What changes were proposed in this pull request?

This PR introduces a window_time function to extract streaming event time from a window column produced by the window aggregating operators. This is one step in sequence of fixes required to add support for multiple stateful operators in Spark Structured Streaming as described in https://issues.apache.org/jira/browse/SPARK-40821

Why are the changes needed?

The window_time function is a convenience function to compute correct event time for a window aggregate records. Such records produced by window aggregating operators have no explicit event time but rather a window column of type StructType { start: TimestampType, end: TimestampType } where start is inclusive and end is exclusive. The correct event time for such record is window.end - 1. The event time is necessary when chaining other stateful operators after the window aggregating operators.

Does this PR introduce any user-facing change?

Yes: The PR introduces a new window_time SQL function for both Scala and Python APIs.

How was this patch tested?

Added new unit tests.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks OK in general. Left several comments.

Given that this adds a new SQL function (with PySpark), let me cc. to get more eyes of reviews. cc. @cloud-fan @HyukjinKwon

if (!metadata.contains(TimeWindow.marker) &&
!metadata.contains(SessionWindow.marker)) {
// FIXME: error framework?
throw new AnalysisException("The input is not a correct window column!")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's put some information of column/expression in the error message. It's not friendly to users to react based on the error message.

Btw I don't have a good suggestion for error framework - we can ping some folks to get some guidance cc. @MaxGekk

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc. @MaxGekk friendly reminder.

with Unevaluable
with NonSQLExpression {

// FIXME: Should we check that windowColumn is a correct one - generated by
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we are fine as long as the rule checks the marker.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ResolveWindowTime does check the marker. Removed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

val windowTimeExpressions =
p.expressions.flatMap(_.collect { case w: WindowTime => w }).toSet

if (windowTimeExpressions.size == 1 &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so one operator can only host one window time expression?

Copy link
Contributor

@HeartSaVioR HeartSaVioR Oct 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alex and I have been discussed about the complicated corner cases when we enable this... This should be totally feasible in point of SQL's view, but tricky in the context of streaming. (Alex will play more on this tomorrow.)

Btw I just figured out the simplest rationalization to allow the only one. We reserve the output column name for the function (as "window_time"), as same as we do for window()/session_window() function. Otherwise we will have to assign the resulting column like window_time(window). So that seems to be a matter of whether we want to do the favor of window-family functions be special or not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This matches the condition in the other window resolution rules above. I added a test which has 2 window_time calls in the select and it passes. I admit I am not well versed in the query plan code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test case should construct two different windows and call window_time per each window to "fail".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modified the test. Indeed the scenario fails with the unsupported ops checker error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left a comment to workaround the ops checker error and still hit this condition.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan
Let me propose the fix in separate PR. I have a fix but may be better to have a separate PR to reflect the comment easily.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alex-balikov alex-balikov requested review from HeartSaVioR and removed request for HeartSaVioR October 20, 2022 23:18
@alex-balikov alex-balikov requested review from HeartSaVioR and HyukjinKwon and removed request for HeartSaVioR and HyukjinKwon October 20, 2022 23:18
Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Python side LGTM. I don't know much about SS side.

df.select(window($"time", "10 seconds").as("window"))
.select(
$"window.end".cast("string"),
window_time($"window").cast("string").as("window1"),
Copy link
Contributor

@HeartSaVioR HeartSaVioR Oct 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very interesting... @cloud-fan Any idea on this? Do we capture the expression as the same? Otherwise it sounds to be odd to pass the condition below:

val child = p.children.head
val windowTimeExpressions =
  p.expressions.flatMap(_.collect { case w: WindowTime => w }).toSet

if (windowTimeExpressions.size == 1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK this was pretty straightforward... Refer the definition of WindowTime.

case class WindowTime(windowColumn: Expression)

We apply .toSet to collect the instances with deduplication, so as long as the windowColumn is resolved as the same, they are considered to the same.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for figuring this out. Redid the test

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although the test is failing now, it's failing due to rule of time window and corresponding unsupported ops checker. Is it intentional? I guess we want to test the case of multiple window_time.

Refer this comment: #38288 (comment)

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good in overall. I only have one uncertain from @cloud-fan's comment #38288 (comment) and the new interesting test case which passes.

)
}

test("2 window_time functions on raw window column") {
Copy link
Contributor

@HeartSaVioR HeartSaVioR Oct 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The actual test code which fails due to the rule is following:

  test("2 window_time functions on raw window column") {
    val df = Seq(
      ("2016-03-27 19:38:18"), ("2016-03-27 19:39:25")
    ).toDF("time")

    val df2 = df
      .withColumn("time2", expr("time - INTERVAL 5 minutes"))
      .select(window($"time", "10 seconds", "5 seconds").as("window1"), $"time2")
      .select($"window1", window($"time2", "10 seconds", "5 seconds").as("window2"))

    /*
      unresolved operator 'Project [window1#10.end AS end#19, unresolvedalias(window_time(window1#10), Some(org.apache.spark.sql.Column$$Lambda$1637/93974967@5d7dd549)), window2#15.end AS end#20, unresolvedalias(window_time(window2#15), Some(org.apache.spark.sql.Column$$Lambda$1637/93974967@5d7dd549))];
      'Project [window1#10.end AS end#19, unresolvedalias(window_time(window1#10), Some(org.apache.spark.sql.Column$$Lambda$1637/93974967@5d7dd549)), window2#15.end AS end#20, unresolvedalias(window_time(window2#15), Some(org.apache.spark.sql.Column$$Lambda$1637/93974967@5d7dd549))]
      +- Project [window1#10, window#16 AS window2#15]
         +- Filter isnotnull(cast(time2#6 as timestamp))
            +- Expand [[named_struct(start, precisetimestampconversion(((precisetimestampconversion(cast(time2#6 as timestamp), TimestampType, LongType) - (((precisetimestampconversion(cast(time2#6 as timestamp), TimestampType, LongType) - 0) + 5000000) % 5000000)) - 0), LongType, TimestampType), end, precisetimestampconversion((((precisetimestampconversion(cast(time2#6 as timestamp), TimestampType, LongType) - (((precisetimestampconversion(cast(time2#6 as timestamp), TimestampType, LongType) - 0) + 5000000) % 5000000)) - 0) + 10000000), LongType, TimestampType)), window1#10, time2#6], [named_struct(start, precisetimestampconversion(((precisetimestampconversion(cast(time2#6 as timestamp), TimestampType, LongType) - (((precisetimestampconversion(cast(time2#6 as timestamp), TimestampType, LongType) - 0) + 5000000) % 5000000)) - 5000000), LongType, TimestampType), end, precisetimestampconversion((((precisetimestampconversion(cast(time2#6 as timestamp), TimestampType, LongType) - (((precisetimestampconversion(cast(time2#6 as timestamp), TimestampType, LongType) - 0) + 5000000) % 5000000)) - 5000000) + 10000000), LongType, TimestampType)), window1#10, time2#6]], [window#16, window1#10, time2#6]
               +- Project [window#11 AS window1#10, time2#6]
                  +- Filter isnotnull(cast(time#4 as timestamp))
                     +- Expand [[named_struct(start, precisetimestampconversion(((precisetimestampconversion(cast(time#4 as timestamp), TimestampType, LongType) - (((precisetimestampconversion(cast(time#4 as timestamp), TimestampType, LongType) - 0) + 5000000) % 5000000)) - 0), LongType, TimestampType), end, precisetimestampconversion((((precisetimestampconversion(cast(time#4 as timestamp), TimestampType, LongType) - (((precisetimestampconversion(cast(time#4 as timestamp), TimestampType, LongType) - 0) + 5000000) % 5000000)) - 0) + 10000000), LongType, TimestampType)), time#4, time2#6], [named_struct(start, precisetimestampconversion(((precisetimestampconversion(cast(time#4 as timestamp), TimestampType, LongType) - (((precisetimestampconversion(cast(time#4 as timestamp), TimestampType, LongType) - 0) + 5000000) % 5000000)) - 5000000), LongType, TimestampType), end, precisetimestampconversion((((precisetimestampconversion(cast(time#4 as timestamp), TimestampType, LongType) - (((precisetimestampconversion(cast(time#4 as timestamp), TimestampType, LongType) - 0) + 5000000) % 5000000)) - 5000000) + 10000000), LongType, TimestampType)), time#4, time2#6]], [window#11, time#4, time2#6]
                        +- Project [time#4, cast(time#4 - INTERVAL '05' MINUTE as string) AS time2#6]
                           +- Project [value#1 AS time#4]
                              +- LocalRelation [value#1]
     */
    df2.select(
      $"window1.end",
      window_time($"window1"),
      $"window2.end",
      window_time($"window2")
    )
  }

The reason the above test case fails with unresolved operator is that we do not resolve the two window_time calls with different windows. If we fix the rule to allow multiple window_time calls with different windows, it should just work.

Btw, this code leads to cartesian product of window"s", but passes the unsupported operation checker whereas you'll hit unsupported operation checker if you place it in a single select. Spark's unsupported operator is rule based and not that smart to capture all possibilities.

That said, Spark can handle the cartesian product of window"s". The unsupported operation checker is more restricted than what Spark can actually do.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, if we want to simply retain the limitation, we should add the check into unsupported ops checker and provide better error message rather than unresolved operator, like we do for time window.

@HeartSaVioR
Copy link
Contributor

My comments are all minors and my follow-up fix will resolve these comments.

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@HeartSaVioR
Copy link
Contributor

Thanks! Merging to master.

@rxin
Copy link
Contributor

rxin commented Dec 23, 2022

Folks - don't we need to do pruning? The ResolveWindowTime is super expensive right now and will be applied to every node repeatedly until fix point.

@rxin
Copy link
Contributor

rxin commented Dec 23, 2022

Also we need to add proper SQL tests.

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Dec 27, 2022

I'm sorry I am on vacation - you're right we seem to miss pruning, and we also seem to miss the same for session window. My bad.

I've submitted PRs separately for both cases. Please take a look.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants