Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public final class HiveSessionProperties
public static final String COLLECT_COLUMN_STATISTICS_ON_WRITE = "collect_column_statistics_on_write";
private static final String OPTIMIZE_MISMATCHED_BUCKET_COUNT = "optimize_mismatched_bucket_count";
private static final String S3_SELECT_PUSHDOWN_ENABLED = "s3_select_pushdown_enabled";
private static final String SHUFFLE_PARTITIONED_COLUMNS_FOR_TABLE_WRITE = "shuffle_partitioned_columns_for_table_write";
public static final String SHUFFLE_PARTITIONED_COLUMNS_FOR_TABLE_WRITE = "shuffle_partitioned_columns_for_table_write";
private static final String TEMPORARY_STAGING_DIRECTORY_ENABLED = "temporary_staging_directory_enabled";
private static final String TEMPORARY_STAGING_DIRECTORY_PATH = "temporary_staging_directory_path";
private static final String TEMPORARY_TABLE_SCHEMA = "temporary_table_schema";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import java.util.Set;

import static com.facebook.presto.SystemSessionProperties.JOIN_REORDERING_STRATEGY;
import static com.facebook.presto.SystemSessionProperties.OPTIMIZE_METADATA_QUERIES;
import static com.facebook.presto.common.function.OperatorType.EQUAL;
import static com.facebook.presto.common.predicate.Domain.multipleValues;
import static com.facebook.presto.common.predicate.Domain.notNull;
Expand All @@ -73,10 +74,12 @@
import static com.facebook.presto.hive.HiveSessionProperties.COLLECT_COLUMN_STATISTICS_ON_WRITE;
import static com.facebook.presto.hive.HiveSessionProperties.PUSHDOWN_FILTER_ENABLED;
import static com.facebook.presto.hive.HiveSessionProperties.RANGE_FILTERS_ON_SUBSCRIPTS_ENABLED;
import static com.facebook.presto.hive.HiveSessionProperties.SHUFFLE_PARTITIONED_COLUMNS_FOR_TABLE_WRITE;
import static com.facebook.presto.hive.TestHiveIntegrationSmokeTest.assertRemoteExchangesCount;
import static com.facebook.presto.sql.analyzer.TypeSignatureProvider.fromTypes;
import static com.facebook.presto.sql.planner.assertions.MatchResult.NO_MATCH;
import static com.facebook.presto.sql.planner.assertions.MatchResult.match;
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.any;
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.anyTree;
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.equiJoinClause;
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.exchange;
Expand Down Expand Up @@ -277,6 +280,91 @@ public void testPartitionPruning()
}
}

