Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,9 @@ object FakeV2SessionCatalog extends TableCatalog with FunctionCatalog {
* @param nestedViewDepth The nested depth in the view resolution, this enables us to limit the
* depth of nested views.
* @param maxNestedViewDepth The maximum allowed depth of nested view resolution.
* @param relationCache A mapping from qualified table names to resolved relations. This can ensure
* that the table is resolved only once if a table is used multiple times
* in a query.
* @param relationCache A mapping from qualified table names and time travel spec to resolved
* relations. This can ensure that the table is resolved only once if a table
* is used multiple times in a query.
* @param referredTempViewNames All the temp view names referred by the current view we are
* resolving. It's used to make sure the relation resolution is
* consistent between view creation and view resolution. For example,
Expand All @@ -128,7 +128,8 @@ case class AnalysisContext(
catalogAndNamespace: Seq[String] = Nil,
nestedViewDepth: Int = 0,
maxNestedViewDepth: Int = -1,
relationCache: mutable.Map[Seq[String], LogicalPlan] = mutable.Map.empty,
relationCache: mutable.Map[(Seq[String], Option[TimeTravelSpec]), LogicalPlan] =
mutable.Map.empty,
referredTempViewNames: Seq[Seq[String]] = Seq.empty,
// 1. If we are resolving a view, this field will be restored from the view metadata,
// by calling `AnalysisContext.withAnalysisContext(viewDesc)`.
Expand Down Expand Up @@ -1188,7 +1189,7 @@ class Analyzer(override val catalogManager: CatalogManager)
lookupTempView(u.multipartIdentifier, u.isStreaming, timeTravelSpec.isDefined).orElse {
expandIdentifier(u.multipartIdentifier) match {
case CatalogAndIdentifier(catalog, ident) =>
val key = catalog.name +: ident.namespace :+ ident.name
val key = ((catalog.name +: ident.namespace :+ ident.name).toSeq, timeTravelSpec)
AnalysisContext.get.relationCache.get(key).map(_.transform {
case multi: MultiInstanceRelation =>
val newRelation = multi.newInstance()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2576,6 +2576,23 @@ class DataSourceV2SQLSuite
}
}

test("SPARK-41154: Incorrect relation caching for queries with time travel spec") {
sql("use testcat")
val t1 = "testcat.t1"
val t2 = "testcat.t2"
withTable(t1, t2) {
sql(s"CREATE TABLE $t1 USING foo AS SELECT 1 as c")
sql(s"CREATE TABLE $t2 USING foo AS SELECT 2 as c")
assert(
sql("""
|SELECT * FROM t VERSION AS OF '1'
|UNION ALL
|SELECT * FROM t VERSION AS OF '2'
|""".stripMargin
).collect() === Array(Row(1), Row(2)))
}
}

private def testNotSupportedV2Command(sqlCommand: String, sqlParams: String): Unit = {
val e = intercept[AnalysisException] {
sql(s"$sqlCommand $sqlParams")
Expand Down