diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala index fd5da3ff13d88..df6b1c400bb70 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala @@ -292,53 +292,59 @@ object ResolveWindowTime extends Rule[LogicalPlan] { val windowTimeExpressions = p.expressions.flatMap(_.collect { case w: WindowTime => w }).toSet - if (windowTimeExpressions.size == 1 && - windowTimeExpressions.head.windowColumn.resolved && - windowTimeExpressions.head.checkInputDataTypes().isSuccess) { + val allWindowTimeExprsResolved = windowTimeExpressions.forall { w => + w.windowColumn.resolved && w.checkInputDataTypes().isSuccess + } - val windowTime = windowTimeExpressions.head + if (windowTimeExpressions.nonEmpty && allWindowTimeExprsResolved) { + val windowTimeToAttrAndNewColumn = windowTimeExpressions.map { windowTime => + val metadata = windowTime.windowColumn match { + case a: Attribute => a.metadata + case _ => Metadata.empty + } - val metadata = windowTime.windowColumn match { - case a: Attribute => a.metadata - case _ => Metadata.empty - } + if (!metadata.contains(TimeWindow.marker) && + !metadata.contains(SessionWindow.marker)) { + // FIXME: error framework? + throw new AnalysisException( + "The input is not a correct window column: $windowTime", plan = Some(p)) + } - if (!metadata.contains(TimeWindow.marker) && - !metadata.contains(SessionWindow.marker)) { - // FIXME: error framework? - throw new AnalysisException( - "The input is not a correct window column: $windowTime", plan = Some(p)) - } + val newMetadata = new MetadataBuilder() + .withMetadata(metadata) + .remove(TimeWindow.marker) + .remove(SessionWindow.marker) + .build() - val newMetadata = new MetadataBuilder() - .withMetadata(metadata) - .remove(TimeWindow.marker) - .remove(SessionWindow.marker) - .build() + val colName = windowTime.sql + + val attr = AttributeReference(colName, windowTime.dataType, metadata = newMetadata)() - val attr = AttributeReference( - "window_time", windowTime.dataType, metadata = newMetadata)() + // NOTE: "window.end" is "exclusive" upper bound of window, so if we use this value as + // it is, it is going to be bound to the different window even if we apply the same window + // spec. Decrease 1 microsecond from window.end to let the window_time be bound to the + // correct window range. + val subtractExpr = + PreciseTimestampConversion( + Subtract(PreciseTimestampConversion( + GetStructField(windowTime.windowColumn, 1), + windowTime.dataType, LongType), Literal(1L)), + LongType, + windowTime.dataType) - // NOTE: "window.end" is "exclusive" upper bound of window, so if we use this value as - // it is, it is going to be bound to the different window even if we apply the same window - // spec. Decrease 1 microsecond from window.end to let the window_time be bound to the - // correct window range. - val subtractExpr = - PreciseTimestampConversion( - Subtract(PreciseTimestampConversion( - GetStructField(windowTime.windowColumn, 1), - windowTime.dataType, LongType), Literal(1L)), - LongType, - windowTime.dataType) + val newColumn = Alias(subtractExpr, colName)( + exprId = attr.exprId, explicitMetadata = Some(newMetadata)) - val newColumn = Alias(subtractExpr, "window_time")( - exprId = attr.exprId, explicitMetadata = Some(newMetadata)) + windowTime -> (attr, newColumn) + }.toMap val replacedPlan = p transformExpressions { - case w: WindowTime => attr + case w: WindowTime => windowTimeToAttrAndNewColumn(w)._1 } - replacedPlan.withNewChildren(Project(newColumn +: child.output, child) :: Nil) + val newColumnsToAdd = windowTimeToAttrAndNewColumn.values.map(_._2) + replacedPlan.withNewChildren( + Project(newColumnsToAdd ++: child.output, child) :: Nil) } else { p // Return unchanged. Analyzer will throw exception later } diff --git a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md index 6f111b777a6d0..482c72679bb82 100644 --- a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md +++ b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md @@ -345,7 +345,7 @@ | org.apache.spark.sql.catalyst.expressions.WeekDay | weekday | SELECT weekday('2009-07-30') | struct | | org.apache.spark.sql.catalyst.expressions.WeekOfYear | weekofyear | SELECT weekofyear('2008-02-20') | struct | | org.apache.spark.sql.catalyst.expressions.WidthBucket | width_bucket | SELECT width_bucket(5.3, 0.2, 10.6, 5) | struct | -| org.apache.spark.sql.catalyst.expressions.WindowTime | window_time | SELECT a, window.start as start, window.end as end, window_time(window), cnt FROM (SELECT a, window, count(*) as cnt FROM VALUES ('A1', '2021-01-01 00:00:00'), ('A1', '2021-01-01 00:04:30'), ('A1', '2021-01-01 00:06:00'), ('A2', '2021-01-01 00:01:00') AS tab(a, b) GROUP by a, window(b, '5 minutes') ORDER BY a, window.start) | struct | +| org.apache.spark.sql.catalyst.expressions.WindowTime | window_time | SELECT a, window.start as start, window.end as end, window_time(window), cnt FROM (SELECT a, window, count(*) as cnt FROM VALUES ('A1', '2021-01-01 00:00:00'), ('A1', '2021-01-01 00:04:30'), ('A1', '2021-01-01 00:06:00'), ('A2', '2021-01-01 00:01:00') AS tab(a, b) GROUP by a, window(b, '5 minutes') ORDER BY a, window.start) | struct | | org.apache.spark.sql.catalyst.expressions.XxHash64 | xxhash64 | SELECT xxhash64('Spark', array(123), 2) | struct | | org.apache.spark.sql.catalyst.expressions.Year | year | SELECT year('2016-07-30') | struct | | org.apache.spark.sql.catalyst.expressions.ZipWith | zip_with | SELECT zip_with(array(1, 2, 3), array('a', 'b', 'c'), (x, y) -> (y, x)) | struct>> | @@ -413,4 +413,4 @@ | org.apache.spark.sql.catalyst.expressions.xml.XPathList | xpath | SELECT xpath('b1b2b3c1c2','a/b/text()') | structb1b2b3c1c2, a/b/text()):array> | | org.apache.spark.sql.catalyst.expressions.xml.XPathLong | xpath_long | SELECT xpath_long('12', 'sum(a/b)') | struct12, sum(a/b)):bigint> | | org.apache.spark.sql.catalyst.expressions.xml.XPathShort | xpath_short | SELECT xpath_short('12', 'sum(a/b)') | struct12, sum(a/b)):smallint> | -| org.apache.spark.sql.catalyst.expressions.xml.XPathString | xpath_string | SELECT xpath_string('bcc','a/c') | structbcc, a/c):string> | \ No newline at end of file +| org.apache.spark.sql.catalyst.expressions.xml.XPathString | xpath_string | SELECT xpath_string('bcc','a/c') | structbcc, a/c):string> | diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala index f775eb9ecfc0d..a878e0ffa51f0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala @@ -599,23 +599,37 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSparkSession { ("2016-03-27 19:38:18"), ("2016-03-27 19:39:25") ).toDF("time") - val e = intercept[AnalysisException] { - df - .withColumn("time2", expr("time - INTERVAL 5 minutes")) - .select( - window($"time", "10 seconds").as("window1"), - window($"time2", "10 seconds").as("window2") - ) - .select( - $"window1.end".cast("string"), - window_time($"window1").cast("string"), - $"window2.end".cast("string"), - window_time($"window2").cast("string") - ) - } - assert(e.getMessage.contains( - "Multiple time/session window expressions would result in a cartesian product of rows, " + - "therefore they are currently not supported")) + val df2 = df + .withColumn("time2", expr("time - INTERVAL 15 minutes")) + .select(window($"time", "10 seconds").as("window1"), $"time2") + .select($"window1", window($"time2", "10 seconds").as("window2")) + + checkAnswer( + df2.select( + $"window1.end".cast("string"), + window_time($"window1").cast("string"), + $"window2.end".cast("string"), + window_time($"window2").cast("string")), + Seq( + Row("2016-03-27 19:38:20", "2016-03-27 19:38:19.999999", + "2016-03-27 19:23:20", "2016-03-27 19:23:19.999999"), + Row("2016-03-27 19:39:30", "2016-03-27 19:39:29.999999", + "2016-03-27 19:24:30", "2016-03-27 19:24:29.999999")) + ) + + // check column names + val df3 = df2 + .select( + window_time($"window1").cast("string"), + window_time($"window2").cast("string"), + window_time($"window2").as("wt2_aliased").cast("string") + ) + + val schema = df3.schema + + assert(schema.fields.exists(_.name == "window_time(window1)")) + assert(schema.fields.exists(_.name == "window_time(window2)")) + assert(schema.fields.exists(_.name == "wt2_aliased")) } test("window_time function on agg output") {