From 16281cd62a65f9e247c5b65981836d14174ebb34 Mon Sep 17 00:00:00 2001 From: Jiaqi Zhang Date: Sat, 29 Nov 2025 13:44:28 -0800 Subject: [PATCH] fix(analyzer): materialized view with logical view and cte (#26713) Summary: If a materialized view is a part of a logical view, the logical view's where predicate is not pushed down to materialized view so that it doesn't check the overlap correctly. It caused the comparison between mv's data and ALL base table data instead of the ones specified in the query. This diff fixes it by storing the where predicate when processing a logical view. So mv can combine the where predicate in logical view as well when getting mv status. It also fixes the issue during with using the logical view/mv in cte. Differential Revision: D87928199 --- .../presto/sql/analyzer/Analysis.java | 19 ++ ...estHiveMaterializedViewLogicalPlanner.java | 227 ++++++++++++++++++ .../sql/analyzer/StatementAnalyzer.java | 20 +- 3 files changed, 263 insertions(+), 3 deletions(-) diff --git a/presto-analyzer/src/main/java/com/facebook/presto/sql/analyzer/Analysis.java b/presto-analyzer/src/main/java/com/facebook/presto/sql/analyzer/Analysis.java index b5977d4a6d20b..d5329e375d58f 100644 --- a/presto-analyzer/src/main/java/com/facebook/presto/sql/analyzer/Analysis.java +++ b/presto-analyzer/src/main/java/com/facebook/presto/sql/analyzer/Analysis.java @@ -221,6 +221,9 @@ public class Analysis // Keeps track of the subquery we are visiting, so we have access to base query information when processing materialized view status private Optional currentQuerySpecification = Optional.empty(); + // Track WHERE clause from the query accessing a view for subquery analysis such as materialized view + private Optional viewAccessorWhereClause = Optional.empty(); + // Maps each output Field to its originating SourceColumn(s) for column-level lineage tracking. private final Multimap originColumnDetails = ArrayListMultimap.create(); @@ -1112,11 +1115,27 @@ public void setCurrentSubquery(QuerySpecification currentSubQuery) { this.currentQuerySpecification = Optional.of(currentSubQuery); } + public Optional getCurrentQuerySpecification() { return currentQuerySpecification; } + public void setViewAccessorWhereClause(Expression whereClause) + { + this.viewAccessorWhereClause = Optional.of(whereClause); + } + + public void clearViewAccessorWhereClause() + { + this.viewAccessorWhereClause = Optional.empty(); + } + + public Optional getViewAccessorWhereClause() + { + return viewAccessorWhereClause; + } + public void setTargetQuery(QuerySpecification targetQuery) { this.targetQuery = Optional.of(targetQuery); diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveMaterializedViewLogicalPlanner.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveMaterializedViewLogicalPlanner.java index f07a61d7d7c77..6767fc4d96b9c 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveMaterializedViewLogicalPlanner.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveMaterializedViewLogicalPlanner.java @@ -106,6 +106,233 @@ protected QueryRunner createQueryRunner() Optional.empty()); } + @Test + public void testMaterializedViewPartitionFilteringThroughLogicalView() + { + QueryRunner queryRunner = getQueryRunner(); + String table = "orders_partitioned_lv_test"; + String materializedView = "orders_mv_lv_test"; + String logicalView = "orders_lv_test"; + + try { + // Create a table partitioned by 'ds' (date string) + queryRunner.execute(format("CREATE TABLE %s WITH (partitioned_by = ARRAY['ds']) AS " + + "SELECT orderkey, totalprice, '2025-11-10' AS ds FROM orders WHERE orderkey < 1000 " + + "UNION ALL " + + "SELECT orderkey, totalprice, '2025-11-11' AS ds FROM orders WHERE orderkey >= 1000 AND orderkey < 2000 " + + "UNION ALL " + + "SELECT orderkey, totalprice, '2025-11-12' AS ds FROM orders WHERE orderkey >= 2000 AND orderkey < 3000", table)); + + // Create a materialized view partitioned by 'ds' + queryRunner.execute(format("CREATE MATERIALIZED VIEW %s WITH (partitioned_by = ARRAY['ds']) AS " + + "SELECT max(totalprice) as max_price, orderkey, ds FROM %s GROUP BY orderkey, ds", materializedView, table)); + + assertTrue(getQueryRunner().tableExists(getSession(), materializedView)); + + // Only refresh partition for '2025-11-10', leaving '2025-11-11' and '2025-11-12' missing + assertUpdate(format("REFRESH MATERIALIZED VIEW %s WHERE ds='2025-11-10'", materializedView), 255); + + // Create a logical view on top of the materialized view + queryRunner.execute(format("CREATE VIEW %s AS SELECT * FROM %s", logicalView, materializedView)); + + setReferencedMaterializedViews((DistributedQueryRunner) queryRunner, table, ImmutableList.of(materializedView)); + + Session session = Session.builder(getQueryRunner().getDefaultSession()) + .setSystemProperty(CONSIDER_QUERY_FILTERS_FOR_MATERIALIZED_VIEW_PARTITIONS, "true") + .setCatalogSessionProperty(HIVE_CATALOG, MATERIALIZED_VIEW_MISSING_PARTITIONS_THRESHOLD, Integer.toString(1)) + .build(); + + // Query the logical view with a predicate + // The predicate should be pushed down to the materialized view + // Since only ds='2025-11-10' is refreshed and that's what we're querying, + // the materialized view should be used (not fall back to base table) + String logicalViewQuery = format("SELECT max_price, orderkey FROM %s WHERE ds='2025-11-10' ORDER BY orderkey", logicalView); + String directMvQuery = format("SELECT max_price, orderkey FROM %s WHERE ds='2025-11-10' ORDER BY orderkey", materializedView); + String baseTableQuery = format("SELECT max(totalprice) as max_price, orderkey FROM %s " + + "WHERE ds='2025-11-10' " + + "GROUP BY orderkey ORDER BY orderkey", table); + + MaterializedResult baseQueryResult = computeActual(session, baseTableQuery); + MaterializedResult logicalViewResult = computeActual(session, logicalViewQuery); + MaterializedResult directMvResult = computeActual(session, directMvQuery); + + // All three queries should return the same results + assertEquals(baseQueryResult, logicalViewResult); + assertEquals(baseQueryResult, directMvResult); + + // The plan for the logical view query should use the materialized view + // (not fall back to base table) because we're only querying the refreshed partition + assertPlan(session, logicalViewQuery, anyTree( + constrainedTableScan( + materializedView, + ImmutableMap.of("ds", singleValue(createVarcharType(10), utf8Slice("2025-11-10"))), + ImmutableMap.of()))); + + // Test query for a missing partition through logical view + // This should fall back to base table because ds='2025-11-11' is not refreshed + String logicalViewQueryMissing = format("SELECT max_price, orderkey FROM %s WHERE ds='2025-11-11' ORDER BY orderkey", logicalView); + String baseTableQueryMissing = format("SELECT max(totalprice) as max_price, orderkey FROM %s " + + "WHERE ds='2025-11-11' " + + "GROUP BY orderkey ORDER BY orderkey", table); + + MaterializedResult baseQueryResultMissing = computeActual(session, baseTableQueryMissing); + MaterializedResult logicalViewResultMissing = computeActual(session, logicalViewQueryMissing); + + assertEquals(baseQueryResultMissing, logicalViewResultMissing); + + // Should fall back to base table for missing partition + assertPlan(session, logicalViewQueryMissing, anyTree( + constrainedTableScan(table, ImmutableMap.of(), ImmutableMap.of()))); + } + finally { + queryRunner.execute("DROP VIEW IF EXISTS " + logicalView); + queryRunner.execute("DROP MATERIALIZED VIEW IF EXISTS " + materializedView); + queryRunner.execute("DROP TABLE IF EXISTS " + table); + } + } + + @Test + public void testMaterializedViewPartitionFilteringThroughLogicalViewWithCTE() + { + QueryRunner queryRunner = getQueryRunner(); + String table = "orders_partitioned_cte_test"; + String materializedView = "orders_mv_cte_test"; + String logicalView = "orders_lv_cte_test"; + + try { + // Create a table partitioned by 'ds' (date string) + queryRunner.execute(format("CREATE TABLE %s WITH (partitioned_by = ARRAY['ds']) AS " + + "SELECT orderkey, totalprice, '2025-11-10' AS ds FROM orders WHERE orderkey < 1000 " + + "UNION ALL " + + "SELECT orderkey, totalprice, '2025-11-11' AS ds FROM orders WHERE orderkey >= 1000 AND orderkey < 2000 " + + "UNION ALL " + + "SELECT orderkey, totalprice, '2025-11-12' AS ds FROM orders WHERE orderkey >= 2000 AND orderkey < 3000", table)); + + // Create a materialized view partitioned by 'ds' + queryRunner.execute(format("CREATE MATERIALIZED VIEW %s WITH (partitioned_by = ARRAY['ds']) AS " + + "SELECT max(totalprice) as max_price, orderkey, ds FROM %s GROUP BY orderkey, ds", materializedView, table)); + + assertTrue(getQueryRunner().tableExists(getSession(), materializedView)); + + // Only refresh partition for '2025-11-11', leaving '2025-11-10' and '2025-11-12' missing + assertUpdate(format("REFRESH MATERIALIZED VIEW %s WHERE ds='2025-11-11'", materializedView), 248); + + // Create a logical view on top of the materialized view + queryRunner.execute(format("CREATE VIEW %s AS SELECT * FROM %s", logicalView, materializedView)); + + setReferencedMaterializedViews((DistributedQueryRunner) queryRunner, table, ImmutableList.of(materializedView)); + + Session session = Session.builder(getQueryRunner().getDefaultSession()) + .setSystemProperty(CONSIDER_QUERY_FILTERS_FOR_MATERIALIZED_VIEW_PARTITIONS, "true") + .setCatalogSessionProperty(HIVE_CATALOG, MATERIALIZED_VIEW_MISSING_PARTITIONS_THRESHOLD, Integer.toString(1)) + .build(); + + // Query the logical view through a CTE with a predicate + // The predicate should be pushed down to the materialized view + String cteQuery = format("WITH PreQuery AS (SELECT * FROM %s WHERE ds='2025-11-11') " + + "SELECT max_price, orderkey FROM PreQuery ORDER BY orderkey", logicalView); + String baseTableQuery = format("SELECT max(totalprice) as max_price, orderkey FROM %s " + + "WHERE ds='2025-11-11' " + + "GROUP BY orderkey ORDER BY orderkey", table); + + MaterializedResult baseQueryResult = computeActual(session, baseTableQuery); + MaterializedResult cteQueryResult = computeActual(session, cteQuery); + + // Both queries should return the same results + assertEquals(baseQueryResult, cteQueryResult); + + // The plan for the CTE query should use the materialized view + // (not fall back to base table) because we're only querying the refreshed partition + assertPlan(session, cteQuery, anyTree( + constrainedTableScan( + materializedView, + ImmutableMap.of("ds", singleValue(createVarcharType(10), utf8Slice("2025-11-11"))), + ImmutableMap.of()))); + } + finally { + queryRunner.execute("DROP VIEW IF EXISTS " + logicalView); + queryRunner.execute("DROP MATERIALIZED VIEW IF EXISTS " + materializedView); + queryRunner.execute("DROP TABLE IF EXISTS " + table); + } + } + + @Test + public void testMaterializedViewPartitionFilteringInCTE() + { + QueryRunner queryRunner = getQueryRunner(); + String table = "orders_partitioned_mv_cte_test"; + String materializedView = "orders_mv_direct_cte_test"; + + try { + // Create a table partitioned by 'ds' (date string) + queryRunner.execute(format("CREATE TABLE %s WITH (partitioned_by = ARRAY['ds']) AS " + + "SELECT orderkey, totalprice, '2025-11-10' AS ds FROM orders WHERE orderkey < 1000 " + + "UNION ALL " + + "SELECT orderkey, totalprice, '2025-11-11' AS ds FROM orders WHERE orderkey >= 1000 AND orderkey < 2000 " + + "UNION ALL " + + "SELECT orderkey, totalprice, '2025-11-12' AS ds FROM orders WHERE orderkey >= 2000 AND orderkey < 3000", table)); + + // Create a materialized view partitioned by 'ds' + queryRunner.execute(format("CREATE MATERIALIZED VIEW %s WITH (partitioned_by = ARRAY['ds']) AS " + + "SELECT max(totalprice) as max_price, orderkey, ds FROM %s GROUP BY orderkey, ds", materializedView, table)); + + assertTrue(getQueryRunner().tableExists(getSession(), materializedView)); + + // Only refresh partition for '2025-11-10', leaving '2025-11-11' and '2025-11-12' missing + assertUpdate(format("REFRESH MATERIALIZED VIEW %s WHERE ds='2025-11-10'", materializedView), 255); + + setReferencedMaterializedViews((DistributedQueryRunner) queryRunner, table, ImmutableList.of(materializedView)); + + Session session = Session.builder(getQueryRunner().getDefaultSession()) + .setSystemProperty(CONSIDER_QUERY_FILTERS_FOR_MATERIALIZED_VIEW_PARTITIONS, "true") + .setCatalogSessionProperty(HIVE_CATALOG, MATERIALIZED_VIEW_MISSING_PARTITIONS_THRESHOLD, Integer.toString(1)) + .build(); + + // Query the materialized view directly through a CTE with a predicate + // The predicate should be used to determine which partitions are needed + String cteQuery = format("WITH PreQuery AS (SELECT * FROM %s WHERE ds='2025-11-10') " + + "SELECT max_price, orderkey FROM PreQuery ORDER BY orderkey", materializedView); + String baseTableQuery = format("SELECT max(totalprice) as max_price, orderkey FROM %s " + + "WHERE ds='2025-11-10' " + + "GROUP BY orderkey ORDER BY orderkey", table); + + MaterializedResult baseQueryResult = computeActual(session, baseTableQuery); + MaterializedResult cteQueryResult = computeActual(session, cteQuery); + + // Both queries should return the same results + assertEquals(baseQueryResult, cteQueryResult); + + // The plan for the CTE query should use the materialized view + // (not fall back to base table) because we're only querying the refreshed partition + assertPlan(session, cteQuery, anyTree( + constrainedTableScan( + materializedView, + ImmutableMap.of("ds", singleValue(createVarcharType(10), utf8Slice("2025-11-10"))), + ImmutableMap.of()))); + + // Test query for a missing partition through CTE + // This should fall back to base table because ds='2025-11-11' is not refreshed + String cteQueryMissing = format("WITH PreQuery AS (SELECT * FROM %s WHERE ds='2025-11-11') " + + "SELECT max_price, orderkey FROM PreQuery ORDER BY orderkey", materializedView); + String baseTableQueryMissing = format("SELECT max(totalprice) as max_price, orderkey FROM %s " + + "WHERE ds='2025-11-11' " + + "GROUP BY orderkey ORDER BY orderkey", table); + + MaterializedResult baseQueryResultMissing = computeActual(session, baseTableQueryMissing); + MaterializedResult cteQueryResultMissing = computeActual(session, cteQueryMissing); + + assertEquals(baseQueryResultMissing, cteQueryResultMissing); + + // Should fall back to base table for missing partition + assertPlan(session, cteQueryMissing, anyTree( + constrainedTableScan(table, ImmutableMap.of(), ImmutableMap.of()))); + } + finally { + queryRunner.execute("DROP MATERIALIZED VIEW IF EXISTS " + materializedView); + queryRunner.execute("DROP TABLE IF EXISTS " + table); + } + } + @Test public void testMaterializedViewOptimization() { diff --git a/presto-main-base/src/main/java/com/facebook/presto/sql/analyzer/StatementAnalyzer.java b/presto-main-base/src/main/java/com/facebook/presto/sql/analyzer/StatementAnalyzer.java index cace6c11606af..e4925a46b5f84 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/sql/analyzer/StatementAnalyzer.java +++ b/presto-main-base/src/main/java/com/facebook/presto/sql/analyzer/StatementAnalyzer.java @@ -2372,12 +2372,21 @@ private Scope processView(Table table, Optional scope, QualifiedObjectNam analysis.getAccessControlReferences().addViewDefinitionReference(name, view); + Optional savedViewAccessorWhereClause = analysis.getCurrentQuerySpecification() + .flatMap(QuerySpecification::getWhere); + savedViewAccessorWhereClause.ifPresent(analysis::setViewAccessorWhereClause); + Query query = parseView(view.getOriginalSql(), name, table); analysis.registerNamedQuery(table, query, true); analysis.registerTableForView(table); RelationType descriptor = analyzeView(query, name, view.getCatalog(), view.getSchema(), view.getOwner(), table); analysis.unregisterTableForView(); + + if (savedViewAccessorWhereClause.isPresent()) { + analysis.clearViewAccessorWhereClause(); + } + if (isViewStale(view.getColumns(), descriptor.getVisibleFields())) { throw new SemanticException(VIEW_IS_STALE, table, "View '%s' is stale; it must be re-created", name); } @@ -2566,7 +2575,12 @@ private MaterializedViewStatus getMaterializedViewStatus(QualifiedObjectName mat checkArgument(analysis.getCurrentQuerySpecification().isPresent(), "Current subquery should be set when processing materialized view"); QuerySpecification currentSubquery = analysis.getCurrentQuerySpecification().get(); - if (currentSubquery.getWhere().isPresent() && isMaterializedViewPartitionFilteringEnabled(session)) { + // Collect where clause from both current subquery and possible logical view + List wherePredicates = new ArrayList<>(); + currentSubquery.getWhere().ifPresent(wherePredicates::add); + analysis.getViewAccessorWhereClause().ifPresent(wherePredicates::add); + + if (!wherePredicates.isEmpty() && isMaterializedViewPartitionFilteringEnabled(session)) { Optional materializedViewDefinition = getMaterializedViewDefinition(session, metadataResolver, analysis.getMetadataHandle(), materializedViewName); if (!materializedViewDefinition.isPresent()) { log.warn("Materialized view definition not present as expected when fetching materialized view status"); @@ -2574,7 +2588,7 @@ private MaterializedViewStatus getMaterializedViewStatus(QualifiedObjectName mat } Scope sourceScope = getScopeFromTable(table, scope); - Expression viewQueryWhereClause = currentSubquery.getWhere().get(); + Expression combinedWhereClause = ExpressionUtils.combineConjuncts(wherePredicates); // Extract column names from materialized view scope Set materializedViewColumns = sourceScope.getRelationType().getAllFields().stream() @@ -2585,7 +2599,7 @@ private MaterializedViewStatus getMaterializedViewStatus(QualifiedObjectName mat .collect(Collectors.toSet()); // Only proceed with partition filtering if there are conjuncts that reference MV columns - List conjuncts = ExpressionUtils.extractConjuncts(viewQueryWhereClause); + List conjuncts = ExpressionUtils.extractConjuncts(combinedWhereClause); List mvConjuncts = conjuncts.stream() .filter(conjunct -> { Set referencedColumns = VariablesExtractor.extractNames(conjunct, analysis.getColumnReferences());