@Test
public void testMetadataAggregationFolding()
{
QueryRunner queryRunner = getQueryRunner();
Session optimizeMetadataQueries = Session.builder(this.getQueryRunner().getDefaultSession())
.setSystemProperty(OPTIMIZE_METADATA_QUERIES, Boolean.toString(true))
.build();
Session shufflePartitionColumns = Session.builder(this.getQueryRunner().getDefaultSession())
.setCatalogSessionProperty(HIVE_CATALOG, SHUFFLE_PARTITIONED_COLUMNS_FOR_TABLE_WRITE, Boolean.toString(true))
.build();

queryRunner.execute(
shufflePartitionColumns,
"CREATE TABLE test_metadata_aggregation_folding WITH (partitioned_by = ARRAY['ds']) AS " +
"SELECT orderkey, CAST(to_iso8601(date_add('DAY', orderkey % 7, date('2020-07-01'))) AS VARCHAR) AS ds FROM orders WHERE orderkey < 1000");
queryRunner.execute(
shufflePartitionColumns,
"CREATE TABLE test_metadata_aggregation_folding_more_partitions WITH (partitioned_by = ARRAY['ds']) AS " +
"SELECT orderkey, CAST(to_iso8601(date_add('DAY', orderkey % 200, date('2020-07-01'))) AS VARCHAR) AS ds FROM orders WHERE orderkey < 1000");
queryRunner.execute(
shufflePartitionColumns,
"CREATE TABLE test_metadata_aggregation_folding_null_partitions WITH (partitioned_by = ARRAY['ds']) AS " +
"SELECT orderkey, CAST(to_iso8601(date_add('DAY', orderkey % 7, date('2020-07-01'))) AS VARCHAR) AS ds FROM orders WHERE orderkey < 1000");
queryRunner.execute(
shufflePartitionColumns,
"INSERT INTO test_metadata_aggregation_folding_null_partitions SELECT 0 as orderkey, null AS ds");

try {
assertPlan(
optimizeMetadataQueries,
"SELECT * FROM test_metadata_aggregation_folding WHERE ds = (SELECT max(ds) from test_metadata_aggregation_folding)",
anyTree(
join(INNER, ImmutableList.of(),
tableScan("test_metadata_aggregation_folding", getSingleValueColumnDomain("ds", "2020-07-07"), TRUE_CONSTANT, ImmutableSet.of("ds")),
anyTree(any()))));
assertPlan(
optimizeMetadataQueries,
"SELECT * FROM test_metadata_aggregation_folding WHERE ds = (SELECT min(ds) from test_metadata_aggregation_folding)",
anyTree(
join(INNER, ImmutableList.of(),
tableScan("test_metadata_aggregation_folding", getSingleValueColumnDomain("ds", "2020-07-01"), TRUE_CONSTANT, ImmutableSet.of("ds")),
anyTree(any()))));

assertPlan(
optimizeMetadataQueries,
"SELECT * FROM test_metadata_aggregation_folding_more_partitions WHERE ds = (SELECT max(ds) from test_metadata_aggregation_folding_more_partitions)",
anyTree(
join(INNER, ImmutableList.of(),
tableScan("test_metadata_aggregation_folding_more_partitions", getSingleValueColumnDomain("ds", "2021-01-16"), TRUE_CONSTANT, ImmutableSet.of("ds")),
anyTree(any()))));
assertPlan(
optimizeMetadataQueries,
"SELECT * FROM test_metadata_aggregation_folding_more_partitions WHERE ds = (SELECT min(ds) from test_metadata_aggregation_folding_more_partitions)",
anyTree(
join(INNER, ImmutableList.of(),
tableScan("test_metadata_aggregation_folding_more_partitions", getSingleValueColumnDomain("ds", "2020-07-01"), TRUE_CONSTANT, ImmutableSet.of("ds")),
anyTree(any()))));

assertPlan(
optimizeMetadataQueries,
"SELECT * FROM test_metadata_aggregation_folding WHERE ds = (SELECT max(ds) from test_metadata_aggregation_folding_null_partitions)",
anyTree(
join(INNER, ImmutableList.of(),
tableScan("test_metadata_aggregation_folding", getSingleValueColumnDomain("ds", "2020-07-07"), TRUE_CONSTANT, ImmutableSet.of("ds")),
anyTree(any()))));
assertPlan(
optimizeMetadataQueries,
"SELECT * FROM test_metadata_aggregation_folding WHERE ds = (SELECT min(ds) from test_metadata_aggregation_folding_null_partitions)",
anyTree(
join(INNER, ImmutableList.of(),
tableScan("test_metadata_aggregation_folding", getSingleValueColumnDomain("ds", "2020-07-01"), TRUE_CONSTANT, ImmutableSet.of("ds")),
anyTree(any()))));
}
finally {
queryRunner.execute("DROP TABLE IF EXISTS test_metadata_aggregation_folding");
queryRunner.execute("DROP TABLE IF EXISTS test_metadata_aggregation_folding_more_partitions");
queryRunner.execute("DROP TABLE IF EXISTS test_metadata_aggregation_folding_null_partitions");
}
}

private static TupleDomain<String> getSingleValueColumnDomain(String column, String value)
{
return withColumnDomains(ImmutableMap.of(column, singleValue(VARCHAR, utf8Slice(value))));
}

private static List<Slice> utf8Slices(String... values)
{
return Arrays.stream(values).map(Slices::utf8Slice).collect(toImmutableList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ public SystemSessionProperties(
Duration::toString),
booleanProperty(
OPTIMIZE_METADATA_QUERIES,
"Enable optimization for metadata queries",
"Enable optimization for metadata queries. Note if metadata entry has empty data, the result might be different (e.g. empty Hive partition)",
featuresConfig.isOptimizeMetadataQueries(),
false),
integerProperty(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,7 @@ public boolean isOptimizeMetadataQueries()
}

@Config("optimizer.optimize-metadata-queries")
@ConfigDescription("Enable optimization for metadata queries. Note if metadata entry has empty data, the result might be different (e.g. empty Hive partition)")
public FeaturesConfig setOptimizeMetadataQueries(boolean optimizeMetadataQueries)
{
this.optimizeMetadataQueries = optimizeMetadataQueries;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.facebook.presto.spi.relation.VariableReferenceExpression;
import com.facebook.presto.sql.planner.optimizations.JoinNodeUtils;
import com.facebook.presto.sql.planner.plan.AssignUniqueId;
import com.facebook.presto.sql.planner.plan.EnforceSingleRowNode;
import com.facebook.presto.sql.planner.plan.ExchangeNode;
import com.facebook.presto.sql.planner.plan.InternalPlanVisitor;
import com.facebook.presto.sql.planner.plan.JoinNode;
Expand Down Expand Up @@ -160,6 +161,15 @@ public Expression visitExchange(ExchangeNode node, Void context)
});
}

@Override
public Expression visitEnforceSingleRow(EnforceSingleRowNode node, Void context)
{
if (node.getSource() instanceof ProjectNode) {
return node.getSource().accept(this, context);
}
return TRUE_LITERAL;
}

