Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.expressions.SubExprUtils._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.expressions.objects._
import org.apache.spark.sql.catalyst.optimizer.OptimizeUpdateFields
import org.apache.spark.sql.catalyst.parser.{ParseException, ParserInterface}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
Expand All @@ -43,7 +44,7 @@ import org.apache.spark.sql.catalyst.trees.CurrentOrigin.withOrigin
import org.apache.spark.sql.catalyst.trees.TreePattern._
import org.apache.spark.sql.catalyst.util.{toPrettySQL, CharVarcharUtils, StringUtils}
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
import org.apache.spark.sql.connector.catalog.{View => _, _}
import org.apache.spark.sql.connector.catalog.{View => V2View, _}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.connector.catalog.TableChange.{After, ColumnPosition}
import org.apache.spark.sql.connector.catalog.functions.{AggregateFunction => V2AggregateFunction, ScalarFunction, UnboundFunction}
Expand Down Expand Up @@ -239,6 +240,11 @@ class Analyzer(override val catalogManager: CatalogManager)
errorOnExceed = true,
maxIterationsSetting = SQLConf.ANALYZER_MAX_ITERATIONS.key)

/**
* Override to provide additional rules for the "Substitution" batch.
*/
val extendedSubstitutionRules: Seq[Rule[LogicalPlan]] = Nil

/**
* Override to provide additional rules for the "Resolution" batch.
*/
Expand All @@ -263,11 +269,12 @@ class Analyzer(override val catalogManager: CatalogManager)
// However, when manipulating deeply nested schema, `UpdateFields` expression tree could be
// very complex and make analysis impossible. Thus we need to optimize `UpdateFields` early
// at the beginning of analysis.
OptimizeUpdateFields,
CTESubstitution,
WindowsSubstitution,
EliminateUnions,
SubstituteUnresolvedOrdinals),
OptimizeUpdateFields +:
CTESubstitution +:
WindowsSubstitution +:
EliminateUnions +:
SubstituteUnresolvedOrdinals +:
extendedSubstitutionRules : _*),
Batch("Disable Hints", Once,
new ResolveHints.DisableHints),
Batch("Hints", fixedPoint,
Expand Down Expand Up @@ -451,6 +458,74 @@ class Analyzer(override val catalogManager: CatalogManager)
}
}

/**
* Substitute persisted views in parsed plans with parsed view sql text.
*/
case class ViewSubstitution(sqlParser: ParserInterface) extends Rule[LogicalPlan] {

def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case u @ UnresolvedRelation(nameParts, _, _) if v1SessionCatalog.isTempView(nameParts) =>
u
case u @ UnresolvedRelation(
parts @ NonSessionCatalogAndIdentifier(catalog, ident), _, _) if !isSQLOnFile(parts) =>
CatalogV2Util.loadView(catalog, ident)
.map(createViewRelation(parts.quoted, _))
.getOrElse(u)
}

private def isSQLOnFile(parts: Seq[String]): Boolean = parts match {
case Seq(_, path) if path.contains("/") => true
case _ => false
}

private def createViewRelation(name: String, view: V2View): LogicalPlan = {
if (!catalogManager.isCatalogRegistered(view.currentCatalog)) {
throw new AnalysisException(
s"Invalid current catalog '${view.currentCatalog}' in view '$name'")
}

val child = parseViewText(name, view.query)
val desc = V2ViewDescription(name, view)
val qualifiedChild = desc.viewCatalogAndNamespace match {
case Seq() =>
// Views from Spark 2.2 or prior do not store catalog or namespace,
// however its sql text should already be fully qualified.
child
case catalogAndNamespace =>
// Substitute CTEs within the view before qualifying table identifiers
qualifyTableIdentifiers(CTESubstitution.apply(child), catalogAndNamespace)
}

// The relation is a view, so we wrap the relation by:
// 1. Add a [[View]] operator over the relation to keep track of the view desc;
// 2. Wrap the logical plan in a [[SubqueryAlias]] which tracks the name of the view.
SubqueryAlias(name, View(desc, false, qualifiedChild))
}

private def parseViewText(name: String, viewText: String): LogicalPlan = {
try {
sqlParser.parsePlan(viewText)
} catch {
case _: ParseException =>
throw QueryCompilationErrors.invalidViewText(viewText, name)
}
}

/**
* Qualify table identifiers with default catalog and namespace if necessary.
*/
private def qualifyTableIdentifiers(
child: LogicalPlan,
catalogAndNamespace: Seq[String]): LogicalPlan =
child transform {
case u @ UnresolvedRelation(Seq(table), _, _) =>
u.copy(multipartIdentifier = catalogAndNamespace :+ table)
case u @ UnresolvedRelation(parts, _, _)
if !catalogManager.isCatalogRegistered(parts.head) =>
u.copy(multipartIdentifier = catalogAndNamespace.head +: parts)
}
}

