Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,9 @@ public class Analysis
// Keeps track of the subquery we are visiting, so we have access to base query information when processing materialized view status
private Optional<QuerySpecification> currentQuerySpecification = Optional.empty();

// Track WHERE clause from the query accessing a view for subquery analysis such as materialized view
private Optional<Expression> viewAccessorWhereClause = Optional.empty();

// Maps each output Field to its originating SourceColumn(s) for column-level lineage tracking.
private final Multimap<Field, SourceColumn> originColumnDetails = ArrayListMultimap.create();

Expand Down Expand Up @@ -1112,11 +1115,27 @@ public void setCurrentSubquery(QuerySpecification currentSubQuery)
{
this.currentQuerySpecification = Optional.of(currentSubQuery);
}

public Optional<QuerySpecification> getCurrentQuerySpecification()
{
return currentQuerySpecification;
}

public void setViewAccessorWhereClause(Expression whereClause)
{
this.viewAccessorWhereClause = Optional.of(whereClause);
}

public void clearViewAccessorWhereClause()
{
this.viewAccessorWhereClause = Optional.empty();
}

public Optional<Expression> getViewAccessorWhereClause()
{
return viewAccessorWhereClause;
}

public void setTargetQuery(QuerySpecification targetQuery)
{
this.targetQuery = Optional.of(targetQuery);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,233 @@ protected QueryRunner createQueryRunner()
Optional.empty());
}

@Test
public void testMaterializedViewPartitionFilteringThroughLogicalView()
{
QueryRunner queryRunner = getQueryRunner();
String table = "orders_partitioned_lv_test";
String materializedView = "orders_mv_lv_test";
String logicalView = "orders_lv_test";

try {
// Create a table partitioned by 'ds' (date string)
queryRunner.execute(format("CREATE TABLE %s WITH (partitioned_by = ARRAY['ds']) AS " +
"SELECT orderkey, totalprice, '2025-11-10' AS ds FROM orders WHERE orderkey < 1000 " +
"UNION ALL " +
"SELECT orderkey, totalprice, '2025-11-11' AS ds FROM orders WHERE orderkey >= 1000 AND orderkey < 2000 " +
"UNION ALL " +
"SELECT orderkey, totalprice, '2025-11-12' AS ds FROM orders WHERE orderkey >= 2000 AND orderkey < 3000", table));

// Create a materialized view partitioned by 'ds'
queryRunner.execute(format("CREATE MATERIALIZED VIEW %s WITH (partitioned_by = ARRAY['ds']) AS " +
"SELECT max(totalprice) as max_price, orderkey, ds FROM %s GROUP BY orderkey, ds", materializedView, table));

assertTrue(getQueryRunner().tableExists(getSession(), materializedView));

// Only refresh partition for '2025-11-10', leaving '2025-11-11' and '2025-11-12' missing
assertUpdate(format("REFRESH MATERIALIZED VIEW %s WHERE ds='2025-11-10'", materializedView), 255);

// Create a logical view on top of the materialized view
queryRunner.execute(format("CREATE VIEW %s AS SELECT * FROM %s", logicalView, materializedView));

setReferencedMaterializedViews((DistributedQueryRunner) queryRunner, table, ImmutableList.of(materializedView));

Session session = Session.builder(getQueryRunner().getDefaultSession())
.setSystemProperty(CONSIDER_QUERY_FILTERS_FOR_MATERIALIZED_VIEW_PARTITIONS, "true")
.setCatalogSessionProperty(HIVE_CATALOG, MATERIALIZED_VIEW_MISSING_PARTITIONS_THRESHOLD, Integer.toString(1))
.build();

// Query the logical view with a predicate
// The predicate should be pushed down to the materialized view
// Since only ds='2025-11-10' is refreshed and that's what we're querying,
// the materialized view should be used (not fall back to base table)
String logicalViewQuery = format("SELECT max_price, orderkey FROM %s WHERE ds='2025-11-10' ORDER BY orderkey", logicalView);
String directMvQuery = format("SELECT max_price, orderkey FROM %s WHERE ds='2025-11-10' ORDER BY orderkey", materializedView);
String baseTableQuery = format("SELECT max(totalprice) as max_price, orderkey FROM %s " +
"WHERE ds='2025-11-10' " +
"GROUP BY orderkey ORDER BY orderkey", table);

MaterializedResult baseQueryResult = computeActual(session, baseTableQuery);
MaterializedResult logicalViewResult = computeActual(session, logicalViewQuery);
MaterializedResult directMvResult = computeActual(session, directMvQuery);

// All three queries should return the same results
assertEquals(baseQueryResult, logicalViewResult);
assertEquals(baseQueryResult, directMvResult);

// The plan for the logical view query should use the materialized view
// (not fall back to base table) because we're only querying the refreshed partition
assertPlan(session, logicalViewQuery, anyTree(
constrainedTableScan(
materializedView,
ImmutableMap.of("ds", singleValue(createVarcharType(10), utf8Slice("2025-11-10"))),
ImmutableMap.of())));

// Test query for a missing partition through logical view
// This should fall back to base table because ds='2025-11-11' is not refreshed
String logicalViewQueryMissing = format("SELECT max_price, orderkey FROM %s WHERE ds='2025-11-11' ORDER BY orderkey", logicalView);
String baseTableQueryMissing = format("SELECT max(totalprice) as max_price, orderkey FROM %s " +
"WHERE ds='2025-11-11' " +
"GROUP BY orderkey ORDER BY orderkey", table);

MaterializedResult baseQueryResultMissing = computeActual(session, baseTableQueryMissing);
MaterializedResult logicalViewResultMissing = computeActual(session, logicalViewQueryMissing);

assertEquals(baseQueryResultMissing, logicalViewResultMissing);

// Should fall back to base table for missing partition
assertPlan(session, logicalViewQueryMissing, anyTree(
constrainedTableScan(table, ImmutableMap.of(), ImmutableMap.of())));
}
finally {
queryRunner.execute("DROP VIEW IF EXISTS " + logicalView);
queryRunner.execute("DROP MATERIALIZED VIEW IF EXISTS " + materializedView);
queryRunner.execute("DROP TABLE IF EXISTS " + table);
}
}

