Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@
import io.prestosql.plugin.hive.HdfsEnvironment;
import io.prestosql.plugin.hive.HiveColumnHandle;
import io.prestosql.plugin.hive.HiveHdfsConfiguration;
import io.prestosql.plugin.hive.HiveTableHandle;
import io.prestosql.plugin.hive.authentication.HiveIdentity;
import io.prestosql.plugin.hive.authentication.NoHdfsAuthentication;
import io.prestosql.plugin.hive.metastore.Database;
import io.prestosql.plugin.hive.metastore.HiveMetastore;
import io.prestosql.plugin.hive.metastore.file.FileHiveMetastore;
import io.prestosql.plugin.hive.testing.TestingHiveConnectorFactory;
import io.prestosql.spi.connector.ColumnHandle;
import io.prestosql.spi.predicate.Domain;
import io.prestosql.spi.predicate.TupleDomain;
import io.prestosql.spi.security.PrincipalType;
import io.prestosql.sql.planner.assertions.BasePushdownPlanTest;
Expand All @@ -48,10 +50,16 @@
import static com.google.common.io.MoreFiles.deleteRecursively;
import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
import static io.prestosql.plugin.hive.TestHiveReaderProjectionsUtil.createProjectedColumnHandle;
import static io.prestosql.spi.type.BigintType.BIGINT;
import static io.prestosql.sql.planner.assertions.PlanMatchPattern.any;
import static io.prestosql.sql.planner.assertions.PlanMatchPattern.anyTree;
import static io.prestosql.sql.planner.assertions.PlanMatchPattern.equiJoinClause;
import static io.prestosql.sql.planner.assertions.PlanMatchPattern.expression;
import static io.prestosql.sql.planner.assertions.PlanMatchPattern.filter;
import static io.prestosql.sql.planner.assertions.PlanMatchPattern.join;
import static io.prestosql.sql.planner.assertions.PlanMatchPattern.project;
import static io.prestosql.sql.planner.assertions.PlanMatchPattern.tableScan;
import static io.prestosql.sql.planner.plan.JoinNode.Type.INNER;
import static io.prestosql.testing.TestingSession.testSessionBuilder;
import static java.lang.String.format;
import static org.testng.Assert.assertTrue;
Expand Down Expand Up @@ -116,14 +124,14 @@ public void testPushdownDisabled()
}

@Test
public void testProjectionPushdown()
public void testDereferencePushdown()
{
String testTable = "test_simple_projection_pushdown";
QualifiedObjectName completeTableName = new QualifiedObjectName(HIVE_CATALOG_NAME, SCHEMA_NAME, testTable);

getQueryRunner().execute(format(
"CREATE TABLE %s (col0) AS" +
" SELECT cast(row(5, 6) as row(a bigint, b bigint)) AS col0 WHERE false",
"CREATE TABLE %s (col0, col1) AS" +
" SELECT cast(row(5, 6) as row(x bigint, y bigint)) AS col0, 5 AS col1 WHERE false",
testTable));

Session session = getQueryRunner().getDefaultSession();
Expand All @@ -132,18 +140,70 @@ public void testProjectionPushdown()
assertTrue(tableHandle.isPresent(), "expected the table handle to be present");

Map<String, ColumnHandle> columns = getColumnHandles(session, completeTableName);
assertTrue(columns.containsKey("col0"), "expected column not found");

HiveColumnHandle baseColumnHandle = (HiveColumnHandle) columns.get("col0");
HiveColumnHandle column0Handle = (HiveColumnHandle) columns.get("col0");
HiveColumnHandle column1Handle = (HiveColumnHandle) columns.get("col1");

HiveColumnHandle columnX = createProjectedColumnHandle(column0Handle, ImmutableList.of(0));
HiveColumnHandle columnY = createProjectedColumnHandle(column0Handle, ImmutableList.of(1));

// Simple Projection pushdown
assertPlan(
"SELECT col0.a expr_a, col0.b expr_b FROM " + testTable,
"SELECT col0.x expr_x, col0.y expr_y FROM " + testTable,
any(tableScan(
equalTo(tableHandle.get().getConnectorHandle()),
TupleDomain.all(),
ImmutableMap.of(
"col0#a", equalTo(createProjectedColumnHandle(baseColumnHandle, ImmutableList.of(0))),
"col0#b", equalTo(createProjectedColumnHandle(baseColumnHandle, ImmutableList.of(1)))))));
equalTo(tableHandle.get().getConnectorHandle()),
TupleDomain.all(),
ImmutableMap.of("col0#x", equalTo(columnX), "col0#y", equalTo(columnY)))));

