Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public void initialize(String name, Map<String, String> properties) {
closeableGroup.setSuppressCloseFailure(true);
}

// protected for testing
@Override
protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
return new InMemoryTableOperations(io, tableIdentifier);
Expand Down Expand Up @@ -326,6 +327,7 @@ public List<TableIdentifier> listViews(Namespace namespace) {
.collect(Collectors.toList());
}

// protected for testing
@Override
protected InMemoryViewOperations newViewOps(TableIdentifier identifier) {
return new InMemoryViewOperations(io, identifier);
Expand Down Expand Up @@ -368,12 +370,12 @@ public void renameView(TableIdentifier from, TableIdentifier to) {
}
}

private class InMemoryTableOperations extends BaseMetastoreTableOperations {
public class InMemoryTableOperations extends BaseMetastoreTableOperations {
private final FileIO fileIO;
private final TableIdentifier tableIdentifier;
private final String fullTableName;

InMemoryTableOperations(FileIO fileIO, TableIdentifier tableIdentifier) {
public InMemoryTableOperations(FileIO fileIO, TableIdentifier tableIdentifier) {
this.fileIO = fileIO;
this.tableIdentifier = tableIdentifier;
this.fullTableName = fullTableName(catalogName, tableIdentifier);
Expand Down Expand Up @@ -439,12 +441,12 @@ protected String tableName() {
}
}

private class InMemoryViewOperations extends BaseViewOperations {
public class InMemoryViewOperations extends BaseViewOperations {
private final FileIO io;
private final TableIdentifier identifier;
private final String fullViewName;

InMemoryViewOperations(FileIO io, TableIdentifier identifier) {
public InMemoryViewOperations(FileIO io, TableIdentifier identifier) {
this.io = io;
this.identifier = identifier;
this.fullViewName = ViewUtil.fullViewName(catalogName, identifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ object CheckViews extends (LogicalPlan => Unit) {
override def apply(plan: LogicalPlan): Unit = {
plan foreach {
case CreateIcebergView(ResolvedIdentifier(_: ViewCatalog, ident), _, query, columnAliases, _,
_, _, _, _, _, _) =>
_, _, _, _, _, _, _) =>
verifyColumnCount(ident, columnAliases, query)
SchemaUtils.checkColumnNameDuplication(query.schema.fieldNames, SQLConf.get.resolver)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look
.map(_ => ResolvedV2View(catalog.asViewCatalog, ident))
.getOrElse(u)

case c@CreateIcebergView(ResolvedIdentifier(_, _), _, query, columnAliases, columnComments, _, _, _, _, _, _)
case c@CreateIcebergView(ResolvedIdentifier(_, _), _, query, columnAliases, columnComments, _, _, _, _, _, _, _)
if query.resolved && !c.rewritten =>
val aliased = aliasColumns(query, columnAliases, columnComments)
c.copy(query = aliased, queryColumnNames = query.schema.fieldNames, rewritten = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ import org.apache.spark.sql.connector.catalog.ViewCatalog
* ResolveSessionCatalog exits early for some v2 View commands,
* thus they are pre-substituted here and then handled in ResolveViews
*/
case class RewriteViewCommands(spark: SparkSession) extends Rule[LogicalPlan] with LookupCatalog {
case class RewriteViewCommands(spark: SparkSession, materializedViewOptions: Option[MaterializedViewOptions])
extends Rule[LogicalPlan] with LookupCatalog {

protected lazy val catalogManager: CatalogManager = spark.sessionState.catalogManager

Expand All @@ -59,7 +60,8 @@ case class RewriteViewCommands(spark: SparkSession) extends Rule[LogicalPlan] wi
comment = comment,
properties = properties,
allowExisting = allowExisting,
replace = replace)
replace = replace,
materializedViewOptions = materializedViewOptions)
}

private def isTempView(nameParts: Seq[String]): Boolean = {
Expand Down Expand Up @@ -124,3 +126,4 @@ case class RewriteViewCommands(spark: SparkSession) extends Rule[LogicalPlan] wi
collectTempViews(child)
}
}
case class MaterializedViewOptions(storageTableIdentifier: Option[String])
Original file line number Diff line number Diff line change
Expand Up @@ -28,36 +28,33 @@ import org.antlr.v4.runtime.tree.TerminalNodeImpl
import org.apache.iceberg.common.DynConstructors
import org.apache.iceberg.spark.ExtendedParser
import org.apache.iceberg.spark.ExtendedParser.RawOrderField
import org.apache.iceberg.spark.Spark3Util
import org.apache.iceberg.spark.source.SparkTable
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.analysis.MaterializedViewOptions
import org.apache.spark.sql.catalyst.analysis.RewriteViewCommands
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.parser.extensions.IcebergSqlExtensionsParser.NonReservedContext
import org.apache.spark.sql.catalyst.parser.extensions.IcebergSqlExtensionsParser.QuotedIdentifierContext
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.trees.Origin
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.catalog.TableCatalog
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.VariableSubstitution
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.types.StructType
import scala.jdk.CollectionConverters._
import scala.util.Try

class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserInterface with ExtendedParser {

import IcebergSparkSqlExtensionsParser._

private lazy val substitutor = substitutorCtor.newInstance(SQLConf.get)
private lazy val astBuilder = new IcebergSqlExtensionsAstBuilder(delegate)
private lazy final val CREATE_MATERIALIZED_VIEW_PATTERN = "(?i)(CREATE)\\s+MATERIALIZED\\s+(VIEW)".r
private lazy final val MATERIALIZED_VIEW_STORED_AS_PATTERN = "(?i)STORED AS\\s*'(\\w+)'\\s*".r


/**
* Parse a string to a DataType.
Expand Down Expand Up @@ -122,8 +119,13 @@ class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserI
val sqlTextAfterSubstitution = substitutor.substitute(sqlText)
if (isIcebergCommand(sqlTextAfterSubstitution)) {
parse(sqlTextAfterSubstitution) { parser => astBuilder.visit(parser.singleStatement()) }.asInstanceOf[LogicalPlan]
} else if (isCreateMaterializedView(sqlText)) {
val createMaterializedViewStatement = getCreateMaterializedViewStatement(sqlText)
RewriteViewCommands(SparkSession.active, Option(getMaterializedViewOptions(sqlText))).apply(
delegate.parsePlan(getCreateMaterializedViewStatement(sqlText))
)
} else {
RewriteViewCommands(SparkSession.active).apply(delegate.parsePlan(sqlText))
RewriteViewCommands(SparkSession.active, None).apply(delegate.parsePlan(sqlText))
}
}

Expand Down Expand Up @@ -151,6 +153,22 @@ class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserI
isSnapshotRefDdl(normalized)))
}

private def isCreateMaterializedView(sqlText: String): Boolean = {
sqlText.toLowerCase.contains("create materialized view")
}

private def getCreateMaterializedViewStatement(sqlText: String): String = {
val replace1 = CREATE_MATERIALIZED_VIEW_PATTERN.replaceAllIn(sqlText, m => m.group(1) + " " + m.group(2))
MATERIALIZED_VIEW_STORED_AS_PATTERN.replaceAllIn(replace1, "")
}

private def getMaterializedViewOptions(sqlText: String): MaterializedViewOptions = {
val storedAsPattern = "(?i)STORED AS\\s*'(\\w+)'\\s*".r
val storageTableIdentifier = storedAsPattern.findFirstMatchIn(sqlText).map(_.group(1))
MaterializedViewOptions(storageTableIdentifier)
}


private def isSnapshotRefDdl(normalized: String): Boolean = {
normalized.contains("create branch") ||
normalized.contains("replace branch") ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.spark.sql.catalyst.plans.logical.views

import org.apache.spark.sql.catalyst.analysis.MaterializedViewOptions
import org.apache.spark.sql.catalyst.plans.logical.BinaryCommand
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

Expand All @@ -33,7 +34,8 @@ case class CreateIcebergView(
properties: Map[String, String],
allowExisting: Boolean,
replace: Boolean,
rewritten: Boolean = false) extends BinaryCommand {
rewritten: Boolean = false,
materializedViewOptions: Option[MaterializedViewOptions]) extends BinaryCommand {
override def left: LogicalPlan = child

override def right: LogicalPlan = query
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.execution.datasources.v2


import java.util.UUID
import org.apache.iceberg.relocated.com.google.common.base.Preconditions
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap
import org.apache.iceberg.spark.MaterializedViewUtil
import org.apache.iceberg.spark.Spark3Util
import org.apache.iceberg.spark.SparkCatalog
import org.apache.iceberg.spark.source.SparkTable
import org.apache.iceberg.spark.source.SparkView
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.catalog.TableChange
import org.apache.spark.sql.connector.catalog.View
import org.apache.spark.sql.connector.catalog.ViewCatalog
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.types.StructType
import scala.collection.JavaConverters._

case class CreateMaterializedViewExec(
catalog: ViewCatalog,
ident: Identifier,
queryText: String,
viewSchema: StructType,
columnAliases: Seq[String],
columnComments: Seq[Option[String]],
queryColumnNames: Seq[String],
comment: Option[String],
properties: Map[String, String],
allowExisting: Boolean,
replace: Boolean,
storageTableIdentifier: Option[String]) extends LeafV2CommandExec {

override def output: Seq[Attribute] = Nil

override protected def run(): Seq[InternalRow] = {

// Check if storageTableIdentifier is provided, if not, generate a default identifier
val sparkStorageTableIdentifier = storageTableIdentifier match {
case Some(identifier) => {
val catalogAndIdentifier = Spark3Util.catalogAndIdentifier(session, identifier)
val storageTableCatalogName = catalogAndIdentifier.catalog().name()
Preconditions.checkState(
storageTableCatalogName.equals(catalog.name()),
"Storage table identifier must be in the same catalog as the view." +
" Found storage table in catalog: %s, expected: %s",
Array[Object](storageTableCatalogName, catalog.name())
)
catalogAndIdentifier.identifier()
}
case None => MaterializedViewUtil.getDefaultMaterializedViewStorageTableIdentifier(ident)
}

val view = createView(sparkStorageTableIdentifier.toString)

view match {
case Some(v) => {
// TODO: Add support for partitioning the storage table
catalog.asInstanceOf[SparkCatalog].createTable(
sparkStorageTableIdentifier,
viewSchema, new Array[Transform](0), ImmutableMap.of[String, String]()
)

// Capture base table state before inserting into the storage table
val baseTables = MaterializedViewUtil.extractBaseTables(queryText).asScala.toList
val baseTableSnapshots = getBaseTableSnapshots(baseTables)
val baseTableSnapshotsProperties = baseTableSnapshots.map {
case (key, value) => (
MaterializedViewUtil.MATERIALIZED_VIEW_BASE_SNAPSHOT_PROPERTY_KEY_PREFIX + key.toString
) -> value.toString
}


val storageTableProperties = baseTableSnapshotsProperties +
(MaterializedViewUtil.MATERIALIZED_VIEW_VERSION_PROPERTY_KEY -> getViewVersion(v).toString)

// Insert into the storage table
session.sql("INSERT INTO " + sparkStorageTableIdentifier + " " + queryText)


// Update the storage table properties
val storageTablePropertyChanges = storageTableProperties.map {
case (key, value) => TableChange.setProperty(key, value)
}.toArray

catalog.asInstanceOf[SparkCatalog].alterTable(sparkStorageTableIdentifier, storageTablePropertyChanges: _*)
}
case None =>
}
Nil
}

override def simpleString(maxFields: Int): String = {
s"CreateMaterializedViewExec: ${ident}"
}

private def createView(storageTableIdentifier: String): Option[View] = {
val currentCatalogName = session.sessionState.catalogManager.currentCatalog.name
val currentCatalog = if (!catalog.name().equals(currentCatalogName)) currentCatalogName else null
val currentNamespace = session.sessionState.catalogManager.currentNamespace

val engineVersion = "Spark " + org.apache.spark.SPARK_VERSION
val newProperties = properties ++
comment.map(ViewCatalog.PROP_COMMENT -> _) +
(ViewCatalog.PROP_CREATE_ENGINE_VERSION -> engineVersion,
ViewCatalog.PROP_ENGINE_VERSION -> engineVersion) +
(MaterializedViewUtil.MATERIALIZED_VIEW_PROPERTY_KEY -> "true") +
(MaterializedViewUtil.MATERIALIZED_VIEW_STORAGE_TABLE_PROPERTY_KEY -> storageTableIdentifier)

if (replace) {
// CREATE OR REPLACE VIEW
if (catalog.viewExists(ident)) {
catalog.dropView(ident)
}
// FIXME: replaceView API doesn't exist in Spark 3.5
val view = catalog.createView(
ident,
queryText,
currentCatalog,
currentNamespace,
viewSchema,
queryColumnNames.toArray,
columnAliases.toArray,
columnComments.map(c => c.orNull).toArray,
newProperties.asJava)
Some(view)
} else {
try {
// CREATE VIEW [IF NOT EXISTS]
val view = catalog.createView(
ident,
queryText,
currentCatalog,
currentNamespace,
viewSchema,
queryColumnNames.toArray,
columnAliases.toArray,
columnComments.map(c => c.orNull).toArray,
newProperties.asJava)
Some(view)
} catch {
// TODO: Make sure the existing view is also a materialized view
case _: ViewAlreadyExistsException if allowExisting => None
}
}
}

private def getBaseTableSnapshots(baseTables: List[Table]): Map[UUID, Long] = {
baseTables.map {
case sparkTable: SparkTable =>
val snapshot = Option(sparkTable.table().currentSnapshot())
val snapshotId = snapshot.map(_.snapshotId().longValue()).getOrElse(0L)
(sparkTable.table().uuid(), snapshotId)
case _ =>
throw new UnsupportedOperationException("Only Spark tables are supported")
}.toMap
}

private def getViewVersion(view: View): Long = {
view match {
case sparkView: SparkView =>
sparkView.view().currentVersion().versionId()
case _ =>
throw new UnsupportedOperationException("Only Spark views are supported")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ case class CreateOrReplaceTagExec(
override lazy val output: Seq[Attribute] = Nil

override protected def run(): Seq[InternalRow] = {
catalog.loadTable(ident) match {
catalog
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant change

.loadTable(ident) match {
case iceberg: SparkTable =>
val snapshotId: java.lang.Long = tagOptions.snapshotId
.orElse(Option(iceberg.table.currentSnapshot()).map(_.snapshotId()))
Expand Down
Loading