Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ private AggregationNode simpleAggregationSum(PlanBuilder pb, PlanNode source, Va
ImmutableList.of(),
SINGLE,
Optional.empty(),
Optional.empty(),
Optional.empty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.facebook.presto.Session;
import com.facebook.presto.execution.SqlQueryManager;
import com.facebook.presto.spi.Plugin;
import com.facebook.presto.spi.plan.AggregationNode;
import com.facebook.presto.spi.plan.ProjectNode;
import com.facebook.presto.spi.statistics.HistoryBasedPlanStatisticsProvider;
import com.facebook.presto.sql.planner.Plan;
Expand All @@ -32,9 +33,11 @@
import org.testng.annotations.Test;

import static com.facebook.presto.SystemSessionProperties.JOIN_DISTRIBUTION_TYPE;
import static com.facebook.presto.SystemSessionProperties.PARTIAL_AGGREGATION_STRATEGY;
import static com.facebook.presto.SystemSessionProperties.RESTRICT_HISTORY_BASED_OPTIMIZATION_TO_COMPLEX_QUERY;
import static com.facebook.presto.SystemSessionProperties.TRACK_HISTORY_BASED_PLAN_STATISTICS;
import static com.facebook.presto.SystemSessionProperties.USE_HISTORY_BASED_PLAN_STATISTICS;
import static com.facebook.presto.SystemSessionProperties.USE_PARTIAL_AGGREGATION_HISTORY;
import static com.facebook.presto.hive.HiveQueryRunner.HIVE_CATALOG;
import static com.facebook.presto.hive.HiveSessionProperties.PUSHDOWN_FILTER_ENABLED;
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.any;
Expand Down Expand Up @@ -80,7 +83,7 @@ public void testHistoryBasedStatsCalculator()
anyTree(node(ProjectNode.class, any())).withOutputRowCount(229.5));

// HBO Statistics
executeAndTrackHistory("SELECT *, 1 FROM test_orders where ds = '2020-09-01' and substr(orderpriority, 1, 1) = '1'");
executeAndTrackHistory("SELECT *, 1 FROM test_orders where ds = '2020-09-01' and substr(orderpriority, 1, 1) = '1'", defaultSession());
assertPlan(
"SELECT *, 2 FROM test_orders where ds = '2020-09-02' and substr(orderpriority, 1, 1) = '1'",
anyTree(node(ProjectNode.class, any()).withOutputRowCount(48)));
Expand All @@ -96,7 +99,7 @@ public void testInsertTable()
try {
getQueryRunner().execute("CREATE TABLE test_orders (orderkey integer, ds varchar) WITH (partitioned_by = ARRAY['ds'])");

Plan plan = plan("insert into test_orders (values (1, '2023-09-20'), (2, '2023-09-21'))", createSession());
Plan plan = plan("insert into test_orders (values (1, '2023-09-20'), (2, '2023-09-21'))", defaultSession());

assertTrue(PlanNodeSearcher.searchFrom(plan.getRoot())
.where(node -> node instanceof TableWriterMergeNode && !node.getStatsEquivalentPlanNode().isPresent())
Expand Down Expand Up @@ -125,7 +128,7 @@ public void testBroadcastJoin()
// CBO Statistics
Plan plan = plan("SELECT * FROM " +
"(SELECT * FROM test_orders where ds = '2020-09-01' and substr(CAST(custkey AS VARCHAR), 1, 3) <> '370') t1 JOIN " +
"(SELECT * FROM test_orders where ds = '2020-09-02' and substr(CAST(custkey AS VARCHAR), 1, 3) = '370') t2 ON t1.orderkey = t2.orderkey", createSession());
"(SELECT * FROM test_orders where ds = '2020-09-02' and substr(CAST(custkey AS VARCHAR), 1, 3) = '370') t2 ON t1.orderkey = t2.orderkey", defaultSession());

assertTrue(PlanNodeSearcher.searchFrom(plan.getRoot())
.where(node -> node instanceof JoinNode && ((JoinNode) node).getDistributionType().get().equals(JoinNode.DistributionType.PARTITIONED))
Expand All @@ -135,11 +138,12 @@ public void testBroadcastJoin()
// HBO Statistics
executeAndTrackHistory("SELECT * FROM " +
"(SELECT * FROM test_orders where ds = '2020-09-01' and substr(CAST(custkey AS VARCHAR), 1, 3) <> '370') t1 JOIN " +
"(SELECT * FROM test_orders where ds = '2020-09-02' and substr(CAST(custkey AS VARCHAR), 1, 3) = '370') t2 ON t1.orderkey = t2.orderkey");
"(SELECT * FROM test_orders where ds = '2020-09-02' and substr(CAST(custkey AS VARCHAR), 1, 3) = '370') t2 ON t1.orderkey = t2.orderkey",
defaultSession());

plan = plan("SELECT * FROM " +
"(SELECT * FROM test_orders where ds = '2020-09-01' and substr(CAST(custkey AS VARCHAR), 1, 3) <> '370') t1 JOIN " +
"(SELECT * FROM test_orders where ds = '2020-09-02' and substr(CAST(custkey AS VARCHAR), 1, 3) = '370') t2 ON t1.orderkey = t2.orderkey", createSession());
"(SELECT * FROM test_orders where ds = '2020-09-02' and substr(CAST(custkey AS VARCHAR), 1, 3) = '370') t2 ON t1.orderkey = t2.orderkey", defaultSession());

