Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -292,53 +292,59 @@ object ResolveWindowTime extends Rule[LogicalPlan] {
val windowTimeExpressions =
p.expressions.flatMap(_.collect { case w: WindowTime => w }).toSet

if (windowTimeExpressions.size == 1 &&
windowTimeExpressions.head.windowColumn.resolved &&
windowTimeExpressions.head.checkInputDataTypes().isSuccess) {
val allWindowTimeExprsResolved = windowTimeExpressions.forall { w =>
w.windowColumn.resolved && w.checkInputDataTypes().isSuccess
}

val windowTime = windowTimeExpressions.head
if (windowTimeExpressions.nonEmpty && allWindowTimeExprsResolved) {
val windowTimeToAttrAndNewColumn = windowTimeExpressions.map { windowTime =>
val metadata = windowTime.windowColumn match {
case a: Attribute => a.metadata
case _ => Metadata.empty
}

val metadata = windowTime.windowColumn match {
case a: Attribute => a.metadata
case _ => Metadata.empty
}
if (!metadata.contains(TimeWindow.marker) &&
!metadata.contains(SessionWindow.marker)) {
// FIXME: error framework?
throw new AnalysisException(
"The input is not a correct window column: $windowTime", plan = Some(p))
}

if (!metadata.contains(TimeWindow.marker) &&
!metadata.contains(SessionWindow.marker)) {
// FIXME: error framework?
throw new AnalysisException(
"The input is not a correct window column: $windowTime", plan = Some(p))
}
val newMetadata = new MetadataBuilder()
.withMetadata(metadata)
.remove(TimeWindow.marker)
.remove(SessionWindow.marker)
.build()

val newMetadata = new MetadataBuilder()
.withMetadata(metadata)
.remove(TimeWindow.marker)
.remove(SessionWindow.marker)
.build()
val colName = windowTime.sql
Copy link
Contributor

@cloud-fan cloud-fan Oct 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this be materialized in the checkpoint or state store? The SQL string for an expression is unstable, as it depends on resolved expression, and resolution may change over time (e.g. type coercion may add cast differently).

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Oct 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The schema checker of state store allows changing name on the column. It majority checks type and nullability.

The ideal column name would be exactly the same as what users call as it is, but I can't find the way to do, and I'm not sure it is available for all usages since there are multiple ways to call the SQL function. Seems like this is a best effort, if this way is already a thing to define the resulting column name for other SQL functions as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we allow name change, this is fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In practice, end users will apply another time window function against the resulting column of window time, hence the final resulting column will be another "window".


val attr = AttributeReference(colName, windowTime.dataType, metadata = newMetadata)()

val attr = AttributeReference(
"window_time", windowTime.dataType, metadata = newMetadata)()
// NOTE: "window.end" is "exclusive" upper bound of window, so if we use this value as
// it is, it is going to be bound to the different window even if we apply the same window
// spec. Decrease 1 microsecond from window.end to let the window_time be bound to the
// correct window range.
val subtractExpr =
PreciseTimestampConversion(
Subtract(PreciseTimestampConversion(
GetStructField(windowTime.windowColumn, 1),
windowTime.dataType, LongType), Literal(1L)),
LongType,
windowTime.dataType)

// NOTE: "window.end" is "exclusive" upper bound of window, so if we use this value as
// it is, it is going to be bound to the different window even if we apply the same window
// spec. Decrease 1 microsecond from window.end to let the window_time be bound to the
// correct window range.
val subtractExpr =
PreciseTimestampConversion(
Subtract(PreciseTimestampConversion(
GetStructField(windowTime.windowColumn, 1),
windowTime.dataType, LongType), Literal(1L)),
LongType,
windowTime.dataType)
val newColumn = Alias(subtractExpr, colName)(
exprId = attr.exprId, explicitMetadata = Some(newMetadata))

val newColumn = Alias(subtractExpr, "window_time")(
exprId = attr.exprId, explicitMetadata = Some(newMetadata))
windowTime -> (attr, newColumn)
}.toMap

val replacedPlan = p transformExpressions {
case w: WindowTime => attr
case w: WindowTime => windowTimeToAttrAndNewColumn(w)._1
}

replacedPlan.withNewChildren(Project(newColumn +: child.output, child) :: Nil)
val newColumnsToAdd = windowTimeToAttrAndNewColumn.values.map(_._2)
replacedPlan.withNewChildren(
Project(newColumnsToAdd ++: child.output, child) :: Nil)
} else {
p // Return unchanged. Analyzer will throw exception later
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@
| org.apache.spark.sql.catalyst.expressions.WeekDay | weekday | SELECT weekday('2009-07-30') | struct<weekday(2009-07-30):int> |
| org.apache.spark.sql.catalyst.expressions.WeekOfYear | weekofyear | SELECT weekofyear('2008-02-20') | struct<weekofyear(2008-02-20):int> |
| org.apache.spark.sql.catalyst.expressions.WidthBucket | width_bucket | SELECT width_bucket(5.3, 0.2, 10.6, 5) | struct<width_bucket(5.3, 0.2, 10.6, 5):bigint> |
| org.apache.spark.sql.catalyst.expressions.WindowTime | window_time | SELECT a, window.start as start, window.end as end, window_time(window), cnt FROM (SELECT a, window, count(*) as cnt FROM VALUES ('A1', '2021-01-01 00:00:00'), ('A1', '2021-01-01 00:04:30'), ('A1', '2021-01-01 00:06:00'), ('A2', '2021-01-01 00:01:00') AS tab(a, b) GROUP by a, window(b, '5 minutes') ORDER BY a, window.start) | struct<a:string,start:timestamp,end:timestamp,window_time:timestamp,cnt:bigint> |
| org.apache.spark.sql.catalyst.expressions.WindowTime | window_time | SELECT a, window.start as start, window.end as end, window_time(window), cnt FROM (SELECT a, window, count(*) as cnt FROM VALUES ('A1', '2021-01-01 00:00:00'), ('A1', '2021-01-01 00:04:30'), ('A1', '2021-01-01 00:06:00'), ('A2', '2021-01-01 00:01:00') AS tab(a, b) GROUP by a, window(b, '5 minutes') ORDER BY a, window.start) | struct<a:string,start:timestamp,end:timestamp,window_time(window):timestamp,cnt:bigint> |
| org.apache.spark.sql.catalyst.expressions.XxHash64 | xxhash64 | SELECT xxhash64('Spark', array(123), 2) | struct<xxhash64(Spark, array(123), 2):bigint> |
| org.apache.spark.sql.catalyst.expressions.Year | year | SELECT year('2016-07-30') | struct<year(2016-07-30):int> |
| org.apache.spark.sql.catalyst.expressions.ZipWith | zip_with | SELECT zip_with(array(1, 2, 3), array('a', 'b', 'c'), (x, y) -> (y, x)) | struct<zip_with(array(1, 2, 3), array(a, b, c), lambdafunction(named_struct(y, namedlambdavariable(), x, namedlambdavariable()), namedlambdavariable(), namedlambdavariable())):array<struct<y:string,x:int>>> |
Expand Down Expand Up @@ -413,4 +413,4 @@
| org.apache.spark.sql.catalyst.expressions.xml.XPathList | xpath | SELECT xpath('<a><b>b1</b><b>b2</b><b>b3</b><c>c1</c><c>c2</c></a>','a/b/text()') | struct<xpath(<a><b>b1</b><b>b2</b><b>b3</b><c>c1</c><c>c2</c></a>, a/b/text()):array<string>> |
| org.apache.spark.sql.catalyst.expressions.xml.XPathLong | xpath_long | SELECT xpath_long('<a><b>1</b><b>2</b></a>', 'sum(a/b)') | struct<xpath_long(<a><b>1</b><b>2</b></a>, sum(a/b)):bigint> |
| org.apache.spark.sql.catalyst.expressions.xml.XPathShort | xpath_short | SELECT xpath_short('<a><b>1</b><b>2</b></a>', 'sum(a/b)') | struct<xpath_short(<a><b>1</b><b>2</b></a>, sum(a/b)):smallint> |
| org.apache.spark.sql.catalyst.expressions.xml.XPathString | xpath_string | SELECT xpath_string('<a><b>b</b><c>cc</c></a>','a/c') | struct<xpath_string(<a><b>b</b><c>cc</c></a>, a/c):string> |
| org.apache.spark.sql.catalyst.expressions.xml.XPathString | xpath_string | SELECT xpath_string('<a><b>b</b><c>cc</c></a>','a/c') | struct<xpath_string(<a><b>b</b><c>cc</c></a>, a/c):string> |
Original file line number Diff line number Diff line change
Expand Up @@ -599,23 +599,37 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession {
("2016-03-27 19:38:18"), ("2016-03-27 19:39:25")
).toDF("time")

val e = intercept[AnalysisException] {
df
.withColumn("time2", expr("time - INTERVAL 5 minutes"))
.select(
window($"time", "10 seconds").as("window1"),
window($"time2", "10 seconds").as("window2")
)
.select(
$"window1.end".cast("string"),
window_time($"window1").cast("string"),
$"window2.end".cast("string"),
window_time($"window2").cast("string")
)
}
assert(e.getMessage.contains(
"Multiple time/session window expressions would result in a cartesian product of rows, " +
"therefore they are currently not supported"))
val df2 = df
.withColumn("time2", expr("time - INTERVAL 15 minutes"))
.select(window($"time", "10 seconds").as("window1"), $"time2")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to do select twice since time window rule does not allow multiple time window function call to co-exist in same projection. time_window/session_window function is effectively a TVF.

.select($"window1", window($"time2", "10 seconds").as("window2"))

checkAnswer(
df2.select(
$"window1.end".cast("string"),
window_time($"window1").cast("string"),
$"window2.end".cast("string"),
window_time($"window2").cast("string")),
Seq(
Row("2016-03-27 19:38:20", "2016-03-27 19:38:19.999999",
"2016-03-27 19:23:20", "2016-03-27 19:23:19.999999"),
Row("2016-03-27 19:39:30", "2016-03-27 19:39:29.999999",
"2016-03-27 19:24:30", "2016-03-27 19:24:29.999999"))
)

// check column names
val df3 = df2
.select(
window_time($"window1").cast("string"),
window_time($"window2").cast("string"),
window_time($"window2").as("wt2_aliased").cast("string")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This checks the functionality that "same" call of window_time function won't bring conflict, and can be tagged with different column name.

)

val schema = df3.schema

assert(schema.fields.exists(_.name == "window_time(window1)"))
assert(schema.fields.exists(_.name == "window_time(window2)"))
assert(schema.fields.exists(_.name == "wt2_aliased"))
}

test("window_time function on agg output") {
Expand Down