From aea1f32f69dab56206a93e74d2e7788f1d0a7243 Mon Sep 17 00:00:00 2001 From: Walaa Eldin Moustafa Date: Tue, 27 Feb 2024 21:45:02 -0800 Subject: [PATCH 1/3] [Views] Implement Materialized Views; Integrate with Spark SQL --- .../iceberg/view/ViewVersionReplace.java | 9 + .../sql/catalyst/analysis/CheckViews.scala | 2 +- .../sql/catalyst/analysis/ResolveViews.scala | 2 +- .../analysis/RewriteViewCommands.scala | 6 +- .../IcebergSparkSqlExtensionsParser.scala | 18 +- .../logical/views/CreateIcebergView.scala | 3 +- .../v2/CreateMaterializedViewExec.scala | 160 +++++++++++++++ .../v2/CreateOrReplaceTagExec.scala | 3 +- .../datasources/v2/DropV2ViewExec.scala | 38 ++++ .../v2/ExtendedDataSourceV2Strategy.scala | 41 ++-- .../extensions/TestMaterializedViews.java | 185 ++++++++++++++++++ .../iceberg/spark/MaterializedViewUtil.java | 91 +++++++++ .../apache/iceberg/spark/SparkCatalog.java | 97 ++++++++- .../spark/source/SparkMaterializedView.java | 57 ++++++ 14 files changed, 690 insertions(+), 22 deletions(-) create mode 100644 spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateMaterializedViewExec.scala create mode 100644 spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMaterializedViews.java create mode 100644 spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/MaterializedViewUtil.java create mode 100644 spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMaterializedView.java diff --git a/core/src/main/java/org/apache/iceberg/view/ViewVersionReplace.java b/core/src/main/java/org/apache/iceberg/view/ViewVersionReplace.java index 8b3d087940a5..0150fd2a2a44 100644 --- a/core/src/main/java/org/apache/iceberg/view/ViewVersionReplace.java +++ b/core/src/main/java/org/apache/iceberg/view/ViewVersionReplace.java @@ -28,6 +28,7 @@ import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT; import java.util.List; +import java.util.Optional; import org.apache.iceberg.EnvironmentContext; import org.apache.iceberg.Schema; import org.apache.iceberg.catalog.Namespace; @@ -56,6 +57,14 @@ public ViewVersion apply() { } ViewMetadata internalApply() { + // Replacing a materialized view is not supported because the old storage location will wrongly + // transfer to the new version + // if not handled properly. + Preconditions.checkState( + Optional.ofNullable(base.properties().get("iceberg.materialized.view")) + .orElse("false") + .equals("false"), + "Cannot replace a materialized view with a new version"); Preconditions.checkState( !representations.isEmpty(), "Cannot replace view without specifying a query"); Preconditions.checkState(null != schema, "Cannot replace view without specifying schema"); diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala index 4a1736764d0d..0c0f071132f2 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala @@ -32,7 +32,7 @@ object CheckViews extends (LogicalPlan => Unit) { override def apply(plan: LogicalPlan): Unit = { plan foreach { case CreateIcebergView(ResolvedIdentifier(_: ViewCatalog, ident), _, query, columnAliases, _, - _, _, _, _, _, _) => + _, _, _, _, _, _, _) => verifyColumnCount(ident, columnAliases, query) SchemaUtils.checkColumnNameDuplication(query.schema.fieldNames, SQLConf.get.resolver) diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala index 46a15331a164..cf95b04fbf04 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala @@ -63,7 +63,7 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look .map(_ => ResolvedV2View(catalog.asViewCatalog, ident)) .getOrElse(u) - case c@CreateIcebergView(ResolvedIdentifier(_, _), _, query, columnAliases, columnComments, _, _, _, _, _, _) + case c@CreateIcebergView(ResolvedIdentifier(_, _), _, query, columnAliases, columnComments, _, _, _, _, _, _, _) if query.resolved && !c.rewritten => val aliased = aliasColumns(query, columnAliases, columnComments) c.copy(query = aliased, queryColumnNames = query.schema.fieldNames, rewritten = true) diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala index 066ba59394d7..94434f058e98 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala @@ -39,7 +39,8 @@ import org.apache.spark.sql.connector.catalog.ViewCatalog * ResolveSessionCatalog exits early for some v2 View commands, * thus they are pre-substituted here and then handled in ResolveViews */ -case class RewriteViewCommands(spark: SparkSession) extends Rule[LogicalPlan] with LookupCatalog { +case class RewriteViewCommands(spark: SparkSession, materialized: Boolean) + extends Rule[LogicalPlan] with LookupCatalog { protected lazy val catalogManager: CatalogManager = spark.sessionState.catalogManager @@ -59,7 +60,8 @@ case class RewriteViewCommands(spark: SparkSession) extends Rule[LogicalPlan] wi comment = comment, properties = properties, allowExisting = allowExisting, - replace = replace) + replace = replace, + materialized = materialized) } private def isTempView(nameParts: Seq[String]): Boolean = { diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala index 02bd59366c13..9f92556c19b7 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala @@ -122,8 +122,12 @@ class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserI val sqlTextAfterSubstitution = substitutor.substitute(sqlText) if (isIcebergCommand(sqlTextAfterSubstitution)) { parse(sqlTextAfterSubstitution) { parser => astBuilder.visit(parser.singleStatement()) }.asInstanceOf[LogicalPlan] + } else if (isCreateMaterializedView(sqlText)) { + RewriteViewCommands(SparkSession.active, true).apply( + delegate.parsePlan(replaceCreateMaterializedViewWithCreateView(sqlText)) + ) } else { - RewriteViewCommands(SparkSession.active).apply(delegate.parsePlan(sqlText)) + RewriteViewCommands(SparkSession.active, false).apply(delegate.parsePlan(sqlText)) } } @@ -151,6 +155,18 @@ class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserI isSnapshotRefDdl(normalized))) } + private def isCreateMaterializedView(sqlText: String): Boolean = { + sqlText.toLowerCase.contains("create materialized view") + } + + def replaceCreateMaterializedViewWithCreateView(input: String): String = { + // Regex pattern to match "create materialized view" in a case-insensitive manner + val pattern = "(?i)create materialized view".r + + // Replace all occurrences of the pattern with "create view" + pattern.replaceAllIn(input, "create view") + } + private def isSnapshotRefDdl(normalized: String): Boolean = { normalized.contains("create branch") || normalized.contains("replace branch") || diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateIcebergView.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateIcebergView.scala index 9366d5efe163..f9adb57a348c 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateIcebergView.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateIcebergView.scala @@ -33,7 +33,8 @@ case class CreateIcebergView( properties: Map[String, String], allowExisting: Boolean, replace: Boolean, - rewritten: Boolean = false) extends BinaryCommand { + rewritten: Boolean = false, + materialized: Boolean = false) extends BinaryCommand { override def left: LogicalPlan = child override def right: LogicalPlan = query diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateMaterializedViewExec.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateMaterializedViewExec.scala new file mode 100644 index 000000000000..265ed95e33ae --- /dev/null +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateMaterializedViewExec.scala @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import java.util.UUID +import org.apache.hadoop.conf.Configuration +import org.apache.iceberg +import org.apache.iceberg.FileFormat +import org.apache.iceberg.PartitionSpec +import org.apache.iceberg.hadoop.HadoopTables +import org.apache.iceberg.relocated.com.google.common.base.Preconditions +import org.apache.iceberg.spark.MaterializedViewUtil +import org.apache.iceberg.spark.SparkSchemaUtil +import org.apache.iceberg.spark.SparkWriteOptions +import org.apache.iceberg.spark.source.SparkTable +import org.apache.spark.sql.SaveMode +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.connector.catalog.Table +import org.apache.spark.sql.connector.catalog.ViewCatalog +import org.apache.spark.sql.types.StructType +import scala.collection.JavaConverters._ + +case class CreateMaterializedViewExec( + catalog: ViewCatalog, + ident: Identifier, + queryText: String, + viewSchema: StructType, + columnAliases: Seq[String], + columnComments: Seq[Option[String]], + queryColumnNames: Seq[String], + comment: Option[String], + properties: Map[String, String], + allowExisting: Boolean, + replace: Boolean) extends LeafV2CommandExec { + + override def output: Seq[Attribute] = Nil + + override protected def run(): Seq[InternalRow] = { + + val viewLocation = properties.get("location") + Preconditions.checkArgument(viewLocation.isDefined) + + val storageTableLocation = viewLocation + "/storage/v1" + + // Create the storage table in the Hadoop catalog so it is explicitly registered in the Spark catalog + val tables: HadoopTables = new HadoopTables(new Configuration()) + val icebergSchema = SparkSchemaUtil.convert(viewSchema) + // TODO: Add support for partitioning the storage table + val spec: PartitionSpec = PartitionSpec.builderFor(icebergSchema).build + + val table: iceberg.Table = tables.create(icebergSchema, spec, storageTableLocation) + + val baseTables = MaterializedViewUtil.extractBaseTables(queryText).asScala.toList + val baseTableSnapshots = getBaseTableSnapshots(baseTables) + val baseTableSnapshotsProperties = baseTableSnapshots.map{ + case (key, value) => ( + MaterializedViewUtil.MATERIALIZED_VIEW_BASE_SNAPSHOT_PROPERTY_KEY_PREFIX + key.toString + ) -> value.toString + } + + session.sql(queryText).write.format("iceberg").option( + SparkWriteOptions.WRITE_FORMAT, FileFormat.PARQUET.toString + ).mode(SaveMode.Append).save(storageTableLocation) + + val updateProperties = table.updateProperties() + baseTableSnapshotsProperties.foreach { + case (key, value) => updateProperties.set(key, value) + } + updateProperties.commit() + + table.refresh() + + createMaterializedView(storageTableLocation) + Nil + } + + override def simpleString(maxFields: Int): String = { + s"CreateMaterializedViewExec: ${ident}" + } + + private def createMaterializedView(storageTableLocation: String): Unit = { + val currentCatalogName = session.sessionState.catalogManager.currentCatalog.name + val currentCatalog = if (!catalog.name().equals(currentCatalogName)) currentCatalogName else null + val currentNamespace = session.sessionState.catalogManager.currentNamespace + + val engineVersion = "Spark " + org.apache.spark.SPARK_VERSION + val newProperties = properties ++ + comment.map(ViewCatalog.PROP_COMMENT -> _) + + (ViewCatalog.PROP_CREATE_ENGINE_VERSION -> engineVersion, + ViewCatalog.PROP_ENGINE_VERSION -> engineVersion) + + (MaterializedViewUtil.MATERIALIZED_VIEW_PROPERTY_KEY -> "true") + + (MaterializedViewUtil.MATERIALIZED_VIEW_STORAGE_LOCATION_PROPERTY_KEY -> storageTableLocation) + + if (replace) { + // CREATE OR REPLACE VIEW + if (catalog.viewExists(ident)) { + catalog.dropView(ident) + } + // FIXME: replaceView API doesn't exist in Spark 3.5 + catalog.createView( + ident, + queryText, + currentCatalog, + currentNamespace, + viewSchema, + queryColumnNames.toArray, + columnAliases.toArray, + columnComments.map(c => c.orNull).toArray, + newProperties.asJava) + } else { + try { + // CREATE VIEW [IF NOT EXISTS] + catalog.createView( + ident, + queryText, + currentCatalog, + currentNamespace, + viewSchema, + queryColumnNames.toArray, + columnAliases.toArray, + columnComments.map(c => c.orNull).toArray, + newProperties.asJava) + } catch { + // TODO: Make sure the existing view is also a materialized view + case _: ViewAlreadyExistsException if allowExisting => // Ignore + } + } + } + + private def getBaseTableSnapshots(baseTables: List[Table]): Map[UUID, Long] = { + baseTables.map { + case sparkTable: SparkTable => + val snapshot = Option(sparkTable.table().currentSnapshot()) + val snapshotId = snapshot.map(_.snapshotId().longValue()).getOrElse(0L) + (sparkTable.table().uuid(), snapshotId) + case _ => + throw new UnsupportedOperationException("Only Spark tables are supported") + }.toMap + } +} diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceTagExec.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceTagExec.scala index 372cd7548632..e7c35f8a771d 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceTagExec.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceTagExec.scala @@ -40,7 +40,8 @@ case class CreateOrReplaceTagExec( override lazy val output: Seq[Attribute] = Nil override protected def run(): Seq[InternalRow] = { - catalog.loadTable(ident) match { + catalog + .loadTable(ident) match { case iceberg: SparkTable => val snapshotId: java.lang.Long = tagOptions.snapshotId .orElse(Option(iceberg.table.currentSnapshot()).map(_.snapshotId())) diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropV2ViewExec.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropV2ViewExec.scala index c35af1486fc7..66ac78eff39d 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropV2ViewExec.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropV2ViewExec.scala @@ -19,6 +19,14 @@ package org.apache.spark.sql.execution.datasources.v2 +import org.apache.hadoop.conf.Configuration +import org.apache.iceberg.catalog.Namespace +import org.apache.iceberg.catalog.TableIdentifier +import org.apache.iceberg.exceptions +import org.apache.iceberg.hadoop.HadoopTables +import org.apache.iceberg.spark.MaterializedViewUtil +import org.apache.iceberg.spark.SparkCatalog +import org.apache.iceberg.view.View import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.NoSuchViewException import org.apache.spark.sql.catalyst.expressions.Attribute @@ -34,6 +42,36 @@ case class DropV2ViewExec( override lazy val output: Seq[Attribute] = Nil override protected def run(): Seq[InternalRow] = { + val icebergCatalog = catalog.asInstanceOf[SparkCatalog].icebergCatalog() + val icebergViewCatalog = icebergCatalog.asInstanceOf[org.apache.iceberg.catalog.ViewCatalog] + var view: Option[View] = None + try { + view = Some(icebergViewCatalog.loadView(TableIdentifier.of(Namespace.of(ident.namespace(): _*), ident.name()))) + } catch { + case e: exceptions.NoSuchViewException => { + if (!ifExists) { + throw new NoSuchViewException(ident) + } + } + } + // if view is not null read the properties and check if it is a materialized view + view match { + case Some(v) => { + val viewProperties = v.properties(); + if (Option( + viewProperties.get(MaterializedViewUtil.MATERIALIZED_VIEW_PROPERTY_KEY + )).getOrElse("false").equals("true")) { + // get the storage table location then drop the storage table + val storageTableLocation = viewProperties.get( + MaterializedViewUtil.MATERIALIZED_VIEW_STORAGE_LOCATION_PROPERTY_KEY + ) + val tables: HadoopTables = new HadoopTables(new Configuration()) + tables.dropTable(storageTableLocation) + } + } + case _ => + } + val dropped = catalog.dropView(ident) if (!dropped && !ifExists) { throw new NoSuchViewException(ident) diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala index 0505fe4e3030..24ce4a87a26b 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala @@ -109,19 +109,34 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi DropV2ViewExec(viewCatalog, ident, ifExists) :: Nil case CreateIcebergView(ResolvedIdentifier(viewCatalog: ViewCatalog, ident), queryText, query, - columnAliases, columnComments, queryColumnNames, comment, properties, allowExisting, replace, _) => - CreateV2ViewExec( - catalog = viewCatalog, - ident = ident, - queryText = queryText, - columnAliases = columnAliases, - columnComments = columnComments, - queryColumnNames = queryColumnNames, - viewSchema = query.schema, - comment = comment, - properties = properties, - allowExisting = allowExisting, - replace = replace) :: Nil + columnAliases, columnComments, queryColumnNames, comment, properties, allowExisting, replace, _, materialized) => + if (materialized) { + CreateMaterializedViewExec( + catalog = viewCatalog, + ident = ident, + queryText = queryText, + columnAliases = columnAliases, + columnComments = columnComments, + queryColumnNames = queryColumnNames, + viewSchema = query.schema, + comment = comment, + properties = properties, + allowExisting = allowExisting, + replace = replace) :: Nil + } else { + CreateV2ViewExec( + catalog = viewCatalog, + ident = ident, + queryText = queryText, + columnAliases = columnAliases, + columnComments = columnComments, + queryColumnNames = queryColumnNames, + viewSchema = query.schema, + comment = comment, + properties = properties, + allowExisting = allowExisting, + replace = replace) :: Nil + } case _ => Nil } diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMaterializedViews.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMaterializedViews.java new file mode 100644 index 000000000000..99e0cefd7048 --- /dev/null +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMaterializedViews.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.extensions; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assertions.fail; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.Map; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.spark.MaterializedViewUtil; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.SparkCatalogConfig; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.analysis.NoSuchViewException; +import org.apache.spark.sql.connector.catalog.CatalogPlugin; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.connector.catalog.ViewCatalog; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runners.Parameterized; + +public class TestMaterializedViews extends SparkExtensionsTestBase { + private static final Namespace NAMESPACE = Namespace.of("default"); + private final String tableName = "table"; + private final String materializedViewName = "materialized_view"; + + @Before + public void before() { + spark.conf().set("spark.sql.defaultCatalog", catalogName); + sql("USE %s", catalogName); + sql("CREATE NAMESPACE IF NOT EXISTS %s", NAMESPACE); + sql("CREATE TABLE %s (id INT, data STRING)", tableName); + } + + @After + public void removeTable() { + sql("USE %s", catalogName); + sql("DROP VIEW IF EXISTS %s", materializedViewName); + sql("DROP TABLE IF EXISTS %s", tableName); + } + + @Parameterized.Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}") + public static Object[][] parameters() { + return new Object[][] { + { + SparkCatalogConfig.SPARK_WITH_VIEWS.catalogName(), + SparkCatalogConfig.SPARK_WITH_VIEWS.implementation(), + SparkCatalogConfig.SPARK_WITH_VIEWS.properties() + } + }; + } + + public TestMaterializedViews( + String catalog, String implementation, Map properties) { + super(catalog, implementation, properties); + } + + @Test + public void assertReadFromStorageTableWhenFresh() throws IOException { + File location = Files.createTempDirectory("materialized-view-test").toFile(); + sql("DROP VIEW IF EXISTS %s", materializedViewName); + sql( + "CREATE MATERIALIZED VIEW %s TBLPROPERTIES ('location' = '%s') AS SELECT id, data FROM %s", + materializedViewName, location.getAbsolutePath(), tableName); + + // Assert that number of records in the materialized view is the same as the number of records + // in the table + assertThat(sql("SELECT * FROM %s", materializedViewName).size()) + .isEqualTo(sql("SELECT * FROM %s", tableName).size()); + + // Assert that the catalog loadView method returns NoSuchViewException because the view is fresh + assertThatThrownBy( + () -> + sparkViewCatalog() + .loadView(Identifier.of(new String[] {"default"}, materializedViewName))) + .isInstanceOf(NoSuchViewException.class); + + // Assert that the catalog loadTable method returns the materialized view storage table + try { + assertThat( + sparkTableCatalog() + .loadTable(Identifier.of(new String[] {"default"}, materializedViewName)) + .name()) + .isEqualTo( + icebergViewCatalog() + .loadView(TableIdentifier.of("default", materializedViewName)) + .properties() + .get(MaterializedViewUtil.MATERIALIZED_VIEW_STORAGE_LOCATION_PROPERTY_KEY)); + } catch (NoSuchTableException e) { + fail("Materialized view storage table not found"); + } + } + + @Test + public void assertNotReadFromStorageTableWhenStale() throws IOException { + File location = Files.createTempDirectory("materialized-view-test").toFile(); + sql( + "CREATE MATERIALIZED VIEW %s TBLPROPERTIES ('location' = '%s') AS SELECT id, data FROM %s", + materializedViewName, location.getAbsolutePath(), tableName); + + // Insert one row to the table so the materialized view becomes stale + sql("INSERT INTO %s VALUES (1, 'a')", tableName); + + // Assert that number of records in the materialized view is the same as the number of records + // in the table + assertThat(sql("SELECT * FROM %s", materializedViewName).size()) + .isEqualTo(sql("SELECT * FROM %s", tableName).size()); + + // Assert that the catalog loadView method returns the view object + try { + assertThat( + sparkViewCatalog() + .loadView(Identifier.of(new String[] {"default"}, materializedViewName)) + .name()) + .isEqualTo( + icebergViewCatalog() + .loadView(TableIdentifier.of("default", materializedViewName)) + .name()); + } catch (NoSuchViewException e) { + fail("Materialized view not found"); + } + + // Assert that the catalog loadTable fails with NoSuchTableException because the view is stale + assertThatThrownBy( + () -> + sparkTableCatalog() + .loadTable(Identifier.of(new String[] {"default"}, materializedViewName))) + .isInstanceOf(NoSuchTableException.class); + } + + @Test + public void assertShowTablesDoesNotShowStorageTable() throws IOException { + File location = Files.createTempDirectory("materialized-view-test").toFile(); + sql( + "CREATE MATERIALIZED VIEW %s TBLPROPERTIES ('location' = '%s') AS SELECT id, data FROM %s", + materializedViewName, location.getAbsolutePath(), tableName); + + // Assert that the storage table is not shown in the list of tables + assertThat(sql("SHOW TABLES").size() == 2); + } + + private ViewCatalog sparkViewCatalog() { + CatalogPlugin catalogPlugin = spark.sessionState().catalogManager().catalog(catalogName); + return (ViewCatalog) catalogPlugin; + } + + private TableCatalog sparkTableCatalog() { + CatalogPlugin catalogPlugin = spark.sessionState().catalogManager().catalog(catalogName); + return (TableCatalog) catalogPlugin; + } + + private org.apache.iceberg.catalog.ViewCatalog icebergViewCatalog() { + Catalog icebergCatalog = Spark3Util.loadIcebergCatalog(spark, catalogName); + assertThat(icebergCatalog).isInstanceOf(org.apache.iceberg.catalog.ViewCatalog.class); + return (org.apache.iceberg.catalog.ViewCatalog) icebergCatalog; + } + + // TODO Add DROP MATERIALIZED VIEW test + // TODO Assert materialized view creation fails when the location is not provided + // TODO Test cannot replace a materialized view with a new version +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/MaterializedViewUtil.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/MaterializedViewUtil.java new file mode 100644 index 000000000000..f6dc101fe4fb --- /dev/null +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/MaterializedViewUtil.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark; + +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation; +import org.apache.spark.sql.catalyst.parser.ParseException; +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; +import org.apache.spark.sql.connector.catalog.Table; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import scala.collection.JavaConverters; + +// Possible to merge with Spark3Util +public class MaterializedViewUtil { + + private MaterializedViewUtil() {} + + public static final String MATERIALIZED_VIEW_PROPERTY_KEY = "iceberg.materialized.view"; + public static final String MATERIALIZED_VIEW_STORAGE_LOCATION_PROPERTY_KEY = + "iceberg.materialized.view.storage.location"; + public static final String MATERIALIZED_VIEW_BASE_SNAPSHOT_PROPERTY_KEY_PREFIX = "base.snapshot."; + + public static List extractBaseTables(String query) { + return extractBaseTableIdentifiers(query).stream() + .filter(optional -> !optional.isEmpty()) + .map(id -> toSparkTable(id).get()) + .collect(Collectors.toList()); + } + + private static List> extractBaseTableIdentifiers(String query) { + try { + // Parse the SQL query to get the LogicalPlan + LogicalPlan logicalPlan = SparkSession.active().sessionState().sqlParser().parsePlan(query); + + // Recursively traverse the LogicalPlan to extract base table names + return extractBaseTableIdentifiers(logicalPlan).stream() + .distinct() + .collect(Collectors.toList()); + } catch (ParseException e) { + throw new IllegalArgumentException("Failed to parse the SQL query: " + query, e); + } + } + + private static List> extractBaseTableIdentifiers(LogicalPlan plan) { + if (plan instanceof UnresolvedRelation) { + UnresolvedRelation relation = (UnresolvedRelation) plan; + List> result = Lists.newArrayListWithCapacity(1); + result.add(JavaConverters.seqAsJavaList(relation.multipartIdentifier())); + return result; + } else { + return (JavaConverters.seqAsJavaList(plan.children())) + .stream() + .flatMap(child -> extractBaseTableIdentifiers(child).stream()) + .collect(Collectors.toList()); + } + } + + public static Optional
toSparkTable(List multipartIdent) { + Spark3Util.CatalogAndIdentifier catalogAndIdentifier = + Spark3Util.catalogAndIdentifier(SparkSession.active(), multipartIdent); + if (catalogAndIdentifier.catalog() instanceof TableCatalog) { + TableCatalog tableCatalog = (TableCatalog) catalogAndIdentifier.catalog(); + try { + return Optional.of(tableCatalog.loadTable(catalogAndIdentifier.identifier())); + } catch (Exception e) { + return Optional.empty(); + } + } + return Optional.empty(); + } +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java index 37e7387d696d..237539dc502e 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java @@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.CachingCatalog; @@ -62,6 +63,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.actions.SparkActions; import org.apache.iceberg.spark.source.SparkChangelogTable; +import org.apache.iceberg.spark.source.SparkMaterializedView; import org.apache.iceberg.spark.source.SparkTable; import org.apache.iceberg.spark.source.SparkView; import org.apache.iceberg.spark.source.StagedSparkTable; @@ -548,7 +550,20 @@ public View loadView(Identifier ident) throws NoSuchViewException { if (null != asViewCatalog) { try { org.apache.iceberg.view.View view = asViewCatalog.loadView(buildIdentifier(ident)); - return new SparkView(catalogName, view); + // Check if the view is a materialized view. If it is, and storage table is fresh, return + // NoSuchViewException so + // loadTable is attempted instead. + if (view.properties() + .get(MaterializedViewUtil.MATERIALIZED_VIEW_PROPERTY_KEY) + .equals("true")) { + if (isFresh(view)) { + throw new NoSuchViewException(ident); + } else { + return new SparkView(catalogName, view); + } + } else { + return new SparkView(catalogName, view); + } } catch (org.apache.iceberg.exceptions.NoSuchViewException e) { throw new NoSuchViewException(ident); } @@ -557,6 +572,48 @@ public View loadView(Identifier ident) throws NoSuchViewException { throw new NoSuchViewException(ident); } + private boolean isFresh(org.apache.iceberg.view.View view) { + Preconditions.checkState( + view.properties().get(MaterializedViewUtil.MATERIALIZED_VIEW_PROPERTY_KEY).equals("true"), + "Cannot check freshness of non-materialized view."); + String storageTableLocation = + view.properties().get(MaterializedViewUtil.MATERIALIZED_VIEW_STORAGE_LOCATION_PROPERTY_KEY); + try { + Table storageTable = loadTable(new PathIdentifier(storageTableLocation)); + Map baseTableSnapshotsProperties = + storageTable.properties().entrySet().stream() + .filter( + entry -> + entry + .getKey() + .startsWith( + MaterializedViewUtil + .MATERIALIZED_VIEW_BASE_SNAPSHOT_PROPERTY_KEY_PREFIX)) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + List
baseTables = MaterializedViewUtil.extractBaseTables(view.sqlFor("spark").sql()); + + for (Table baseTable : baseTables) { + org.apache.iceberg.Table icebergBaseTable = ((SparkTable) baseTable).table(); + String snapshotId = + String.valueOf( + icebergBaseTable.currentSnapshot() == null + ? 0 + : icebergBaseTable.currentSnapshot().snapshotId()); + if (!baseTableSnapshotsProperties + .get( + MaterializedViewUtil.MATERIALIZED_VIEW_BASE_SNAPSHOT_PROPERTY_KEY_PREFIX + + icebergBaseTable.uuid()) + .equals(snapshotId)) { + return false; + } + } + return true; + } catch (NoSuchTableException e) { + throw new IllegalStateException( + "Could not load materialized view storage table from catalog.", e); + } + } + @Override public View createView( Identifier ident, @@ -591,7 +648,19 @@ public View createView( .withLocation(properties.get("location")) .withProperties(props) .create(); - return new SparkView(catalogName, view); + if (props.get(MaterializedViewUtil.MATERIALIZED_VIEW_PROPERTY_KEY).equals("true")) { + String storageTableLocation = + properties.get(MaterializedViewUtil.MATERIALIZED_VIEW_STORAGE_LOCATION_PROPERTY_KEY); + try { + Table storageTable = loadTable(new PathIdentifier(storageTableLocation)); + return new SparkMaterializedView(catalogName, view, storageTable); + } catch (NoSuchTableException e) { + throw new IllegalStateException( + "Could not load materialized view storage table from catalog.", e); + } + } else { + return new SparkView(catalogName, view); + } } catch (org.apache.iceberg.exceptions.NoSuchNamespaceException e) { throw new NoSuchNamespaceException(currentNamespace); } catch (AlreadyExistsException e) { @@ -746,11 +815,35 @@ private static void checkNotPathIdentifier(Identifier identifier, String method) } } + // TODO Remove @SuppressWarnings + @SuppressWarnings("checkstyle:CyclomaticComplexity") private Table load(Identifier ident) { if (isPathIdentifier(ident)) { return loadFromPathIdentifier((PathIdentifier) ident); } + // Check if materialized view. If fresh, return the SparkMaterializedView. + if (null != asViewCatalog) { + try { + org.apache.iceberg.view.View view = asViewCatalog.loadView(buildIdentifier(ident)); + if (view.properties() + .get(MaterializedViewUtil.MATERIALIZED_VIEW_PROPERTY_KEY) + .equals("true")) { + if (isFresh(view)) { + String storageTableLocation = + view.properties() + .get(MaterializedViewUtil.MATERIALIZED_VIEW_STORAGE_LOCATION_PROPERTY_KEY); + return new SparkMaterializedView( + catalogName, + view, + loadFromPathIdentifier(new PathIdentifier(storageTableLocation))); + } + } + } catch (org.apache.iceberg.exceptions.NoSuchViewException e) { + // Ignore. Just process as a normal table. + } + } + try { org.apache.iceberg.Table table = icebergCatalog.loadTable(buildIdentifier(ident)); return new SparkTable(table, !cacheEnabled); diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMaterializedView.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMaterializedView.java new file mode 100644 index 000000000000..0c01ae449292 --- /dev/null +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMaterializedView.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import java.util.Set; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.view.View; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.connector.catalog.SupportsRead; +import org.apache.spark.sql.connector.catalog.Table; +import org.apache.spark.sql.connector.catalog.TableCapability; +import org.apache.spark.sql.connector.read.ScanBuilder; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + +public class SparkMaterializedView extends SparkView implements SupportsRead { + private final Table storageTable; + private SparkSession lazySpark; + + public SparkMaterializedView(String catalogName, View icebergView, Table storageTable) { + super(catalogName, icebergView); + this.storageTable = storageTable; + } + + @Override + public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) { + return ((SupportsRead) storageTable).newScanBuilder(options); + } + + private SparkSession sparkSession() { + if (lazySpark == null) { + this.lazySpark = SparkSession.active(); + } + + return lazySpark; + } + + @Override + public Set capabilities() { + return ImmutableSet.of(TableCapability.BATCH_READ); + } +} From 0aac39612d71f319abfb14b271deae6e30bba866 Mon Sep 17 00:00:00 2001 From: Walaa Eldin Moustafa Date: Sun, 10 Mar 2024 17:51:14 -0700 Subject: [PATCH 2/3] Represent the storage table using its catalog identifier --- .../iceberg/inmemory/InMemoryCatalog.java | 10 +- .../analysis/RewriteViewCommands.scala | 5 +- .../IcebergSparkSqlExtensionsParser.scala | 32 ++-- .../logical/views/CreateIcebergView.scala | 3 +- .../v2/CreateMaterializedViewExec.scala | 70 ++++---- .../datasources/v2/DropV2ViewExec.scala | 12 +- .../v2/ExtendedDataSourceV2Strategy.scala | 38 +++-- .../extensions/TestMaterializedViews.java | 155 ++++++++++++------ .../iceberg/spark/MaterializedViewUtil.java | 16 +- .../apache/iceberg/spark/SparkCatalog.java | 139 ++++++++-------- .../iceberg/spark/SparkCatalogConfig.java | 6 +- .../spark/SparkTestBaseWithCatalog.java | 8 +- 12 files changed, 291 insertions(+), 203 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/inmemory/InMemoryCatalog.java b/core/src/main/java/org/apache/iceberg/inmemory/InMemoryCatalog.java index a880f94f4385..d397b9fdbd7e 100644 --- a/core/src/main/java/org/apache/iceberg/inmemory/InMemoryCatalog.java +++ b/core/src/main/java/org/apache/iceberg/inmemory/InMemoryCatalog.java @@ -94,6 +94,7 @@ public void initialize(String name, Map properties) { closeableGroup.setSuppressCloseFailure(true); } + // protected for testing @Override protected TableOperations newTableOps(TableIdentifier tableIdentifier) { return new InMemoryTableOperations(io, tableIdentifier); @@ -326,6 +327,7 @@ public List listViews(Namespace namespace) { .collect(Collectors.toList()); } + // protected for testing @Override protected InMemoryViewOperations newViewOps(TableIdentifier identifier) { return new InMemoryViewOperations(io, identifier); @@ -368,12 +370,12 @@ public void renameView(TableIdentifier from, TableIdentifier to) { } } - private class InMemoryTableOperations extends BaseMetastoreTableOperations { + public class InMemoryTableOperations extends BaseMetastoreTableOperations { private final FileIO fileIO; private final TableIdentifier tableIdentifier; private final String fullTableName; - InMemoryTableOperations(FileIO fileIO, TableIdentifier tableIdentifier) { + public InMemoryTableOperations(FileIO fileIO, TableIdentifier tableIdentifier) { this.fileIO = fileIO; this.tableIdentifier = tableIdentifier; this.fullTableName = fullTableName(catalogName, tableIdentifier); @@ -439,12 +441,12 @@ protected String tableName() { } } - private class InMemoryViewOperations extends BaseViewOperations { + public class InMemoryViewOperations extends BaseViewOperations { private final FileIO io; private final TableIdentifier identifier; private final String fullViewName; - InMemoryViewOperations(FileIO io, TableIdentifier identifier) { + public InMemoryViewOperations(FileIO io, TableIdentifier identifier) { this.io = io; this.identifier = identifier; this.fullViewName = ViewUtil.fullViewName(catalogName, identifier); diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala index 94434f058e98..adeff7f32640 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala @@ -39,7 +39,7 @@ import org.apache.spark.sql.connector.catalog.ViewCatalog * ResolveSessionCatalog exits early for some v2 View commands, * thus they are pre-substituted here and then handled in ResolveViews */ -case class RewriteViewCommands(spark: SparkSession, materialized: Boolean) +case class RewriteViewCommands(spark: SparkSession, materializedViewOptions: Option[MaterializedViewOptions]) extends Rule[LogicalPlan] with LookupCatalog { protected lazy val catalogManager: CatalogManager = spark.sessionState.catalogManager @@ -61,7 +61,7 @@ case class RewriteViewCommands(spark: SparkSession, materialized: Boolean) properties = properties, allowExisting = allowExisting, replace = replace, - materialized = materialized) + materializedViewOptions = materializedViewOptions) } private def isTempView(nameParts: Seq[String]): Boolean = { @@ -126,3 +126,4 @@ case class RewriteViewCommands(spark: SparkSession, materialized: Boolean) collectTempViews(child) } } +case class MaterializedViewOptions(storageTableIdentifier: Option[String]) diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala index 9f92556c19b7..5b04280863a8 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala @@ -28,29 +28,23 @@ import org.antlr.v4.runtime.tree.TerminalNodeImpl import org.apache.iceberg.common.DynConstructors import org.apache.iceberg.spark.ExtendedParser import org.apache.iceberg.spark.ExtendedParser.RawOrderField -import org.apache.iceberg.spark.Spark3Util -import org.apache.iceberg.spark.source.SparkTable import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.analysis.MaterializedViewOptions import org.apache.spark.sql.catalyst.analysis.RewriteViewCommands -import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.parser.extensions.IcebergSqlExtensionsParser.NonReservedContext import org.apache.spark.sql.catalyst.parser.extensions.IcebergSqlExtensionsParser.QuotedIdentifierContext import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.trees.Origin -import org.apache.spark.sql.connector.catalog.Table -import org.apache.spark.sql.connector.catalog.TableCatalog import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.VariableSubstitution import org.apache.spark.sql.types.DataType import org.apache.spark.sql.types.StructType import scala.jdk.CollectionConverters._ -import scala.util.Try class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserInterface with ExtendedParser { @@ -58,6 +52,9 @@ class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserI private lazy val substitutor = substitutorCtor.newInstance(SQLConf.get) private lazy val astBuilder = new IcebergSqlExtensionsAstBuilder(delegate) + private lazy final val CREATE_MATERIALIZED_VIEW_PATTERN = "(?i)(CREATE)\\s+MATERIALIZED\\s+(VIEW)".r + private lazy final val MATERIALIZED_VIEW_STORED_AS_PATTERN = "(?i)STORED AS\\s*'(\\w+)'\\s*".r + /** * Parse a string to a DataType. @@ -123,11 +120,12 @@ class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserI if (isIcebergCommand(sqlTextAfterSubstitution)) { parse(sqlTextAfterSubstitution) { parser => astBuilder.visit(parser.singleStatement()) }.asInstanceOf[LogicalPlan] } else if (isCreateMaterializedView(sqlText)) { - RewriteViewCommands(SparkSession.active, true).apply( - delegate.parsePlan(replaceCreateMaterializedViewWithCreateView(sqlText)) + val createMaterializedViewStatement = getCreateMaterializedViewStatement(sqlText) + RewriteViewCommands(SparkSession.active, Option(getMaterializedViewOptions(sqlText))).apply( + delegate.parsePlan(getCreateMaterializedViewStatement(sqlText)) ) } else { - RewriteViewCommands(SparkSession.active, false).apply(delegate.parsePlan(sqlText)) + RewriteViewCommands(SparkSession.active, None).apply(delegate.parsePlan(sqlText)) } } @@ -159,14 +157,18 @@ class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserI sqlText.toLowerCase.contains("create materialized view") } - def replaceCreateMaterializedViewWithCreateView(input: String): String = { - // Regex pattern to match "create materialized view" in a case-insensitive manner - val pattern = "(?i)create materialized view".r + private def getCreateMaterializedViewStatement(sqlText: String): String = { + val replace1 = CREATE_MATERIALIZED_VIEW_PATTERN.replaceAllIn(sqlText, m => m.group(1) + " " + m.group(2)) + MATERIALIZED_VIEW_STORED_AS_PATTERN.replaceAllIn(replace1, "") + } - // Replace all occurrences of the pattern with "create view" - pattern.replaceAllIn(input, "create view") + private def getMaterializedViewOptions(sqlText: String): MaterializedViewOptions = { + val storedAsPattern = "(?i)STORED AS\\s*'(\\w+)'\\s*".r + val storageTableIdentifier = storedAsPattern.findFirstMatchIn(sqlText).map(_.group(1)) + MaterializedViewOptions(storageTableIdentifier) } + private def isSnapshotRefDdl(normalized: String): Boolean = { normalized.contains("create branch") || normalized.contains("replace branch") || diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateIcebergView.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateIcebergView.scala index f9adb57a348c..a4134476cbd8 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateIcebergView.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateIcebergView.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.plans.logical.views +import org.apache.spark.sql.catalyst.analysis.MaterializedViewOptions import org.apache.spark.sql.catalyst.plans.logical.BinaryCommand import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -34,7 +35,7 @@ case class CreateIcebergView( allowExisting: Boolean, replace: Boolean, rewritten: Boolean = false, - materialized: Boolean = false) extends BinaryCommand { + materializedViewOptions: Option[MaterializedViewOptions]) extends BinaryCommand { override def left: LogicalPlan = child override def right: LogicalPlan = query diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateMaterializedViewExec.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateMaterializedViewExec.scala index 265ed95e33ae..bcff93dd1ab7 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateMaterializedViewExec.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateMaterializedViewExec.scala @@ -19,24 +19,22 @@ package org.apache.spark.sql.execution.datasources.v2 + import java.util.UUID -import org.apache.hadoop.conf.Configuration -import org.apache.iceberg -import org.apache.iceberg.FileFormat -import org.apache.iceberg.PartitionSpec -import org.apache.iceberg.hadoop.HadoopTables import org.apache.iceberg.relocated.com.google.common.base.Preconditions +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap import org.apache.iceberg.spark.MaterializedViewUtil -import org.apache.iceberg.spark.SparkSchemaUtil -import org.apache.iceberg.spark.SparkWriteOptions +import org.apache.iceberg.spark.Spark3Util +import org.apache.iceberg.spark.SparkCatalog import org.apache.iceberg.spark.source.SparkTable -import org.apache.spark.sql.SaveMode import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.connector.catalog.Table +import org.apache.spark.sql.connector.catalog.TableChange import org.apache.spark.sql.connector.catalog.ViewCatalog +import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.types.StructType import scala.collection.JavaConverters._ @@ -51,25 +49,36 @@ case class CreateMaterializedViewExec( comment: Option[String], properties: Map[String, String], allowExisting: Boolean, - replace: Boolean) extends LeafV2CommandExec { + replace: Boolean, + storageTableIdentifier: Option[String]) extends LeafV2CommandExec { override def output: Seq[Attribute] = Nil override protected def run(): Seq[InternalRow] = { - val viewLocation = properties.get("location") - Preconditions.checkArgument(viewLocation.isDefined) - - val storageTableLocation = viewLocation + "/storage/v1" + // Check if storageTableIdentifier is provided, if not, generate a default identifier + val sparkStorageTableIdentifier = storageTableIdentifier match { + case Some(identifier) => { + val catalogAndIdentifier = Spark3Util.catalogAndIdentifier(session, identifier) + val storageTableCatalogName = catalogAndIdentifier.catalog().name() + Preconditions.checkState( + storageTableCatalogName.equals(catalog.name()), + "Storage table identifier must be in the same catalog as the view." + + " Found storage table in catalog: %s, expected: %s", + Array[Object](storageTableCatalogName, catalog.name()) + ) + catalogAndIdentifier.identifier() + } + case None => MaterializedViewUtil.getDefaultMaterializedViewStorageTableIdentifier(ident) + } - // Create the storage table in the Hadoop catalog so it is explicitly registered in the Spark catalog - val tables: HadoopTables = new HadoopTables(new Configuration()) - val icebergSchema = SparkSchemaUtil.convert(viewSchema) // TODO: Add support for partitioning the storage table - val spec: PartitionSpec = PartitionSpec.builderFor(icebergSchema).build - - val table: iceberg.Table = tables.create(icebergSchema, spec, storageTableLocation) + catalog.asInstanceOf[SparkCatalog].createTable( + sparkStorageTableIdentifier, + viewSchema, new Array[Transform](0), ImmutableMap.of[String, String]() + ) + // Capture base table state before inserting into the storage table val baseTables = MaterializedViewUtil.extractBaseTables(queryText).asScala.toList val baseTableSnapshots = getBaseTableSnapshots(baseTables) val baseTableSnapshotsProperties = baseTableSnapshots.map{ @@ -78,19 +87,16 @@ case class CreateMaterializedViewExec( ) -> value.toString } - session.sql(queryText).write.format("iceberg").option( - SparkWriteOptions.WRITE_FORMAT, FileFormat.PARQUET.toString - ).mode(SaveMode.Append).save(storageTableLocation) - - val updateProperties = table.updateProperties() - baseTableSnapshotsProperties.foreach { - case (key, value) => updateProperties.set(key, value) - } - updateProperties.commit() + // Insert into the storage table + session.sql("INSERT INTO " + sparkStorageTableIdentifier + " " + queryText) - table.refresh() + // Update the base table snapshots properties + val baseTablePropertyChanges = baseTableSnapshotsProperties.map{ + case (key, value) => TableChange.setProperty(key, value) + }.toArray - createMaterializedView(storageTableLocation) + catalog.asInstanceOf[SparkCatalog].alterTable(sparkStorageTableIdentifier, baseTablePropertyChanges:_*) + createMaterializedView(sparkStorageTableIdentifier.toString) Nil } @@ -98,7 +104,7 @@ case class CreateMaterializedViewExec( s"CreateMaterializedViewExec: ${ident}" } - private def createMaterializedView(storageTableLocation: String): Unit = { + private def createMaterializedView(storageTableIdentifier: String): Unit = { val currentCatalogName = session.sessionState.catalogManager.currentCatalog.name val currentCatalog = if (!catalog.name().equals(currentCatalogName)) currentCatalogName else null val currentNamespace = session.sessionState.catalogManager.currentNamespace @@ -109,7 +115,7 @@ case class CreateMaterializedViewExec( (ViewCatalog.PROP_CREATE_ENGINE_VERSION -> engineVersion, ViewCatalog.PROP_ENGINE_VERSION -> engineVersion) + (MaterializedViewUtil.MATERIALIZED_VIEW_PROPERTY_KEY -> "true") + - (MaterializedViewUtil.MATERIALIZED_VIEW_STORAGE_LOCATION_PROPERTY_KEY -> storageTableLocation) + (MaterializedViewUtil.MATERIALIZED_VIEW_STORAGE_TABLE_PROPERTY_KEY -> storageTableIdentifier) if (replace) { // CREATE OR REPLACE VIEW diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropV2ViewExec.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropV2ViewExec.scala index 66ac78eff39d..6242c6b874bf 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropV2ViewExec.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropV2ViewExec.scala @@ -19,14 +19,14 @@ package org.apache.spark.sql.execution.datasources.v2 -import org.apache.hadoop.conf.Configuration import org.apache.iceberg.catalog.Namespace import org.apache.iceberg.catalog.TableIdentifier import org.apache.iceberg.exceptions -import org.apache.iceberg.hadoop.HadoopTables import org.apache.iceberg.spark.MaterializedViewUtil +import org.apache.iceberg.spark.Spark3Util import org.apache.iceberg.spark.SparkCatalog import org.apache.iceberg.view.View +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.NoSuchViewException import org.apache.spark.sql.catalyst.expressions.Attribute @@ -63,10 +63,12 @@ case class DropV2ViewExec( )).getOrElse("false").equals("true")) { // get the storage table location then drop the storage table val storageTableLocation = viewProperties.get( - MaterializedViewUtil.MATERIALIZED_VIEW_STORAGE_LOCATION_PROPERTY_KEY + MaterializedViewUtil.MATERIALIZED_VIEW_STORAGE_TABLE_PROPERTY_KEY ) - val tables: HadoopTables = new HadoopTables(new Configuration()) - tables.dropTable(storageTableLocation) + val storageTableIdentifier = Spark3Util.catalogAndIdentifier( + SparkSession.active, storageTableLocation).identifier() + // get active spark session + catalog.asInstanceOf[SparkCatalog].dropTable(storageTableIdentifier) } } case _ => diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala index 24ce4a87a26b..388ed5a91948 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala @@ -108,9 +108,9 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi case DropIcebergView(ResolvedIdentifier(viewCatalog: ViewCatalog, ident), ifExists) => DropV2ViewExec(viewCatalog, ident, ifExists) :: Nil - case CreateIcebergView(ResolvedIdentifier(viewCatalog: ViewCatalog, ident), queryText, query, - columnAliases, columnComments, queryColumnNames, comment, properties, allowExisting, replace, _, materialized) => - if (materialized) { + case CreateIcebergView(ResolvedIdentifier(viewCatalog: ViewCatalog, ident), + queryText, query, columnAliases, columnComments, queryColumnNames, comment, properties, allowExisting, replace, _, + Some(materializedViewOptions)) => CreateMaterializedViewExec( catalog = viewCatalog, ident = ident, @@ -122,21 +122,23 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi comment = comment, properties = properties, allowExisting = allowExisting, - replace = replace) :: Nil - } else { - CreateV2ViewExec( - catalog = viewCatalog, - ident = ident, - queryText = queryText, - columnAliases = columnAliases, - columnComments = columnComments, - queryColumnNames = queryColumnNames, - viewSchema = query.schema, - comment = comment, - properties = properties, - allowExisting = allowExisting, - replace = replace) :: Nil - } + replace = replace, + storageTableIdentifier = materializedViewOptions.storageTableIdentifier) :: Nil + + case CreateIcebergView(ResolvedIdentifier(viewCatalog: ViewCatalog, ident), queryText, query, + columnAliases, columnComments, queryColumnNames, comment, properties, allowExisting, replace, _, None) => + CreateV2ViewExec( + catalog = viewCatalog, + ident = ident, + queryText = queryText, + columnAliases = columnAliases, + columnComments = columnComments, + queryColumnNames = queryColumnNames, + viewSchema = query.schema, + comment = comment, + properties = properties, + allowExisting = allowExisting, + replace = replace) :: Nil case _ => Nil } diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMaterializedViews.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMaterializedViews.java index 99e0cefd7048..f71fbce9a35c 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMaterializedViews.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMaterializedViews.java @@ -26,12 +26,20 @@ import java.io.IOException; import java.nio.file.Files; import java.util.Map; -import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.TableOperations; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.inmemory.InMemoryCatalog; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.spark.MaterializedViewUtil; -import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.SparkCatalogConfig; +import org.apache.iceberg.spark.source.SparkMaterializedView; +import org.apache.iceberg.spark.source.SparkView; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.apache.spark.sql.catalyst.analysis.NoSuchViewException; import org.apache.spark.sql.connector.catalog.CatalogPlugin; @@ -65,15 +73,30 @@ public void removeTable() { @Parameterized.Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}") public static Object[][] parameters() { + Map properties = + Maps.newHashMap(SparkCatalogConfig.SPARK_WITH_MATERIALIZED_VIEWS.properties()); + properties.put(CatalogProperties.WAREHOUSE_LOCATION, "file:" + getTempWarehouseDir()); + properties.put(CatalogProperties.CATALOG_IMPL, InMemoryCatalogWithLocalFileIO.class.getName()); return new Object[][] { { - SparkCatalogConfig.SPARK_WITH_VIEWS.catalogName(), - SparkCatalogConfig.SPARK_WITH_VIEWS.implementation(), - SparkCatalogConfig.SPARK_WITH_VIEWS.properties() + SparkCatalogConfig.SPARK_WITH_MATERIALIZED_VIEWS.catalogName(), + SparkCatalogConfig.SPARK_WITH_MATERIALIZED_VIEWS.implementation(), + properties } }; } + private static String getTempWarehouseDir() { + try { + File tempDir = Files.createTempDirectory("warehouse-").toFile(); + tempDir.delete(); + return tempDir.getAbsolutePath(); + + } catch (IOException e) { + throw new RuntimeException(e); + } + } + public TestMaterializedViews( String catalog, String implementation, Map properties) { super(catalog, implementation, properties); @@ -81,35 +104,24 @@ public TestMaterializedViews( @Test public void assertReadFromStorageTableWhenFresh() throws IOException { - File location = Files.createTempDirectory("materialized-view-test").toFile(); sql("DROP VIEW IF EXISTS %s", materializedViewName); - sql( - "CREATE MATERIALIZED VIEW %s TBLPROPERTIES ('location' = '%s') AS SELECT id, data FROM %s", - materializedViewName, location.getAbsolutePath(), tableName); + sql("CREATE MATERIALIZED VIEW %s AS SELECT id, data FROM %s", materializedViewName, tableName); // Assert that number of records in the materialized view is the same as the number of records // in the table assertThat(sql("SELECT * FROM %s", materializedViewName).size()) .isEqualTo(sql("SELECT * FROM %s", tableName).size()); - // Assert that the catalog loadView method returns NoSuchViewException because the view is fresh - assertThatThrownBy( - () -> - sparkViewCatalog() - .loadView(Identifier.of(new String[] {"default"}, materializedViewName))) - .isInstanceOf(NoSuchViewException.class); + // Assert that the catalog loadView method throws IllegalStateException because the view is + // fresh + assertThatThrownBy(() -> sparkViewCatalog().loadView(viewIdentifier())) + .isInstanceOf(IllegalStateException.class); - // Assert that the catalog loadTable method returns the materialized view storage table + // Assert that the catalog loadTable method returns an object, and its type is + // SparkMaterializedView try { - assertThat( - sparkTableCatalog() - .loadTable(Identifier.of(new String[] {"default"}, materializedViewName)) - .name()) - .isEqualTo( - icebergViewCatalog() - .loadView(TableIdentifier.of("default", materializedViewName)) - .properties() - .get(MaterializedViewUtil.MATERIALIZED_VIEW_STORAGE_LOCATION_PROPERTY_KEY)); + assertThat(sparkTableCatalog().loadTable(viewIdentifier())) + .isInstanceOf(SparkMaterializedView.class); } catch (NoSuchTableException e) { fail("Materialized view storage table not found"); } @@ -117,10 +129,7 @@ public void assertReadFromStorageTableWhenFresh() throws IOException { @Test public void assertNotReadFromStorageTableWhenStale() throws IOException { - File location = Files.createTempDirectory("materialized-view-test").toFile(); - sql( - "CREATE MATERIALIZED VIEW %s TBLPROPERTIES ('location' = '%s') AS SELECT id, data FROM %s", - materializedViewName, location.getAbsolutePath(), tableName); + sql("CREATE MATERIALIZED VIEW %s AS SELECT id, data FROM %s", materializedViewName, tableName); // Insert one row to the table so the materialized view becomes stale sql("INSERT INTO %s VALUES (1, 'a')", tableName); @@ -130,37 +139,40 @@ public void assertNotReadFromStorageTableWhenStale() throws IOException { assertThat(sql("SELECT * FROM %s", materializedViewName).size()) .isEqualTo(sql("SELECT * FROM %s", tableName).size()); - // Assert that the catalog loadView method returns the view object + // Assert that the catalog loadView method returns an object, and of type SparkView try { - assertThat( - sparkViewCatalog() - .loadView(Identifier.of(new String[] {"default"}, materializedViewName)) - .name()) - .isEqualTo( - icebergViewCatalog() - .loadView(TableIdentifier.of("default", materializedViewName)) - .name()); + assertThat(sparkViewCatalog().loadView(viewIdentifier())).isInstanceOf(SparkView.class); } catch (NoSuchViewException e) { fail("Materialized view not found"); } // Assert that the catalog loadTable fails with NoSuchTableException because the view is stale - assertThatThrownBy( - () -> - sparkTableCatalog() - .loadTable(Identifier.of(new String[] {"default"}, materializedViewName))) + assertThatThrownBy(() -> sparkTableCatalog().loadTable(viewIdentifier())) .isInstanceOf(NoSuchTableException.class); } @Test - public void assertShowTablesDoesNotShowStorageTable() throws IOException { - File location = Files.createTempDirectory("materialized-view-test").toFile(); + public void testDefaultStorageTableIdentifier() { + sql("CREATE MATERIALIZED VIEW %s AS SELECT id, data FROM %s", materializedViewName, tableName); + + // Assert that the storage table is in the list of tables + final String materializedViewStorageTableName = + MaterializedViewUtil.getDefaultMaterializedViewStorageTableIdentifier( + Identifier.of(new String[] {NAMESPACE.toString()}, materializedViewName)) + .name(); + assertThat(sql("SHOW TABLES")) + .anySatisfy(row -> assertThat(row[1]).isEqualTo(materializedViewStorageTableName)); + } + + @Test + public void testStoredAsClause() { + String customTableName = "custom_table_name"; sql( - "CREATE MATERIALIZED VIEW %s TBLPROPERTIES ('location' = '%s') AS SELECT id, data FROM %s", - materializedViewName, location.getAbsolutePath(), tableName); + "CREATE MATERIALIZED VIEW %s STORED AS '%s' AS SELECT id, data FROM %s", + materializedViewName, customTableName, tableName); - // Assert that the storage table is not shown in the list of tables - assertThat(sql("SHOW TABLES").size() == 2); + // Assert that the storage table with the custom name is in the list of tables + assertThat(sql("SHOW TABLES")).anySatisfy(row -> assertThat(row[1]).isEqualTo(customTableName)); } private ViewCatalog sparkViewCatalog() { @@ -173,10 +185,49 @@ private TableCatalog sparkTableCatalog() { return (TableCatalog) catalogPlugin; } - private org.apache.iceberg.catalog.ViewCatalog icebergViewCatalog() { - Catalog icebergCatalog = Spark3Util.loadIcebergCatalog(spark, catalogName); - assertThat(icebergCatalog).isInstanceOf(org.apache.iceberg.catalog.ViewCatalog.class); - return (org.apache.iceberg.catalog.ViewCatalog) icebergCatalog; + private Identifier viewIdentifier() { + return Identifier.of(new String[] {NAMESPACE.toString()}, materializedViewName); + } + + // Required to be public since it is loaded by org.apache.iceberg.CatalogUtil.loadCatalog + public static class InMemoryCatalogWithLocalFileIO extends InMemoryCatalog { + private FileIO localFileIO; + + @Override + public void initialize(String name, Map properties) { + super.initialize(name, properties); + localFileIO = new LocalFileIO(); + } + + @Override + protected TableOperations newTableOps(TableIdentifier tableIdentifier) { + return new InMemoryTableOperations(localFileIO, tableIdentifier); + } + + @Override + protected InMemoryCatalog.InMemoryViewOperations newViewOps(TableIdentifier identifier) { + return new InMemoryViewOperations(localFileIO, identifier); + } + } + + private static class LocalFileIO implements FileIO { + + @Override + public InputFile newInputFile(String path) { + return org.apache.iceberg.Files.localInput(path); + } + + @Override + public OutputFile newOutputFile(String path) { + return org.apache.iceberg.Files.localOutput(path); + } + + @Override + public void deleteFile(String path) { + if (!new File(path).delete()) { + throw new RuntimeIOException("Failed to delete file: " + path); + } + } } // TODO Add DROP MATERIALIZED VIEW test diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/MaterializedViewUtil.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/MaterializedViewUtil.java index f6dc101fe4fb..3d4114a1afe0 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/MaterializedViewUtil.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/MaterializedViewUtil.java @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation; import org.apache.spark.sql.catalyst.parser.ParseException; import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; +import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.Table; import org.apache.spark.sql.connector.catalog.TableCatalog; import scala.collection.JavaConverters; @@ -36,9 +37,11 @@ public class MaterializedViewUtil { private MaterializedViewUtil() {} public static final String MATERIALIZED_VIEW_PROPERTY_KEY = "iceberg.materialized.view"; - public static final String MATERIALIZED_VIEW_STORAGE_LOCATION_PROPERTY_KEY = - "iceberg.materialized.view.storage.location"; - public static final String MATERIALIZED_VIEW_BASE_SNAPSHOT_PROPERTY_KEY_PREFIX = "base.snapshot."; + public static final String MATERIALIZED_VIEW_STORAGE_TABLE_PROPERTY_KEY = + "iceberg.materialized.view.storage.table"; + public static final String MATERIALIZED_VIEW_BASE_SNAPSHOT_PROPERTY_KEY_PREFIX = + "iceberg.base.snapshot."; + private static final String MATERIALIZED_VIEW_STORAGE_TABLE_IDENTIFIER_SUFFIX = ".storage.table"; public static List
extractBaseTables(String query) { return extractBaseTableIdentifiers(query).stream() @@ -88,4 +91,11 @@ public static Optional
toSparkTable(List multipartIdent) { } return Optional.empty(); } + + public static Identifier getDefaultMaterializedViewStorageTableIdentifier( + Identifier viewIdentifier) { + return Identifier.of( + viewIdentifier.namespace(), + viewIdentifier.name() + MATERIALIZED_VIEW_STORAGE_TABLE_IDENTIFIER_SUFFIX); + } } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java index 237539dc502e..12965c4f10c9 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.StringJoiner; import java.util.TreeMap; @@ -77,6 +78,7 @@ import org.apache.spark.sql.catalyst.analysis.NoSuchViewException; import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException; +import org.apache.spark.sql.catalyst.parser.ParseException; import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.NamespaceChange; import org.apache.spark.sql.connector.catalog.StagedTable; @@ -550,17 +552,11 @@ public View loadView(Identifier ident) throws NoSuchViewException { if (null != asViewCatalog) { try { org.apache.iceberg.view.View view = asViewCatalog.loadView(buildIdentifier(ident)); - // Check if the view is a materialized view. If it is, and storage table is fresh, return - // NoSuchViewException so - // loadTable is attempted instead. - if (view.properties() - .get(MaterializedViewUtil.MATERIALIZED_VIEW_PROPERTY_KEY) - .equals("true")) { - if (isFresh(view)) { - throw new NoSuchViewException(ident); - } else { - return new SparkView(catalogName, view); - } + // Check if the view is a materialized view. If it is, and storage table is fresh, throw + // IllegalStateException + if (isMaterializedView(view) && isFresh(view)) { + throw new IllegalStateException( + "Materialized view is fresh. loadTable should be attempted instead."); } else { return new SparkView(catalogName, view); } @@ -572,46 +568,67 @@ public View loadView(Identifier ident) throws NoSuchViewException { throw new NoSuchViewException(ident); } - private boolean isFresh(org.apache.iceberg.view.View view) { + // Candidate to be moved to org.apache.iceberg.view.View + private boolean isMaterializedView(org.apache.iceberg.view.View view) { + return Optional.of(view.properties().get(MaterializedViewUtil.MATERIALIZED_VIEW_PROPERTY_KEY)) + .orElse("false") + .equals("true"); + } + + // Candidate to be moved to org.apache.iceberg.view.View + private String getStorageTableIdentifier(org.apache.iceberg.view.View view) { + String identifier = + view.properties().get(MaterializedViewUtil.MATERIALIZED_VIEW_STORAGE_TABLE_PROPERTY_KEY); Preconditions.checkState( - view.properties().get(MaterializedViewUtil.MATERIALIZED_VIEW_PROPERTY_KEY).equals("true"), - "Cannot check freshness of non-materialized view."); - String storageTableLocation = - view.properties().get(MaterializedViewUtil.MATERIALIZED_VIEW_STORAGE_LOCATION_PROPERTY_KEY); + identifier != null, "Storage table identifier is not set for materialized view."); + return identifier; + } + + // Candidate to be moved to org.apache.iceberg.view.View but requires loadTable + private Table loadStorageTable(org.apache.iceberg.view.View view) { + String storageTableIdentifier = getStorageTableIdentifier(view); try { - Table storageTable = loadTable(new PathIdentifier(storageTableLocation)); - Map baseTableSnapshotsProperties = - storageTable.properties().entrySet().stream() - .filter( - entry -> - entry - .getKey() - .startsWith( - MaterializedViewUtil - .MATERIALIZED_VIEW_BASE_SNAPSHOT_PROPERTY_KEY_PREFIX)) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - List
baseTables = MaterializedViewUtil.extractBaseTables(view.sqlFor("spark").sql()); - - for (Table baseTable : baseTables) { - org.apache.iceberg.Table icebergBaseTable = ((SparkTable) baseTable).table(); - String snapshotId = - String.valueOf( - icebergBaseTable.currentSnapshot() == null - ? 0 - : icebergBaseTable.currentSnapshot().snapshotId()); - if (!baseTableSnapshotsProperties - .get( - MaterializedViewUtil.MATERIALIZED_VIEW_BASE_SNAPSHOT_PROPERTY_KEY_PREFIX - + icebergBaseTable.uuid()) - .equals(snapshotId)) { - return false; - } + SparkSession session = SparkSession.active(); + Table storageTable = + loadTable(Spark3Util.catalogAndIdentifier(session, storageTableIdentifier).identifier()); + return storageTable; + } catch (ParseException | NoSuchTableException e) { + throw new IllegalStateException("Unable to load storage table for materialized view.", e); + } + } + + // Candidate to be moved to org.apache.iceberg.view.View but requires loadTable + // Second option is to move to SparkMaterializedView + private boolean isFresh(org.apache.iceberg.view.View view) { + Table storageTable = loadStorageTable(view); + Map baseTableSnapshotsProperties = + storageTable.properties().entrySet().stream() + .filter( + entry -> + entry + .getKey() + .startsWith( + MaterializedViewUtil + .MATERIALIZED_VIEW_BASE_SNAPSHOT_PROPERTY_KEY_PREFIX)) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + List
baseTables = MaterializedViewUtil.extractBaseTables(view.sqlFor("spark").sql()); + + for (Table baseTable : baseTables) { + org.apache.iceberg.Table icebergBaseTable = ((SparkTable) baseTable).table(); + String snapshotId = + String.valueOf( + icebergBaseTable.currentSnapshot() == null + ? 0 + : icebergBaseTable.currentSnapshot().snapshotId()); + if (!baseTableSnapshotsProperties + .get( + MaterializedViewUtil.MATERIALIZED_VIEW_BASE_SNAPSHOT_PROPERTY_KEY_PREFIX + + icebergBaseTable.uuid()) + .equals(snapshotId)) { + return false; } - return true; - } catch (NoSuchTableException e) { - throw new IllegalStateException( - "Could not load materialized view storage table from catalog.", e); } + return true; } @Override @@ -648,16 +665,9 @@ public View createView( .withLocation(properties.get("location")) .withProperties(props) .create(); - if (props.get(MaterializedViewUtil.MATERIALIZED_VIEW_PROPERTY_KEY).equals("true")) { - String storageTableLocation = - properties.get(MaterializedViewUtil.MATERIALIZED_VIEW_STORAGE_LOCATION_PROPERTY_KEY); - try { - Table storageTable = loadTable(new PathIdentifier(storageTableLocation)); - return new SparkMaterializedView(catalogName, view, storageTable); - } catch (NoSuchTableException e) { - throw new IllegalStateException( - "Could not load materialized view storage table from catalog.", e); - } + if (isMaterializedView(view)) { + Table storageTable = loadStorageTable(view); + return new SparkMaterializedView(catalogName, view, storageTable); } else { return new SparkView(catalogName, view); } @@ -826,18 +836,9 @@ private Table load(Identifier ident) { if (null != asViewCatalog) { try { org.apache.iceberg.view.View view = asViewCatalog.loadView(buildIdentifier(ident)); - if (view.properties() - .get(MaterializedViewUtil.MATERIALIZED_VIEW_PROPERTY_KEY) - .equals("true")) { - if (isFresh(view)) { - String storageTableLocation = - view.properties() - .get(MaterializedViewUtil.MATERIALIZED_VIEW_STORAGE_LOCATION_PROPERTY_KEY); - return new SparkMaterializedView( - catalogName, - view, - loadFromPathIdentifier(new PathIdentifier(storageTableLocation))); - } + if (isMaterializedView(view) && isFresh(view)) { + Table storageTable = loadStorageTable(view); + return new SparkMaterializedView(catalogName, view, storageTable); } } catch (org.apache.iceberg.exceptions.NoSuchViewException e) { // Ignore. Just process as a normal table. diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/SparkCatalogConfig.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/SparkCatalogConfig.java index abfd7da0c7bd..720614ac76f6 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/SparkCatalogConfig.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/SparkCatalogConfig.java @@ -53,7 +53,11 @@ public enum SparkCatalogConfig { "default-namespace", "default", "cache-enabled", - "false")); + "false")), + SPARK_WITH_MATERIALIZED_VIEWS( + "spark_with_materialized_views", + SparkCatalog.class.getName(), + ImmutableMap.of("default-namespace", "default", "cache-enabled", "false")); private final String catalogName; private final String implementation; diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/SparkTestBaseWithCatalog.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/SparkTestBaseWithCatalog.java index d5708c9e575e..012e57fea80e 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/SparkTestBaseWithCatalog.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/SparkTestBaseWithCatalog.java @@ -31,6 +31,7 @@ import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.PropertyUtil; import org.junit.AfterClass; import org.junit.Assert; @@ -75,8 +76,13 @@ public SparkTestBaseWithCatalog(SparkCatalogConfig config) { public SparkTestBaseWithCatalog( String catalogName, String implementation, Map config) { + if (catalogName.equals("spark_with_materialized_views")) { + this.catalogConfig = Maps.newHashMap(config); + this.catalogConfig.put(CatalogProperties.WAREHOUSE_LOCATION, "file:" + warehouse); + } else { + this.catalogConfig = config; + } this.catalogName = catalogName; - this.catalogConfig = config; this.validationCatalog = catalogName.equals("testhadoop") ? new HadoopCatalog(spark.sessionState().newHadoopConf(), "file:" + warehouse) From a9e1bee3b5bf5914e5330d3b195042aea33868c9 Mon Sep 17 00:00:00 2001 From: Walaa Eldin Moustafa Date: Tue, 12 Mar 2024 22:38:22 -0700 Subject: [PATCH 3/3] Add support for replacing view version --- .../iceberg/view/ViewVersionReplace.java | 9 --- .../v2/CreateMaterializedViewExec.scala | 76 ++++++++++++------- .../iceberg/spark/MaterializedViewUtil.java | 2 + .../apache/iceberg/spark/SparkCatalog.java | 28 +++++-- 4 files changed, 73 insertions(+), 42 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/view/ViewVersionReplace.java b/core/src/main/java/org/apache/iceberg/view/ViewVersionReplace.java index 0150fd2a2a44..8b3d087940a5 100644 --- a/core/src/main/java/org/apache/iceberg/view/ViewVersionReplace.java +++ b/core/src/main/java/org/apache/iceberg/view/ViewVersionReplace.java @@ -28,7 +28,6 @@ import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT; import java.util.List; -import java.util.Optional; import org.apache.iceberg.EnvironmentContext; import org.apache.iceberg.Schema; import org.apache.iceberg.catalog.Namespace; @@ -57,14 +56,6 @@ public ViewVersion apply() { } ViewMetadata internalApply() { - // Replacing a materialized view is not supported because the old storage location will wrongly - // transfer to the new version - // if not handled properly. - Preconditions.checkState( - Optional.ofNullable(base.properties().get("iceberg.materialized.view")) - .orElse("false") - .equals("false"), - "Cannot replace a materialized view with a new version"); Preconditions.checkState( !representations.isEmpty(), "Cannot replace view without specifying a query"); Preconditions.checkState(null != schema, "Cannot replace view without specifying schema"); diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateMaterializedViewExec.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateMaterializedViewExec.scala index bcff93dd1ab7..9b72a9d4e99b 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateMaterializedViewExec.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateMaterializedViewExec.scala @@ -27,12 +27,14 @@ import org.apache.iceberg.spark.MaterializedViewUtil import org.apache.iceberg.spark.Spark3Util import org.apache.iceberg.spark.SparkCatalog import org.apache.iceberg.spark.source.SparkTable +import org.apache.iceberg.spark.source.SparkView import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.connector.catalog.Table import org.apache.spark.sql.connector.catalog.TableChange +import org.apache.spark.sql.connector.catalog.View import org.apache.spark.sql.connector.catalog.ViewCatalog import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.types.StructType @@ -72,31 +74,42 @@ case class CreateMaterializedViewExec( case None => MaterializedViewUtil.getDefaultMaterializedViewStorageTableIdentifier(ident) } - // TODO: Add support for partitioning the storage table - catalog.asInstanceOf[SparkCatalog].createTable( - sparkStorageTableIdentifier, - viewSchema, new Array[Transform](0), ImmutableMap.of[String, String]() - ) - - // Capture base table state before inserting into the storage table - val baseTables = MaterializedViewUtil.extractBaseTables(queryText).asScala.toList - val baseTableSnapshots = getBaseTableSnapshots(baseTables) - val baseTableSnapshotsProperties = baseTableSnapshots.map{ - case (key, value) => ( - MaterializedViewUtil.MATERIALIZED_VIEW_BASE_SNAPSHOT_PROPERTY_KEY_PREFIX + key.toString - ) -> value.toString - } + val view = createView(sparkStorageTableIdentifier.toString) + + view match { + case Some(v) => { + // TODO: Add support for partitioning the storage table + catalog.asInstanceOf[SparkCatalog].createTable( + sparkStorageTableIdentifier, + viewSchema, new Array[Transform](0), ImmutableMap.of[String, String]() + ) + + // Capture base table state before inserting into the storage table + val baseTables = MaterializedViewUtil.extractBaseTables(queryText).asScala.toList + val baseTableSnapshots = getBaseTableSnapshots(baseTables) + val baseTableSnapshotsProperties = baseTableSnapshots.map { + case (key, value) => ( + MaterializedViewUtil.MATERIALIZED_VIEW_BASE_SNAPSHOT_PROPERTY_KEY_PREFIX + key.toString + ) -> value.toString + } + + + val storageTableProperties = baseTableSnapshotsProperties + + (MaterializedViewUtil.MATERIALIZED_VIEW_VERSION_PROPERTY_KEY -> getViewVersion(v).toString) + + // Insert into the storage table + session.sql("INSERT INTO " + sparkStorageTableIdentifier + " " + queryText) - // Insert into the storage table - session.sql("INSERT INTO " + sparkStorageTableIdentifier + " " + queryText) - // Update the base table snapshots properties - val baseTablePropertyChanges = baseTableSnapshotsProperties.map{ - case (key, value) => TableChange.setProperty(key, value) - }.toArray + // Update the storage table properties + val storageTablePropertyChanges = storageTableProperties.map { + case (key, value) => TableChange.setProperty(key, value) + }.toArray - catalog.asInstanceOf[SparkCatalog].alterTable(sparkStorageTableIdentifier, baseTablePropertyChanges:_*) - createMaterializedView(sparkStorageTableIdentifier.toString) + catalog.asInstanceOf[SparkCatalog].alterTable(sparkStorageTableIdentifier, storageTablePropertyChanges: _*) + } + case None => + } Nil } @@ -104,7 +117,7 @@ case class CreateMaterializedViewExec( s"CreateMaterializedViewExec: ${ident}" } - private def createMaterializedView(storageTableIdentifier: String): Unit = { + private def createView(storageTableIdentifier: String): Option[View] = { val currentCatalogName = session.sessionState.catalogManager.currentCatalog.name val currentCatalog = if (!catalog.name().equals(currentCatalogName)) currentCatalogName else null val currentNamespace = session.sessionState.catalogManager.currentNamespace @@ -123,7 +136,7 @@ case class CreateMaterializedViewExec( catalog.dropView(ident) } // FIXME: replaceView API doesn't exist in Spark 3.5 - catalog.createView( + val view = catalog.createView( ident, queryText, currentCatalog, @@ -133,10 +146,11 @@ case class CreateMaterializedViewExec( columnAliases.toArray, columnComments.map(c => c.orNull).toArray, newProperties.asJava) + Some(view) } else { try { // CREATE VIEW [IF NOT EXISTS] - catalog.createView( + val view = catalog.createView( ident, queryText, currentCatalog, @@ -146,9 +160,10 @@ case class CreateMaterializedViewExec( columnAliases.toArray, columnComments.map(c => c.orNull).toArray, newProperties.asJava) + Some(view) } catch { // TODO: Make sure the existing view is also a materialized view - case _: ViewAlreadyExistsException if allowExisting => // Ignore + case _: ViewAlreadyExistsException if allowExisting => None } } } @@ -163,4 +178,13 @@ case class CreateMaterializedViewExec( throw new UnsupportedOperationException("Only Spark tables are supported") }.toMap } + + private def getViewVersion(view: View): Long = { + view match { + case sparkView: SparkView => + sparkView.view().currentVersion().versionId() + case _ => + throw new UnsupportedOperationException("Only Spark views are supported") + } + } } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/MaterializedViewUtil.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/MaterializedViewUtil.java index 3d4114a1afe0..633b705eb37a 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/MaterializedViewUtil.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/MaterializedViewUtil.java @@ -41,6 +41,8 @@ private MaterializedViewUtil() {} "iceberg.materialized.view.storage.table"; public static final String MATERIALIZED_VIEW_BASE_SNAPSHOT_PROPERTY_KEY_PREFIX = "iceberg.base.snapshot."; + public static final String MATERIALIZED_VIEW_VERSION_PROPERTY_KEY = + "iceberg.materialized.view.version"; private static final String MATERIALIZED_VIEW_STORAGE_TABLE_IDENTIFIER_SUFFIX = ".storage.table"; public static List
extractBaseTables(String query) { diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java index 12965c4f10c9..cab2b4ce2a10 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java @@ -601,8 +601,27 @@ private Table loadStorageTable(org.apache.iceberg.view.View view) { // Second option is to move to SparkMaterializedView private boolean isFresh(org.apache.iceberg.view.View view) { Table storageTable = loadStorageTable(view); + Map storageTableProperties = storageTable.properties(); + + // Get the parent view version id from the storage table properties + String storageTableViewVersionIdPropertyValue = + storageTableProperties.get( + MaterializedViewUtil.MATERIALIZED_VIEW_VERSION_PROPERTY_KEY); + if (storageTableViewVersionIdPropertyValue == null) { + throw new IllegalStateException( + "Storage table properties do not contain the virtual view version id property."); + } + int storageTableViewVersionId = Integer.parseInt(storageTableViewVersionIdPropertyValue); + + // If the storage table view version id is different from the current version id, the + // materialized view is not fresh + if (storageTableViewVersionId != view.currentVersion().versionId()) { + return false; + } + + // Get the base table snapshot ids from the storage table properties Map baseTableSnapshotsProperties = - storageTable.properties().entrySet().stream() + storageTableProperties.entrySet().stream() .filter( entry -> entry @@ -665,12 +684,7 @@ public View createView( .withLocation(properties.get("location")) .withProperties(props) .create(); - if (isMaterializedView(view)) { - Table storageTable = loadStorageTable(view); - return new SparkMaterializedView(catalogName, view, storageTable); - } else { - return new SparkView(catalogName, view); - } + return new SparkView(catalogName, view); } catch (org.apache.iceberg.exceptions.NoSuchNamespaceException e) { throw new NoSuchNamespaceException(currentNamespace); } catch (AlreadyExistsException e) {