// Projection and predicate pushdown
assertPlan(
format("SELECT col0.x FROM %s WHERE col0.x = col1 + 3 and col0.y = 2", testTable),
anyTree(
filter(
"col0_y = bigint '2' AND (col0_x = cast((col1 + 3) as bigint))",
tableScan(
table -> ((HiveTableHandle) table).getCompactEffectivePredicate().getDomains().get()
.equals(ImmutableMap.of(columnY, Domain.singleValue(BIGINT, 2L))),
TupleDomain.all(),
ImmutableMap.of("col0_y", equalTo(columnY), "col0_x", equalTo(columnX), "col1", equalTo(column1Handle))))));

// Projection and predicate pushdown with overlapping columns
assertPlan(
format("SELECT col0, col0.y expr_y FROM %s WHERE col0.x = 5", testTable),
anyTree(
filter(
"col0_x = bigint '5'",
tableScan(
table -> ((HiveTableHandle) table).getCompactEffectivePredicate().getDomains().get()
.equals(ImmutableMap.of(columnX, Domain.singleValue(BIGINT, 5L))),
TupleDomain.all(),
ImmutableMap.of("col0", equalTo(column0Handle), "col0_x", equalTo(columnX))))));

// Projection and predicate pushdown with joins
assertPlan(
format("SELECT T.col0.x, T.col0, T.col0.y FROM %s T join %s S on T.col1 = S.col1 WHERE (T.col0.x = 2)", testTable, testTable),
anyTree(
project(
ImmutableMap.of(
"expr_0_x", expression("expr_0.x"),
"expr_0", expression("expr_0"),
"expr_0_y", expression("expr_0.y")),
join(
INNER,
ImmutableList.of(equiJoinClause("t_expr_1", "s_expr_1")),
anyTree(
filter(
"expr_0_x = BIGINT '2'",
tableScan(
table -> ((HiveTableHandle) table).getCompactEffectivePredicate().getDomains().get()
.equals(ImmutableMap.of(columnX, Domain.singleValue(BIGINT, 2L))),
TupleDomain.all(),
ImmutableMap.of("expr_0_x", equalTo(columnX), "expr_0", equalTo(column0Handle), "t_expr_1", equalTo(column1Handle))))),
anyTree(
tableScan(
equalTo(tableHandle.get().getConnectorHandle()),
TupleDomain.all(),
ImmutableMap.of("s_expr_1", equalTo(column1Handle))))))));
}