/**
* Substitute child plan with WindowSpecDefinitions.
*/
Expand Down Expand Up @@ -1067,7 +1142,7 @@ class Analyzer(override val catalogManager: CatalogManager)
// The view's child should be a logical plan parsed from the `desc.viewText`, the variable
// `viewText` should be defined, or else we throw an error on the generation of the View
// operator.
case view @ View(desc, isTempView, child) if !child.resolved =>
case view @ View(CatalogTableViewDescription(desc), isTempView, child) if !child.resolved =>
// Resolve all the UnresolvedRelations and Views in the child.
val newChild = AnalysisContext.withAnalysisContext(desc) {
val nestedViewDepth = AnalysisContext.get.nestedViewDepth
Expand Down Expand Up @@ -1206,23 +1281,32 @@ class Analyzer(override val catalogManager: CatalogManager)
}.orElse {
expandIdentifier(identifier) match {
case CatalogAndIdentifier(catalog, ident) =>
if (viewOnly && !CatalogV2Util.isSessionCatalog(catalog)) {
throw QueryCompilationErrors.catalogOperationNotSupported(catalog, "views")
}
CatalogV2Util.loadTable(catalog, ident).map {
case v1Table: V1Table if CatalogV2Util.isSessionCatalog(catalog) &&
v1Table.v1Table.tableType == CatalogTableType.VIEW =>
val v1Ident = v1Table.catalogTable.identifier
val v2Ident = Identifier.of(v1Ident.database.toArray, v1Ident.identifier)
ResolvedPersistentView(catalog, v2Ident, v1Table.catalogTable.schema)
case table =>
ResolvedTable.create(catalog.asTableCatalog, ident, table)
}
lookupView(catalog, ident)
.orElse(lookupTable(catalog, ident))
case _ => None
}
}
}

private def lookupView(catalog: CatalogPlugin, ident: Identifier): Option[LogicalPlan] =
CatalogV2Util.loadView(catalog, ident).map {
case view if CatalogV2Util.isSessionCatalog(catalog) =>
ResolvedPersistentView(catalog, ident, view.schema)
case view =>
ResolvedV2View(catalog.asViewCatalog, ident, view)
}

private def lookupTable(catalog: CatalogPlugin, ident: Identifier): Option[LogicalPlan] =
CatalogV2Util.loadTable(catalog, ident).map {
case v1Table: V1Table if CatalogV2Util.isSessionCatalog(catalog) &&
v1Table.v1Table.tableType == CatalogTableType.VIEW =>
val v1Ident = v1Table.catalogTable.identifier
val v2Ident = Identifier.of(v1Ident.database.toArray, v1Ident.identifier)
analysis.ResolvedPersistentView(catalog, v2Ident, v1Table.catalogTable.schema)
case table =>
ResolvedTable.create(catalog.asTableCatalog, ident, table)
}

private def createRelation(
catalog: CatalogPlugin,
ident: Identifier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,44 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB
case alter: AlterTableCommand =>
checkAlterTableCommand(alter)

case c: CreateView =>
if (c.originalText.isEmpty) {
throw new AnalysisException(
"'originalText' must be provided to create permanent view")
}

if (c.allowExisting && c.replace) {
throw new AnalysisException(
"CREATE VIEW with both IF NOT EXISTS and REPLACE is not allowed.")
}

// If the view output doesn't have the same number of columns neither with the child
// output, nor with the query column names, throw an AnalysisException.
// If the view's child output can't up cast to the view output,
// throw an AnalysisException, too.
case v @ View(desc, _, child) if child.resolved && v.output != child.output =>
val queryColumnNames = desc.viewQueryColumnNames
val queryOutput = if (queryColumnNames.nonEmpty) {
if (v.output.length != queryColumnNames.length) {
// If the view output doesn't have the same number of columns with the query column
// names, throw an AnalysisException.
throw new AnalysisException(
s"The view output ${v.output.mkString("[", ",", "]")} doesn't have the same" +
"number of columns with the query column names " +
s"${queryColumnNames.mkString("[", ",", "]")}")
}
val resolver = SQLConf.get.resolver
queryColumnNames.map { colName =>
child.output.find { attr =>
resolver(attr.name, colName)
}.getOrElse(throw new AnalysisException(
s"Attribute with name '$colName' is not found in " +
s"'${child.output.map(_.name).mkString("(", ",", ")")}'"))
}
} else {
child.output
}

case _ => // Falls back to the following checks
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,17 @@ package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.{CatalogManager, Identifier, LookupCatalog}
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, Identifier, LookupCatalog, ViewChange}
import org.apache.spark.sql.errors.QueryCompilationErrors