@Test
public void testMaterializedViewPartitionFilteringThroughLogicalViewWithCTE()
{
QueryRunner queryRunner = getQueryRunner();
String table = "orders_partitioned_cte_test";
String materializedView = "orders_mv_cte_test";
String logicalView = "orders_lv_cte_test";

try {
// Create a table partitioned by 'ds' (date string)
queryRunner.execute(format("CREATE TABLE %s WITH (partitioned_by = ARRAY['ds']) AS " +
"SELECT orderkey, totalprice, '2025-11-10' AS ds FROM orders WHERE orderkey < 1000 " +
"UNION ALL " +
"SELECT orderkey, totalprice, '2025-11-11' AS ds FROM orders WHERE orderkey >= 1000 AND orderkey < 2000 " +
"UNION ALL " +
"SELECT orderkey, totalprice, '2025-11-12' AS ds FROM orders WHERE orderkey >= 2000 AND orderkey < 3000", table));

// Create a materialized view partitioned by 'ds'
queryRunner.execute(format("CREATE MATERIALIZED VIEW %s WITH (partitioned_by = ARRAY['ds']) AS " +
"SELECT max(totalprice) as max_price, orderkey, ds FROM %s GROUP BY orderkey, ds", materializedView, table));

assertTrue(getQueryRunner().tableExists(getSession(), materializedView));

// Only refresh partition for '2025-11-11', leaving '2025-11-10' and '2025-11-12' missing
assertUpdate(format("REFRESH MATERIALIZED VIEW %s WHERE ds='2025-11-11'", materializedView), 248);

// Create a logical view on top of the materialized view
queryRunner.execute(format("CREATE VIEW %s AS SELECT * FROM %s", logicalView, materializedView));

setReferencedMaterializedViews((DistributedQueryRunner) queryRunner, table, ImmutableList.of(materializedView));

Session session = Session.builder(getQueryRunner().getDefaultSession())
.setSystemProperty(CONSIDER_QUERY_FILTERS_FOR_MATERIALIZED_VIEW_PARTITIONS, "true")
.setCatalogSessionProperty(HIVE_CATALOG, MATERIALIZED_VIEW_MISSING_PARTITIONS_THRESHOLD, Integer.toString(1))
.build();

// Query the logical view through a CTE with a predicate
// The predicate should be pushed down to the materialized view
String cteQuery = format("WITH PreQuery AS (SELECT * FROM %s WHERE ds='2025-11-11') " +
"SELECT max_price, orderkey FROM PreQuery ORDER BY orderkey", logicalView);
String baseTableQuery = format("SELECT max(totalprice) as max_price, orderkey FROM %s " +
"WHERE ds='2025-11-11' " +
"GROUP BY orderkey ORDER BY orderkey", table);

MaterializedResult baseQueryResult = computeActual(session, baseTableQuery);
MaterializedResult cteQueryResult = computeActual(session, cteQuery);

// Both queries should return the same results
assertEquals(baseQueryResult, cteQueryResult);

// The plan for the CTE query should use the materialized view
// (not fall back to base table) because we're only querying the refreshed partition
assertPlan(session, cteQuery, anyTree(
constrainedTableScan(
materializedView,
ImmutableMap.of("ds", singleValue(createVarcharType(10), utf8Slice("2025-11-11"))),
ImmutableMap.of())));
}
finally {
queryRunner.execute("DROP VIEW IF EXISTS " + logicalView);
queryRunner.execute("DROP MATERIALIZED VIEW IF EXISTS " + materializedView);
queryRunner.execute("DROP TABLE IF EXISTS " + table);
}
}