@AfterClass(alwaysRun = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import io.prestosql.sql.planner.iterative.rule.DetermineSemiJoinDistributionType;
import io.prestosql.sql.planner.iterative.rule.EliminateCrossJoins;
import io.prestosql.sql.planner.iterative.rule.EvaluateZeroSample;
import io.prestosql.sql.planner.iterative.rule.ExtractDereferencesFromFilterAboveScan;
import io.prestosql.sql.planner.iterative.rule.ExtractSpatialJoins;
import io.prestosql.sql.planner.iterative.rule.GatherAndMergeWindows;
import io.prestosql.sql.planner.iterative.rule.ImplementBernoulliSampleAsFilter;
Expand Down Expand Up @@ -105,6 +106,18 @@
import io.prestosql.sql.planner.iterative.rule.PruneWindowColumns;
import io.prestosql.sql.planner.iterative.rule.PushAggregationThroughOuterJoin;
import io.prestosql.sql.planner.iterative.rule.PushDeleteIntoConnector;
import io.prestosql.sql.planner.iterative.rule.PushDownDereferenceThroughFilter;
import io.prestosql.sql.planner.iterative.rule.PushDownDereferenceThroughJoin;
import io.prestosql.sql.planner.iterative.rule.PushDownDereferenceThroughProject;
import io.prestosql.sql.planner.iterative.rule.PushDownDereferenceThroughSemiJoin;
import io.prestosql.sql.planner.iterative.rule.PushDownDereferenceThroughUnnest;
import io.prestosql.sql.planner.iterative.rule.PushDownDereferencesThroughAssignUniqueId;
import io.prestosql.sql.planner.iterative.rule.PushDownDereferencesThroughLimit;
import io.prestosql.sql.planner.iterative.rule.PushDownDereferencesThroughRowNumber;
import io.prestosql.sql.planner.iterative.rule.PushDownDereferencesThroughSort;
import io.prestosql.sql.planner.iterative.rule.PushDownDereferencesThroughTopN;
import io.prestosql.sql.planner.iterative.rule.PushDownDereferencesThroughTopNRowNumber;
import io.prestosql.sql.planner.iterative.rule.PushDownDereferencesThroughWindow;
import io.prestosql.sql.planner.iterative.rule.PushLimitIntoTableScan;
import io.prestosql.sql.planner.iterative.rule.PushLimitThroughMarkDistinct;
import io.prestosql.sql.planner.iterative.rule.PushLimitThroughOffset;
Expand Down Expand Up @@ -299,7 +312,21 @@ public PlanOptimizers(
Set<Rule<?>> projectionPushdownRules = ImmutableSet.of(
new PushProjectionIntoTableScan(metadata, typeAnalyzer),
new PushProjectionThroughUnion(),
new PushProjectionThroughExchange());
new PushProjectionThroughExchange(),
// Dereference pushdown rules
new PushDownDereferenceThroughProject(typeAnalyzer),
new PushDownDereferenceThroughUnnest(typeAnalyzer),
new PushDownDereferenceThroughSemiJoin(typeAnalyzer),
new PushDownDereferenceThroughJoin(typeAnalyzer),
new PushDownDereferenceThroughFilter(typeAnalyzer),
new ExtractDereferencesFromFilterAboveScan(typeAnalyzer),
new PushDownDereferencesThroughLimit(typeAnalyzer),
new PushDownDereferencesThroughSort(typeAnalyzer),
new PushDownDereferencesThroughAssignUniqueId(typeAnalyzer),
new PushDownDereferencesThroughWindow(typeAnalyzer),
new PushDownDereferencesThroughTopN(typeAnalyzer),
new PushDownDereferencesThroughRowNumber(typeAnalyzer),
new PushDownDereferencesThroughTopNRowNumber(typeAnalyzer));

IterativeOptimizer inlineProjections = new IterativeOptimizer(
ruleStats,
Expand Down Expand Up @@ -497,6 +524,16 @@ public PlanOptimizers(
inlineProjections,
simplifyOptimizer, // Re-run the SimplifyExpressions to simplify any recomposed expressions from other optimizations
projectionPushDown,
// Projection pushdown rules may push reducing projections (e.g. dereferences) below filters for potential
// pushdown into the connectors. We invoke PredicatePushdown and PushPredicateIntoTableScan after this
// to leverage predicate pushdown on projected columns.
new StatsRecordingPlanOptimizer(optimizerStats, new PredicatePushDown(metadata, typeAnalyzer, true, false)),
simplifyOptimizer, // Should be always run after PredicatePushDown
new IterativeOptimizer(
ruleStats,
statsCalculator,
estimatedExchangesCostCalculator,
ImmutableSet.of(new PushPredicateIntoTableScan(metadata, typeAnalyzer))),
new UnaliasSymbolReferences(metadata), // Run again because predicate pushdown and projection pushdown might add more projections
new PruneUnreferencedOutputs(metadata, typeAnalyzer), // Make sure to run this before index join. Filtered projections may not have all the columns.
new IndexJoinOptimizer(metadata), // Run this after projections and filters have been fully simplified and pushed down
Expand Down Expand Up @@ -539,6 +576,16 @@ public PlanOptimizers(
estimatedExchangesCostCalculator,
ImmutableSet.of(new PushPredicateIntoTableScan(metadata, typeAnalyzer))),
projectionPushDown,
// Projection pushdown rules may push reducing projections (e.g. dereferences) below filters for potential
// pushdown into the connectors. Invoke PredicatePushdown and PushPredicateIntoTableScan after this
// to leverage predicate pushdown on projected columns.
new StatsRecordingPlanOptimizer(optimizerStats, new PredicatePushDown(metadata, typeAnalyzer, true, false)),
Copy link
Copy Markdown

@JamesRTaylor JamesRTaylor Apr 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing call to RemoveUnsupportedDynamicFilters here is causing this query to be 4x slower:

                     new RemoveUnsupportedDynamicFilters(metadata),

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Upon closer examination, performance is the same without the RemoveUnsupportedDynamicFilters. Sorry for the noise.

simplifyOptimizer, // Should be always run after PredicatePushDown
new IterativeOptimizer(
ruleStats,
statsCalculator,
estimatedExchangesCostCalculator,
ImmutableSet.of(new PushPredicateIntoTableScan(metadata, typeAnalyzer))),
new PruneUnreferencedOutputs(metadata, typeAnalyzer),
new IterativeOptimizer(
ruleStats,
Expand Down Expand Up @@ -627,6 +674,17 @@ public PlanOptimizers(
costCalculator,
ImmutableSet.of(new RemoveRedundantTableScanPredicate(metadata))));
builder.add(projectionPushDown);
// Projection pushdown rules may push reducing projections (e.g. dereferences) below filters for potential
// pushdown into the connectors. Invoke PredicatePushdown and PushPredicateIntoTableScan after this
// to leverage predicate pushdown on projected columns.
builder.add(new StatsRecordingPlanOptimizer(optimizerStats, new PredicatePushDown(metadata, typeAnalyzer, true, true)));
builder.add(new RemoveUnsupportedDynamicFilters(metadata)); // Remove unsupported dynamic filters introduced by PredicatePushdown
builder.add(simplifyOptimizer); // Should always run after PredicatePushdown
new IterativeOptimizer(
ruleStats,
statsCalculator,
costCalculator,
ImmutableSet.of(new PushPredicateIntoTableScan(metadata, typeAnalyzer)));
builder.add(inlineProjections);
builder.add(new UnaliasSymbolReferences(metadata)); // Run unalias after merging projections to simplify projections more efficiently
builder.add(new PruneUnreferencedOutputs(metadata, typeAnalyzer));
Expand Down
Loading