Skip to content

Commit bab80bc

Browse files
committed
[SPARK-39800][SQL][WIP] DataSourceV2: View Support
1 parent ca3ea2e commit bab80bc

File tree

27 files changed

+1106
-48
lines changed

27 files changed

+1106
-48
lines changed

sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/ViewCatalog.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql.connector.catalog;
1919

20+
import java.util.Arrays;
21+
import java.util.List;
2022
import java.util.Map;
2123

2224
import org.apache.spark.annotation.Experimental;
@@ -31,6 +33,35 @@
3133
@Experimental
3234
public interface ViewCatalog extends CatalogPlugin {
3335

36+
/**
37+
* A reserved property to specify the description of the view.
38+
*/
39+
String PROP_COMMENT = "comment";
40+
41+
/**
42+
* A reserved property to specify the owner of the view.
43+
*/
44+
String PROP_OWNER = "owner";
45+
46+
/**
47+
* A reserved property to specify the software version used to create the view.
48+
*/
49+
String PROP_CREATE_ENGINE_VERSION = "create_engine_version";
50+
51+
/**
52+
* A reserved property to specify the software version used to change the view.
53+
*/
54+
String PROP_ENGINE_VERSION = "engine_version";
55+
56+
/**
57+
* All reserved properties of the view.
58+
*/
59+
List<String> RESERVED_PROPERTIES = Arrays.asList(
60+
PROP_COMMENT,
61+
PROP_OWNER,
62+
PROP_CREATE_ENGINE_VERSION,
63+
PROP_ENGINE_VERSION);
64+
3465
/**
3566
* List the views in a namespace from the catalog.
3667
* <p>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 106 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.expressions.SubExprUtils._
3535
import org.apache.spark.sql.catalyst.expressions.aggregate._
3636
import org.apache.spark.sql.catalyst.expressions.objects._
3737
import org.apache.spark.sql.catalyst.optimizer.OptimizeUpdateFields
38+
import org.apache.spark.sql.catalyst.parser.{ParseException, ParserInterface}
3839
import org.apache.spark.sql.catalyst.plans._
3940
import org.apache.spark.sql.catalyst.plans.logical._
4041
import org.apache.spark.sql.catalyst.rules._
@@ -44,7 +45,7 @@ import org.apache.spark.sql.catalyst.trees.CurrentOrigin.withOrigin
4445
import org.apache.spark.sql.catalyst.trees.TreePattern._
4546
import org.apache.spark.sql.catalyst.util.{toPrettySQL, CharVarcharUtils}
4647
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
47-
import org.apache.spark.sql.connector.catalog.{View => _, _}
48+
import org.apache.spark.sql.connector.catalog.{View => V2View, _}
4849
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
4950
import org.apache.spark.sql.connector.catalog.TableChange.{After, ColumnPosition}
5051
import org.apache.spark.sql.connector.catalog.functions.{AggregateFunction => V2AggregateFunction, BoundFunction, ScalarFunction, UnboundFunction}
@@ -238,6 +239,11 @@ class Analyzer(override val catalogManager: CatalogManager)
238239
errorOnExceed = true,
239240
maxIterationsSetting = SQLConf.ANALYZER_MAX_ITERATIONS.key)
240241

242+
/**
243+
* Override to provide additional rules for the "Substitution" batch.
244+
*/
245+
val extendedSubstitutionRules: Seq[Rule[LogicalPlan]] = Nil
246+
241247
/**
242248
* Override to provide additional rules for the "Resolution" batch.
243249
*/
@@ -262,11 +268,12 @@ class Analyzer(override val catalogManager: CatalogManager)
262268
// However, when manipulating deeply nested schema, `UpdateFields` expression tree could be
263269
// very complex and make analysis impossible. Thus we need to optimize `UpdateFields` early
264270
// at the beginning of analysis.
265-
OptimizeUpdateFields,
266-
CTESubstitution,
267-
WindowsSubstitution,
268-
EliminateUnions,
269-
SubstituteUnresolvedOrdinals),
271+
OptimizeUpdateFields +:
272+
CTESubstitution +:
273+
WindowsSubstitution +:
274+
EliminateUnions +:
275+
SubstituteUnresolvedOrdinals +:
276+
extendedSubstitutionRules : _*),
270277
Batch("Disable Hints", Once,
271278
new ResolveHints.DisableHints),
272279
Batch("Hints", fixedPoint,
@@ -447,6 +454,74 @@ class Analyzer(override val catalogManager: CatalogManager)
447454
}
448455
}
449456