assertTrue(PlanNodeSearcher.searchFrom(plan.getRoot())
.where(node -> node instanceof JoinNode && ((JoinNode) node).getDistributionType().get().equals(JoinNode.DistributionType.REPLICATED))
Expand All @@ -151,28 +155,65 @@ public void testBroadcastJoin()
}
}

@Test
public void testPartialAggStatistics()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: @Test?

{
try {
// CBO Statistics
getQueryRunner().execute("CREATE TABLE test_orders WITH (partitioned_by = ARRAY['ds', 'ts']) AS " +
"SELECT orderkey, orderpriority, comment, custkey, '2020-09-01' as ds, '00:01' as ts FROM orders where orderkey < 2000 ");

String query = "SELECT count(*) FROM test_orders group by custkey";
Session session = createSession("always");
Plan plan = plan(query, session);

assertTrue(PlanNodeSearcher.searchFrom(plan.getRoot())
.where(node -> node instanceof AggregationNode && ((AggregationNode) node).getStep() == AggregationNode.Step.PARTIAL)
.findFirst()
.isPresent());

// collect HBO Statistics
executeAndTrackHistory(query, createSession("always"));

plan = plan(query, createSession("automatic"));

assertTrue(PlanNodeSearcher.searchFrom(plan.getRoot())
.where(node -> node instanceof AggregationNode && ((AggregationNode) node).getStep() == AggregationNode.Step.PARTIAL).findAll().isEmpty());
}
finally {
getQueryRunner().execute("DROP TABLE IF EXISTS test_orders");
}
}

@Override
protected void assertPlan(@Language("SQL") String query, PlanMatchPattern pattern)
{
assertPlan(createSession(), query, pattern);
assertPlan(defaultSession(), query, pattern);
}

private void executeAndTrackHistory(String sql)
private void executeAndTrackHistory(String sql, Session session)
{
DistributedQueryRunner queryRunner = (DistributedQueryRunner) getQueryRunner();
SqlQueryManager sqlQueryManager = (SqlQueryManager) queryRunner.getCoordinator().getQueryManager();
InMemoryHistoryBasedPlanStatisticsProvider provider = (InMemoryHistoryBasedPlanStatisticsProvider) sqlQueryManager.getHistoryBasedPlanStatisticsTracker().getHistoryBasedPlanStatisticsProvider();

queryRunner.execute(createSession(), sql);
queryRunner.execute(session, sql);
provider.waitProcessQueryEvents();
}

