Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2866,6 +2866,61 @@ public void testAutoRefreshMaterializedViewFullyRefreshed()
}
}

@Test
public void testAutoRefreshMaterializedViewAfterInsertion()
{
QueryRunner queryRunner = getQueryRunner();

String table = "test_auto_refresh";
String view = "test_auto_refresh_mv";

Session fullRefreshSession = Session.builder(getSession())
.setSystemProperty("materialized_view_allow_full_refresh_enabled", "true")
.setSystemProperty("materialized_view_data_consistency_enabled", "false")
.build();
Session ownerSession = getSession();

queryRunner.execute(
fullRefreshSession,
format("CREATE TABLE %s (col1 bigint, col2 varchar, part_key varchar) " +
"WITH (partitioned_by = ARRAY['part_key'])", table));

queryRunner.execute(
fullRefreshSession,
format("INSERT INTO %s VALUES (1, 'aaa', 'p1'), " +
"(2, 'bbb', 'p2'), (3, 'aaa', 'p1')", table));

queryRunner.execute(
fullRefreshSession,
format("CREATE MATERIALIZED VIEW %s " +
"WITH (partitioned_by = ARRAY['part_key']) " +
"AS SELECT col1, part_key FROM %s", view, table));

try {
queryRunner.execute(fullRefreshSession, format("REFRESH MATERIALIZED VIEW %s", view));

MaterializedResult result = queryRunner.execute(fullRefreshSession,
format("SELECT COUNT(DISTINCT part_key) FROM %s", view));
assertEquals((long) ((Long) result.getOnlyValue()), 2, "Materialized view should contain all data after refreshes");

queryRunner.execute(
fullRefreshSession,
format("INSERT INTO %s VALUES (1, 'aaa', 'p3'), " +
"(2, 'bbb', 'p4'), (3, 'aaa', 'p5')", table));

queryRunner.execute(fullRefreshSession,
format("REFRESH MATERIALIZED VIEW %s", view));

result = queryRunner.execute(fullRefreshSession,
format("SELECT COUNT(DISTINCT part_key) FROM %s", view));
assertEquals((long) ((Long) result.getOnlyValue()), 5, "Materialized view should contain all data after refreshes");
}
finally {
queryRunner.execute(ownerSession, format("DROP MATERIALIZED VIEW %s", view));
queryRunner.execute(ownerSession, format("DROP TABLE %s", table));
}
}

