diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index c016fd52b611..14a4ac395c53 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -416,7 +416,7 @@ case class InMemoryRelation( } override def doCanonicalize(): logical.LogicalPlan = - copy(output = output.map(QueryPlan.normalizeExpressions(_, cachedPlan.output)), + copy(output = output.map(QueryPlan.normalizeExpressions(_, output)), cacheBuilder, outputOrdering) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala index 0ab8926c016f..c0d7aa61e30c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala @@ -2157,6 +2157,21 @@ class DataFrameAggregateSuite extends QueryTest def createAggregate(df: DataFrame): DataFrame = df.groupBy("c0").agg(count("*")) } + + test("SPARK-46779: Group by subquery with a cached relation") { + withTempView("data") { + sql( + """create or replace temp view data(c1, c2) as values + |(1, 2), + |(1, 3), + |(3, 7)""".stripMargin) + sql("cache table data") + val df = sql( + """select c1, (select count(*) from data d1 where d1.c1 = d2.c1), count(c2) + |from data d2 group by all""".stripMargin) + checkAnswer(df, Row(1, 2, 2) :: Row(3, 1, 1) :: Nil) + } + } } case class B(c: Option[Double]) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryRelationSuite.scala index 72b3a4bc1095..a5c5ec40af6f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryRelationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryRelationSuite.scala @@ -34,4 +34,11 @@ class InMemoryRelationSuite extends SparkFunSuite with SharedSparkSessionBase { assert(!relationCachedPlan.eq(clonedCachedPlan)) assert(relationCachedPlan === clonedCachedPlan) } + + test("SPARK-46779: InMemoryRelations with the same cached plan are semantically equivalent") { + val d = spark.range(1) + val r1 = InMemoryRelation(StorageLevel.MEMORY_ONLY, d.queryExecution, None) + val r2 = r1.withOutput(r1.output.map(_.newInstance())) + assert(r1.sameResult(r2)) + } }