From 50c7e6ed23cae5a06431b97d7f7c1d0962dfcc9d Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Mon, 1 Jan 2024 12:23:43 -0800 Subject: [PATCH 1/3] Update resolution to apply aliases and comments. --- .../sql/catalyst/analysis/ResolveViews.scala | 48 ++++++-- .../apache/iceberg/spark/SparkCatalog.java | 112 +++++++++--------- 2 files changed, 96 insertions(+), 64 deletions(-) diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala index 6869c60a8368..ed526fb0f1b8 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala @@ -19,12 +19,17 @@ package org.apache.spark.sql.catalyst.analysis +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.Alias +import org.apache.spark.sql.catalyst.expressions.UpCast import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.Project import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.connector.catalog.{View => V2View} +import org.apache.spark.sql.catalyst.trees.CurrentOrigin +import org.apache.spark.sql.catalyst.trees.Origin import org.apache.spark.sql.connector.catalog.CatalogManager import org.apache.spark.sql.connector.catalog.CatalogPlugin import org.apache.spark.sql.connector.catalog.Identifier @@ -41,12 +46,12 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case u@UnresolvedRelation(nameParts, _, _) - if catalogManager.v1SessionCatalog.isTempView(Seq(nameParts.asIdentifier.name())) => + if catalogManager.v1SessionCatalog.isTempView(nameParts) => u case u@UnresolvedRelation(parts@CatalogAndIdentifier(catalog, ident), _, _) => loadView(catalog, ident) - .map(createViewRelation(parts.quoted, _)) + .map(createViewRelation(parts, _)) .getOrElse(u) } @@ -60,17 +65,44 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look case _ => None } - private def createViewRelation(name: String, view: V2View): LogicalPlan = { - val child = parseViewText(name, view.query) + private def createViewRelation(nameParts: Seq[String], view: View): LogicalPlan = { + val parsed = parseViewText(nameParts.quoted, view.query) + + // Apply the field aliases and column comments if necessary + val child: LogicalPlan = if (!parsed.schema.sameType(view.schema)) { + // This logic differs from how Spark handles views in SessionCatalog.fromCatalogTable. + // This is more strict because it doesn't allow resolution by name. + if (parsed.schema.fieldNames.length != view.schema.fieldNames.length) { + throw new AnalysisException( + "Cannot resolve view ${nameParts.quoted} with incompatible parsed schema:" + + s" ${parsed.schema.fieldNames.mkString(", ")} (expected ${view.schema.fieldNames.mkString(", ")})") + } + + val aliases = parsed.output.zip(view.schema.fields).map { case (attr, expected) => + Alias(UpCast(attr, expected.dataType), expected.name)(explicitMetadata = Some(expected.metadata)) + } + + Project(aliases, parsed) + } else { + parsed + } val viewCatalogAndNamespace: Seq[String] = view.currentCatalog +: view.currentNamespace.toSeq + // Substitute CTEs within the view before qualifying table identifiers - SubqueryAlias(name, qualifyTableIdentifiers(CTESubstitution.apply(child), viewCatalogAndNamespace)) + SubqueryAlias(nameParts, qualifyTableIdentifiers(CTESubstitution.apply(child), viewCatalogAndNamespace)) } private def parseViewText(name: String, viewText: String): LogicalPlan = { + val origin = Origin( + objectType = Some("VIEW"), + objectName = Some(name) + ) + try { - SparkSession.active.sessionState.sqlParser.parsePlan(viewText) + CurrentOrigin.withOrigin(origin) { + spark.sessionState.sqlParser.parseQuery(viewText) + } } catch { case _: ParseException => throw QueryCompilationErrors.invalidViewText(viewText, name) @@ -87,7 +119,7 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look case u@UnresolvedRelation(Seq(table), _, _) => u.copy(multipartIdentifier = catalogAndNamespace :+ table) case u@UnresolvedRelation(parts, _, _) - if !SparkSession.active.sessionState.catalogManager.isCatalogRegistered(parts.head) => + if !spark.sessionState.catalogManager.isCatalogRegistered(parts.head) => u.copy(multipartIdentifier = catalogAndNamespace.head +: parts) } } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java index ec4b2b821634..01804e010834 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java @@ -536,6 +536,62 @@ public boolean dropNamespace(String[] namespace, boolean cascade) return false; } + @Override + public Identifier[] listViews(String... namespace) { + throw new UnsupportedOperationException( + "Listing views is not supported by catalog: " + catalogName); + } + + @Override + public View loadView(Identifier ident) throws NoSuchViewException { + if (null != asViewCatalog) { + try { + org.apache.iceberg.view.View view = asViewCatalog.loadView(buildIdentifier(ident)); + return new SparkView(catalogName, view); + } catch (org.apache.iceberg.exceptions.NoSuchViewException e) { + throw new NoSuchViewException(ident); + } + } + + throw new NoSuchViewException(ident); + } + + @Override + public View createView( + Identifier ident, + String sql, + String currentCatalog, + String[] currentNamespace, + StructType schema, + String[] queryColumnNames, + String[] columnAliases, + String[] columnComments, + Map properties) + throws ViewAlreadyExistsException, NoSuchNamespaceException { + throw new UnsupportedOperationException( + "Creating a view is not supported by catalog: " + catalogName); + } + + @Override + public View alterView(Identifier ident, ViewChange... changes) + throws NoSuchViewException, IllegalArgumentException { + throw new UnsupportedOperationException( + "Altering a view is not supported by catalog: " + catalogName); + } + + @Override + public boolean dropView(Identifier ident) { + throw new UnsupportedOperationException( + "Dropping a view is not supported by catalog: " + catalogName); + } + + @Override + public void renameView(Identifier fromIdentifier, Identifier toIdentifier) + throws NoSuchViewException, ViewAlreadyExistsException { + throw new UnsupportedOperationException( + "Renaming a view is not supported by catalog: " + catalogName); + } + @Override public final void initialize(String name, CaseInsensitiveStringMap options) { this.cacheEnabled = @@ -822,60 +878,4 @@ private Catalog.TableBuilder newBuilder(Identifier ident, Schema schema) { public Catalog icebergCatalog() { return icebergCatalog; } - - @Override - public Identifier[] listViews(String... namespace) { - throw new UnsupportedOperationException( - "Listing views is not supported by catalog: " + catalogName); - } - - @Override - public View loadView(Identifier ident) throws NoSuchViewException { - if (null != asViewCatalog) { - try { - org.apache.iceberg.view.View view = asViewCatalog.loadView(buildIdentifier(ident)); - return new SparkView(catalogName, view); - } catch (org.apache.iceberg.exceptions.NoSuchViewException e) { - throw new NoSuchViewException(ident); - } - } - - throw new NoSuchViewException(ident); - } - - @Override - public View createView( - Identifier ident, - String sql, - String currentCatalog, - String[] currentNamespace, - StructType schema, - String[] queryColumnNames, - String[] columnAliases, - String[] columnComments, - Map properties) - throws ViewAlreadyExistsException, NoSuchNamespaceException { - throw new UnsupportedOperationException( - "Creating a view is not supported by catalog: " + catalogName); - } - - @Override - public View alterView(Identifier ident, ViewChange... changes) - throws NoSuchViewException, IllegalArgumentException { - throw new UnsupportedOperationException( - "Altering a view is not supported by catalog: " + catalogName); - } - - @Override - public boolean dropView(Identifier ident) { - throw new UnsupportedOperationException( - "Dropping a view is not supported by catalog: " + catalogName); - } - - @Override - public void renameView(Identifier fromIdentifier, Identifier toIdentifier) - throws NoSuchViewException, ViewAlreadyExistsException { - throw new UnsupportedOperationException( - "Renaming a view is not supported by catalog: " + catalogName); - } } From 7e957a3331ec3802121675f39ed3f153b22069db Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Mon, 1 Jan 2024 15:58:13 -0800 Subject: [PATCH 2/3] Fix projection and aliasing, update tests. --- .../sql/catalyst/analysis/ResolveViews.scala | 25 ++-- .../iceberg/spark/extensions/TestViews.java | 133 +++++++++++++----- 2 files changed, 102 insertions(+), 56 deletions(-) diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala index ed526fb0f1b8..90d63bb6bf1b 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala @@ -68,25 +68,16 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look private def createViewRelation(nameParts: Seq[String], view: View): LogicalPlan = { val parsed = parseViewText(nameParts.quoted, view.query) - // Apply the field aliases and column comments if necessary - val child: LogicalPlan = if (!parsed.schema.sameType(view.schema)) { - // This logic differs from how Spark handles views in SessionCatalog.fromCatalogTable. - // This is more strict because it doesn't allow resolution by name. - if (parsed.schema.fieldNames.length != view.schema.fieldNames.length) { - throw new AnalysisException( - "Cannot resolve view ${nameParts.quoted} with incompatible parsed schema:" + - s" ${parsed.schema.fieldNames.mkString(", ")} (expected ${view.schema.fieldNames.mkString(", ")})") - } - - val aliases = parsed.output.zip(view.schema.fields).map { case (attr, expected) => - Alias(UpCast(attr, expected.dataType), expected.name)(explicitMetadata = Some(expected.metadata)) - } - - Project(aliases, parsed) - } else { - parsed + // Apply the field aliases and column comments + // This logic differs from how Spark handles views in SessionCatalog.fromCatalogTable. + // This is more strict because it doesn't allow resolution by field name. + val aliases = view.schema.fields.zipWithIndex.map { case (expected, pos) => + val attr = GetColumnByOrdinal(pos, expected.dataType) + Alias(UpCast(attr, expected.dataType), expected.name)(explicitMetadata = Some(expected.metadata)) } + val child = Project(aliases, parsed) + val viewCatalogAndNamespace: Seq[String] = view.currentCatalog +: view.currentNamespace.toSeq // Substitute CTEs within the view before qualifying table identifiers diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java index a63bc86b007f..7eb5d7dde106 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java @@ -34,7 +34,9 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.SparkCatalogConfig; +import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.source.SimpleRecord; +import org.apache.iceberg.types.Types; import org.apache.spark.sql.AnalysisException; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; @@ -79,18 +81,18 @@ public TestViews(String catalog, String implementation, Map prop public void readFromView() throws NoSuchTableException { insertRows(10); String viewName = "simpleView"; + String sql = String.format("SELECT id FROM %s", tableName); ViewCatalog viewCatalog = viewCatalog(); - Schema schema = tableCatalog().loadTable(TableIdentifier.of(NAMESPACE, tableName)).schema(); viewCatalog .buildView(TableIdentifier.of(NAMESPACE, viewName)) - .withQuery("spark", String.format("SELECT id FROM %s", tableName)) + .withQuery("spark", sql) // use non-existing column name to make sure only the SQL definition for spark is loaded .withQuery("trino", String.format("SELECT non_existing FROM %s", tableName)) .withDefaultNamespace(NAMESPACE) .withDefaultCatalog(catalogName) - .withSchema(schema) + .withSchema(schema(sql)) .create(); List expected = @@ -105,16 +107,16 @@ public void readFromView() throws NoSuchTableException { public void readFromTrinoView() throws NoSuchTableException { insertRows(10); String viewName = "trinoView"; + String sql = String.format("SELECT id FROM %s", tableName); ViewCatalog viewCatalog = viewCatalog(); - Schema schema = tableCatalog().loadTable(TableIdentifier.of(NAMESPACE, tableName)).schema(); viewCatalog .buildView(TableIdentifier.of(NAMESPACE, viewName)) - .withQuery("trino", String.format("SELECT id FROM %s", tableName)) + .withQuery("trino", sql) .withDefaultNamespace(NAMESPACE) .withDefaultCatalog(catalogName) - .withSchema(schema) + .withSchema(schema(sql)) .create(); List expected = @@ -131,22 +133,23 @@ public void readFromMultipleViews() throws NoSuchTableException { insertRows(6); String viewName = "firstView"; String secondView = "secondView"; + String viewSQL = String.format("SELECT id FROM %s WHERE id <= 3", tableName); + String secondViewSQL = String.format("SELECT id FROM %s WHERE id > 3", tableName); ViewCatalog viewCatalog = viewCatalog(); - Schema schema = tableCatalog().loadTable(TableIdentifier.of(NAMESPACE, tableName)).schema(); viewCatalog .buildView(TableIdentifier.of(NAMESPACE, viewName)) - .withQuery("spark", String.format("SELECT id FROM %s WHERE id <= 3", tableName)) + .withQuery("spark", viewSQL) .withDefaultNamespace(NAMESPACE) - .withSchema(schema) + .withSchema(schema(viewSQL)) .create(); viewCatalog .buildView(TableIdentifier.of(NAMESPACE, secondView)) - .withQuery("spark", String.format("SELECT id FROM %s WHERE id > 3", tableName)) + .withQuery("spark", secondViewSQL) .withDefaultNamespace(NAMESPACE) - .withSchema(schema) + .withSchema(schema(secondViewSQL)) .create(); assertThat(sql("SELECT * FROM %s", viewName)) @@ -164,7 +167,7 @@ public void readFromViewUsingNonExistingTable() throws NoSuchTableException { String viewName = "viewWithNonExistingTable"; ViewCatalog viewCatalog = viewCatalog(); - Schema schema = tableCatalog().loadTable(TableIdentifier.of(NAMESPACE, tableName)).schema(); + Schema schema = new Schema(Types.NestedField.required(1, "id", Types.LongType.get())); viewCatalog .buildView(TableIdentifier.of(NAMESPACE, viewName)) @@ -188,7 +191,7 @@ public void readFromViewUsingNonExistingTableColumn() throws NoSuchTableExceptio String viewName = "viewWithNonExistingColumn"; ViewCatalog viewCatalog = viewCatalog(); - Schema schema = tableCatalog().loadTable(TableIdentifier.of(NAMESPACE, tableName)).schema(); + Schema schema = new Schema(Types.NestedField.required(1, "non_existing", Types.LongType.get())); viewCatalog .buildView(TableIdentifier.of(NAMESPACE, viewName)) @@ -230,25 +233,26 @@ public void readFromViewUsingInvalidSQL() throws NoSuchTableException { public void readFromViewWithStaleSchema() throws NoSuchTableException { insertRows(10); String viewName = "staleView"; + String sql = String.format("SELECT id, data FROM %s", tableName); ViewCatalog viewCatalog = viewCatalog(); - Schema schema = tableCatalog().loadTable(TableIdentifier.of(NAMESPACE, tableName)).schema(); viewCatalog .buildView(TableIdentifier.of(NAMESPACE, viewName)) - .withQuery("spark", String.format("SELECT id FROM %s", tableName)) + .withQuery("spark", sql) .withDefaultNamespace(NAMESPACE) .withDefaultCatalog(catalogName) - .withSchema(schema) + .withSchema(schema(sql)) .create(); - // drop the column the view depends on - sql("ALTER TABLE %s DROP COLUMN id", tableName); + // drop a column the view depends on + // note that this tests `data` because it has an invalid ordinal + sql("ALTER TABLE %s DROP COLUMN data", tableName); // reading from the view should now fail assertThatThrownBy(() -> sql("SELECT * FROM %s", viewName)) .isInstanceOf(AnalysisException.class) - .hasMessageContaining("A column or function parameter with name `id` cannot be resolved"); + .hasMessageContaining("A column or function parameter with name `data` cannot be resolved"); } @Test @@ -282,18 +286,18 @@ public void readFromViewHiddenByTempView() throws NoSuchTableException { public void readFromViewWithGlobalTempView() throws NoSuchTableException { insertRows(10); String viewName = "viewWithGlobalTempView"; + String sql = String.format("SELECT id FROM %s WHERE id > 5", tableName); ViewCatalog viewCatalog = viewCatalog(); - Schema schema = tableCatalog().loadTable(TableIdentifier.of(NAMESPACE, tableName)).schema(); sql("CREATE GLOBAL TEMPORARY VIEW %s AS SELECT id FROM %s WHERE id <= 5", viewName, tableName); viewCatalog .buildView(TableIdentifier.of(NAMESPACE, viewName)) - .withQuery("spark", String.format("SELECT id FROM %s WHERE id > 5", tableName)) + .withQuery("spark", sql) .withDefaultNamespace(NAMESPACE) .withDefaultCatalog(catalogName) - .withSchema(schema) + .withSchema(schema(sql)) .create(); // GLOBAL TEMP VIEWS are stored in a global_temp namespace @@ -313,32 +317,30 @@ public void readFromViewReferencingAnotherView() throws NoSuchTableException { insertRows(10); String firstView = "viewBeingReferencedInAnotherView"; String viewReferencingOtherView = "viewReferencingOtherView"; + String firstSQL = String.format("SELECT id FROM %s WHERE id <= 5", tableName); + String secondSQL = String.format("SELECT id FROM %s WHERE id > 4", firstView); ViewCatalog viewCatalog = viewCatalog(); - Schema schema = tableCatalog().loadTable(TableIdentifier.of(NAMESPACE, tableName)).schema(); viewCatalog .buildView(TableIdentifier.of(NAMESPACE, firstView)) - .withQuery("spark", String.format("SELECT id FROM %s", tableName)) + .withQuery("spark", firstSQL) .withDefaultNamespace(NAMESPACE) .withDefaultCatalog(catalogName) - .withSchema(schema) + .withSchema(schema(firstSQL)) .create(); viewCatalog .buildView(TableIdentifier.of(NAMESPACE, viewReferencingOtherView)) - .withQuery("spark", String.format("SELECT id FROM %s", firstView)) + .withQuery("spark", secondSQL) .withDefaultNamespace(NAMESPACE) .withDefaultCatalog(catalogName) - .withSchema(schema) + .withSchema(schema(secondSQL)) .create(); - List expected = - IntStream.rangeClosed(1, 10).mapToObj(this::row).collect(Collectors.toList()); - assertThat(sql("SELECT * FROM %s", viewReferencingOtherView)) - .hasSize(10) - .containsExactlyInAnyOrderElementsOf(expected); + .hasSize(1) + .containsExactly(row(5)); } @Test @@ -346,9 +348,9 @@ public void readFromViewReferencingTempView() throws NoSuchTableException { insertRows(10); String tempView = "tempViewBeingReferencedInAnotherView"; String viewReferencingTempView = "viewReferencingTempView"; + String sql = String.format("SELECT id FROM %s", tempView); ViewCatalog viewCatalog = viewCatalog(); - Schema schema = tableCatalog().loadTable(TableIdentifier.of(NAMESPACE, tableName)).schema(); sql("CREATE TEMPORARY VIEW %s AS SELECT id FROM %s WHERE id <= 5", tempView, tableName); @@ -356,10 +358,10 @@ public void readFromViewReferencingTempView() throws NoSuchTableException { // but this can't be prevented when using the API directly viewCatalog .buildView(TableIdentifier.of(NAMESPACE, viewReferencingTempView)) - .withQuery("spark", String.format("SELECT id FROM %s", tempView)) + .withQuery("spark", sql) .withDefaultNamespace(NAMESPACE) .withDefaultCatalog(catalogName) - .withSchema(schema) + .withSchema(schema(sql)) .create(); List expected = @@ -377,6 +379,51 @@ public void readFromViewReferencingTempView() throws NoSuchTableException { .hasMessageContaining("cannot be found"); } + @Test + public void readFromViewReferencingAnotherViewHiddenByTempView() throws NoSuchTableException { + insertRows(10); + String innerViewName = "inner_view"; + String outerViewName = "outer_view"; + String innerViewSQL = String.format("SELECT * FROM %s WHERE id > 5", tableName); + String outerViewSQL = String.format("SELECT id FROM %s", innerViewName); + + ViewCatalog viewCatalog = viewCatalog(); + + viewCatalog + .buildView(TableIdentifier.of(NAMESPACE, innerViewName)) + .withQuery("spark", innerViewSQL) + .withDefaultNamespace(NAMESPACE) + .withDefaultCatalog(catalogName) + .withSchema(schema(innerViewSQL)) + .create(); + + viewCatalog + .buildView(TableIdentifier.of(NAMESPACE, outerViewName)) + .withQuery("spark", outerViewSQL) + .withDefaultNamespace(NAMESPACE) + .withDefaultCatalog(catalogName) + .withSchema(schema(outerViewSQL)) + .create(); + + // create a temporary view that conflicts with the inner view to verify the inner name is + // resolved using the catalog and namespace defaults from the outer view + sql("CREATE TEMPORARY VIEW %s AS SELECT id FROM %s WHERE id <= 5", innerViewName, tableName); + + List tempViewRows = + IntStream.rangeClosed(1, 5).mapToObj(this::row).collect(Collectors.toList()); + + assertThat(sql("SELECT * FROM %s", innerViewName)) + .hasSize(5) + .containsExactlyInAnyOrderElementsOf(tempViewRows); + + List expectedViewRows = + IntStream.rangeClosed(6, 10).mapToObj(this::row).collect(Collectors.toList()); + + assertThat(sql("SELECT * FROM %s", outerViewName)) + .hasSize(5) + .containsExactlyInAnyOrderElementsOf(expectedViewRows); + } + @Test public void readFromViewReferencingGlobalTempView() throws NoSuchTableException { insertRows(10); @@ -419,19 +466,27 @@ public void readFromViewReferencingGlobalTempView() throws NoSuchTableException public void readFromViewWithCTE() throws NoSuchTableException { insertRows(10); String viewName = "viewWithCTE"; + String sql = + String.format( + "WITH max_by_data AS (SELECT max(id) as max FROM %s) " + + "SELECT max, count(1) AS count FROM max_by_data GROUP BY max", + tableName); ViewCatalog viewCatalog = viewCatalog(); - Schema schema = tableCatalog().loadTable(TableIdentifier.of(NAMESPACE, tableName)).schema(); viewCatalog .buildView(TableIdentifier.of(NAMESPACE, viewName)) - .withQuery("spark", String.format("SELECT * FROM (SELECT max(id) FROM %s)", tableName)) + .withQuery("spark", sql) .withDefaultNamespace(NAMESPACE) .withDefaultCatalog(catalogName) - .withSchema(schema) + .withSchema(schema(sql)) .create(); - assertThat(sql("SELECT * FROM %s", viewName)).hasSize(1).containsExactly(row(10)); + assertThat(sql("SELECT * FROM %s", viewName)).hasSize(1).containsExactly(row(10, 1L)); + } + + private Schema schema(String sql) { + return SparkSchemaUtil.convert(spark.sql(sql).schema()); } private ViewCatalog viewCatalog() { From f0eece48ab74b62b11151ff76282c8433b4a3865 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Mon, 1 Jan 2024 17:23:27 -0800 Subject: [PATCH 3/3] Add function identifier rewriting. --- .../sql/catalyst/analysis/ResolveViews.scala | 48 ++++-- .../iceberg/spark/extensions/TestViews.java | 138 +++++++++++++++++- 2 files changed, 176 insertions(+), 10 deletions(-) diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala index 90d63bb6bf1b..a978b94f49ac 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala @@ -19,8 +19,8 @@ package org.apache.spark.sql.catalyst.analysis -import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.expressions.Alias import org.apache.spark.sql.catalyst.expressions.UpCast import org.apache.spark.sql.catalyst.parser.ParseException @@ -68,6 +68,10 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look private def createViewRelation(nameParts: Seq[String], view: View): LogicalPlan = { val parsed = parseViewText(nameParts.quoted, view.query) + // Apply any necessary rewrites to preserve correct resolution + val viewCatalogAndNamespace: Seq[String] = view.currentCatalog +: view.currentNamespace.toSeq + val rewritten = rewriteIdentifiers(parsed, viewCatalogAndNamespace); + // Apply the field aliases and column comments // This logic differs from how Spark handles views in SessionCatalog.fromCatalogTable. // This is more strict because it doesn't allow resolution by field name. @@ -76,12 +80,7 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look Alias(UpCast(attr, expected.dataType), expected.name)(explicitMetadata = Some(expected.metadata)) } - val child = Project(aliases, parsed) - - val viewCatalogAndNamespace: Seq[String] = view.currentCatalog +: view.currentNamespace.toSeq - - // Substitute CTEs within the view before qualifying table identifiers - SubqueryAlias(nameParts, qualifyTableIdentifiers(CTESubstitution.apply(child), viewCatalogAndNamespace)) + SubqueryAlias(nameParts, Project(aliases, rewritten)) } private def parseViewText(name: String, viewText: String): LogicalPlan = { @@ -100,6 +99,30 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look } } + private def rewriteIdentifiers( + plan: LogicalPlan, + catalogAndNamespace: Seq[String]): LogicalPlan = { + // Substitute CTEs within the view, then rewrite unresolved functions and relations + qualifyTableIdentifiers( + qualifyFunctionIdentifiers( + CTESubstitution.apply(plan), + catalogAndNamespace), + catalogAndNamespace) + } + + private def qualifyFunctionIdentifiers( + plan: LogicalPlan, + catalogAndNamespace: Seq[String]): LogicalPlan = plan transformExpressions { + case u@UnresolvedFunction(Seq(name), _, _, _, _) => + if (!isBuiltinFunction(name)) { + u.copy(nameParts = catalogAndNamespace :+ name) + } else { + u + } + case u@UnresolvedFunction(parts, _, _, _, _) if !isCatalog(parts.head) => + u.copy(nameParts = catalogAndNamespace.head +: parts) + } + /** * Qualify table identifiers with default catalog and namespace if necessary. */ @@ -109,8 +132,15 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look child transform { case u@UnresolvedRelation(Seq(table), _, _) => u.copy(multipartIdentifier = catalogAndNamespace :+ table) - case u@UnresolvedRelation(parts, _, _) - if !spark.sessionState.catalogManager.isCatalogRegistered(parts.head) => + case u@UnresolvedRelation(parts, _, _) if !isCatalog(parts.head) => u.copy(multipartIdentifier = catalogAndNamespace.head +: parts) } + + private def isCatalog(name: String): Boolean = { + spark.sessionState.catalogManager.isCatalogRegistered(name) + } + + private def isBuiltinFunction(name: String): Boolean = { + spark.sessionState.catalogManager.v1SessionCatalog.isBuiltinFunction(FunctionIdentifier(name)) + } } diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java index 7eb5d7dde106..94e86e5ee3cc 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java @@ -26,6 +26,7 @@ import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.IntStream; +import org.apache.iceberg.IcebergBuild; import org.apache.iceberg.Schema; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; @@ -53,12 +54,14 @@ public class TestViews extends SparkExtensionsTestBase { @Before public void before() { spark.conf().set("spark.sql.defaultCatalog", catalogName); + sql("USE %s", catalogName); sql("CREATE NAMESPACE IF NOT EXISTS %s", NAMESPACE); sql("CREATE TABLE %s (id INT, data STRING)", tableName); } @After public void removeTable() { + sql("USE %s", catalogName); sql("DROP TABLE IF EXISTS %s", tableName); } @@ -409,6 +412,9 @@ public void readFromViewReferencingAnotherViewHiddenByTempView() throws NoSuchTa // resolved using the catalog and namespace defaults from the outer view sql("CREATE TEMPORARY VIEW %s AS SELECT id FROM %s WHERE id <= 5", innerViewName, tableName); + // ensure that the inner view resolution uses the view namespace and catalog + sql("USE spark_catalog"); + List tempViewRows = IntStream.rangeClosed(1, 5).mapToObj(this::row).collect(Collectors.toList()); @@ -419,7 +425,7 @@ public void readFromViewReferencingAnotherViewHiddenByTempView() throws NoSuchTa List expectedViewRows = IntStream.rangeClosed(6, 10).mapToObj(this::row).collect(Collectors.toList()); - assertThat(sql("SELECT * FROM %s", outerViewName)) + assertThat(sql("SELECT * FROM %s.%s.%s", catalogName, NAMESPACE, outerViewName)) .hasSize(5) .containsExactlyInAnyOrderElementsOf(expectedViewRows); } @@ -485,6 +491,136 @@ public void readFromViewWithCTE() throws NoSuchTableException { assertThat(sql("SELECT * FROM %s", viewName)).hasSize(1).containsExactly(row(10, 1L)); } + @Test + public void rewriteFunctionIdentifier() { + String viewName = "rewriteFunctionIdentifier"; + String sql = "SELECT iceberg_version() AS version"; + + assertThatThrownBy(() -> sql(sql)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining("Cannot resolve function") + .hasMessageContaining("iceberg_version"); + + ViewCatalog viewCatalog = viewCatalog(); + Schema schema = new Schema(Types.NestedField.required(1, "version", Types.StringType.get())); + + viewCatalog + .buildView(TableIdentifier.of(NAMESPACE, viewName)) + .withQuery("spark", sql) + .withDefaultNamespace(Namespace.of("system")) + .withDefaultCatalog(catalogName) + .withSchema(schema) + .create(); + + assertThat(sql("SELECT * FROM %s", viewName)) + .hasSize(1) + .containsExactly(row(IcebergBuild.version())); + } + + @Test + public void builtinFunctionIdentifierNotRewritten() { + String viewName = "builtinFunctionIdentifierNotRewritten"; + String sql = "SELECT trim(' abc ') AS result"; + + ViewCatalog viewCatalog = viewCatalog(); + Schema schema = new Schema(Types.NestedField.required(1, "result", Types.StringType.get())); + + viewCatalog + .buildView(TableIdentifier.of(NAMESPACE, viewName)) + .withQuery("spark", sql) + .withDefaultNamespace(Namespace.of("system")) + .withDefaultCatalog(catalogName) + .withSchema(schema) + .create(); + + assertThat(sql("SELECT * FROM %s", viewName)).hasSize(1).containsExactly(row("abc")); + } + + @Test + public void rewriteFunctionIdentifierWithNamespace() { + String viewName = "rewriteFunctionIdentifierWithNamespace"; + String sql = "SELECT system.bucket(100, 'a') AS bucket_result, 'a' AS value"; + + ViewCatalog viewCatalog = viewCatalog(); + + viewCatalog + .buildView(TableIdentifier.of(NAMESPACE, viewName)) + .withQuery("spark", sql) + .withDefaultNamespace(Namespace.of("system")) + .withDefaultCatalog(catalogName) + .withSchema(schema(sql)) + .create(); + + sql("USE spark_catalog"); + + assertThatThrownBy(() -> sql(sql)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining("Cannot resolve function") + .hasMessageContaining("`system`.`bucket`"); + + assertThat(sql("SELECT * FROM %s.%s.%s", catalogName, NAMESPACE, viewName)) + .hasSize(1) + .containsExactly(row(50, "a")); + } + + @Test + public void fullFunctionIdentifier() { + String viewName = "fullFunctionIdentifier"; + String sql = + String.format( + "SELECT %s.system.bucket(100, 'a') AS bucket_result, 'a' AS value", catalogName); + + sql("USE spark_catalog"); + + ViewCatalog viewCatalog = viewCatalog(); + + viewCatalog + .buildView(TableIdentifier.of(NAMESPACE, viewName)) + .withQuery("spark", sql) + .withDefaultNamespace(Namespace.of("system")) + .withDefaultCatalog(catalogName) + .withSchema(schema(sql)) + .create(); + + assertThat(sql("SELECT * FROM %s.%s.%s", catalogName, NAMESPACE, viewName)) + .hasSize(1) + .containsExactly(row(50, "a")); + } + + @Test + public void fullFunctionIdentifierNotRewrittenLoadFailure() { + String viewName = "fullFunctionIdentifierNotRewrittenLoadFailure"; + String sql = + String.format( + "SELECT spark_catalog.system.bucket(100, 'a') AS bucket_result, 'a' AS value", + catalogName); + + // avoid namespace failures + sql("USE spark_catalog"); + sql("CREATE NAMESPACE IF NOT EXISTS system"); + sql("USE %s", catalogName); + + Schema schema = + new Schema( + Types.NestedField.required(1, "bucket_result", Types.IntegerType.get()), + Types.NestedField.required(2, "value", Types.StringType.get())); + + ViewCatalog viewCatalog = viewCatalog(); + + viewCatalog + .buildView(TableIdentifier.of(NAMESPACE, viewName)) + .withQuery("spark", sql) + .withDefaultNamespace(Namespace.of("system")) + .withDefaultCatalog(catalogName) + .withSchema(schema) + .create(); + + // verify the v1 error message + assertThatThrownBy(() -> sql("SELECT * FROM %s", viewName)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining("The function `system`.`bucket` cannot be found"); + } + private Schema schema(String sql) { return SparkSchemaUtil.convert(spark.sql(sql).schema()); }