diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala index af52af0d1d7e..b8e18b89b54b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala @@ -77,7 +77,8 @@ class IncrementalExecution( */ override lazy val optimizedPlan: LogicalPlan = tracker.measurePhase(QueryPlanningTracker.OPTIMIZATION) { - sparkSession.sessionState.optimizer.execute(withCachedData) transformAllExpressions { + sparkSession.sessionState.optimizer.executeAndTrack(withCachedData, + tracker) transformAllExpressions { case ts @ CurrentBatchTimestamp(timestamp, _, _) => logInfo(s"Current batch timestamp = $timestamp") ts.toLiteral diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryPlanningTrackerEndToEndSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryPlanningTrackerEndToEndSuite.scala index 76006efda992..987338cf6cbb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryPlanningTrackerEndToEndSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryPlanningTrackerEndToEndSuite.scala @@ -17,9 +17,11 @@ package org.apache.spark.sql.execution -import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.execution.streaming.{MemoryStream, StreamExecution} +import org.apache.spark.sql.streaming.StreamTest -class QueryPlanningTrackerEndToEndSuite extends SharedSparkSession { +class QueryPlanningTrackerEndToEndSuite extends StreamTest { + import testImplicits._ test("programmatic API") { val df = spark.range(1000).selectExpr("count(*)") @@ -38,4 +40,22 @@ class QueryPlanningTrackerEndToEndSuite extends SharedSparkSession { assert(tracker.rules.nonEmpty) } + test("SPARK-29227: Track rule info in optimization phase in streaming") { + val inputData = MemoryStream[Int] + val df = inputData.toDF() + + def assertStatus(stream: StreamExecution): Unit = { + stream.processAllAvailable() + val tracker = stream.lastExecution.tracker + assert(tracker.phases.keys == Set("analysis", "optimization", "planning")) + assert(tracker.rules.nonEmpty) + } + + testStream(df)( + StartStream(), + AddData(inputData, 1, 2, 3), + Execute(assertStatus), + StopStream) + } + }