private Session createSession()
private Session defaultSession()
{
return createSession("automatic");
}

private Session createSession(String partialAggregationStrategy)
{
return Session.builder(getQueryRunner().getDefaultSession())
.setSystemProperty(USE_HISTORY_BASED_PLAN_STATISTICS, "true")
.setSystemProperty(TRACK_HISTORY_BASED_PLAN_STATISTICS, "true")
.setSystemProperty(JOIN_DISTRIBUTION_TYPE, "automatic")
.setSystemProperty(PARTIAL_AGGREGATION_STRATEGY, partialAggregationStrategy)
.setSystemProperty(USE_PARTIAL_AGGREGATION_HISTORY, "true")
.setCatalogSessionProperty(HIVE_CATALOG, PUSHDOWN_FILTER_ENABLED, "true")
.setSystemProperty(RESTRICT_HISTORY_BASED_OPTIMIZATION_TO_COMPLEX_QUERY, "false")
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ public final class SystemSessionProperties
public static final String REWRITE_CONSTANT_ARRAY_CONTAINS_TO_IN_EXPRESSION = "rewrite_constant_array_contains_to_in_expression";
public static final String INFER_INEQUALITY_PREDICATES = "infer_inequality_predicates";
public static final String ENABLE_HISTORY_BASED_SCALED_WRITER = "enable_history_based_scaled_writer";
public static final String USE_PARTIAL_AGGREGATION_HISTORY = "use_partial_aggregation_history";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: It's usually more convenient when config names and session property names are consistent. Also I remember there used to be a rule of thumb to call boolean properties as ...-enabled.

What do you think about history_based_partial_aggregation_optimization_enabled (optimizer.history-based-partial-aggregation-optimization-enabled) (or something along the line to keep it close to how HISTORY_BASED_SCALED_WRITER is called)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that makes sense.
The only caveat here is that this optimization was already history-based, but now we use the statistics from the partial aggregation instead of the final aggregation. I added the flag to avoid possible regressions and be able to gradually deploy.
If you have more name suggestions let me know :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense. Let's keep the name. However It may still be worth to have a consistent session property name (use_partial_aggregation_history) and config property name (optimizer.use-partial-aggregation-history)

public static final String REMOVE_REDUNDANT_CAST_TO_VARCHAR_IN_JOIN = "remove_redundant_cast_to_varchar_in_join";
public static final String HANDLE_COMPLEX_EQUI_JOINS = "handle_complex_equi_joins";

Expand Down Expand Up @@ -1772,6 +1773,11 @@ public SystemSessionProperties(
"Enable setting the initial number of tasks for scaled writers with HBO",
featuresConfig.isUseHBOForScaledWriters(),
false),
booleanProperty(
USE_PARTIAL_AGGREGATION_HISTORY,
"Use collected partial aggregation statistics from HBO",
featuresConfig.isUsePartialAggregationHistory(),
false),
booleanProperty(
REMOVE_REDUNDANT_CAST_TO_VARCHAR_IN_JOIN,
"If both left and right side of join clause are varchar cast from int/bigint, remove the cast here",
Expand Down Expand Up @@ -2961,6 +2967,11 @@ public static boolean useHistoryBasedScaledWriters(Session session)
return session.getSystemProperty(ENABLE_HISTORY_BASED_SCALED_WRITER, Boolean.class);
}

public static boolean usePartialAggregationHistory(Session session)
{
return session.getSystemProperty(USE_PARTIAL_AGGREGATION_HISTORY, Boolean.class);
}

public static boolean isRemoveRedundantCastToVarcharInJoinEnabled(Session session)
{
return session.getSystemProperty(REMOVE_REDUNDANT_CAST_TO_VARCHAR_IN_JOIN, Boolean.class);
Expand Down
Loading