From 53547b2e15d0a7bde7b9b66ddca918ad75d0f94b Mon Sep 17 00:00:00 2001 From: wenxuanguan Date: Tue, 24 Sep 2019 16:35:24 +0800 Subject: [PATCH 1/4] track rule info in optimization phase --- .../org/apache/spark/sql/connector/catalog/SupportsDelete.java | 2 +- .../spark/sql/execution/streaming/IncrementalExecution.scala | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsDelete.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsDelete.java index 80aa57ca1877..e53f1ed9d809 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsDelete.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsDelete.java @@ -29,7 +29,7 @@ public interface SupportsDelete { /** * Delete data from a data source table that matches filter expressions. *

- * Rows are deleted from the data source iff all of the filter expressions match. That is, the + * Rows are deleted from the data source if all of the filter expressions match. That is, the * expressions must be interpreted as a set of filters that are ANDed together. *

* Implementations may reject a delete operation if the delete isn't possible without significant diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala index af52af0d1d7e..b8e18b89b54b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala @@ -77,7 +77,8 @@ class IncrementalExecution( */ override lazy val optimizedPlan: LogicalPlan = tracker.measurePhase(QueryPlanningTracker.OPTIMIZATION) { - sparkSession.sessionState.optimizer.execute(withCachedData) transformAllExpressions { + sparkSession.sessionState.optimizer.executeAndTrack(withCachedData, + tracker) transformAllExpressions { case ts @ CurrentBatchTimestamp(timestamp, _, _) => logInfo(s"Current batch timestamp = $timestamp") ts.toLiteral From 8a33aba40a0d91784d2c603a0e3943e1971f3075 Mon Sep 17 00:00:00 2001 From: wenxuanguan Date: Tue, 24 Sep 2019 21:50:41 +0800 Subject: [PATCH 2/4] revert doc change --- .../org/apache/spark/sql/connector/catalog/SupportsDelete.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsDelete.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsDelete.java index e53f1ed9d809..80aa57ca1877 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsDelete.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsDelete.java @@ -29,7 +29,7 @@ public interface SupportsDelete { /** * Delete data from a data source table that matches filter expressions. *

- * Rows are deleted from the data source if all of the filter expressions match. That is, the + * Rows are deleted from the data source iff all of the filter expressions match. That is, the * expressions must be interpreted as a set of filters that are ANDed together. *

* Implementations may reject a delete operation if the delete isn't possible without significant From 060b3290040b82ca0a7c19c8ddf6baa4e7a7062a Mon Sep 17 00:00:00 2001 From: wenxuanguan Date: Mon, 21 Oct 2019 16:30:43 +0800 Subject: [PATCH 3/4] add ut --- .../QueryPlanningTrackerEndToEndSuite.scala | 24 +++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryPlanningTrackerEndToEndSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryPlanningTrackerEndToEndSuite.scala index 76006efda992..082e6234c8be 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryPlanningTrackerEndToEndSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryPlanningTrackerEndToEndSuite.scala @@ -17,9 +17,11 @@ package org.apache.spark.sql.execution -import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.execution.streaming.{MemoryStream, StreamExecution} +import org.apache.spark.sql.streaming.StreamTest -class QueryPlanningTrackerEndToEndSuite extends SharedSparkSession { +class QueryPlanningTrackerEndToEndSuite extends StreamTest { + import testImplicits._ test("programmatic API") { val df = spark.range(1000).selectExpr("count(*)") @@ -38,4 +40,22 @@ class QueryPlanningTrackerEndToEndSuite extends SharedSparkSession { assert(tracker.rules.nonEmpty) } + test("streaming") { + val inputData = MemoryStream[Int] + val df = inputData.toDF() + + def assertStatus(stream: StreamExecution): Unit = { + stream.processAllAvailable() + val tracker = stream.lastExecution.tracker + assert(tracker.phases.keys == Set("analysis", "optimization", "planning")) + assert(tracker.rules.nonEmpty) + } + + testStream(df)( + StartStream(), + AddData(inputData, 1, 2, 3), + Execute(assertStatus), + StopStream) + } + } From 41fddba4438b83eec539c1c8aa3bde92a4e9b85e Mon Sep 17 00:00:00 2001 From: wenxuanguan Date: Tue, 22 Oct 2019 11:27:09 +0800 Subject: [PATCH 4/4] add jira info in test name --- .../spark/sql/execution/QueryPlanningTrackerEndToEndSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryPlanningTrackerEndToEndSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryPlanningTrackerEndToEndSuite.scala index 082e6234c8be..987338cf6cbb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryPlanningTrackerEndToEndSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryPlanningTrackerEndToEndSuite.scala @@ -40,7 +40,7 @@ class QueryPlanningTrackerEndToEndSuite extends StreamTest { assert(tracker.rules.nonEmpty) } - test("streaming") { + test("SPARK-29227: Track rule info in optimization phase in streaming") { val inputData = MemoryStream[Int] val df = inputData.toDF()