Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2469,9 +2469,6 @@ public MaterializedViewStatus getMaterializedViewStatus(ConnectorSession session
Map<SchemaTableName, Map<String, String>> viewToBasePartitionMap = getViewToBasePartitionMap(materializedViewTable, baseTables, directColumnMappings);

MaterializedDataPredicates materializedDataPredicates = getMaterializedDataPredicates(metastore, metastoreContext, typeManager, materializedViewTable, timeZone);
if (materializedDataPredicates.getPredicateDisjuncts().isEmpty()) {
return new MaterializedViewStatus(NOT_MATERIALIZED);
}

// Partitions to keep track of for materialized view freshness are the partitions of every base table
// that are not available/updated to the materialized view yet.
Expand Down Expand Up @@ -2501,6 +2498,9 @@ public MaterializedViewStatus getMaterializedViewStatus(ConnectorSession session
}
}

if (materializedDataPredicates.getPredicateDisjuncts().isEmpty()) {
return new MaterializedViewStatus(NOT_MATERIALIZED, partitionsFromBaseTables);
}
if (missingPartitions > HiveSessionProperties.getMaterializedViewMissingPartitionsThreshold(session)) {
return new MaterializedViewStatus(TOO_MANY_PARTITIONS_MISSING, partitionsFromBaseTables);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6106,7 +6106,25 @@ public void testRefreshMaterializedView()
// Test invalid predicates
assertQueryFails("REFRESH MATERIALIZED VIEW test_customer_view_5 WHERE nationname = 'UNITED STATES'", ".*Refresh materialized view by column nationname is not supported.*");
assertQueryFails("REFRESH MATERIALIZED VIEW test_customer_view_5 WHERE regionkey + nationkey = 25", ".*Only column references are supported on the left side of comparison expressions in WHERE clause.*");
assertQueryFails("REFRESH MATERIALIZED VIEW test_customer_view_5", ".*Refresh Materialized View without predicates is not supported.");
}

@Test
public void testAutoRefreshMaterializedViewFailsWithoutFlag()
{
QueryRunner queryRunner = getQueryRunner();

computeActual("CREATE TABLE test_orders_no_flag WITH (partitioned_by = ARRAY['orderstatus']) " +
"AS SELECT orderkey, totalprice, orderstatus FROM orders WHERE orderkey < 100");
computeActual(
"CREATE MATERIALIZED VIEW test_orders_no_flag_view WITH (partitioned_by = ARRAY['orderstatus']" + retentionDays(30) + ") " +
"AS SELECT SUM(totalprice) AS total, orderstatus FROM test_orders_no_flag GROUP BY orderstatus");

assertQueryFails(
"REFRESH MATERIALIZED VIEW test_orders_no_flag_view",
".*misses too many partitions or is never refreshed and may incur high cost.*");

computeActual("DROP MATERIALIZED VIEW test_orders_no_flag_view");
computeActual("DROP TABLE test_orders_no_flag");
Comment on lines +6126 to +6127
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick (testing): Nitpick: Consider using try-finally for cleanup to ensure resources are dropped even if assertions fail.

Wrap the cleanup statements in a try-finally block to ensure resources are always released, even if an assertion fails.

}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2725,6 +2725,147 @@ public void testRefreshMaterializedViewAccessControl()
}
}

