diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala index bc41dd0465e3..6fa5203a06f7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala @@ -81,7 +81,7 @@ abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] { childPlans.map { childPlan => // Replace the placeholder by the child plan candidateWithPlaceholders.transformUp { - case p if p == placeholder => childPlan + case p if p.eq(placeholder) => childPlan } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 3db89ecfad9f..b10da6c70be1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -704,6 +704,23 @@ class PlannerSuite extends SharedSQLContext { df.queryExecution.executedPlan.execute() } + test("SPARK-25278: physical nodes should be different instances for same logical nodes") { + val range = Range(1, 1, 1, 1) + val df = Union(range, range) + val ranges = df.queryExecution.optimizedPlan.collect { + case r: Range => r + } + assert(ranges.length == 2) + // Ensure the two Range instances are equal according to their equal method + assert(ranges.head == ranges.last) + val execRanges = df.queryExecution.sparkPlan.collect { + case r: RangeExec => r + } + assert(execRanges.length == 2) + // Ensure the two RangeExec instances are different instances + assert(!execRanges.head.eq(execRanges.last)) + } + test("SPARK-24556: always rewrite output partitioning in ReusedExchangeExec " + "and InMemoryTableScanExec") { def checkOutputPartitioningRewrite( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index a3a3f3851e21..d45eb0c27a6b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -497,6 +497,19 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared } } + test("SPARK-25278: output metrics are wrong for plans repeated in the query") { + val name = "demo_view" + withView(name) { + sql(s"CREATE OR REPLACE VIEW $name AS VALUES 1,2") + val view = spark.table(name) + val union = view.union(view) + testSparkPlanMetrics(union, 1, Map( + 0L -> ("Union" -> Map()), + 1L -> ("LocalTableScan" -> Map("number of output rows" -> 2L)), + 2L -> ("LocalTableScan" -> Map("number of output rows" -> 2L)))) + } + } + test("writing data out metrics: parquet") { testMetricsNonDynamicPartition("parquet", "t1") }