-
Notifications
You must be signed in to change notification settings - Fork 3k
Views, Spark: Add support for Materialized Views; Integrate with Spark SQL #9830
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| ViewMetadata internalApply() { | ||
| // Replacing a materialized view is not supported because the old storage location will wrongly | ||
| // transfer to the new version | ||
| // if not handled properly. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we will want to allow this by adding the view version ID to metadata in the table. If you load the view, then load the table and the version doesn't match between them then the table cannot be used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, just keeping it out of scope of this PR. Let me know if we should keep in this PR's scope. (also agree that we can change the comment in case we leave it out of scope).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this relates to the spec, I have decided to add this support. Now there is a new view version property materialized.view.version.id that is tracked at the table level and it is factored into freshness evaluation.
| MaterializedViewUtil | ||
| .MATERIALIZED_VIEW_BASE_SNAPSHOT_PROPERTY_KEY_PREFIX)) | ||
| .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); | ||
| List<Table> baseTables = MaterializedViewUtil.extractBaseTables(view.sqlFor("spark").sql()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suppose view A references view B and you materialize view A. Does the base tables include view B? This is because a change in SQL definition to view B would also invalidate the MV. I'm not sure if the baseTables also includes views in the plan?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Base tables do not include view B, but rather its (leaf) base tables. That is a good point. We should track view B version as well. In the current patch, we are tracking the parent view version ID, but we should do the same for the children views (if any).
| "iceberg.base.snapshot."; | ||
| public static final String MATERIALIZED_VIEW_VERSION_PROPERTY_KEY = | ||
| "iceberg.materialized.view.version"; | ||
| private static final String MATERIALIZED_VIEW_STORAGE_TABLE_IDENTIFIER_SUFFIX = ".storage.table"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this work with SparkSessionCatalog which requires single-part namespace?
|
|
||
| override protected def run(): Seq[InternalRow] = { | ||
| catalog.loadTable(ident) match { | ||
| catalog |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Redundant change
| icebergBaseTable.currentSnapshot() == null | ||
| ? 0 | ||
| : icebergBaseTable.currentSnapshot().snapshotId()); | ||
| if (!baseTableSnapshotsProperties |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could also be optimized a bit to not count base table snapshot changes where the DataOperations is REPLACE.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
was wondering if it's not too much, does it makes sense to have freshness check, configurable ? This would kinda give view creator more control on the freshness check and avoid un-necessary re-loading only when the current snapshot id changed ?
| .get( | ||
| MaterializedViewUtil.MATERIALIZED_VIEW_BASE_SNAPSHOT_PROPERTY_KEY_PREFIX | ||
| + icebergBaseTable.uuid()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As per spec looks like uuid was optional for v1, how do we handle that here ?
Looks like it was added via here https://github.com/apache/iceberg/pull/264/files
singhpk234
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
However, if the materialized view is stale, the method simply returns to allow SparkCatalog's loadView to run. In turn, loadView returns the metadata for the virtual view itself, triggering the usual Spark view logic that computes the result set based on the current state of the base tables.
1/ was wondering if auto-refresh of MV on staleness detection should be an opt-in feature ?
2/ Any ideas / plans for incremental refresh ?
These are very good questions. To me looks like if there is an external process that guarantees the freshness, then the current implementation still holds. Manual For (2): We have not discussed incremental refresh plans in the Iceberg community, but there is some relevant work here. You can review some of the test cases here. |
@wmoustafa, Read this today, was wondering if there is something we can utilize from CDC (considering iceberg has support for that) perspective ? how expensive the refreshes of a PB size tables are and what is the ideal frequency of updates in this model, if you can share some datapoints ? rewrite to get incremental refresh by computing deltas between the snapshots and then joining it with other deltas and having union of those does seems user-friendly though |
It really depends on the query and the size of the delta and whole table etc. There is an extension of that work that is currently taking place to get an idea about the cost of some basic queries (e.g., a few joins/aggregations + filters & projections), and coming up with a reasonable cost model (including choosing to not perform incremental at all if incremental is deemed more expensive). |
|
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions. |
|
This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
Spec
This patch adds support for materialized views in Iceberg and integrates the implementation with Spark SQL. It reuses the current spec of Iceberg views and tables by leveraging table properties to capture materialized view metadata. Those properties can be added to the Iceberg spec to formalize materialized view support.
Below is a summary of all metadata properties introduced or utilized by this patch, classified based on whether they are associated with a table or a view, along with their purposes:
Properties on a View:
iceberg.materialized.view:true, the view is treated as a materialized view. This helps in differentiating between virtual and materialized views within the catalog and dictates specific handling and validation logic for materialized views.iceberg.materialized.view.storage.table:Properties on a Table:
iceberg.base.snapshot.[UUID]:base.snapshot.followed by the UUID of the base table. They are used to track whether the materialized view's data is up to date with the base tables by comparing these snapshot IDs with the current snapshot IDs of the base tables. If all the base tables' current snapshot IDs match the ones stored in these properties, the materialized view's data is considered fresh.iceberg.materialized.view.version:Spark SQL
This patch introduces support for materialized views in the Spark module by adding support for Spark SQL
CREATE MATERIALIZED VIEWand adding materialized view handling for theDROP VIEWDDL command. When aCREATE MATERIALIZED VIEWcommand is executed, the patch interprets the command to create a new materialized view, which involves not only registering the view's metadata (including marking it as a materialized view with the appropriate properties) but also setting up a corresponding storage table to hold the materialized data and setting the base table current snapshot IDs (at creation time). The storage table identifier is passed by a new clauseSTORED AS '...'. If noSTORED ASclause is specified, a default storage table identifier is assigned. When aDROP VIEWcommand is issued for a materialized view, the patch ensures that both the metadata for the materialized view and its associated storage table are properly removed from the catalog. Support forREFRESH MATERIALIZED VIEWis left as a future enhancement.Spark Catalog
This patch enhances the
SparkCatalogto intelligently decide whether to return the view text metadata for a materialized view or the data from its associated storage table based on the freshness of the materialized view. Within theloadTablemethod, the patch first checks if the requested table corresponds to a materialized view by loading the view from the Iceberg catalog. If the identified view is marked as a materialized view (using theiceberg.materialized.viewproperty), the patch then assesses its freshness. If it is fresh, theloadTablemethod proceeds to load and return the storage table associated with the materialized view, allowing users to query the pre-computed data directly. However, if the materialized view is stale, the method simply returns to allowSparkCatalog'sloadViewto run. In turn,loadViewreturns the metadata for the virtual view itself, triggering the usual Spark view logic that computes the result set based on the current state of the base tables.Notes
InMemoryCataloghas been extended to use a testLocalFileIOdue to an existing gap in a pureInMemoryCatalog(withInMemoryFileIO), with working with data files (which are required by the storage table). The extension of theInMemoryCatalogto useLocalFileIOended up promoting a couple of methods topublic, but the intention is again to start a discussion about the best way to address the current gap.