diff --git a/core/src/main/java/org/apache/iceberg/inmemory/InMemoryCatalog.java b/core/src/main/java/org/apache/iceberg/inmemory/InMemoryCatalog.java index a880f94f4385..d397b9fdbd7e 100644 --- a/core/src/main/java/org/apache/iceberg/inmemory/InMemoryCatalog.java +++ b/core/src/main/java/org/apache/iceberg/inmemory/InMemoryCatalog.java @@ -94,6 +94,7 @@ public void initialize(String name, Map properties) { closeableGroup.setSuppressCloseFailure(true); } + // protected for testing @Override protected TableOperations newTableOps(TableIdentifier tableIdentifier) { return new InMemoryTableOperations(io, tableIdentifier); @@ -326,6 +327,7 @@ public List listViews(Namespace namespace) { .collect(Collectors.toList()); } + // protected for testing @Override protected InMemoryViewOperations newViewOps(TableIdentifier identifier) { return new InMemoryViewOperations(io, identifier); @@ -368,12 +370,12 @@ public void renameView(TableIdentifier from, TableIdentifier to) { } } - private class InMemoryTableOperations extends BaseMetastoreTableOperations { + public class InMemoryTableOperations extends BaseMetastoreTableOperations { private final FileIO fileIO; private final TableIdentifier tableIdentifier; private final String fullTableName; - InMemoryTableOperations(FileIO fileIO, TableIdentifier tableIdentifier) { + public InMemoryTableOperations(FileIO fileIO, TableIdentifier tableIdentifier) { this.fileIO = fileIO; this.tableIdentifier = tableIdentifier; this.fullTableName = fullTableName(catalogName, tableIdentifier); @@ -439,12 +441,12 @@ protected String tableName() { } } - private class InMemoryViewOperations extends BaseViewOperations { + public class InMemoryViewOperations extends BaseViewOperations { private final FileIO io; private final TableIdentifier identifier; private final String fullViewName; - InMemoryViewOperations(FileIO io, TableIdentifier identifier) { + public InMemoryViewOperations(FileIO io, TableIdentifier identifier) { this.io = io; this.identifier = identifier; this.fullViewName = ViewUtil.fullViewName(catalogName, identifier); diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala index 4a1736764d0d..0c0f071132f2 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala @@ -32,7 +32,7 @@ object CheckViews extends (LogicalPlan => Unit) { override def apply(plan: LogicalPlan): Unit = { plan foreach { case CreateIcebergView(ResolvedIdentifier(_: ViewCatalog, ident), _, query, columnAliases, _, - _, _, _, _, _, _) => + _, _, _, _, _, _, _) => verifyColumnCount(ident, columnAliases, query) SchemaUtils.checkColumnNameDuplication(query.schema.fieldNames, SQLConf.get.resolver) diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala index 46a15331a164..cf95b04fbf04 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala @@ -63,7 +63,7 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look .map(_ => ResolvedV2View(catalog.asViewCatalog, ident)) .getOrElse(u) - case c@CreateIcebergView(ResolvedIdentifier(_, _), _, query, columnAliases, columnComments, _, _, _, _, _, _) + case c@CreateIcebergView(ResolvedIdentifier(_, _), _, query, columnAliases, columnComments, _, _, _, _, _, _, _) if query.resolved && !c.rewritten => val aliased = aliasColumns(query, columnAliases, columnComments) c.copy(query = aliased, queryColumnNames = query.schema.fieldNames, rewritten = true) diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala index 066ba59394d7..adeff7f32640 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala @@ -39,7 +39,8 @@ import org.apache.spark.sql.connector.catalog.ViewCatalog * ResolveSessionCatalog exits early for some v2 View commands, * thus they are pre-substituted here and then handled in ResolveViews */ -case class RewriteViewCommands(spark: SparkSession) extends Rule[LogicalPlan] with LookupCatalog { +case class RewriteViewCommands(spark: SparkSession, materializedViewOptions: Option[MaterializedViewOptions]) + extends Rule[LogicalPlan] with LookupCatalog { protected lazy val catalogManager: CatalogManager = spark.sessionState.catalogManager @@ -59,7 +60,8 @@ case class RewriteViewCommands(spark: SparkSession) extends Rule[LogicalPlan] wi comment = comment, properties = properties, allowExisting = allowExisting, - replace = replace) + replace = replace, + materializedViewOptions = materializedViewOptions) } private def isTempView(nameParts: Seq[String]): Boolean = { @@ -124,3 +126,4 @@ case class RewriteViewCommands(spark: SparkSession) extends Rule[LogicalPlan] wi collectTempViews(child) } } +case class MaterializedViewOptions(storageTableIdentifier: Option[String]) diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala index 02bd59366c13..5b04280863a8 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala @@ -28,29 +28,23 @@ import org.antlr.v4.runtime.tree.TerminalNodeImpl import org.apache.iceberg.common.DynConstructors import org.apache.iceberg.spark.ExtendedParser import org.apache.iceberg.spark.ExtendedParser.RawOrderField -import org.apache.iceberg.spark.Spark3Util -import org.apache.iceberg.spark.source.SparkTable import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.analysis.MaterializedViewOptions import org.apache.spark.sql.catalyst.analysis.RewriteViewCommands -import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.parser.extensions.IcebergSqlExtensionsParser.NonReservedContext import org.apache.spark.sql.catalyst.parser.extensions.IcebergSqlExtensionsParser.QuotedIdentifierContext import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.trees.Origin -import org.apache.spark.sql.connector.catalog.Table -import org.apache.spark.sql.connector.catalog.TableCatalog import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.VariableSubstitution import org.apache.spark.sql.types.DataType import org.apache.spark.sql.types.StructType import scala.jdk.CollectionConverters._ -import scala.util.Try class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserInterface with ExtendedParser { @@ -58,6 +52,9 @@ class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserI private lazy val substitutor = substitutorCtor.newInstance(SQLConf.get) private lazy val astBuilder = new IcebergSqlExtensionsAstBuilder(delegate) + private lazy final val CREATE_MATERIALIZED_VIEW_PATTERN = "(?i)(CREATE)\\s+MATERIALIZED\\s+(VIEW)".r + private lazy final val MATERIALIZED_VIEW_STORED_AS_PATTERN = "(?i)STORED AS\\s*'(\\w+)'\\s*".r + /** * Parse a string to a DataType. @@ -122,8 +119,13 @@ class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserI val sqlTextAfterSubstitution = substitutor.substitute(sqlText) if (isIcebergCommand(sqlTextAfterSubstitution)) { parse(sqlTextAfterSubstitution) { parser => astBuilder.visit(parser.singleStatement()) }.asInstanceOf[LogicalPlan] + } else if (isCreateMaterializedView(sqlText)) { + val createMaterializedViewStatement = getCreateMaterializedViewStatement(sqlText) + RewriteViewCommands(SparkSession.active, Option(getMaterializedViewOptions(sqlText))).apply( + delegate.parsePlan(getCreateMaterializedViewStatement(sqlText)) + ) } else { - RewriteViewCommands(SparkSession.active).apply(delegate.parsePlan(sqlText)) + RewriteViewCommands(SparkSession.active, None).apply(delegate.parsePlan(sqlText)) } } @@ -151,6 +153,22 @@ class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserI isSnapshotRefDdl(normalized))) } + private def isCreateMaterializedView(sqlText: String): Boolean = { + sqlText.toLowerCase.contains("create materialized view") + } + + private def getCreateMaterializedViewStatement(sqlText: String): String = { + val replace1 = CREATE_MATERIALIZED_VIEW_PATTERN.replaceAllIn(sqlText, m => m.group(1) + " " + m.group(2)) + MATERIALIZED_VIEW_STORED_AS_PATTERN.replaceAllIn(replace1, "") + } + + private def getMaterializedViewOptions(sqlText: String): MaterializedViewOptions = { + val storedAsPattern = "(?i)STORED AS\\s*'(\\w+)'\\s*".r + val storageTableIdentifier = storedAsPattern.findFirstMatchIn(sqlText).map(_.group(1)) + MaterializedViewOptions(storageTableIdentifier) + } + + private def isSnapshotRefDdl(normalized: String): Boolean = { normalized.contains("create branch") || normalized.contains("replace branch") || diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateIcebergView.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateIcebergView.scala index 9366d5efe163..a4134476cbd8 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateIcebergView.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateIcebergView.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.plans.logical.views +import org.apache.spark.sql.catalyst.analysis.MaterializedViewOptions import org.apache.spark.sql.catalyst.plans.logical.BinaryCommand import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -33,7 +34,8 @@ case class CreateIcebergView( properties: Map[String, String], allowExisting: Boolean, replace: Boolean, - rewritten: Boolean = false) extends BinaryCommand { + rewritten: Boolean = false, + materializedViewOptions: Option[MaterializedViewOptions]) extends BinaryCommand { override def left: LogicalPlan = child override def right: LogicalPlan = query diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateMaterializedViewExec.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateMaterializedViewExec.scala new file mode 100644 index 000000000000..9b72a9d4e99b --- /dev/null +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateMaterializedViewExec.scala @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + + +import java.util.UUID +import org.apache.iceberg.relocated.com.google.common.base.Preconditions +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap +import org.apache.iceberg.spark.MaterializedViewUtil +import org.apache.iceberg.spark.Spark3Util +import org.apache.iceberg.spark.SparkCatalog +import org.apache.iceberg.spark.source.SparkTable +import org.apache.iceberg.spark.source.SparkView +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.connector.catalog.Table +import org.apache.spark.sql.connector.catalog.TableChange +import org.apache.spark.sql.connector.catalog.View +import org.apache.spark.sql.connector.catalog.ViewCatalog +import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.types.StructType +import scala.collection.JavaConverters._ + +case class CreateMaterializedViewExec( + catalog: ViewCatalog, + ident: Identifier, + queryText: String, + viewSchema: StructType, + columnAliases: Seq[String], + columnComments: Seq[Option[String]], + queryColumnNames: Seq[String], + comment: Option[String], + properties: Map[String, String], + allowExisting: Boolean, + replace: Boolean, + storageTableIdentifier: Option[String]) extends LeafV2CommandExec { + + override def output: Seq[Attribute] = Nil + + override protected def run(): Seq[InternalRow] = { + + // Check if storageTableIdentifier is provided, if not, generate a default identifier + val sparkStorageTableIdentifier = storageTableIdentifier match { + case Some(identifier) => { + val catalogAndIdentifier = Spark3Util.catalogAndIdentifier(session, identifier) + val storageTableCatalogName = catalogAndIdentifier.catalog().name() + Preconditions.checkState( + storageTableCatalogName.equals(catalog.name()), + "Storage table identifier must be in the same catalog as the view." + + " Found storage table in catalog: %s, expected: %s", + Array[Object](storageTableCatalogName, catalog.name()) + ) + catalogAndIdentifier.identifier() + } + case None => MaterializedViewUtil.getDefaultMaterializedViewStorageTableIdentifier(ident) + } + + val view = createView(sparkStorageTableIdentifier.toString) + + view match { + case Some(v) => { + // TODO: Add support for partitioning the storage table + catalog.asInstanceOf[SparkCatalog].createTable( + sparkStorageTableIdentifier, + viewSchema, new Array[Transform](0), ImmutableMap.of[String, String]() + ) + + // Capture base table state before inserting into the storage table + val baseTables = MaterializedViewUtil.extractBaseTables(queryText).asScala.toList + val baseTableSnapshots = getBaseTableSnapshots(baseTables) + val baseTableSnapshotsProperties = baseTableSnapshots.map { + case (key, value) => ( + MaterializedViewUtil.MATERIALIZED_VIEW_BASE_SNAPSHOT_PROPERTY_KEY_PREFIX + key.toString + ) -> value.toString + } + + + val storageTableProperties = baseTableSnapshotsProperties + + (MaterializedViewUtil.MATERIALIZED_VIEW_VERSION_PROPERTY_KEY -> getViewVersion(v).toString) + + // Insert into the storage table + session.sql("INSERT INTO " + sparkStorageTableIdentifier + " " + queryText) + + + // Update the storage table properties + val storageTablePropertyChanges = storageTableProperties.map { + case (key, value) => TableChange.setProperty(key, value) + }.toArray + + catalog.asInstanceOf[SparkCatalog].alterTable(sparkStorageTableIdentifier, storageTablePropertyChanges: _*) + } + case None => + } + Nil + } + + override def simpleString(maxFields: Int): String = { + s"CreateMaterializedViewExec: ${ident}" + } + + private def createView(storageTableIdentifier: String): Option[View] = { + val currentCatalogName = session.sessionState.catalogManager.currentCatalog.name + val currentCatalog = if (!catalog.name().equals(currentCatalogName)) currentCatalogName else null + val currentNamespace = session.sessionState.catalogManager.currentNamespace + + val engineVersion = "Spark " + org.apache.spark.SPARK_VERSION + val newProperties = properties ++ + comment.map(ViewCatalog.PROP_COMMENT -> _) + + (ViewCatalog.PROP_CREATE_ENGINE_VERSION -> engineVersion, + ViewCatalog.PROP_ENGINE_VERSION -> engineVersion) + + (MaterializedViewUtil.MATERIALIZED_VIEW_PROPERTY_KEY -> "true") + + (MaterializedViewUtil.MATERIALIZED_VIEW_STORAGE_TABLE_PROPERTY_KEY -> storageTableIdentifier) + + if (replace) { + // CREATE OR REPLACE VIEW + if (catalog.viewExists(ident)) { + catalog.dropView(ident) + } + // FIXME: replaceView API doesn't exist in Spark 3.5 + val view = catalog.createView( + ident, + queryText, + currentCatalog, + currentNamespace, + viewSchema, + queryColumnNames.toArray, + columnAliases.toArray, + columnComments.map(c => c.orNull).toArray, + newProperties.asJava) + Some(view) + } else { + try { + // CREATE VIEW [IF NOT EXISTS] + val view = catalog.createView( + ident, + queryText, + currentCatalog, + currentNamespace, + viewSchema, + queryColumnNames.toArray, + columnAliases.toArray, + columnComments.map(c => c.orNull).toArray, + newProperties.asJava) + Some(view) + } catch { + // TODO: Make sure the existing view is also a materialized view + case _: ViewAlreadyExistsException if allowExisting => None + } + } + } + + private def getBaseTableSnapshots(baseTables: List[Table]): Map[UUID, Long] = { + baseTables.map { + case sparkTable: SparkTable => + val snapshot = Option(sparkTable.table().currentSnapshot()) + val snapshotId = snapshot.map(_.snapshotId().longValue()).getOrElse(0L) + (sparkTable.table().uuid(), snapshotId) + case _ => + throw new UnsupportedOperationException("Only Spark tables are supported") + }.toMap + } + + private def getViewVersion(view: View): Long = { + view match { + case sparkView: SparkView => + sparkView.view().currentVersion().versionId() + case _ => + throw new UnsupportedOperationException("Only Spark views are supported") + } + } +} diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceTagExec.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceTagExec.scala index 372cd7548632..e7c35f8a771d 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceTagExec.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceTagExec.scala @@ -40,7 +40,8 @@ case class CreateOrReplaceTagExec( override lazy val output: Seq[Attribute] = Nil override protected def run(): Seq[InternalRow] = { - catalog.loadTable(ident) match { + catalog + .loadTable(ident) match { case iceberg: SparkTable => val snapshotId: java.lang.Long = tagOptions.snapshotId .orElse(Option(iceberg.table.currentSnapshot()).map(_.snapshotId())) diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropV2ViewExec.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropV2ViewExec.scala index c35af1486fc7..6242c6b874bf 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropV2ViewExec.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropV2ViewExec.scala @@ -19,6 +19,14 @@ package org.apache.spark.sql.execution.datasources.v2 +import org.apache.iceberg.catalog.Namespace +import org.apache.iceberg.catalog.TableIdentifier +import org.apache.iceberg.exceptions +import org.apache.iceberg.spark.MaterializedViewUtil +import org.apache.iceberg.spark.Spark3Util +import org.apache.iceberg.spark.SparkCatalog +import org.apache.iceberg.view.View +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.NoSuchViewException import org.apache.spark.sql.catalyst.expressions.Attribute @@ -34,6 +42,38 @@ case class DropV2ViewExec( override lazy val output: Seq[Attribute] = Nil override protected def run(): Seq[InternalRow] = { + val icebergCatalog = catalog.asInstanceOf[SparkCatalog].icebergCatalog() + val icebergViewCatalog = icebergCatalog.asInstanceOf[org.apache.iceberg.catalog.ViewCatalog] + var view: Option[View] = None + try { + view = Some(icebergViewCatalog.loadView(TableIdentifier.of(Namespace.of(ident.namespace(): _*), ident.name()))) + } catch { + case e: exceptions.NoSuchViewException => { + if (!ifExists) { + throw new NoSuchViewException(ident) + } + } + } + // if view is not null read the properties and check if it is a materialized view + view match { + case Some(v) => { + val viewProperties = v.properties(); + if (Option( + viewProperties.get(MaterializedViewUtil.MATERIALIZED_VIEW_PROPERTY_KEY + )).getOrElse("false").equals("true")) { + // get the storage table location then drop the storage table + val storageTableLocation = viewProperties.get( + MaterializedViewUtil.MATERIALIZED_VIEW_STORAGE_TABLE_PROPERTY_KEY + ) + val storageTableIdentifier = Spark3Util.catalogAndIdentifier( + SparkSession.active, storageTableLocation).identifier() + // get active spark session + catalog.asInstanceOf[SparkCatalog].dropTable(storageTableIdentifier) + } + } + case _ => + } + val dropped = catalog.dropView(ident) if (!dropped && !ifExists) { throw new NoSuchViewException(ident) diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala index 0505fe4e3030..388ed5a91948 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala @@ -108,8 +108,25 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi case DropIcebergView(ResolvedIdentifier(viewCatalog: ViewCatalog, ident), ifExists) => DropV2ViewExec(viewCatalog, ident, ifExists) :: Nil + case CreateIcebergView(ResolvedIdentifier(viewCatalog: ViewCatalog, ident), + queryText, query, columnAliases, columnComments, queryColumnNames, comment, properties, allowExisting, replace, _, + Some(materializedViewOptions)) => + CreateMaterializedViewExec( + catalog = viewCatalog, + ident = ident, + queryText = queryText, + columnAliases = columnAliases, + columnComments = columnComments, + queryColumnNames = queryColumnNames, + viewSchema = query.schema, + comment = comment, + properties = properties, + allowExisting = allowExisting, + replace = replace, + storageTableIdentifier = materializedViewOptions.storageTableIdentifier) :: Nil + case CreateIcebergView(ResolvedIdentifier(viewCatalog: ViewCatalog, ident), queryText, query, - columnAliases, columnComments, queryColumnNames, comment, properties, allowExisting, replace, _) => + columnAliases, columnComments, queryColumnNames, comment, properties, allowExisting, replace, _, None) => CreateV2ViewExec( catalog = viewCatalog, ident = ident, diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMaterializedViews.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMaterializedViews.java new file mode 100644 index 000000000000..f71fbce9a35c --- /dev/null +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMaterializedViews.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.extensions; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assertions.fail; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.Map; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.inmemory.InMemoryCatalog; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.spark.MaterializedViewUtil; +import org.apache.iceberg.spark.SparkCatalogConfig; +import org.apache.iceberg.spark.source.SparkMaterializedView; +import org.apache.iceberg.spark.source.SparkView; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.analysis.NoSuchViewException; +import org.apache.spark.sql.connector.catalog.CatalogPlugin; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.connector.catalog.ViewCatalog; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runners.Parameterized; + +public class TestMaterializedViews extends SparkExtensionsTestBase { + private static final Namespace NAMESPACE = Namespace.of("default"); + private final String tableName = "table"; + private final String materializedViewName = "materialized_view"; + + @Before + public void before() { + spark.conf().set("spark.sql.defaultCatalog", catalogName); + sql("USE %s", catalogName); + sql("CREATE NAMESPACE IF NOT EXISTS %s", NAMESPACE); + sql("CREATE TABLE %s (id INT, data STRING)", tableName); + } + + @After + public void removeTable() { + sql("USE %s", catalogName); + sql("DROP VIEW IF EXISTS %s", materializedViewName); + sql("DROP TABLE IF EXISTS %s", tableName); + } + + @Parameterized.Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}") + public static Object[][] parameters() { + Map properties = + Maps.newHashMap(SparkCatalogConfig.SPARK_WITH_MATERIALIZED_VIEWS.properties()); + properties.put(CatalogProperties.WAREHOUSE_LOCATION, "file:" + getTempWarehouseDir()); + properties.put(CatalogProperties.CATALOG_IMPL, InMemoryCatalogWithLocalFileIO.class.getName()); + return new Object[][] { + { + SparkCatalogConfig.SPARK_WITH_MATERIALIZED_VIEWS.catalogName(), + SparkCatalogConfig.SPARK_WITH_MATERIALIZED_VIEWS.implementation(), + properties + } + }; + } + + private static String getTempWarehouseDir() { + try { + File tempDir = Files.createTempDirectory("warehouse-").toFile(); + tempDir.delete(); + return tempDir.getAbsolutePath(); + + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public TestMaterializedViews( + String catalog, String implementation, Map properties) { + super(catalog, implementation, properties); + } + + @Test + public void assertReadFromStorageTableWhenFresh() throws IOException { + sql("DROP VIEW IF EXISTS %s", materializedViewName); + sql("CREATE MATERIALIZED VIEW %s AS SELECT id, data FROM %s", materializedViewName, tableName); + + // Assert that number of records in the materialized view is the same as the number of records + // in the table + assertThat(sql("SELECT * FROM %s", materializedViewName).size()) + .isEqualTo(sql("SELECT * FROM %s", tableName).size()); + + // Assert that the catalog loadView method throws IllegalStateException because the view is + // fresh + assertThatThrownBy(() -> sparkViewCatalog().loadView(viewIdentifier())) + .isInstanceOf(IllegalStateException.class); + + // Assert that the catalog loadTable method returns an object, and its type is + // SparkMaterializedView + try { + assertThat(sparkTableCatalog().loadTable(viewIdentifier())) + .isInstanceOf(SparkMaterializedView.class); + } catch (NoSuchTableException e) { + fail("Materialized view storage table not found"); + } + } + + @Test + public void assertNotReadFromStorageTableWhenStale() throws IOException { + sql("CREATE MATERIALIZED VIEW %s AS SELECT id, data FROM %s", materializedViewName, tableName); + + // Insert one row to the table so the materialized view becomes stale + sql("INSERT INTO %s VALUES (1, 'a')", tableName); + + // Assert that number of records in the materialized view is the same as the number of records + // in the table + assertThat(sql("SELECT * FROM %s", materializedViewName).size()) + .isEqualTo(sql("SELECT * FROM %s", tableName).size()); + + // Assert that the catalog loadView method returns an object, and of type SparkView + try { + assertThat(sparkViewCatalog().loadView(viewIdentifier())).isInstanceOf(SparkView.class); + } catch (NoSuchViewException e) { + fail("Materialized view not found"); + } + + // Assert that the catalog loadTable fails with NoSuchTableException because the view is stale + assertThatThrownBy(() -> sparkTableCatalog().loadTable(viewIdentifier())) + .isInstanceOf(NoSuchTableException.class); + } + + @Test + public void testDefaultStorageTableIdentifier() { + sql("CREATE MATERIALIZED VIEW %s AS SELECT id, data FROM %s", materializedViewName, tableName); + + // Assert that the storage table is in the list of tables + final String materializedViewStorageTableName = + MaterializedViewUtil.getDefaultMaterializedViewStorageTableIdentifier( + Identifier.of(new String[] {NAMESPACE.toString()}, materializedViewName)) + .name(); + assertThat(sql("SHOW TABLES")) + .anySatisfy(row -> assertThat(row[1]).isEqualTo(materializedViewStorageTableName)); + } + + @Test + public void testStoredAsClause() { + String customTableName = "custom_table_name"; + sql( + "CREATE MATERIALIZED VIEW %s STORED AS '%s' AS SELECT id, data FROM %s", + materializedViewName, customTableName, tableName); + + // Assert that the storage table with the custom name is in the list of tables + assertThat(sql("SHOW TABLES")).anySatisfy(row -> assertThat(row[1]).isEqualTo(customTableName)); + } + + private ViewCatalog sparkViewCatalog() { + CatalogPlugin catalogPlugin = spark.sessionState().catalogManager().catalog(catalogName); + return (ViewCatalog) catalogPlugin; + } + + private TableCatalog sparkTableCatalog() { + CatalogPlugin catalogPlugin = spark.sessionState().catalogManager().catalog(catalogName); + return (TableCatalog) catalogPlugin; + } + + private Identifier viewIdentifier() { + return Identifier.of(new String[] {NAMESPACE.toString()}, materializedViewName); + } + + // Required to be public since it is loaded by org.apache.iceberg.CatalogUtil.loadCatalog + public static class InMemoryCatalogWithLocalFileIO extends InMemoryCatalog { + private FileIO localFileIO; + + @Override + public void initialize(String name, Map properties) { + super.initialize(name, properties); + localFileIO = new LocalFileIO(); + } + + @Override + protected TableOperations newTableOps(TableIdentifier tableIdentifier) { + return new InMemoryTableOperations(localFileIO, tableIdentifier); + } + + @Override + protected InMemoryCatalog.InMemoryViewOperations newViewOps(TableIdentifier identifier) { + return new InMemoryViewOperations(localFileIO, identifier); + } + } + + private static class LocalFileIO implements FileIO { + + @Override + public InputFile newInputFile(String path) { + return org.apache.iceberg.Files.localInput(path); + } + + @Override + public OutputFile newOutputFile(String path) { + return org.apache.iceberg.Files.localOutput(path); + } + + @Override + public void deleteFile(String path) { + if (!new File(path).delete()) { + throw new RuntimeIOException("Failed to delete file: " + path); + } + } + } + + // TODO Add DROP MATERIALIZED VIEW test + // TODO Assert materialized view creation fails when the location is not provided + // TODO Test cannot replace a materialized view with a new version +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/MaterializedViewUtil.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/MaterializedViewUtil.java new file mode 100644 index 000000000000..633b705eb37a --- /dev/null +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/MaterializedViewUtil.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark; + +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation; +import org.apache.spark.sql.catalyst.parser.ParseException; +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.Table; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import scala.collection.JavaConverters; + +// Possible to merge with Spark3Util +public class MaterializedViewUtil { + + private MaterializedViewUtil() {} + + public static final String MATERIALIZED_VIEW_PROPERTY_KEY = "iceberg.materialized.view"; + public static final String MATERIALIZED_VIEW_STORAGE_TABLE_PROPERTY_KEY = + "iceberg.materialized.view.storage.table"; + public static final String MATERIALIZED_VIEW_BASE_SNAPSHOT_PROPERTY_KEY_PREFIX = + "iceberg.base.snapshot."; + public static final String MATERIALIZED_VIEW_VERSION_PROPERTY_KEY = + "iceberg.materialized.view.version"; + private static final String MATERIALIZED_VIEW_STORAGE_TABLE_IDENTIFIER_SUFFIX = ".storage.table"; + + public static List extractBaseTables(String query) { + return extractBaseTableIdentifiers(query).stream() + .filter(optional -> !optional.isEmpty()) + .map(id -> toSparkTable(id).get()) + .collect(Collectors.toList()); + } + + private static List> extractBaseTableIdentifiers(String query) { + try { + // Parse the SQL query to get the LogicalPlan + LogicalPlan logicalPlan = SparkSession.active().sessionState().sqlParser().parsePlan(query); + + // Recursively traverse the LogicalPlan to extract base table names + return extractBaseTableIdentifiers(logicalPlan).stream() + .distinct() + .collect(Collectors.toList()); + } catch (ParseException e) { + throw new IllegalArgumentException("Failed to parse the SQL query: " + query, e); + } + } + + private static List> extractBaseTableIdentifiers(LogicalPlan plan) { + if (plan instanceof UnresolvedRelation) { + UnresolvedRelation relation = (UnresolvedRelation) plan; + List> result = Lists.newArrayListWithCapacity(1); + result.add(JavaConverters.seqAsJavaList(relation.multipartIdentifier())); + return result; + } else { + return (JavaConverters.seqAsJavaList(plan.children())) + .stream() + .flatMap(child -> extractBaseTableIdentifiers(child).stream()) + .collect(Collectors.toList()); + } + } + + public static Optional
toSparkTable(List multipartIdent) { + Spark3Util.CatalogAndIdentifier catalogAndIdentifier = + Spark3Util.catalogAndIdentifier(SparkSession.active(), multipartIdent); + if (catalogAndIdentifier.catalog() instanceof TableCatalog) { + TableCatalog tableCatalog = (TableCatalog) catalogAndIdentifier.catalog(); + try { + return Optional.of(tableCatalog.loadTable(catalogAndIdentifier.identifier())); + } catch (Exception e) { + return Optional.empty(); + } + } + return Optional.empty(); + } + + public static Identifier getDefaultMaterializedViewStorageTableIdentifier( + Identifier viewIdentifier) { + return Identifier.of( + viewIdentifier.namespace(), + viewIdentifier.name() + MATERIALIZED_VIEW_STORAGE_TABLE_IDENTIFIER_SUFFIX); + } +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java index 37e7387d696d..cab2b4ce2a10 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java @@ -25,12 +25,14 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.StringJoiner; import java.util.TreeMap; import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.CachingCatalog; @@ -62,6 +64,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.actions.SparkActions; import org.apache.iceberg.spark.source.SparkChangelogTable; +import org.apache.iceberg.spark.source.SparkMaterializedView; import org.apache.iceberg.spark.source.SparkTable; import org.apache.iceberg.spark.source.SparkView; import org.apache.iceberg.spark.source.StagedSparkTable; @@ -75,6 +78,7 @@ import org.apache.spark.sql.catalyst.analysis.NoSuchViewException; import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException; +import org.apache.spark.sql.catalyst.parser.ParseException; import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.NamespaceChange; import org.apache.spark.sql.connector.catalog.StagedTable; @@ -548,7 +552,14 @@ public View loadView(Identifier ident) throws NoSuchViewException { if (null != asViewCatalog) { try { org.apache.iceberg.view.View view = asViewCatalog.loadView(buildIdentifier(ident)); - return new SparkView(catalogName, view); + // Check if the view is a materialized view. If it is, and storage table is fresh, throw + // IllegalStateException + if (isMaterializedView(view) && isFresh(view)) { + throw new IllegalStateException( + "Materialized view is fresh. loadTable should be attempted instead."); + } else { + return new SparkView(catalogName, view); + } } catch (org.apache.iceberg.exceptions.NoSuchViewException e) { throw new NoSuchViewException(ident); } @@ -557,6 +568,88 @@ public View loadView(Identifier ident) throws NoSuchViewException { throw new NoSuchViewException(ident); } + // Candidate to be moved to org.apache.iceberg.view.View + private boolean isMaterializedView(org.apache.iceberg.view.View view) { + return Optional.of(view.properties().get(MaterializedViewUtil.MATERIALIZED_VIEW_PROPERTY_KEY)) + .orElse("false") + .equals("true"); + } + + // Candidate to be moved to org.apache.iceberg.view.View + private String getStorageTableIdentifier(org.apache.iceberg.view.View view) { + String identifier = + view.properties().get(MaterializedViewUtil.MATERIALIZED_VIEW_STORAGE_TABLE_PROPERTY_KEY); + Preconditions.checkState( + identifier != null, "Storage table identifier is not set for materialized view."); + return identifier; + } + + // Candidate to be moved to org.apache.iceberg.view.View but requires loadTable + private Table loadStorageTable(org.apache.iceberg.view.View view) { + String storageTableIdentifier = getStorageTableIdentifier(view); + try { + SparkSession session = SparkSession.active(); + Table storageTable = + loadTable(Spark3Util.catalogAndIdentifier(session, storageTableIdentifier).identifier()); + return storageTable; + } catch (ParseException | NoSuchTableException e) { + throw new IllegalStateException("Unable to load storage table for materialized view.", e); + } + } + + // Candidate to be moved to org.apache.iceberg.view.View but requires loadTable + // Second option is to move to SparkMaterializedView + private boolean isFresh(org.apache.iceberg.view.View view) { + Table storageTable = loadStorageTable(view); + Map storageTableProperties = storageTable.properties(); + + // Get the parent view version id from the storage table properties + String storageTableViewVersionIdPropertyValue = + storageTableProperties.get( + MaterializedViewUtil.MATERIALIZED_VIEW_VERSION_PROPERTY_KEY); + if (storageTableViewVersionIdPropertyValue == null) { + throw new IllegalStateException( + "Storage table properties do not contain the virtual view version id property."); + } + int storageTableViewVersionId = Integer.parseInt(storageTableViewVersionIdPropertyValue); + + // If the storage table view version id is different from the current version id, the + // materialized view is not fresh + if (storageTableViewVersionId != view.currentVersion().versionId()) { + return false; + } + + // Get the base table snapshot ids from the storage table properties + Map baseTableSnapshotsProperties = + storageTableProperties.entrySet().stream() + .filter( + entry -> + entry + .getKey() + .startsWith( + MaterializedViewUtil + .MATERIALIZED_VIEW_BASE_SNAPSHOT_PROPERTY_KEY_PREFIX)) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + List
baseTables = MaterializedViewUtil.extractBaseTables(view.sqlFor("spark").sql()); + + for (Table baseTable : baseTables) { + org.apache.iceberg.Table icebergBaseTable = ((SparkTable) baseTable).table(); + String snapshotId = + String.valueOf( + icebergBaseTable.currentSnapshot() == null + ? 0 + : icebergBaseTable.currentSnapshot().snapshotId()); + if (!baseTableSnapshotsProperties + .get( + MaterializedViewUtil.MATERIALIZED_VIEW_BASE_SNAPSHOT_PROPERTY_KEY_PREFIX + + icebergBaseTable.uuid()) + .equals(snapshotId)) { + return false; + } + } + return true; + } + @Override public View createView( Identifier ident, @@ -746,11 +839,26 @@ private static void checkNotPathIdentifier(Identifier identifier, String method) } } + // TODO Remove @SuppressWarnings + @SuppressWarnings("checkstyle:CyclomaticComplexity") private Table load(Identifier ident) { if (isPathIdentifier(ident)) { return loadFromPathIdentifier((PathIdentifier) ident); } + // Check if materialized view. If fresh, return the SparkMaterializedView. + if (null != asViewCatalog) { + try { + org.apache.iceberg.view.View view = asViewCatalog.loadView(buildIdentifier(ident)); + if (isMaterializedView(view) && isFresh(view)) { + Table storageTable = loadStorageTable(view); + return new SparkMaterializedView(catalogName, view, storageTable); + } + } catch (org.apache.iceberg.exceptions.NoSuchViewException e) { + // Ignore. Just process as a normal table. + } + } + try { org.apache.iceberg.Table table = icebergCatalog.loadTable(buildIdentifier(ident)); return new SparkTable(table, !cacheEnabled); diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMaterializedView.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMaterializedView.java new file mode 100644 index 000000000000..0c01ae449292 --- /dev/null +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMaterializedView.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import java.util.Set; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.view.View; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.connector.catalog.SupportsRead; +import org.apache.spark.sql.connector.catalog.Table; +import org.apache.spark.sql.connector.catalog.TableCapability; +import org.apache.spark.sql.connector.read.ScanBuilder; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + +public class SparkMaterializedView extends SparkView implements SupportsRead { + private final Table storageTable; + private SparkSession lazySpark; + + public SparkMaterializedView(String catalogName, View icebergView, Table storageTable) { + super(catalogName, icebergView); + this.storageTable = storageTable; + } + + @Override + public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) { + return ((SupportsRead) storageTable).newScanBuilder(options); + } + + private SparkSession sparkSession() { + if (lazySpark == null) { + this.lazySpark = SparkSession.active(); + } + + return lazySpark; + } + + @Override + public Set capabilities() { + return ImmutableSet.of(TableCapability.BATCH_READ); + } +} diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/SparkCatalogConfig.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/SparkCatalogConfig.java index abfd7da0c7bd..720614ac76f6 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/SparkCatalogConfig.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/SparkCatalogConfig.java @@ -53,7 +53,11 @@ public enum SparkCatalogConfig { "default-namespace", "default", "cache-enabled", - "false")); + "false")), + SPARK_WITH_MATERIALIZED_VIEWS( + "spark_with_materialized_views", + SparkCatalog.class.getName(), + ImmutableMap.of("default-namespace", "default", "cache-enabled", "false")); private final String catalogName; private final String implementation; diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/SparkTestBaseWithCatalog.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/SparkTestBaseWithCatalog.java index d5708c9e575e..012e57fea80e 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/SparkTestBaseWithCatalog.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/SparkTestBaseWithCatalog.java @@ -31,6 +31,7 @@ import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.PropertyUtil; import org.junit.AfterClass; import org.junit.Assert; @@ -75,8 +76,13 @@ public SparkTestBaseWithCatalog(SparkCatalogConfig config) { public SparkTestBaseWithCatalog( String catalogName, String implementation, Map config) { + if (catalogName.equals("spark_with_materialized_views")) { + this.catalogConfig = Maps.newHashMap(config); + this.catalogConfig.put(CatalogProperties.WAREHOUSE_LOCATION, "file:" + warehouse); + } else { + this.catalogConfig = config; + } this.catalogName = catalogName; - this.catalogConfig = config; this.validationCatalog = catalogName.equals("testhadoop") ? new HadoopCatalog(spark.sessionState().newHadoopConf(), "file:" + warehouse)