Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,13 @@ import org.apache.spark.sql.connector.catalog.ViewCatalog
* ResolveSessionCatalog exits early for some v2 View commands,
* thus they are pre-substituted here and then handled in ResolveViews
*/
case class HijackViewCommands(spark: SparkSession) extends Rule[LogicalPlan] with LookupCatalog {
case class RewriteViewCommands(spark: SparkSession) extends Rule[LogicalPlan] with LookupCatalog {

protected lazy val catalogManager: CatalogManager = spark.sessionState.catalogManager

override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case DropView(UnresolvedIdentifier(nameParts, allowTemp), ifExists)
if isViewCatalog(catalogManager.currentCatalog) && !isTempView(nameParts) =>
DropIcebergView(UnresolvedIdentifier(nameParts, allowTemp), ifExists)
case DropView(ResolvedView(resolved), ifExists) =>
DropIcebergView(resolved, ifExists)
}

private def isTempView(nameParts: Seq[String]): Boolean = {
Expand All @@ -50,4 +49,17 @@ case class HijackViewCommands(spark: SparkSession) extends Rule[LogicalPlan] wit
private def isViewCatalog(catalog: CatalogPlugin): Boolean = {
catalog.isInstanceOf[ViewCatalog]
}

object ResolvedView {
def unapply(unresolved: UnresolvedIdentifier): Option[ResolvedIdentifier] = unresolved match {
case UnresolvedIdentifier(nameParts, true) if isTempView(nameParts) =>
None

case UnresolvedIdentifier(CatalogAndIdentifier(catalog, ident), _) if isViewCatalog(catalog) =>
Some(ResolvedIdentifier(catalog, ident))

case _ =>
None
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.analysis.HijackViewCommands
import org.apache.spark.sql.catalyst.analysis.RewriteViewCommands
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.parser.ParserInterface
Expand Down Expand Up @@ -123,7 +123,7 @@ class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserI
if (isIcebergCommand(sqlTextAfterSubstitution)) {
parse(sqlTextAfterSubstitution) { parser => astBuilder.visit(parser.singleStatement()) }.asInstanceOf[LogicalPlan]
} else {
HijackViewCommands(SparkSession.active).apply(delegate.parsePlan(sqlText))
RewriteViewCommands(SparkSession.active).apply(delegate.parsePlan(sqlText))
}
}

Expand Down Expand Up @@ -335,4 +335,4 @@ class IcebergParseException(
def withCommand(cmd: String): IcebergParseException = {
new IcebergParseException(Option(cmd), message, start, stop)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -717,12 +717,13 @@ public void dropV1View() {
sql("CREATE NAMESPACE IF NOT EXISTS %s", NAMESPACE);
sql("CREATE TABLE %s (id INT, data STRING)", tableName);
sql("CREATE VIEW %s AS SELECT id FROM %s", v1View, tableName);
sql("USE %s", catalogName);
assertThat(
v1SessionCatalog()
.tableExists(new org.apache.spark.sql.catalyst.TableIdentifier(v1View)))
.isTrue();

sql("DROP VIEW %s", v1View);
sql("DROP VIEW spark_catalog.%s.%s", NAMESPACE, v1View);
assertThat(
v1SessionCatalog()
.tableExists(new org.apache.spark.sql.catalyst.TableIdentifier(v1View)))
Expand Down