Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ Configuration Property Name Description
``optimizer.track-history-based-plan-statistics`` Recording the statistics of the current query as history statistics so as to be used by future queries. ``False``
``optimizer.track-history-stats-from-failed-queries`` Track history based plan statistics from complete plan fragments in failed queries. ``True``
``optimizer.history-based-optimizer-timeout`` Timeout for history based optimizer. ``10 seconds``
``optimizer.enforce-timeout-for-hbo-query-registration`` Enforce timeout for query registration in HBO optimizer ``False``
``optimizer.treat-low-confidence-zero-estimation-as-unknown`` Treat ``LOW`` confidence, zero estimations as ``UNKNOWN`` during joins. ``False``
``optimizer.confidence-based-broadcast`` Broadcast based on the confidence of the statistics that are being used, by broadcasting the side of a joinNode which ``False``
has the highest confidence statistics. If confidence is the same, then the original behavior will be followed.
Expand All @@ -61,6 +62,8 @@ Session property Name Description
``optimizer.track-history-stats-from-failed-queries`` in the current session.
``history_based_optimizer_timeout_limit`` Overrides the behavior of the configuration property ``optimizer.history-based-optimizer-timeout``
``optimizer.history-based-optimizer-timeout`` in the current session.
``enforce_history_based_optimizer_register_timeout`` Overrides the behavior of the configuration property ``optimizer.enforce-timeout-for-hbo-query-registration``
``optimizer.enforce-timeout-for-hbo-query-registration`` in the current session.
``restrict_history_based_optimization_to_complex_query`` Enable history based optimization only for complex queries, i.e. queries with join and aggregation. ``True``
``history_input_table_statistics_matching_threshold`` Overrides the behavior of the configuration property ``hbo.history-matching-threshold``
``hbo.history-matching-threshold`` in the current session.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ public final class SystemSessionProperties
public static final String HISTORY_BASED_OPTIMIZATION_PLAN_CANONICALIZATION_STRATEGY = "history_based_optimization_plan_canonicalization_strategy";
public static final String ENABLE_VERBOSE_HISTORY_BASED_OPTIMIZER_RUNTIME_STATS = "enable_verbose_history_based_optimizer_runtime_stats";
public static final String LOG_QUERY_PLANS_USED_IN_HISTORY_BASED_OPTIMIZER = "log_query_plans_used_in_history_based_optimizer";
public static final String ENFORCE_HISTORY_BASED_OPTIMIZER_REGISTRATION_TIMEOUT = "enforce_history_based_optimizer_register_timeout";
public static final String MAX_LEAF_NODES_IN_PLAN = "max_leaf_nodes_in_plan";
public static final String LEAF_NODE_LIMIT_ENABLED = "leaf_node_limit_enabled";
public static final String PUSH_REMOTE_EXCHANGE_THROUGH_GROUP_ID = "push_remote_exchange_through_group_id";
Expand Down Expand Up @@ -1589,6 +1590,11 @@ public SystemSessionProperties(
"Enable logging of query plans generated and used in history based optimizer",
featuresConfig.isLogPlansUsedInHistoryBasedOptimizer(),
false),
booleanProperty(
ENFORCE_HISTORY_BASED_OPTIMIZER_REGISTRATION_TIMEOUT,
"Enforce timeout for query registration in HBO optimizer",
featuresConfig.isEnforceTimeoutForHBOQueryRegistration(),
false),
new PropertyMetadata<>(
MAX_LEAF_NODES_IN_PLAN,
"Maximum number of leaf nodes in the logical plan of SQL statement",
Expand Down Expand Up @@ -3064,6 +3070,11 @@ public static boolean logQueryPlansUsedInHistoryBasedOptimizer(Session session)
return session.getSystemProperty(LOG_QUERY_PLANS_USED_IN_HISTORY_BASED_OPTIMIZER, Boolean.class);
}

public static boolean enforceHistoryBasedOptimizerRegistrationTimeout(Session session)
{
return session.getSystemProperty(ENFORCE_HISTORY_BASED_OPTIMIZER_REGISTRATION_TIMEOUT, Boolean.class);
}

