Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import static java.lang.String.format;
import static java.util.Collections.emptyList;
import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.toList;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;

Expand Down Expand Up @@ -2958,6 +2959,71 @@ public void testAutoRefreshMaterializedViewAfterInsertion()
}
}

@Test
public void testMVJoinQueryWithOtherTableColumnFiltering()
{
QueryRunner queryRunner = getQueryRunner();
Session session = getSession();

assertUpdate("CREATE TABLE mv_base (mv_col1 int, mv_col2 varchar, mv_col3 varchar) " +
"WITH (partitioned_by=ARRAY['mv_col3'])");
assertUpdate("CREATE TABLE join_table (table_col1 int, table_col2 varchar, table_col3 varchar) " +
" WITH (partitioned_by=ARRAY['table_col3'])");

assertUpdate("INSERT INTO mv_base VALUES (1, 'Alice', 'A'), (2, 'Bob', 'B'), (3, 'Charlie', 'C')", 3);
assertUpdate("INSERT INTO join_table VALUES (1, 'CityA', 'A'), (21, 'CityA', 'B'), (32, 'CityB', 'C')", 3);

assertUpdate("CREATE MATERIALIZED VIEW mv " +
"WITH (partitioned_by=ARRAY['mv_col3']) " +
"AS SELECT mv_col1, mv_col2, mv_col3 FROM mv_base");

assertUpdate("REFRESH MATERIALIZED VIEW mv WHERE mv_col3>='A'", 3);

// Query MV with JOIN and WHERE clause on column from joined table (not in MV)
MaterializedResult result = queryRunner.execute(session,
"SELECT mv_col2 FROM mv " +
"JOIN join_table ON mv_col3=table_col3 " +
"WHERE table_col1>10 ORDER BY mv_col1");
assertEquals(result.getRowCount(), 2, "Materialized view join produced unexpected row counts");

List<Object> expectedResults = List.of("Bob", "Charlie");
List<Object> actualResults = result.getMaterializedRows().stream()
.map(row -> row.getField(0))
.collect(toList());
assertEquals(actualResults, expectedResults, "Materialized view join returned unexpected row values");

// WHERE clause on MV column
result = queryRunner.execute(session, "SELECT mv_col2 FROM mv JOIN join_table " +
"ON mv_col3=table_col3 WHERE mv_col2>'Alice' ORDER BY mv_col2");
assertEquals(result.getRowCount(), 2, "Materialized view join produced unexpected row counts");

expectedResults = List.of("Bob", "Charlie");
actualResults = result.getMaterializedRows().stream()
.map(row -> row.getField(0))
.collect(toList());
assertEquals(actualResults, expectedResults, "Materialized view join returned unexpected row values");

// Test with multiple conditions in WHERE clause (non-partition column)
result = queryRunner.execute(session, "SELECT mv_col1 FROM mv JOIN join_table ON mv_col3=table_col3 " +
"WHERE table_col1>10 AND table_col3='B' AND mv_col1>1");
assertEquals(result.getRowCount(), 1, "Materialized view join produced unexpected row counts");

expectedResults = List.of(2);
actualResults = result.getMaterializedRows().stream()
.map(row -> row.getField(0))
.collect(toList());
assertEquals(actualResults, expectedResults, "Materialized view join returned unexpected row values");

// Test with multiple conditions in WHERE clause (partition column)
result = queryRunner.execute(session, "SELECT mv_col1 FROM mv JOIN join_table ON mv_col3=table_col3 " +
"WHERE table_col1>10 AND table_col3='B' AND mv_col3='C'");
assertEquals(result.getRowCount(), 0, "Materialized view join produced wrong results");

assertUpdate("DROP MATERIALIZED VIEW mv");
assertUpdate("DROP TABLE join_table");
assertUpdate("DROP TABLE mv_base");
}