@Test
public void testAutoRefreshMaterializedViewWithoutPredicates()
{
QueryRunner queryRunner = getQueryRunner();
String table = "test_orders_auto_refresh_source";
String view = "test_orders_auto_refresh_target_mv";
String view2 = "test_orders_auto_refresh_target_mv2";

Session nonFullRefreshSession = getSession();

Session fullRefreshSession = Session.builder(getSession())
.setSystemProperty("materialized_view_allow_full_refresh_enabled", "true")
.setSystemProperty("materialized_view_data_consistency_enabled", "false")
.build();

queryRunner.execute(
fullRefreshSession,
format("CREATE TABLE %s WITH (partitioned_by = ARRAY['orderstatus']) " +
"AS SELECT orderkey, custkey, totalprice, orderstatus FROM orders WHERE orderkey < 100", table));

queryRunner.execute(
fullRefreshSession,
format("CREATE MATERIALIZED VIEW %s " +
"WITH (partitioned_by = ARRAY['orderstatus']) " +
"AS SELECT SUM(totalprice) AS total, COUNT(*) AS cnt, orderstatus " +
"FROM %s GROUP BY orderstatus", view, table));

queryRunner.execute(
nonFullRefreshSession,
format("CREATE MATERIALIZED VIEW %s " +
"WITH (partitioned_by = ARRAY['orderstatus']) " +
"AS SELECT SUM(totalprice) AS total, COUNT(*) AS cnt, orderstatus " +
"FROM %s GROUP BY orderstatus", view2, table));

try {
// Test that refresh without predicates succeeds when flag is enabled
queryRunner.execute(fullRefreshSession, format("REFRESH MATERIALIZED VIEW %s", view));

// Verify all partitions are refreshed
MaterializedResult result = queryRunner.execute(fullRefreshSession,
format("SELECT COUNT(DISTINCT orderstatus) FROM %s", view));
assertTrue(((Long) result.getOnlyValue()) > 0, "Materialized view should contain data after auto-refresh");

// Test that refresh without predicates fails when flag is not enabled
assertQueryFails(
nonFullRefreshSession,
format("REFRESH MATERIALIZED VIEW %s", view2),
".*misses too many partitions or is never refreshed and may incur high cost.*");
}
finally {
queryRunner.execute(fullRefreshSession, format("DROP MATERIALIZED VIEW %s", view));
queryRunner.execute(fullRefreshSession, format("DROP TABLE %s", table));
}
}

@Test
public void testAutoRefreshMaterializedViewWithJoinWithoutPredicates()
{
QueryRunner queryRunner = getQueryRunner();

String table1 = "test_customer_auto_refresh";
String table2 = "test_orders_join_auto_refresh";
String view = "test_auto_refresh_join_target_mv";

Session fullRefreshSession = Session.builder(getSession())
.setSystemProperty("materialized_view_allow_full_refresh_enabled", "true")
.setSystemProperty("materialized_view_data_consistency_enabled", "false")
.build();
Session ownerSession = getSession();

queryRunner.execute(
fullRefreshSession,
format("CREATE TABLE %s WITH (partitioned_by = ARRAY['nationkey']) " +
"AS SELECT custkey, name, nationkey FROM customer WHERE custkey < 100", table1));
queryRunner.execute(
fullRefreshSession,
format("CREATE TABLE %s WITH (partitioned_by = ARRAY['orderstatus']) " +
"AS SELECT orderkey, custkey, totalprice, orderstatus FROM orders WHERE orderkey < 100", table2));
queryRunner.execute(
fullRefreshSession,
format("CREATE MATERIALIZED VIEW %s " +
"WITH (partitioned_by = ARRAY['nationkey', 'orderstatus']) " +
"AS SELECT c.name, SUM(o.totalprice) AS total, c.nationkey, o.orderstatus " +
"FROM %s c JOIN %s o ON c.custkey = o.custkey " +
"GROUP BY c.name, c.nationkey, o.orderstatus", view, table1, table2));

try {
queryRunner.execute(fullRefreshSession, format("REFRESH MATERIALIZED VIEW %s", view));

MaterializedResult result = queryRunner.execute(fullRefreshSession,
format("SELECT COUNT(*) FROM %s", view));
assertTrue(((Long) result.getOnlyValue()) > 0,
"Materialized view with join should contain data after auto-refresh");
}
finally {
queryRunner.execute(ownerSession, format("DROP MATERIALIZED VIEW %s", view));
queryRunner.execute(ownerSession, format("DROP TABLE %s", table1));
queryRunner.execute(ownerSession, format("DROP TABLE %s", table2));
}
}

@Test
public void testAutoRefreshMaterializedViewFullyRefreshed()
{
QueryRunner queryRunner = getQueryRunner();

String table = "test_customer_auto_refresh";
String view = "test_auto_refresh_join_target_mv";

Session fullRefreshSession = Session.builder(getSession())
.setSystemProperty("materialized_view_allow_full_refresh_enabled", "true")
.setSystemProperty("materialized_view_data_consistency_enabled", "false")
.build();
Session ownerSession = getSession();

queryRunner.execute(
fullRefreshSession,
format("CREATE TABLE %s WITH (partitioned_by = ARRAY['nationkey']) " +
"AS SELECT custkey, name, nationkey FROM customer WHERE custkey < 100", table));

queryRunner.execute(
fullRefreshSession,
format("CREATE MATERIALIZED VIEW %s " +
"WITH (partitioned_by = ARRAY['nationkey']) " +
"AS SELECT custkey, nationkey FROM %s", view, table));

try {
queryRunner.execute(fullRefreshSession, format("REFRESH MATERIALIZED VIEW %s", view));

MaterializedResult result = queryRunner.execute(fullRefreshSession,
format("REFRESH MATERIALIZED VIEW %s", view));

assertEquals(result.getWarnings().size(), 1);
assertTrue(result.getWarnings().get(0).getMessage().matches("Materialized view .* is already fully refreshed"));
}
finally {
queryRunner.execute(ownerSession, format("DROP MATERIALIZED VIEW %s", view));
queryRunner.execute(ownerSession, format("DROP TABLE %s", table));
}
}