@Test
public void testMaterializedViewPartitionFilteringInCTE()
{
QueryRunner queryRunner = getQueryRunner();
String table = "orders_partitioned_mv_cte_test";
String materializedView = "orders_mv_direct_cte_test";

try {
// Create a table partitioned by 'ds' (date string)
queryRunner.execute(format("CREATE TABLE %s WITH (partitioned_by = ARRAY['ds']) AS " +
"SELECT orderkey, totalprice, '2025-11-10' AS ds FROM orders WHERE orderkey < 1000 " +
"UNION ALL " +
"SELECT orderkey, totalprice, '2025-11-11' AS ds FROM orders WHERE orderkey >= 1000 AND orderkey < 2000 " +
"UNION ALL " +
"SELECT orderkey, totalprice, '2025-11-12' AS ds FROM orders WHERE orderkey >= 2000 AND orderkey < 3000", table));

// Create a materialized view partitioned by 'ds'
queryRunner.execute(format("CREATE MATERIALIZED VIEW %s WITH (partitioned_by = ARRAY['ds']) AS " +
"SELECT max(totalprice) as max_price, orderkey, ds FROM %s GROUP BY orderkey, ds", materializedView, table));

assertTrue(getQueryRunner().tableExists(getSession(), materializedView));

// Only refresh partition for '2025-11-10', leaving '2025-11-11' and '2025-11-12' missing
assertUpdate(format("REFRESH MATERIALIZED VIEW %s WHERE ds='2025-11-10'", materializedView), 255);

setReferencedMaterializedViews((DistributedQueryRunner) queryRunner, table, ImmutableList.of(materializedView));

Session session = Session.builder(getQueryRunner().getDefaultSession())
.setSystemProperty(CONSIDER_QUERY_FILTERS_FOR_MATERIALIZED_VIEW_PARTITIONS, "true")
.setCatalogSessionProperty(HIVE_CATALOG, MATERIALIZED_VIEW_MISSING_PARTITIONS_THRESHOLD, Integer.toString(1))
.build();

// Query the materialized view directly through a CTE with a predicate
// The predicate should be used to determine which partitions are needed
String cteQuery = format("WITH PreQuery AS (SELECT * FROM %s WHERE ds='2025-11-10') " +
"SELECT max_price, orderkey FROM PreQuery ORDER BY orderkey", materializedView);
String baseTableQuery = format("SELECT max(totalprice) as max_price, orderkey FROM %s " +
"WHERE ds='2025-11-10' " +
"GROUP BY orderkey ORDER BY orderkey", table);

MaterializedResult baseQueryResult = computeActual(session, baseTableQuery);
MaterializedResult cteQueryResult = computeActual(session, cteQuery);

// Both queries should return the same results
assertEquals(baseQueryResult, cteQueryResult);

// The plan for the CTE query should use the materialized view
// (not fall back to base table) because we're only querying the refreshed partition
assertPlan(session, cteQuery, anyTree(
constrainedTableScan(
materializedView,
ImmutableMap.of("ds", singleValue(createVarcharType(10), utf8Slice("2025-11-10"))),
ImmutableMap.of())));

// Test query for a missing partition through CTE
// This should fall back to base table because ds='2025-11-11' is not refreshed
String cteQueryMissing = format("WITH PreQuery AS (SELECT * FROM %s WHERE ds='2025-11-11') " +
"SELECT max_price, orderkey FROM PreQuery ORDER BY orderkey", materializedView);
String baseTableQueryMissing = format("SELECT max(totalprice) as max_price, orderkey FROM %s " +
"WHERE ds='2025-11-11' " +
"GROUP BY orderkey ORDER BY orderkey", table);

MaterializedResult baseQueryResultMissing = computeActual(session, baseTableQueryMissing);
MaterializedResult cteQueryResultMissing = computeActual(session, cteQueryMissing);

assertEquals(baseQueryResultMissing, cteQueryResultMissing);

// Should fall back to base table for missing partition
assertPlan(session, cteQueryMissing, anyTree(
constrainedTableScan(table, ImmutableMap.of(), ImmutableMap.of())));
}
finally {
queryRunner.execute("DROP MATERIALIZED VIEW IF EXISTS " + materializedView);
queryRunner.execute("DROP TABLE IF EXISTS " + table);
}
}