457+
/**
458+
* Substitute persisted views in parsed plans with parsed view sql text.
459+
*/
460+
case class ViewSubstitution(sqlParser: ParserInterface) extends Rule[LogicalPlan] {
461+
462+
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
463+
case u @ UnresolvedRelation(nameParts, _, _) if v1SessionCatalog.isTempView(nameParts) =>
464+
u
465+
case u @ UnresolvedRelation(
466+
parts @ NonSessionCatalogAndIdentifier(catalog, ident), _, _) if !isSQLOnFile(parts) =>
467+
CatalogV2Util.loadView(catalog, ident)
468+
.map(createViewRelation(parts.quoted, _))
469+
.getOrElse(u)
470+
}
471+
472+
private def isSQLOnFile(parts: Seq[String]): Boolean = parts match {
473+
case Seq(_, path) if path.contains("/") => true
474+
case _ => false
475+
}
476+
477+
private def createViewRelation(name: String, view: V2View): LogicalPlan = {
478+
if (!catalogManager.isCatalogRegistered(view.currentCatalog)) {
479+
throw new AnalysisException(
480+
s"Invalid current catalog '${view.currentCatalog}' in view '$name'")
481+
}
482+
483+
val child = parseViewText(name, view.sql)
484+
val desc = V2ViewDescription(name, view)
485+
val qualifiedChild = desc.viewCatalogAndNamespace match {
486+
case Seq() =>
487+
// Views from Spark 2.2 or prior do not store catalog or namespace,
488+
// however its sql text should already be fully qualified.
489+
child
490+
case catalogAndNamespace =>
491+
// Substitute CTEs within the view before qualifying table identifiers
492+
qualifyTableIdentifiers(CTESubstitution.apply(child), catalogAndNamespace)
493+
}
494+
495+
// The relation is a view, so we wrap the relation by:
496+
// 1. Add a [[View]] operator over the relation to keep track of the view desc;
497+
// 2. Wrap the logical plan in a [[SubqueryAlias]] which tracks the name of the view.
498+
SubqueryAlias(name, View(desc, false, qualifiedChild))
499+
}
500+
501+
private def parseViewText(name: String, viewText: String): LogicalPlan = {
502+
try {
503+
sqlParser.parsePlan(viewText)
504+
} catch {
505+
case _: ParseException =>
506+
throw QueryCompilationErrors.invalidViewText(viewText, name)
507+
}
508+
}
509+
510+
/**
511+
* Qualify table identifiers with default catalog and namespace if necessary.
512+
*/
513+
private def qualifyTableIdentifiers(
514+
child: LogicalPlan,
515+
catalogAndNamespace: Seq[String]): LogicalPlan =
516+
child transform {
517+
case u @ UnresolvedRelation(Seq(table), _, _) =>
518+
u.copy(multipartIdentifier = catalogAndNamespace :+ table)
519+
case u @ UnresolvedRelation(parts, _, _)
520+
if !catalogManager.isCatalogRegistered(parts.head) =>
521+
u.copy(multipartIdentifier = catalogAndNamespace.head +: parts)
522+
}
523+
}
524+
450525
/**
451526
* Substitute child plan with WindowSpecDefinitions.
452527
*/
@@ -1003,7 +1078,7 @@ class Analyzer(override val catalogManager: CatalogManager)
10031078
// The view's child should be a logical plan parsed from the `desc.viewText`, the variable
10041079
// `viewText` should be defined, or else we throw an error on the generation of the View
10051080
// operator.
1006-
case view @ View(desc, isTempView, child) if !child.resolved =>
1081+
case view @ View(CatalogTableViewDescription(desc), isTempView, child) if !child.resolved =>
10071082
// Resolve all the UnresolvedRelations and Views in the child.
10081083
val newChild = AnalysisContext.withAnalysisContext(desc) {
10091084
val nestedViewDepth = AnalysisContext.get.nestedViewDepth
@@ -1055,8 +1130,9 @@ class Analyzer(override val catalogManager: CatalogManager)
10551130
write.table match {
10561131
case u: UnresolvedRelation if !u.isStreaming =>
10571132
lookupRelation(u).map(unwrapRelationPlan).map {
1058-
case v: View => throw QueryCompilationErrors.writeIntoViewNotAllowedError(
1059-
v.desc.identifier, write)
1133+
case View(CatalogTableViewDescription(desc), _, _) =>
1134+
throw QueryCompilationErrors.writeIntoViewNotAllowedError(
1135+
desc.identifier, write)
10601136
case r: DataSourceV2Relation => write.withNewTable(r)
10611137
case u: UnresolvedCatalogRelation =>
10621138
throw QueryCompilationErrors.writeIntoV1TableNotAllowedError(
@@ -1139,20 +1215,32 @@ class Analyzer(override val catalogManager: CatalogManager)
11391215
}.orElse {
11401216
expandIdentifier(identifier) match {
11411217
case CatalogAndIdentifier(catalog, ident) =>
1142-
CatalogV2Util.loadTable(catalog, ident).map {
1143-
case v1Table: V1Table if CatalogV2Util.isSessionCatalog(catalog) &&
1144-
v1Table.v1Table.tableType == CatalogTableType.VIEW =>
1145-
val v1Ident = v1Table.catalogTable.identifier
1146-
val v2Ident = Identifier.of(v1Ident.database.toArray, v1Ident.identifier)
1147-
ResolvedView(v2Ident, isTemp = false)
1148-
case table =>
1149-
ResolvedTable.create(catalog.asTableCatalog, ident, table)
1150-
}
1218+
lookupView(catalog, ident)
1219+
.orElse(lookupTable(catalog, ident))
11511220
case _ => None
11521221
}
11531222
}
11541223
}
11551224

1225+
private def lookupView(catalog: CatalogPlugin, ident: Identifier): Option[LogicalPlan] =
1226+
CatalogV2Util.loadView(catalog, ident).map {
1227+
case _ if CatalogV2Util.isSessionCatalog(catalog) =>
1228+
ResolvedView(ident, isTemp = false)
1229+
case view =>
1230+
ResolvedV2View(catalog.asViewCatalog, ident, view)
1231+
}
1232+
1233+
private def lookupTable(catalog: CatalogPlugin, ident: Identifier): Option[LogicalPlan] =
1234+
CatalogV2Util.loadTable(catalog, ident).map {
1235+
case v1Table: V1Table if CatalogV2Util.isSessionCatalog(catalog) &&
1236+
v1Table.v1Table.tableType == CatalogTableType.VIEW =>
1237+
val v1Ident = v1Table.catalogTable.identifier
1238+
val v2Ident = Identifier.of(v1Ident.database.toArray, v1Ident.identifier)
1239+
ResolvedView(v2Ident, isTemp = false)
1240+
case table =>
1241+
ResolvedTable.create(catalog.asTableCatalog, ident, table)
1242+
}
1243+
11561244
private def createRelation(
11571245
catalog: CatalogPlugin,
11581246
ident: Identifier,

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -131,12 +131,6 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
131131
case u: UnresolvedTable =>
132132
u.failAnalysis(s"Table not found: ${u.multipartIdentifier.quoted}")
133133

134-
case u @ UnresolvedView(NonSessionCatalogAndIdentifier(catalog, ident), cmd, _, _) =>
135-
u.failAnalysis(
136-
s"Cannot specify catalog `${catalog.name}` for view ${ident.quoted} " +
137-
"because view support in v2 catalog has not been implemented yet. " +
138-
s"$cmd expects a view.")
139-
140134
case u: UnresolvedView =>
141135
u.failAnalysis(s"View not found: ${u.multipartIdentifier.quoted}")
142136

@@ -521,6 +515,44 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
521515
case alter: AlterTableCommand =>
522516
checkAlterTableCommand(alter)
523517

518+
case c: CreateView =>
519+
if (c.originalText.isEmpty) {
520+
throw new AnalysisException(
521+
"'originalText' must be provided to create permanent view")
522+
}
523+
524+
if (c.allowExisting && c.replace) {
525+
throw new AnalysisException(
526+
"CREATE VIEW with both IF NOT EXISTS and REPLACE is not allowed.")
527+
}
528+
529+
// If the view output doesn't have the same number of columns neither with the child
530+
// output, nor with the query column names, throw an AnalysisException.
531+
// If the view's child output can't up cast to the view output,
532+
// throw an AnalysisException, too.
533+
case v @ View(desc, _, child) if child.resolved && v.output != child.output =>
534+
val queryColumnNames = desc.viewQueryColumnNames
535+
val queryOutput = if (queryColumnNames.nonEmpty) {
536+
if (v.output.length != queryColumnNames.length) {
537+
// If the view output doesn't have the same number of columns with the query column
538+
// names, throw an AnalysisException.
539+
throw new AnalysisException(
540+
s"The view output ${v.output.mkString("[", ",", "]")} doesn't have the same" +
541+
"number of columns with the query column names " +
542+
s"${queryColumnNames.mkString("[", ",", "]")}")
543+
}
544+
val resolver = SQLConf.get.resolver
545+
queryColumnNames.map { colName =>
546+
child.output.find { attr =>
547+
resolver(attr.name, colName)
548+
}.getOrElse(throw new AnalysisException(
549+
s"Attribute with name '$colName' is not found in " +
550+
s"'${child.output.map(_.name).mkString("(", ",", ")")}'"))
551+
}
552+
} else {
553+
child.output
554+
}
555+
524556
case _ => // Falls back to the following checks
525557
}
526558

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,17 @@ package org.apache.spark.sql.catalyst.analysis
1919

2020
import org.apache.spark.sql.catalyst.plans.logical._
2121
import org.apache.spark.sql.catalyst.rules.Rule
22-
import org.apache.spark.sql.connector.catalog.{CatalogManager, LookupCatalog}
22+
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, LookupCatalog, ViewChange}
23+
import org.apache.spark.sql.errors.QueryCompilationErrors
2324

2425
/**
2526
* Resolves the catalog of the name parts for table/view/function/namespace.
2627
*/
2728
class ResolveCatalogs(val catalogManager: CatalogManager)
2829
extends Rule[LogicalPlan] with LookupCatalog {
2930

31+
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
32+
3033
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
3134
case UnresolvedIdentifier(CatalogAndIdentifier(catalog, identifier)) =>
3235
ResolvedIdentifier(catalog, identifier)
@@ -44,5 +47,52 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
4447
ResolvedNamespace(currentCatalog, Seq.empty[String])
4548
case UnresolvedNamespace(CatalogAndNamespace(catalog, ns)) =>
4649
ResolvedNamespace(catalog, ns)
50+
51+
case DescribeRelation(ResolvedV2View(_, ident, view), _, isExtended, _) =>
52+
DescribeV2View(V2ViewDescription(ident.quoted, view), isExtended)
53+
54+
case ShowCreateTable(ResolvedV2View(_, ident, view), _, _) =>
55+
ShowCreateV2View(V2ViewDescription(ident.quoted, view))
56+
57+
case ShowTableProperties(ResolvedV2View(_, ident, view), propertyKeys, _) =>
58+
ShowV2ViewProperties(V2ViewDescription(ident.quoted, view), propertyKeys)
59+
60+
case SetViewProperties(ResolvedV2View(catalog, ident, _), props) =>
61+
val changes = props.map {
62+
case (property, value) => ViewChange.setProperty(property, value)
63+
}.toSeq
64+
AlterV2View(catalog, ident, changes)
65+
66+
case UnsetViewProperties(ResolvedV2View(catalog, ident, _), propertyKeys, ifExists) =>
67+
if (!ifExists) {
68+
val view = catalog.loadView(ident)
69+
propertyKeys.filterNot(view.properties.containsKey).foreach { property =>
70+
QueryCompilationErrors.cannotUnsetNonExistentViewProperty(ident, property)
71+
}
72+
}
73+
val changes = propertyKeys.map(ViewChange.removeProperty)
74+
AlterV2View(catalog, ident, changes)
75+
76+
case RenameTable(ResolvedV2View(oldCatalog, oldIdent, _),
77+
NonSessionCatalogAndIdentifier(newCatalog, newIdent), true) =>
78+
if (oldCatalog.name != newCatalog.name) {
79+
QueryCompilationErrors.cannotMoveViewBetweenCatalogs(
80+
oldCatalog.name, newCatalog.name)
81+
}
82+
RenameV2View(oldCatalog, oldIdent, newIdent)
83+
84+
case RefreshTable(ResolvedV2View(catalog, ident, _)) =>
85+
RefreshView(catalog, ident)
86+
87+
case DropView(ResolvedV2View(catalog, ident, _), ifExists) =>
88+
DropV2View(catalog, ident, ifExists)
89+
}
90+
91+
object NonSessionCatalogAndTable {
92+
def unapply(nameParts: Seq[String]): Option[(CatalogPlugin, Seq[String])] = nameParts match {
93+
case NonSessionCatalogAndIdentifier(catalog, ident) =>
94+
Some(catalog -> ident.asMultipartIdentifier)
95+
case _ => None
96+
}
4797
}
4898
}

0 commit comments

Comments
 (0)