public void testMaterializedViewNotRefreshedInNonLegacyMode()
{
Session nonLegacySession = Session.builder(getSession())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2551,37 +2551,60 @@ private MaterializedViewStatus getMaterializedViewStatus(QualifiedObjectName mat
Scope sourceScope = getScopeFromTable(table, scope);
Expression viewQueryWhereClause = currentSubquery.getWhere().get();

analyzeWhere(currentSubquery, sourceScope, viewQueryWhereClause);

DomainTranslator domainTranslator = new RowExpressionDomainTranslator(metadata);
RowExpression rowExpression = SqlToRowExpressionTranslator.translate(
viewQueryWhereClause,
analysis.getTypes(),
ImmutableMap.of(),
metadata.getFunctionAndTypeManager(),
session);

TupleDomain<String> viewQueryDomain = MaterializedViewUtils.getDomainFromFilter(session, domainTranslator, rowExpression);

Map<String, Map<SchemaTableName, String>> directColumnMappings = materializedViewDefinition.get().getDirectColumnMappingsAsMap();
// Extract column names from materialized view scope
Set<QualifiedName> materializedViewColumns = sourceScope.getRelationType().getAllFields().stream()
.map(field -> field.getName())
.filter(Optional::isPresent)
.map(Optional::get)
.map(QualifiedName::of)
.collect(Collectors.toSet());

// Get base query domain we have mapped from view query- if there are not direct mappings, don't filter partition count for predicate
boolean mappedToOneTable = true;
Map<String, Domain> rewrittenDomain = new HashMap<>();
// Only proceed with partition filtering if there are conjuncts that reference MV columns
List<Expression> conjuncts = ExpressionUtils.extractConjuncts(viewQueryWhereClause);
List<Expression> mvConjuncts = conjuncts.stream()
.filter(conjunct -> {
Set<QualifiedName> referencedColumns = VariablesExtractor.extractNames(conjunct, analysis.getColumnReferences());
return !referencedColumns.isEmpty() && referencedColumns.stream().allMatch(materializedViewColumns::contains);
})
.collect(Collectors.toList());

if (!mvConjuncts.isEmpty()) {
Expression filteredWhereClause = ExpressionUtils.combineConjuncts(mvConjuncts);

// Analyze the filtered WHERE clause only for type inference, don't record it in analysis
// to avoid preventing the full WHERE clause from being analyzed later
ExpressionAnalysis expressionAnalysis = analyzeExpression(filteredWhereClause, sourceScope);

DomainTranslator domainTranslator = new RowExpressionDomainTranslator(metadata);
RowExpression rowExpression = SqlToRowExpressionTranslator.translate(
filteredWhereClause,
analysis.getTypes(),
ImmutableMap.of(),
metadata.getFunctionAndTypeManager(),
session);

TupleDomain<String> viewQueryDomain = MaterializedViewUtils.getDomainFromFilter(session, domainTranslator, rowExpression);

Map<String, Map<SchemaTableName, String>> directColumnMappings = materializedViewDefinition.get().getDirectColumnMappingsAsMap();

// Get base query domain we have mapped from view query- if there are not direct mappings, don't filter partition count for predicate
boolean mappedToOneTable = true;
Map<String, Domain> rewrittenDomain = new HashMap<>();

for (Map.Entry<String, Domain> entry : viewQueryDomain.getDomains().orElse(ImmutableMap.of()).entrySet()) {
Map<SchemaTableName, String> baseTableMapping = directColumnMappings.get(entry.getKey());
if (baseTableMapping == null || baseTableMapping.size() != 1) {
mappedToOneTable = false;
break;
}

for (Map.Entry<String, Domain> entry : viewQueryDomain.getDomains().orElse(ImmutableMap.of()).entrySet()) {
Map<SchemaTableName, String> baseTableMapping = directColumnMappings.get(entry.getKey());
if (baseTableMapping == null || baseTableMapping.size() != 1) {
mappedToOneTable = false;
break;
String baseColumnName = baseTableMapping.entrySet().stream().findAny().get().getValue();
rewrittenDomain.put(baseColumnName, entry.getValue());
}

String baseColumnName = baseTableMapping.entrySet().stream().findAny().get().getValue();
rewrittenDomain.put(baseColumnName, entry.getValue());
}

if (mappedToOneTable) {
baseQueryDomain = TupleDomain.withColumnDomains(rewrittenDomain);
if (mappedToOneTable) {
baseQueryDomain = TupleDomain.withColumnDomains(rewrittenDomain);
}
}
}

Expand Down
Loading