@Test
public void testMaterializedViewOptimization()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2372,12 +2372,21 @@ private Scope processView(Table table, Optional<Scope> scope, QualifiedObjectNam

analysis.getAccessControlReferences().addViewDefinitionReference(name, view);

Optional<Expression> savedViewAccessorWhereClause = analysis.getCurrentQuerySpecification()
.flatMap(QuerySpecification::getWhere);
savedViewAccessorWhereClause.ifPresent(analysis::setViewAccessorWhereClause);

Query query = parseView(view.getOriginalSql(), name, table);

analysis.registerNamedQuery(table, query, true);
analysis.registerTableForView(table);
RelationType descriptor = analyzeView(query, name, view.getCatalog(), view.getSchema(), view.getOwner(), table);
analysis.unregisterTableForView();

if (savedViewAccessorWhereClause.isPresent()) {
analysis.clearViewAccessorWhereClause();
}

if (isViewStale(view.getColumns(), descriptor.getVisibleFields())) {
throw new SemanticException(VIEW_IS_STALE, table, "View '%s' is stale; it must be re-created", name);
}
Expand Down Expand Up @@ -2566,15 +2575,20 @@ private MaterializedViewStatus getMaterializedViewStatus(QualifiedObjectName mat
checkArgument(analysis.getCurrentQuerySpecification().isPresent(), "Current subquery should be set when processing materialized view");
QuerySpecification currentSubquery = analysis.getCurrentQuerySpecification().get();

if (currentSubquery.getWhere().isPresent() && isMaterializedViewPartitionFilteringEnabled(session)) {
// Collect where clause from both current subquery and possible logical view
List<Expression> wherePredicates = new ArrayList<>();
currentSubquery.getWhere().ifPresent(wherePredicates::add);
analysis.getViewAccessorWhereClause().ifPresent(wherePredicates::add);

if (!wherePredicates.isEmpty() && isMaterializedViewPartitionFilteringEnabled(session)) {
Optional<MaterializedViewDefinition> materializedViewDefinition = getMaterializedViewDefinition(session, metadataResolver, analysis.getMetadataHandle(), materializedViewName);
if (!materializedViewDefinition.isPresent()) {
log.warn("Materialized view definition not present as expected when fetching materialized view status");
return metadataResolver.getMaterializedViewStatus(materializedViewName, baseQueryDomain);
}

Scope sourceScope = getScopeFromTable(table, scope);
Expression viewQueryWhereClause = currentSubquery.getWhere().get();
Expression combinedWhereClause = ExpressionUtils.combineConjuncts(wherePredicates);

// Extract column names from materialized view scope
Set<QualifiedName> materializedViewColumns = sourceScope.getRelationType().getAllFields().stream()
Expand All @@ -2585,7 +2599,7 @@ private MaterializedViewStatus getMaterializedViewStatus(QualifiedObjectName mat
.collect(Collectors.toSet());

// Only proceed with partition filtering if there are conjuncts that reference MV columns
List<Expression> conjuncts = ExpressionUtils.extractConjuncts(viewQueryWhereClause);
List<Expression> conjuncts = ExpressionUtils.extractConjuncts(combinedWhereClause);
List<Expression> mvConjuncts = conjuncts.stream()
.filter(conjunct -> {
Set<QualifiedName> referencedColumns = VariablesExtractor.extractNames(conjunct, analysis.getColumnReferences());
Expand Down
Loading