diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index aec71747fde6..3d797994e77e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -807,8 +807,10 @@ class Analyzer( def apply(plan: LogicalPlan): LogicalPlan = ResolveTempViews(plan).resolveOperatorsUp { case u: UnresolvedRelation => lookupV2Relation(u.multipartIdentifier) - .map(SubqueryAlias(u.multipartIdentifier, _)) - .getOrElse(u) + .map { rel => + val ident = rel.identifier.get + SubqueryAlias(rel.catalog.get.name +: ident.namespace :+ ident.name, rel) + }.getOrElse(u) case u @ UnresolvedTable(NonSessionCatalogAndIdentifier(catalog, ident)) => CatalogV2Util.loadTable(catalog, ident) @@ -933,7 +935,7 @@ class Analyzer( v1SessionCatalog.getRelation(v1Table.v1Table) case table => SubqueryAlias( - identifier, + ident.asMultipartIdentifier, DataSourceV2Relation.create(table, Some(catalog), Some(ident))) } val key = catalog.name +: ident.namespace :+ ident.name diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index f64211498329..4ff20939b3df 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -685,12 +685,21 @@ class DataSourceV2SQLSuite sql(s"CREATE TABLE $t (id bigint, point struct) USING foo") sql(s"INSERT INTO $t VALUES (1, (10, 20))") - checkAnswer( - sql(s"SELECT testcat.ns1.ns2.tbl.id, testcat.ns1.ns2.tbl.point.x FROM $t"), - Row(1, 10)) - checkAnswer(sql(s"SELECT ns1.ns2.tbl.id, ns1.ns2.tbl.point.x FROM $t"), Row(1, 10)) - checkAnswer(sql(s"SELECT ns2.tbl.id, ns2.tbl.point.x FROM $t"), Row(1, 10)) - checkAnswer(sql(s"SELECT tbl.id, tbl.point.x FROM $t"), Row(1, 10)) + def check(tbl: String): Unit = { + checkAnswer( + sql(s"SELECT testcat.ns1.ns2.tbl.id, testcat.ns1.ns2.tbl.point.x FROM $tbl"), + Row(1, 10)) + checkAnswer(sql(s"SELECT ns1.ns2.tbl.id, ns1.ns2.tbl.point.x FROM $tbl"), Row(1, 10)) + checkAnswer(sql(s"SELECT ns2.tbl.id, ns2.tbl.point.x FROM $tbl"), Row(1, 10)) + checkAnswer(sql(s"SELECT tbl.id, tbl.point.x FROM $tbl"), Row(1, 10)) + } + + // Test with qualified table name "testcat.ns1.ns2.tbl". + check(t) + + // Test if current catalog and namespace is respected in column resolution. + sql("USE testcat.ns1.ns2") + check("tbl") val ex = intercept[AnalysisException] { sql(s"SELECT ns1.ns2.ns3.tbl.id from $t") @@ -700,19 +709,30 @@ class DataSourceV2SQLSuite } test("qualified column names for v1 tables") { - // unset this config to use the default v2 session catalog. - spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key) - - withTable("t") { - sql("CREATE TABLE t USING json AS SELECT 1 AS i") - checkAnswer(sql("select t.i from spark_catalog.default.t"), Row(1)) - checkAnswer(sql("select default.t.i from spark_catalog.default.t"), Row(1)) + Seq(true, false).foreach { useV1Table => + val format = if (useV1Table) "json" else v2Format + if (useV1Table) { + // unset this config to use the default v2 session catalog. + spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key) + } else { + spark.conf.set( + V2_SESSION_CATALOG_IMPLEMENTATION.key, classOf[InMemoryTableSessionCatalog].getName) + } - // catalog name cannot be used for v1 tables. - val ex = intercept[AnalysisException] { - sql(s"select spark_catalog.default.t.i from spark_catalog.default.t") + withTable("t") { + sql(s"CREATE TABLE t USING $format AS SELECT 1 AS i") + checkAnswer(sql("select i from t"), Row(1)) + checkAnswer(sql("select t.i from t"), Row(1)) + checkAnswer(sql("select default.t.i from t"), Row(1)) + checkAnswer(sql("select t.i from spark_catalog.default.t"), Row(1)) + checkAnswer(sql("select default.t.i from spark_catalog.default.t"), Row(1)) + + // catalog name cannot be used for tables in the session catalog. + val ex = intercept[AnalysisException] { + sql(s"select spark_catalog.default.t.i from spark_catalog.default.t") + } + assert(ex.getMessage.contains("cannot resolve '`spark_catalog.default.t.i`")) } - assert(ex.getMessage.contains("cannot resolve '`spark_catalog.default.t.i`")) } }