Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ case class InMemoryRelation(
}

override def doCanonicalize(): logical.LogicalPlan =
copy(output = output.map(QueryPlan.normalizeExpressions(_, cachedPlan.output)),
copy(output = output.map(QueryPlan.normalizeExpressions(_, output)),
cacheBuilder,
outputOrdering)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2157,6 +2157,21 @@ class DataFrameAggregateSuite extends QueryTest

def createAggregate(df: DataFrame): DataFrame = df.groupBy("c0").agg(count("*"))
}

test("SPARK-46779: Group by subquery with a cached relation") {
withTempView("data") {
sql(
"""create or replace temp view data(c1, c2) as values
|(1, 2),
|(1, 3),
|(3, 7)""".stripMargin)
sql("cache table data")
val df = sql(
"""select c1, (select count(*) from data d1 where d1.c1 = d2.c1), count(c2)
|from data d2 group by all""".stripMargin)
checkAnswer(df, Row(1, 2, 2) :: Row(3, 1, 1) :: Nil)
}
}
}

case class B(c: Option[Double])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,11 @@ class InMemoryRelationSuite extends SparkFunSuite with SharedSparkSessionBase {
assert(!relationCachedPlan.eq(clonedCachedPlan))
assert(relationCachedPlan === clonedCachedPlan)
}

test("SPARK-46779: InMemoryRelations with the same cached plan are semantically equivalent") {
val d = spark.range(1)
val r1 = InMemoryRelation(StorageLevel.MEMORY_ONLY, d.queryExecution, None)
val r2 = r1.withOutput(r1.output.map(_.newInstance()))
assert(r1.sameResult(r2))
}
}