public static boolean shouldPushRemoteExchangeThroughGroupId(Session session)
{
return session.getSystemProperty(PUSH_REMOTE_EXCHANGE_THROUGH_GROUP_ID, Boolean.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ public class FeaturesConfig
private Duration historyBasedOptimizerTimeout = new Duration(10, SECONDS);
private String historyBasedOptimizerPlanCanonicalizationStrategies = "IGNORE_SAFE_CONSTANTS";
private boolean logPlansUsedInHistoryBasedOptimizer;
private boolean enforceTimeoutForHBOQueryRegistration;
private boolean redistributeWrites = true;
private boolean scaleWriters;
private DataSize writerMinSize = new DataSize(32, MEGABYTE);
Expand Down Expand Up @@ -1009,6 +1010,18 @@ public FeaturesConfig setLogPlansUsedInHistoryBasedOptimizer(boolean logPlansUse
return this;
}

public boolean isEnforceTimeoutForHBOQueryRegistration()
{
return enforceTimeoutForHBOQueryRegistration;
}

@Config("optimizer.enforce-timeout-for-hbo-query-registration")
public FeaturesConfig setEnforceTimeoutForHBOQueryRegistration(boolean enforceTimeoutForHBOQueryRegistration)
{
this.enforceTimeoutForHBOQueryRegistration = enforceTimeoutForHBOQueryRegistration;
return this;
}

public AggregationPartitioningMergingStrategy getAggregationPartitioningMergingStrategy()
{
return aggregationPartitioningMergingStrategy;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,18 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import static com.facebook.presto.SystemSessionProperties.enforceHistoryBasedOptimizerRegistrationTimeout;
import static com.facebook.presto.SystemSessionProperties.getHistoryBasedOptimizerTimeoutLimit;
import static com.facebook.presto.SystemSessionProperties.getHistoryCanonicalPlanNodeLimit;
import static com.facebook.presto.SystemSessionProperties.restrictHistoryBasedOptimizationToComplexQuery;
import static com.facebook.presto.SystemSessionProperties.trackHistoryBasedPlanStatisticsEnabled;
import static com.facebook.presto.SystemSessionProperties.useHistoryBasedPlanStatisticsEnabled;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;

public class HistoricalStatisticsEquivalentPlanMarkingOptimizer
Expand Down Expand Up @@ -120,9 +125,25 @@ public PlanOptimizerResult optimize(PlanNode plan, Session session, TypeProvider
}

// Fetch and cache history based statistics of all plan nodes, so no serial network calls happen later.
boolean registerSucceed = statsCalculator.registerPlan(newPlan, session, startTimeInNano, timeoutInMilliseconds);
boolean registrationSucceeded = false;
if (enforceHistoryBasedOptimizerRegistrationTimeout(session)) {
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Boolean> future = executor.submit(() -> statsCalculator.registerPlan(newPlan, session, startTimeInNano, timeoutInMilliseconds));
try {
registrationSucceeded = future.get(timeoutInMilliseconds, MILLISECONDS);
}
catch (Exception ignored) {
}
finally {
executor.shutdownNow();
}
}
else {
registrationSucceeded = statsCalculator.registerPlan(newPlan, session, startTimeInNano, timeoutInMilliseconds);
}

// Return original plan if timeout or registration not successful
if (checkTimeOut(startTimeInNano, timeoutInMilliseconds) || !registerSucceed) {
if (checkTimeOut(startTimeInNano, timeoutInMilliseconds) || !registrationSucceeded) {
logOptimizerFailure(session);
return PlanOptimizerResult.optimizerResult(plan, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.TimeUnit;

import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;

public class InMemoryHistoryBasedPlanStatisticsProvider
Expand Down Expand Up @@ -80,6 +81,16 @@ public void waitProcessQueryEvents()
}
}

public void noProcessQueryEvents()
{
try {
assertFalse(semaphore.tryAcquire(10, TimeUnit.SECONDS));
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

semaphore is released during the putStats function call. If HBO failed, there will be no putStats function call, hence we will not get the semaphore here. Use this function to assert the case where HBO timeout

}
catch (InterruptedException e) {
throw new AssertionError("Expect no history statistics to be written");
}
}

// Returns boolean whether stats writing query events were processed
public boolean waitProcessQueryEventsIfAvailable()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public void testDefaults()
.setHistoryBasedOptimizerTimeout(new Duration(10, SECONDS))
.setHistoryBasedOptimizerPlanCanonicalizationStrategies("IGNORE_SAFE_CONSTANTS")
.setLogPlansUsedInHistoryBasedOptimizer(false)
.setEnforceTimeoutForHBOQueryRegistration(false)
.setRedistributeWrites(true)
.setScaleWriters(false)
.setWriterMinSize(new DataSize(32, MEGABYTE))
Expand Down Expand Up @@ -330,6 +331,7 @@ public void testExplicitPropertyMappings()
.put("optimizer.history-canonical-plan-node-limit", "2")
.put("optimizer.history-based-optimizer-plan-canonicalization-strategies", "IGNORE_SAFE_CONSTANTS,IGNORE_SCAN_CONSTANTS")
.put("optimizer.log-plans-used-in-history-based-optimizer", "true")
.put("optimizer.enforce-timeout-for-hbo-query-registration", "true")
.put("optimizer.history-based-optimizer-timeout", "1s")
.put("redistribute-writes", "false")
.put("scale-writers", "true")
Expand Down Expand Up @@ -541,6 +543,7 @@ public void testExplicitPropertyMappings()
.setHistoryBasedOptimizerTimeout(new Duration(1, SECONDS))
.setHistoryBasedOptimizerPlanCanonicalizationStrategies("IGNORE_SAFE_CONSTANTS,IGNORE_SCAN_CONSTANTS")
.setLogPlansUsedInHistoryBasedOptimizer(true)
.setEnforceTimeoutForHBOQueryRegistration(true)
.setRedistributeWrites(false)
.setScaleWriters(true)
.setWriterMinSize(new DataSize(42, GIGABYTE))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import org.testng.annotations.Test;

import static com.facebook.presto.SystemSessionProperties.CONFIDENCE_BASED_BROADCAST_ENABLED;
import static com.facebook.presto.SystemSessionProperties.ENFORCE_HISTORY_BASED_OPTIMIZER_REGISTRATION_TIMEOUT;
import static com.facebook.presto.SystemSessionProperties.HISTORY_BASED_OPTIMIZER_TIMEOUT_LIMIT;
import static com.facebook.presto.SystemSessionProperties.HISTORY_CANONICAL_PLAN_NODE_LIMIT;
import static com.facebook.presto.SystemSessionProperties.JOIN_DISTRIBUTION_TYPE;
import static com.facebook.presto.SystemSessionProperties.JOIN_REORDERING_STRATEGY;
Expand Down Expand Up @@ -129,6 +131,88 @@ public void testHistoryBasedStatsCalculator()
anyTree(node(AggregationNode.class, node(ExchangeNode.class, anyTree(any()))).withOutputRowCount(3).withOutputSize(54)));
}

@Test
public void testHistoryBasedStatsCalculatorEnforceTimeOut()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks like it tests cases where the timeout is enabled, but never exceeded. Can you add a test case where the timeout gets enforced, eg. you set the timeout really low (like to 0?) and then check that we don't use hbo stats?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added test with time limit to be 0

{
Session sessionWithDefaultTimeoutLimit = Session.builder(createSession())
.setSystemProperty(ENFORCE_HISTORY_BASED_OPTIMIZER_REGISTRATION_TIMEOUT, "true")
.build();
Session sessionWithZeroTimeoutLimit = Session.builder(createSession())
.setSystemProperty(ENFORCE_HISTORY_BASED_OPTIMIZER_REGISTRATION_TIMEOUT, "true")
.setSystemProperty(HISTORY_BASED_OPTIMIZER_TIMEOUT_LIMIT, "0ms")
.build();
// CBO Statistics
assertPlan(
sessionWithDefaultTimeoutLimit,
"SELECT * FROM nation where substr(name, 1, 1) = 'A'",
anyTree(node(FilterNode.class, any()).withOutputRowCount(Double.NaN)));

// Write HBO statistics failed as we set timeout limit to be 0
executeAndNoHistoryWritten("SELECT * FROM nation where substr(name, 1, 1) = 'A'", sessionWithZeroTimeoutLimit);
// No HBO statistics read
assertPlan(
sessionWithDefaultTimeoutLimit,
"SELECT * FROM nation where substr(name, 1, 1) = 'A'",
anyTree(node(FilterNode.class, any()).withOutputRowCount(Double.NaN)));

// Write HBO Statistics is successful, as we use the default 10 seconds timeout limit
executeAndTrackHistory("SELECT * FROM nation where substr(name, 1, 1) = 'A'", sessionWithDefaultTimeoutLimit);
// Read HBO statistics successfully with default timeout
assertPlan(
sessionWithDefaultTimeoutLimit,
"SELECT * FROM nation where substr(name, 1, 1) = 'A'",
anyTree(node(FilterNode.class, any()).withOutputRowCount(2).withOutputSize(199)));
// Read HBO statistics fail due to timeout
assertPlan(
sessionWithZeroTimeoutLimit,
"SELECT * FROM nation where substr(name, 1, 1) = 'A'",
anyTree(node(FilterNode.class, any()).withOutputRowCount(Double.NaN)));

// CBO Statistics
assertPlan(
sessionWithDefaultTimeoutLimit,
"SELECT max(nationkey) FROM nation where name < 'D' group by regionkey",
anyTree(node(ProjectNode.class, node(FilterNode.class, any())).withOutputRowCount(12.5)));
assertPlan(
sessionWithDefaultTimeoutLimit,
"SELECT max(nationkey) FROM nation where name < 'D' group by regionkey",
anyTree(node(AggregationNode.class, node(ExchangeNode.class, anyTree(any()))).withOutputRowCount(Double.NaN)));

// Write HBO statistics failed as we set timeout limit to be 0
executeAndNoHistoryWritten("SELECT max(nationkey) FROM nation where name < 'D' group by regionkey", sessionWithZeroTimeoutLimit);
// No HBO statistics read
assertPlan(
sessionWithDefaultTimeoutLimit,
"SELECT max(nationkey) FROM nation where name < 'D' group by regionkey",
anyTree(node(ProjectNode.class, node(FilterNode.class, any())).withOutputRowCount(12.5)));
assertPlan(
sessionWithDefaultTimeoutLimit,
"SELECT max(nationkey) FROM nation where name < 'D' group by regionkey",
anyTree(node(AggregationNode.class, node(ExchangeNode.class, anyTree(any()))).withOutputRowCount(Double.NaN)));

// Write HBO Statistics is successful, as we use the default 10 seconds timeout limit
executeAndTrackHistory("SELECT max(nationkey) FROM nation where name < 'D' group by regionkey", sessionWithDefaultTimeoutLimit);
// Read HBO statistics successfully with default timeout
assertPlan(
sessionWithDefaultTimeoutLimit,
"SELECT max(nationkey) FROM nation where name < 'D' group by regionkey",
anyTree(node(ProjectNode.class, node(FilterNode.class, any())).withOutputRowCount(5).withOutputSize(90)));
assertPlan(
sessionWithDefaultTimeoutLimit,
"SELECT max(nationkey) FROM nation where name < 'D' group by regionkey",
anyTree(node(AggregationNode.class, node(ExchangeNode.class, anyTree(any()))).withOutputRowCount(3).withOutputSize(54)));

// Read HBO statistics fail due to timeout
assertPlan(
sessionWithZeroTimeoutLimit,
"SELECT max(nationkey) FROM nation where name < 'D' group by regionkey",
anyTree(node(ProjectNode.class, node(FilterNode.class, any())).withOutputRowCount(12.5)));
assertPlan(
sessionWithZeroTimeoutLimit,
"SELECT max(nationkey) FROM nation where name < 'D' group by regionkey",
anyTree(node(AggregationNode.class, node(ExchangeNode.class, anyTree(any()))).withOutputRowCount(Double.NaN)));
}

@Test
public void testFailedQuery()
{
Expand Down Expand Up @@ -525,6 +609,12 @@ private void executeAndTrackHistory(String sql, Session session)
getHistoryProvider().waitProcessQueryEvents();
}

private void executeAndNoHistoryWritten(String sql, Session session)
{
getQueryRunner().execute(session, sql);
getHistoryProvider().noProcessQueryEvents();
}

private InMemoryHistoryBasedPlanStatisticsProvider getHistoryProvider()
{
DistributedQueryRunner queryRunner = (DistributedQueryRunner) getQueryRunner();
Expand Down