Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public final class SystemSessionProperties
public static final String OPTIMIZE_HASH_GENERATION = "optimize_hash_generation";
public static final String JOIN_DISTRIBUTION_TYPE = "join_distribution_type";
public static final String JOIN_MAX_BROADCAST_TABLE_SIZE = "join_max_broadcast_table_size";
public static final String RETRY_QUERY_WITH_HISTORY_BASED_OPTIMIZATION = "retry_query_with_history_based_optimization";
public static final String SIZE_BASED_JOIN_DISTRIBUTION_TYPE = "size_based_join_distribution_type";
public static final String DISTRIBUTED_JOIN = "distributed_join";
public static final String DISTRIBUTED_INDEX_JOIN = "distributed_index_join";
Expand Down Expand Up @@ -432,6 +433,10 @@ public SystemSessionProperties(
"Enable confidence based broadcasting when enabled",
featuresConfig.isConfidenceBasedBroadcastEnabled(),
false),
booleanProperty(RETRY_QUERY_WITH_HISTORY_BASED_OPTIMIZATION,
"Automatically retry a query if it fails and HBO can change the query plan",
featuresConfig.isRetryQueryWithHistoryBasedOptimizationEnabled(),
false),
booleanProperty(
TREAT_LOW_CONFIDENCE_ZERO_ESTIMATION_AS_UNKNOWN_ENABLED,
"Treat low confidence zero estimations as unknowns during joins when enabled",
Expand Down Expand Up @@ -2057,6 +2062,11 @@ public static boolean treatLowConfidenceZeroEstimationAsUnknownEnabled(Session s
return session.getSystemProperty(TREAT_LOW_CONFIDENCE_ZERO_ESTIMATION_AS_UNKNOWN_ENABLED, Boolean.class);
}

public static boolean retryQueryWithHistoryBasedOptimizationEnabled(Session session)
{
return session.getSystemProperty(RETRY_QUERY_WITH_HISTORY_BASED_OPTIMIZATION, Boolean.class);
}

public static int getHashPartitionCount(Session session)
{
return session.getSystemProperty(HASH_PARTITION_COUNT, Integer.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.facebook.presto.execution.StageInfo;
import com.facebook.presto.execution.buffer.PagesSerdeFactory;
import com.facebook.presto.operator.ExchangeClient;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.spi.function.SqlFunctionId;
import com.facebook.presto.spi.function.SqlInvokedFunction;
Expand Down Expand Up @@ -79,6 +80,9 @@
import static com.facebook.presto.SystemSessionProperties.getTargetResultSize;
import static com.facebook.presto.SystemSessionProperties.isExchangeChecksumEnabled;
import static com.facebook.presto.SystemSessionProperties.isExchangeCompressionEnabled;
import static com.facebook.presto.SystemSessionProperties.retryQueryWithHistoryBasedOptimizationEnabled;
import static com.facebook.presto.SystemSessionProperties.trackHistoryBasedPlanStatisticsEnabled;
import static com.facebook.presto.SystemSessionProperties.useHistoryBasedPlanStatisticsEnabled;
import static com.facebook.presto.execution.QueryState.FAILED;
import static com.facebook.presto.execution.QueryState.WAITING_FOR_PREREQUISITES;
import static com.facebook.presto.server.protocol.QueryResourceUtil.toStatementStats;
Expand All @@ -98,6 +102,9 @@ class Query
{
private static final Logger log = Logger.get(Query.class);
private static final Base64.Encoder BASE64_ENCODER = Base64.getEncoder();
private static Optional<QueryId> originalBeforeRetryQueryId = Optional.empty();
private static Optional<Integer> previousQueryTopLevelPlanHash = Optional.empty();
private static Optional<QueryError> previousQueryFailureError = Optional.empty();

private final QueryManager queryManager;
private final TransactionManager transactionManager;
Expand Down Expand Up @@ -383,27 +390,40 @@ private synchronized Optional<QueryResults> getCachedResult(long token)
private synchronized QueryResults getNextResultWithRetry(long token, UriInfo uriInfo, String scheme, DataSize targetResultSize, boolean binaryResults)
{
QueryResults queryResults = getNextResult(token, uriInfo, scheme, targetResultSize, binaryResults);
if (queryResults.getError() == null || !queryResults.getError().isRetriable()) {
return queryResults;
}

// check if we have exceeded the global limit
retryCircuitBreaker.incrementFailure();
if (!retryCircuitBreaker.isRetryAllowed() || hasProducedResult) {
if (queryResults.getError() == null) {
return queryResults;
}

// check if we have exceeded the local limit
if (queryManager.getQueryRetryCount(queryId) >= getQueryRetryLimit(session) ||
queryManager.getQueryInfo(queryId).getQueryStats().getExecutionTime().toMillis() > getQueryRetryMaxExecutionTime(session).toMillis()) {
return queryResults;
boolean historyBasedOptimizationEnabled = useHistoryBasedPlanStatisticsEnabled(session) && trackHistoryBasedPlanStatisticsEnabled(session);
boolean hasNotRetried = queryManager.getQueryRetryCount(queryId) < 1;

if (historyBasedOptimizationEnabled && hasNotRetried && retryConditionsMet(queryResults) && retryQueryWithHistoryBasedOptimizationEnabled(session)) {
originalBeforeRetryQueryId = Optional.of(queryId);
previousQueryTopLevelPlanHash = getCurrentTopLevelPlanHash();
previousQueryFailureError = Optional.of(queryResults.getError());
}
else if (queryManager.getQueryRetryCount(queryId) == 1 && retryQueryWithHistoryBasedOptimizationEnabled(session) && retryConditionsMet(queryResults) && historyBasedOptimizationEnabled) {
Optional<Integer> currentTopLevelPlanHash = getCurrentTopLevelPlanHash();

if (previousQueryTopLevelPlanHash.isPresent() && currentTopLevelPlanHash.isPresent() && currentTopLevelPlanHash.equals(previousQueryTopLevelPlanHash)
|| (!previousQueryTopLevelPlanHash.isPresent() && !currentTopLevelPlanHash.isPresent())) {
queryManager.failQuery(queryId, new PrestoException(GENERIC_INTERNAL_ERROR, "Since the plan hashes did not change, your retry query will not execute." +
"Your original error was " + previousQueryFailureError.get() + ". Original QueryId: " + originalBeforeRetryQueryId +
". Retry QueryId: " + queryId));
}

originalBeforeRetryQueryId = Optional.empty();
previousQueryTopLevelPlanHash = Optional.empty();
previousQueryFailureError = Optional.empty();

// no support for transactions
if (session.getTransactionId().isPresent() &&
!transactionManager.getOptionalTransactionInfo(session.getRequiredTransactionId()).map(TransactionInfo::isAutoCommitContext).orElse(true)) {
return queryResults;
}
else {
if (!retryConditionsMet(queryResults)) {
return queryResults;
}
}

// build a new query with next uri
// we expect failed nodes have been removed from discovery server upon query failure
Expand Down Expand Up @@ -687,6 +707,54 @@ private static URI findCancelableLeafStage(StageInfo stage)
return stage.getSelf();
}

private boolean retryConditionsMet(QueryResults queryResults)
{
if (queryResults.getError() == null) {
return false;
}

if (!retryQueryWithHistoryBasedOptimizationEnabled(session)) {
if (!queryResults.getError().isRetriable()) {
return false;
}

// check if we have exceeded the global limit
retryCircuitBreaker.incrementFailure();
if (!retryCircuitBreaker.isRetryAllowed()) {
return false;
}

if (queryManager.getQueryRetryCount(queryId) >= getQueryRetryLimit(session)) {
return false;
}
}

if (hasProducedResult) {
return false;
}

// check if we have exceeded the local limit
if (queryManager.getQueryInfo(queryId).getQueryStats().getExecutionTime().toMillis() > getQueryRetryMaxExecutionTime(session).toMillis()) {
return false;
}

// no support for transactions
if (session.getTransactionId().isPresent() &&
!transactionManager.getOptionalTransactionInfo(session.getRequiredTransactionId()).map(TransactionInfo::isAutoCommitContext).orElse(true)) {
return false;
}

return true;
}

private Optional<Integer> getCurrentTopLevelPlanHash()
{
if (queryManager.getFullQueryInfo(queryId).getPlanCanonicalInfo().isEmpty()) {
return Optional.empty();
}
return Optional.of(queryManager.getFullQueryInfo(queryId).getPlanCanonicalInfo().get(0).getCanonicalPlan().getPlan().hashCode());
}

private static QueryError toQueryError(QueryInfo queryInfo)
{
QueryState state = queryInfo.getState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ public class FeaturesConfig
private boolean optimizeNullsInJoin;
private boolean optimizePayloadJoins;
private boolean confidenceBasedBroadcastEnabled;
private boolean retryQueryWithHistoryBasedOptimizationEnabled;
private boolean treatLowConfidenceZeroEstimationAsUnknownEnabled;
private boolean pushdownDereferenceEnabled;
private boolean inlineSqlFunctions = true;
Expand Down Expand Up @@ -1266,6 +1267,18 @@ public FeaturesConfig setConfidenceBasedBroadcastEnabled(boolean confidenceBased
return this;
}

public boolean isRetryQueryWithHistoryBasedOptimizationEnabled()
{
return retryQueryWithHistoryBasedOptimizationEnabled;
}

@Config("optimizer.retry-query-with-history-based-optimization")
public FeaturesConfig setRetryQueryWithHistoryBasedOptimizationEnabled(boolean retryQueryWithHistoryBasedOptimizationEnabled)
{
this.retryQueryWithHistoryBasedOptimizationEnabled = retryQueryWithHistoryBasedOptimizationEnabled;
return this;
}

public boolean isTreatLowConfidenceZeroEstimationAsUnknownEnabled()
{
return treatLowConfidenceZeroEstimationAsUnknownEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ public void testDefaults()
.setPushTableWriteThroughUnion(true)
.setDictionaryAggregation(false)
.setConfidenceBasedBroadcastEnabled(false)
.setRetryQueryWithHistoryBasedOptimizationEnabled(false)
.setTreatLowConfidenceZeroEstimationAsUnknownEnabled(false)
.setAggregationPartitioningMergingStrategy(LEGACY)
.setLegacyArrayAgg(false)
Expand Down Expand Up @@ -343,6 +344,7 @@ public void testExplicitPropertyMappings()
.put("optimizer.push-table-write-through-union", "false")
.put("optimizer.dictionary-aggregation", "true")
.put("optimizer.confidence-based-broadcast", "true")
.put("optimizer.retry-query-with-history-based-optimization", "true")
.put("optimizer.treat-low-confidence-zero-estimation-as-unknown", "true")
.put("optimizer.push-aggregation-through-join", "false")
.put("optimizer.aggregation-partition-merging", "top_down")
Expand Down Expand Up @@ -553,6 +555,7 @@ public void testExplicitPropertyMappings()
.setPushTableWriteThroughUnion(false)
.setDictionaryAggregation(true)
.setConfidenceBasedBroadcastEnabled(true)
.setRetryQueryWithHistoryBasedOptimizationEnabled(true)
.setTreatLowConfidenceZeroEstimationAsUnknownEnabled(true)
.setAggregationPartitioningMergingStrategy(TOP_DOWN)
.setPushAggregationThroughJoin(false)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.tests;

import com.facebook.presto.Session;
import com.facebook.presto.server.BasicQueryInfo;
import com.facebook.presto.spi.Plugin;
import com.facebook.presto.spi.statistics.HistoryBasedPlanStatisticsProvider;
import com.facebook.presto.testing.InMemoryHistoryBasedPlanStatisticsProvider;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tpch.TpchPlugin;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;

import static com.facebook.presto.SystemSessionProperties.QUERY_RETRY_LIMIT;
import static com.facebook.presto.SystemSessionProperties.RESTRICT_HISTORY_BASED_OPTIMIZATION_TO_COMPLEX_QUERY;
import static com.facebook.presto.SystemSessionProperties.RETRY_QUERY_WITH_HISTORY_BASED_OPTIMIZATION;
import static com.facebook.presto.SystemSessionProperties.TRACK_HISTORY_BASED_PLAN_STATISTICS;
import static com.facebook.presto.SystemSessionProperties.TRACK_HISTORY_STATS_FROM_FAILED_QUERIES;
import static com.facebook.presto.SystemSessionProperties.USE_HISTORY_BASED_PLAN_STATISTICS;
import static com.facebook.presto.testing.TestingSession.testSessionBuilder;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static org.testng.Assert.assertEquals;

public class TestHistoryBasedRetry
{
private ListeningExecutorService executor;

@BeforeClass(alwaysRun = true)
public void setUp()
{
executor = MoreExecutors.listeningDecorator(newCachedThreadPool());
}

@AfterClass(alwaysRun = true)
public void shutdown()
{
executor.shutdownNow();
}

@Test
public void testQueryRetryWithHBONaturalFail()
throws Exception
{
int retryLimit = 3;
int retryNum = 0;

String sql = "SELECT o.orderkey, l.partkey, l.mapcol[o.orderkey] FROM (select orderkey, partkey, mapcol " +
"FROM (SELECT *, map(array[1], array[2]) mapcol FROM lineitem)) l " +
"JOIN orders o ON l.partkey = o.custkey WHERE length(comment) > 10";

Session session = Session.builder(createSession())
.setSystemProperty(QUERY_RETRY_LIMIT, String.valueOf(retryLimit))
.build();

try (QueryRunner queryRunner = createQueryRunner(session)) {
List<ListenableFuture<?>> queryFutures = new ArrayList<>();

ListenableFuture<?> future = executor.submit(() -> queryRunner.execute(session, sql));
queryFutures.add(future);

waitForQueryToFinish(queryFutures);

retryNum = getRetryCount(queryRunner);
}

assertEquals(retryNum, 1, "Retry count should be one as the query plan has changed.");
}

@Test
public void testQueryRetryWithHBOForceFail()
throws Exception
{
int retryNum = 0;

String query = "SELECT if(COUNT(*)=1,1,fail(1, 'failed')) FROM part p LEFT JOIN lineitem l ON p.partkey = l.partkey WHERE l.comment like '%a%'";

Session session = Session.builder(createSession())
.build();

try (QueryRunner queryRunner = createQueryRunner(session)) {
List<ListenableFuture<?>> queryFutures = new ArrayList<>();

ListenableFuture<?> future = executor.submit(() -> queryRunner.execute(session, query));
queryFutures.add(future);

waitForQueryToFinish(queryFutures);

retryNum = getRetryCount(queryRunner);
}

assertEquals(retryNum, 1, "Retry count should be one as the query plan has changed and is the max retry limit.");
}

private int getRetryCount(QueryRunner queryRunner)
{
int retryNum = 0;

DistributedQueryRunner distributedQueryRunner = (DistributedQueryRunner) queryRunner;
List<BasicQueryInfo> queryInfos = distributedQueryRunner.getCoordinator().getQueryManager().getQueries();

for (BasicQueryInfo info : queryInfos) {
if (info.getQuery().contains("-- retry query")) {
retryNum++;
}
}
return retryNum;
}

private void waitForQueryToFinish(List<ListenableFuture<?>> queryFutures)
throws Exception
{
for (ListenableFuture<?> future : queryFutures) {
try {
future.get();
}
catch (ExecutionException e) {
//it is okay to fail, we are forcing it to test the retry mechanism
}
}
}

private QueryRunner createQueryRunner(Session session)
throws Exception
{
QueryRunner queryRunner = new DistributedQueryRunner(session, 1);

queryRunner.installPlugin(new TpchPlugin());
queryRunner.createCatalog("tpch", "tpch", ImmutableMap.of("tpch.splits-per-node", "3"));

queryRunner.installPlugin(new Plugin()
{
@Override
public Iterable<HistoryBasedPlanStatisticsProvider> getHistoryBasedPlanStatisticsProviders()
{
return ImmutableList.of(new InMemoryHistoryBasedPlanStatisticsProvider());
}
});
return queryRunner;
}

private static Session createSession()
{
return testSessionBuilder()
.setCatalog("tpch")
.setSchema("tiny")
.setSystemProperty(USE_HISTORY_BASED_PLAN_STATISTICS, "true")
.setSystemProperty(TRACK_HISTORY_BASED_PLAN_STATISTICS, "true")
.setSystemProperty(RETRY_QUERY_WITH_HISTORY_BASED_OPTIMIZATION, "true")
.setSystemProperty(RESTRICT_HISTORY_BASED_OPTIMIZATION_TO_COMPLEX_QUERY, "false")
.setSystemProperty(TRACK_HISTORY_STATS_FROM_FAILED_QUERIES, "true")
.build();
}
}