/**
* Resolves the catalog of the name parts for table/view/function/namespace.
*/
class ResolveCatalogs(val catalogManager: CatalogManager)
extends Rule[LogicalPlan] with LookupCatalog {

import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._

override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case UnresolvedIdentifier(nameParts, allowTemp) =>
if (allowTemp && catalogManager.v1SessionCatalog.isTempView(nameParts)) {
Expand All @@ -50,5 +53,52 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
ResolvedNamespace(currentCatalog, Seq.empty[String])
case UnresolvedNamespace(CatalogAndNamespace(catalog, ns)) =>
ResolvedNamespace(catalog, ns)

case DescribeRelation(ResolvedV2View(_, ident, view), _, isExtended, _) =>
DescribeV2View(V2ViewDescription(ident.quoted, view), isExtended)

case ShowCreateTable(ResolvedV2View(_, ident, view), _, _) =>
ShowCreateV2View(V2ViewDescription(ident.quoted, view))

case ShowTableProperties(ResolvedV2View(_, ident, view), propertyKeys, _) =>
ShowV2ViewProperties(V2ViewDescription(ident.quoted, view), propertyKeys)

case SetViewProperties(ResolvedV2View(catalog, ident, _), props) =>
val changes = props.map {
case (property, value) => ViewChange.setProperty(property, value)
}.toSeq
AlterV2View(catalog, ident, changes)

case UnsetViewProperties(ResolvedV2View(catalog, ident, _), propertyKeys, ifExists) =>
if (!ifExists) {
val view = catalog.loadView(ident)
propertyKeys.filterNot(view.properties.containsKey).foreach { property =>
QueryCompilationErrors.cannotUnsetNonExistentViewProperty(ident, property)
}
}
val changes = propertyKeys.map(ViewChange.removeProperty)
AlterV2View(catalog, ident, changes)

case RenameTable(ResolvedV2View(oldCatalog, oldIdent, _),
NonSessionCatalogAndIdentifier(newCatalog, newIdent), true) =>
if (oldCatalog.name != newCatalog.name) {
QueryCompilationErrors.cannotMoveViewBetweenCatalogs(
oldCatalog.name, newCatalog.name)
}
RenameV2View(oldCatalog, oldIdent, newIdent)

case RefreshTable(ResolvedV2View(catalog, ident, _)) =>
RefreshView(catalog, ident)

case DropView(ResolvedV2View(catalog, ident, _), ifExists) =>
DropV2View(catalog, ident, ifExists)
}

object NonSessionCatalogAndTable {
def unapply(nameParts: Seq[String]): Option[(CatalogPlugin, Seq[String])] = nameParts match {
case NonSessionCatalogAndIdentifier(catalog, ident) =>
Some(catalog -> ident.asMultipartIdentifier)
case _ => None
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.analysis

import scala.collection.JavaConverters._

import org.apache.spark.sql.catalyst.plans.logical.ViewDescription
import org.apache.spark.sql.connector.catalog.{View, ViewCatalog}
import org.apache.spark.sql.types.StructType

/**
* View description backed by a View in V2 catalog.
*
* @param view a view in V2 catalog
*/
case class V2ViewDescription(
override val identifier: String,
view: View) extends ViewDescription {

override val schema: StructType = view.schema

override val viewText: Option[String] = Option(view.query)

override val viewCatalogAndNamespace: Seq[String] =
view.currentCatalog +: view.currentNamespace.toSeq

override val viewQueryColumnNames: Seq[String] = view.schema.fieldNames

val query: String = view.query

val comment: Option[String] = Option(view.properties.get(ViewCatalog.PROP_COMMENT))

val owner: Option[String] = Option(view.properties.get(ViewCatalog.PROP_OWNER))

val createEngineVersion: Option[String] =
Option(view.properties.get(ViewCatalog.PROP_CREATE_ENGINE_VERSION))

val properties: Map[String, String] =
view.properties.asScala.toMap -- ViewCatalog.RESERVED_PROPERTIES.asScala
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, LeafExpression, Une
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
import org.apache.spark.sql.catalyst.trees.TreePattern.{TreePattern, UNRESOLVED_FUNC}
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, FunctionCatalog, Identifier, Table, TableCatalog}
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, FunctionCatalog, Identifier, Table, TableCatalog, View => V2View, ViewCatalog}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
Expand Down Expand Up @@ -218,6 +218,13 @@ case class ResolvedTempView(identifier: Identifier, viewSchema: StructType)
override def output: Seq[Attribute] = Nil
}

case class ResolvedV2View(
catalog: ViewCatalog,
identifier: Identifier,
view: V2View) extends LeafNodeWithoutStats {
override def output: Seq[Attribute] = Nil
}

/**
* A plan containing resolved persistent function.
*/
Expand Down
Loading