@Override
public Expression visitProject(ProjectNode node, Void context)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,20 +410,7 @@ public PlanOptimizers(
ruleStats,
statsCalculator,
estimatedExchangesCostCalculator,
ImmutableSet.of(new RemoveRedundantIdentityProjections())),
new MetadataQueryOptimizer(metadata),
new IterativeOptimizer(
ruleStats,
statsCalculator,
estimatedExchangesCostCalculator,
ImmutableSet.of(new EliminateCrossJoins())), // This can pull up Filter and Project nodes from between Joins, so we need to push them down again
predicatePushDown,
simplifyOptimizer, // Should be always run after PredicatePushDown
new IterativeOptimizer(
ruleStats,
statsCalculator,
estimatedExchangesCostCalculator,
new PickTableLayout(metadata, sqlParser).rules()));
ImmutableSet.of(new RemoveRedundantIdentityProjections())));

// TODO: move this before optimization if possible!!
// Replace all expressions with row expressions
Expand All @@ -434,6 +421,24 @@ public PlanOptimizers(
new TranslateExpressions(metadata, sqlParser).rules()));
// After this point, all planNodes should not contain OriginalExpression

builder.add(new MetadataQueryOptimizer(metadata));

// This can pull up Filter and Project nodes from between Joins, so we need to push them down again
builder.add(
new IterativeOptimizer(
ruleStats,
statsCalculator,
estimatedExchangesCostCalculator,
ImmutableSet.of(new EliminateCrossJoins())),
rowExpressionPredicatePushDown,
simplifyRowExpressionOptimizer); // Should always run simplifyOptimizer after rowExpressionPredicatePushDown

builder.add(new IterativeOptimizer(
ruleStats,
statsCalculator,
estimatedExchangesCostCalculator,
new PickTableLayout(metadata, sqlParser).rules()));

// PlanRemoteProjections only handles RowExpression so this need to run after TranslateExpressions
// Rules applied after this need to handle locality of ProjectNode properly.
builder.add(new IterativeOptimizer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.facebook.presto.spi.relation.RowExpression;
import com.facebook.presto.spi.relation.VariableReferenceExpression;
import com.facebook.presto.sql.planner.plan.AssignUniqueId;
import com.facebook.presto.sql.planner.plan.EnforceSingleRowNode;
import com.facebook.presto.sql.planner.plan.ExchangeNode;
import com.facebook.presto.sql.planner.plan.InternalPlanVisitor;
import com.facebook.presto.sql.planner.plan.JoinNode;
Expand Down Expand Up @@ -156,6 +157,15 @@ public RowExpression visitExchange(ExchangeNode node, Void context)
});
}

@Override
public RowExpression visitEnforceSingleRow(EnforceSingleRowNode node, Void context)
{
if (node.getSource() instanceof ProjectNode) {
return node.getSource().accept(this, context);
}
return TRUE_CONSTANT;
}

@Override
public RowExpression visitProject(ProjectNode node, Void context)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,12 @@
import com.facebook.presto.spi.plan.PlanNodeId;
import com.facebook.presto.spi.plan.PlanNodeIdAllocator;
import com.facebook.presto.spi.plan.ProjectNode;
import com.facebook.presto.spi.relation.RowExpression;
import com.facebook.presto.spi.relation.VariableReferenceExpression;
import com.facebook.presto.sql.analyzer.FeaturesConfig.JoinReorderingStrategy;
import com.facebook.presto.sql.planner.iterative.Rule;
import com.facebook.presto.sql.planner.optimizations.joins.JoinGraph;
import com.facebook.presto.sql.planner.plan.JoinNode;
import com.facebook.presto.sql.relational.OriginalExpressionUtils;
import com.facebook.presto.sql.tree.Expression;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;

Expand All @@ -45,11 +44,9 @@
import static com.facebook.presto.sql.analyzer.FeaturesConfig.JoinReorderingStrategy.ELIMINATE_CROSS_JOINS;
import static com.facebook.presto.sql.planner.iterative.rule.Util.restrictOutputs;
import static com.facebook.presto.sql.planner.plan.Patterns.join;
import static com.facebook.presto.sql.relational.OriginalExpressionUtils.castToRowExpression;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Maps.transformValues;
import static java.util.Comparator.comparing;
import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -192,24 +189,24 @@ public static PlanNode buildJoinTree(List<VariableReferenceExpression> expectedO
Optional.empty());
}

List<Expression> filters = graph.getFilters();
List<RowExpression> filters = graph.getFilters();

for (Expression filter : filters) {
for (RowExpression filter : filters) {
result = new FilterNode(
idAllocator.getNextId(),
result,
castToRowExpression(filter));
filter);
}

if (graph.getAssignments().isPresent()) {
result = new ProjectNode(
idAllocator.getNextId(),
result,
Assignments.copyOf(transformValues(graph.getAssignments().get(), OriginalExpressionUtils::castToRowExpression)));
Assignments.copyOf(graph.getAssignments().get()));
}

// If needed, introduce a projection to constrain the outputs to what was originally expected
// Some nodes are sensitive to what's produced (e.g., DistinctLimit node)
return restrictOutputs(idAllocator, result, ImmutableSet.copyOf(expectedOutputVariables), false).orElse(result);
return restrictOutputs(idAllocator, result, ImmutableSet.copyOf(expectedOutputVariables), true).orElse(result);
}
}
Loading