private void setReferencedMaterializedViews(DistributedQueryRunner queryRunner, String tableName, List<String> referencedMaterializedViews)
{
appendTableParameter(replicateHiveMetastore(queryRunner),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ public final class SystemSessionProperties
public static final String CONSIDER_QUERY_FILTERS_FOR_MATERIALIZED_VIEW_PARTITIONS = "consider-query-filters-for-materialized-view-partitions";
public static final String QUERY_OPTIMIZATION_WITH_MATERIALIZED_VIEW_ENABLED = "query_optimization_with_materialized_view_enabled";
public static final String LEGACY_MATERIALIZED_VIEWS = "legacy_materialized_views";
public static final String MATERIALIZED_VIEW_ALLOW_FULL_REFRESH_ENABLED = "materialized_view_allow_full_refresh_enabled";
public static final String AGGREGATION_IF_TO_FILTER_REWRITE_STRATEGY = "aggregation_if_to_filter_rewrite_strategy";
public static final String JOINS_NOT_NULL_INFERENCE_STRATEGY = "joins_not_null_inference_strategy";
public static final String RESOURCE_AWARE_SCHEDULING_STRATEGY = "resource_aware_scheduling_strategy";
Expand Down Expand Up @@ -1360,6 +1361,11 @@ public SystemSessionProperties(
"or be removed at any time. Do not disable in production environments.",
featuresConfig.isLegacyMaterializedViews(),
true),
booleanProperty(
MATERIALIZED_VIEW_ALLOW_FULL_REFRESH_ENABLED,
"Allow full refresh of MV when it's empty - potentially high cost.",
featuresConfig.isMaterializedViewAllowFullRefreshEnabled(),
true),
stringProperty(
DISTRIBUTED_TRACING_MODE,
"Mode for distributed tracing. NO_TRACE, ALWAYS_TRACE, or SAMPLE_BASED",
Expand Down Expand Up @@ -2894,6 +2900,11 @@ public static boolean isLegacyMaterializedViews(Session session)
return session.getSystemProperty(LEGACY_MATERIALIZED_VIEWS, Boolean.class);
}

public static boolean isMaterializedViewAllowFullRefreshEnabled(Session session)
{
return session.getSystemProperty(MATERIALIZED_VIEW_ALLOW_FULL_REFRESH_ENABLED, Boolean.class);
}

public static boolean isVerboseRuntimeStatsEnabled(Session session)
{
return session.getSystemProperty(VERBOSE_RUNTIME_STATS_ENABLED, Boolean.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,27 @@
import com.facebook.presto.spi.relation.VariableReferenceExpression;
import com.facebook.presto.spi.security.Identity;
import com.facebook.presto.sql.analyzer.SemanticException;
import com.facebook.presto.sql.planner.ExpressionDomainTranslator;
import com.facebook.presto.sql.planner.LiteralEncoder;
import com.facebook.presto.sql.tree.ArithmeticBinaryExpression;
import com.facebook.presto.sql.tree.BooleanLiteral;
import com.facebook.presto.sql.tree.Cast;
import com.facebook.presto.sql.tree.ComparisonExpression;
import com.facebook.presto.sql.tree.Expression;
import com.facebook.presto.sql.tree.ExpressionRewriter;
import com.facebook.presto.sql.tree.ExpressionTreeRewriter;
import com.facebook.presto.sql.tree.FunctionCall;
import com.facebook.presto.sql.tree.Identifier;
import com.facebook.presto.sql.tree.IsNullPredicate;
import com.facebook.presto.sql.tree.LogicalBinaryExpression;
import com.facebook.presto.sql.tree.QualifiedName;
import com.facebook.presto.sql.tree.SymbolReference;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand All @@ -61,6 +66,7 @@
import static com.facebook.presto.sql.tree.ComparisonExpression.Operator.EQUAL;
import static com.facebook.presto.sql.tree.LogicalBinaryExpression.Operator.AND;
import static com.facebook.presto.sql.tree.LogicalBinaryExpression.Operator.OR;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;

Expand Down Expand Up @@ -327,4 +333,82 @@ public boolean validate(Identifier baseTableColumn, Map<Expression, Identifier>
return baseToViewColumnMap.containsKey(new Cast(new FunctionCall(APPROX_SET, ImmutableList.of(baseTableColumn)), VARBINARY));
}
}

/**
* Generate WHERE predicates for missing partitions from MaterializedDataPredicates.
* Used for auto-refresh of materialized views without explicit WHERE clause.
*/
public static Map<SchemaTableName, Expression> generatePredicatesForMissingPartitions(
Map<SchemaTableName, MaterializedViewStatus.MaterializedDataPredicates> missingPartitionsPerTable,
Metadata metadata)
{
Map<SchemaTableName, Expression> predicates = new HashMap<>();

for (Map.Entry<SchemaTableName, MaterializedViewStatus.MaterializedDataPredicates> entry :
missingPartitionsPerTable.entrySet()) {
SchemaTableName tableName = entry.getKey();
MaterializedViewStatus.MaterializedDataPredicates missingPartitions = entry.getValue();

Expression predicate = convertMaterializedDataPredicatesToExpression(missingPartitions, metadata);

predicates.put(tableName, predicate);
}

return predicates;
}

/**
* Convert MaterializedDataPredicates to a SQL Expression tree.
* Builds an OR expression of partition predicates, where each partition is an AND expression of column filters.
*/
public static Expression convertMaterializedDataPredicatesToExpression(
MaterializedViewStatus.MaterializedDataPredicates predicates,
Metadata metadata)
{
List<String> columnNames = predicates.getColumnNames();
List<TupleDomain<String>> predicateDisjuncts = predicates.getPredicateDisjuncts();

ExpressionDomainTranslator translator = new ExpressionDomainTranslator(
new LiteralEncoder(metadata.getBlockEncodingSerde()));

List<Expression> disjuncts = new ArrayList<>();

for (TupleDomain<String> tupleDomain : predicateDisjuncts) {
checkState(!tupleDomain.isAll(), "TupleDomain.isAll() should not appear in MaterializedDataPredicates");
if (tupleDomain.isNone()) {
continue;
}

Expression conjunction = translator.toPredicate(tupleDomain);
conjunction = convertSymbolReferencesToIdentifiers(conjunction);

disjuncts.add(conjunction);
}

if (disjuncts.isEmpty()) {
throw new IllegalStateException("No predicates generated for missing partitions");
}

if (disjuncts.size() == 1) {
return disjuncts.get(0);
}
else {
return disjuncts.stream()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I would use ExpressionUtils#or here to ensure the expression tree is balanced.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or perhaps ExpressionUtils#combineDisjuncts

.reduce((left, right) -> new LogicalBinaryExpression(
LogicalBinaryExpression.Operator.OR, left, right))
.get();
}
}

private static Expression convertSymbolReferencesToIdentifiers(Expression expression)
{
return ExpressionTreeRewriter.rewriteWith(new ExpressionRewriter<Void>()
{
@Override
public Expression rewriteSymbolReference(SymbolReference node, Void context, ExpressionTreeRewriter<Void> treeRewriter)
{
return new Identifier(node.getName());
}
}, expression);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ public class FeaturesConfig
private boolean materializedViewPartitionFilteringEnabled = true;
private boolean queryOptimizationWithMaterializedViewEnabled;
private boolean legacyMaterializedViewRefresh = true;
private boolean materializedViewAllowFullRefreshEnabled;

private AggregationIfToFilterRewriteStrategy aggregationIfToFilterRewriteStrategy = AggregationIfToFilterRewriteStrategy.DISABLED;
private String analyzerType = "BUILTIN";
Expand Down Expand Up @@ -2169,6 +2170,19 @@ public FeaturesConfig setLegacyMaterializedViews(boolean value)
return this;
}

public boolean isMaterializedViewAllowFullRefreshEnabled()
{
return materializedViewAllowFullRefreshEnabled;
}

@Config("materialized-view-allow-full-refresh-enabled")
@ConfigDescription("Allow full refresh of MV when it's empty - potentially high cost.")
public FeaturesConfig setMaterializedViewAllowFullRefreshEnabled(boolean value)
{
this.materializedViewAllowFullRefreshEnabled = value;
return this;
}

public boolean isVerboseRuntimeStatsEnabled()
{
return verboseRuntimeStatsEnabled;
Expand Down
Loading
Loading