private void setReferencedMaterializedViews(DistributedQueryRunner queryRunner, String tableName, List<String> referencedMaterializedViews)
{
appendTableParameter(replicateHiveMetastore(queryRunner),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.metadata.SessionPropertyManager;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.MaterializedViewStatus;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.relation.DomainTranslator;
Expand Down Expand Up @@ -60,6 +59,7 @@
import static com.facebook.presto.common.predicate.TupleDomain.extractFixedValues;
import static com.facebook.presto.common.type.StandardTypes.HYPER_LOG_LOG;
import static com.facebook.presto.common.type.StandardTypes.VARBINARY;
import static com.facebook.presto.sql.ExpressionUtils.combineDisjuncts;
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.NOT_SUPPORTED;
import static com.facebook.presto.sql.tree.ArithmeticBinaryExpression.Operator.DIVIDE;
import static com.facebook.presto.sql.tree.BooleanLiteral.FALSE_LITERAL;
Expand Down Expand Up @@ -115,13 +115,6 @@ public static Session buildOwnerSession(Session session, Optional<String> owner,
builder.setSystemProperty(property.getKey(), property.getValue());
}

for (Map.Entry<ConnectorId, Map<String, String>> connectorEntry : session.getConnectorProperties().entrySet()) {
String catalogName = connectorEntry.getKey().getCatalogName();
for (Map.Entry<String, String> property : connectorEntry.getValue().entrySet()) {
builder.setCatalogSessionProperty(catalogName, property.getKey(), property.getValue());
}
}

return builder.build();
}

Expand Down Expand Up @@ -393,10 +386,7 @@ public static Expression convertMaterializedDataPredicatesToExpression(
return disjuncts.get(0);
}
else {
return disjuncts.stream()
.reduce((left, right) -> new LogicalBinaryExpression(
LogicalBinaryExpression.Operator.OR, left, right))
.get();
return combineDisjuncts(disjuncts);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@
import com.facebook.presto.spi.type.UnknownTypeException;
import com.facebook.presto.sql.ExpressionUtils;
import com.facebook.presto.sql.MaterializedViewUtils;
import com.facebook.presto.sql.SqlFormatterUtil;
import com.facebook.presto.sql.analyzer.Analysis.TableArgumentAnalysis;
import com.facebook.presto.sql.analyzer.Analysis.TableFunctionInvocationAnalysis;
import com.facebook.presto.sql.parser.ParsingException;
Expand Down Expand Up @@ -266,6 +265,8 @@
import static com.facebook.presto.sql.NodeUtils.mapFromProperties;
import static com.facebook.presto.sql.QueryUtil.selectList;
import static com.facebook.presto.sql.QueryUtil.simpleQuery;
import static com.facebook.presto.sql.SqlFormatter.formatSql;
import static com.facebook.presto.sql.SqlFormatterUtil.getFormattedSql;
import static com.facebook.presto.sql.analyzer.AggregationAnalyzer.verifyOrderByAggregations;
import static com.facebook.presto.sql.analyzer.AggregationAnalyzer.verifySourceAggregations;
import static com.facebook.presto.sql.analyzer.Analysis.MaterializedViewAnalysisState;
Expand Down Expand Up @@ -849,7 +850,7 @@ protected Scope visitRefreshMaterializedView(RefreshMaterializedView node, Optio

// the original refresh statement will always be one line
analysis.setExpandedQuery(format("-- Expanded Query: %s%nINSERT INTO %s %s",
SqlFormatterUtil.getFormattedSql(node, sqlParser, Optional.empty()),
getFormattedSql(node, sqlParser, Optional.empty()),
viewName.getObjectName(),
view.getOriginalSql()));
analysis.addAccessControlCheckForTable(
Expand All @@ -870,7 +871,7 @@ protected Scope visitRefreshMaterializedView(RefreshMaterializedView node, Optio

Query viewQuery = parseView(view.getOriginalSql(), viewName, node);
Query refreshQuery = tablePredicates.containsKey(toSchemaTableName(viewName)) ?
buildQueryWithPredicate(viewQuery, tablePredicates.get(toSchemaTableName(viewName)))
buildSubqueryWithPredicate(viewQuery, tablePredicates.get(toSchemaTableName(viewName)))
: viewQuery;
// Check if the owner has SELECT permission on the base tables
StatementAnalyzer queryAnalyzer = new StatementAnalyzer(
Expand Down Expand Up @@ -938,7 +939,7 @@ private Optional<RelationType> analyzeBaseTableForRefreshMaterializedView(Table

SchemaTableName baseTableName = toSchemaTableName(createQualifiedObjectName(session, baseTable, baseTable.getName(), metadata));
if (tablePredicates.containsKey(baseTableName)) {
Query tableSubquery = buildQueryWithPredicate(baseTable, tablePredicates.get(baseTableName));
Query tableSubquery = buildTableQueryWithPredicate(baseTable, tablePredicates.get(baseTableName));
analysis.registerNamedQuery(baseTable, tableSubquery, true);

Scope subqueryScope = process(tableSubquery, scope);
Expand Down Expand Up @@ -975,17 +976,19 @@ private Map<SchemaTableName, Expression> getTablePredicatesForMaterializedViewRe
}
}

private Query buildQueryWithPredicate(Table table, Expression predicate)
private Query buildTableQueryWithPredicate(Table table, Expression predicate)
{
Query query = simpleQuery(selectList(new AllColumns()), table, predicate);
return (Query) sqlParser.createStatement(
SqlFormatterUtil.getFormattedSql(query, sqlParser, Optional.empty()),
createParsingOptions(session, warningCollector));
String formattedSql = formatSql(query, Optional.empty());
return (Query) sqlParser.createStatement(formattedSql, createParsingOptions(session, warningCollector));
}

private Query buildQueryWithPredicate(Query originalQuery, Expression predicate)
private Query buildSubqueryWithPredicate(Query originalQuery, Expression predicate)
{
return simpleQuery(selectList(new AllColumns()), new TableSubquery(originalQuery), predicate);
Query query = simpleQuery(selectList(new AllColumns()), new TableSubquery(originalQuery), predicate);
return (Query) sqlParser.createStatement(
getFormattedSql(query, sqlParser, Optional.empty()),
createParsingOptions(session, warningCollector));
}

@Override
Expand Down Expand Up @@ -2366,7 +2369,7 @@ else if (materializedViewStatus.isPartiallyMaterialized()) {
Query unionQuery = new Query(predicateStitchedQuery.getWith(), union, predicateStitchedQuery.getOrderBy(), predicateStitchedQuery.getOffset(), predicateStitchedQuery.getLimit());
// can we return the above query object, instead of building a query string?
// in case of returning the query object, make sure to clone the original query object.
return SqlFormatterUtil.getFormattedSql(unionQuery, sqlParser, Optional.empty());
return getFormattedSql(unionQuery, sqlParser, Optional.empty());
}

/**
Expand Down
Loading