From 654e73309a8e145dd0590b121224f1cdbd50f594 Mon Sep 17 00:00:00 2001 From: Vic Zhang Date: Fri, 9 Aug 2019 09:49:36 -0400 Subject: [PATCH 1/2] Add current and peak running tasks to query state machine Tracking task count to identifiy expensive query --- .../presto/execution/QueryStateMachine.java | 28 +++++++++++++++++++ .../facebook/presto/execution/QueryStats.java | 11 ++++++++ .../presto/execution/SqlQueryExecution.java | 2 +- .../execution/SqlQueryManagerStats.java | 13 +++++++++ ...ry.java => TrackingRemoteTaskFactory.java} | 22 +++++++++++---- .../presto/execution/MockQueryExecution.java | 1 + .../presto/execution/TestQueryStats.java | 2 ++ .../presto/server/TestBasicQueryInfo.java | 1 + .../presto/server/TestQueryStateInfo.java | 1 + 9 files changed, 75 insertions(+), 6 deletions(-) rename presto-main/src/main/java/com/facebook/presto/execution/{MemoryTrackingRemoteTaskFactory.java => TrackingRemoteTaskFactory.java} (79%) diff --git a/presto-main/src/main/java/com/facebook/presto/execution/QueryStateMachine.java b/presto-main/src/main/java/com/facebook/presto/execution/QueryStateMachine.java index d5e5417f49c44..9b6c2c28fcc09 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/QueryStateMachine.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/QueryStateMachine.java @@ -62,6 +62,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -116,6 +117,9 @@ public class QueryStateMachine private final AtomicLong peakTaskUserMemory = new AtomicLong(); private final AtomicLong peakTaskTotalMemory = new AtomicLong(); + private final AtomicInteger currentRunningTaskCount = new AtomicInteger(); + private final AtomicInteger peakRunningTaskCount = new AtomicInteger(); + private final QueryStateTimer queryStateTimer; private final StateMachine queryState; @@ -277,6 +281,28 @@ public long getPeakTaskUserMemory() return peakTaskUserMemory.get(); } + public int getCurrentRunningTaskCount() + { + return currentRunningTaskCount.get(); + } + + public int incrementCurrentRunningTaskCount() + { + int runningTaskCount = currentRunningTaskCount.incrementAndGet(); + peakRunningTaskCount.accumulateAndGet(runningTaskCount, Math::max); + return runningTaskCount; + } + + public int decrementCurrentRunningTaskCount() + { + return currentRunningTaskCount.decrementAndGet(); + } + + public int getPeakRunningTaskCount() + { + return peakRunningTaskCount.get(); + } + public WarningCollector getWarningCollector() { return warningCollector; @@ -541,6 +567,7 @@ private QueryStats getQueryStats(Optional rootStage) totalTasks, runningTasks, + getPeakRunningTaskCount(), completedTasks, totalDrivers, @@ -1018,6 +1045,7 @@ private static QueryStats pruneQueryStats(QueryStats queryStats) queryStats.getTotalTasks(), queryStats.getRunningTasks(), queryStats.getCompletedTasks(), + queryStats.getPeakRunningTasks(), queryStats.getTotalDrivers(), queryStats.getQueuedDrivers(), queryStats.getRunningDrivers(), diff --git a/presto-main/src/main/java/com/facebook/presto/execution/QueryStats.java b/presto-main/src/main/java/com/facebook/presto/execution/QueryStats.java index 5c716384ae43b..b869de1d56e66 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/QueryStats.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/QueryStats.java @@ -55,6 +55,7 @@ public class QueryStats private final int totalTasks; private final int runningTasks; + private final int peakRunningTasks; private final int completedTasks; private final int totalDrivers; @@ -114,6 +115,7 @@ public QueryStats( @JsonProperty("totalTasks") int totalTasks, @JsonProperty("runningTasks") int runningTasks, + @JsonProperty("peakRunningTasks") int peakRunningTasks, @JsonProperty("completedTasks") int completedTasks, @JsonProperty("totalDrivers") int totalDrivers, @@ -173,6 +175,8 @@ public QueryStats( this.totalTasks = totalTasks; checkArgument(runningTasks >= 0, "runningTasks is negative"); this.runningTasks = runningTasks; + checkArgument(peakRunningTasks >= 0, "peakRunningTasks is negative"); + this.peakRunningTasks = peakRunningTasks; checkArgument(completedTasks >= 0, "completedTasks is negative"); this.completedTasks = completedTasks; @@ -248,6 +252,7 @@ public static QueryStats immediateFailureQueryStats() 0, 0, 0, + 0, new DataSize(0, BYTE), new DataSize(0, BYTE), new DataSize(0, BYTE), @@ -353,6 +358,12 @@ public int getRunningTasks() return runningTasks; } + @JsonProperty + public int getPeakRunningTasks() + { + return peakRunningTasks; + } + @JsonProperty public int getCompletedTasks() { diff --git a/presto-main/src/main/java/com/facebook/presto/execution/SqlQueryExecution.java b/presto-main/src/main/java/com/facebook/presto/execution/SqlQueryExecution.java index d9ce34157960a..de0b175287459 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/SqlQueryExecution.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/SqlQueryExecution.java @@ -229,7 +229,7 @@ private SqlQueryExecution( } }); - this.remoteTaskFactory = new MemoryTrackingRemoteTaskFactory(requireNonNull(remoteTaskFactory, "remoteTaskFactory is null"), stateMachine); + this.remoteTaskFactory = new TrackingRemoteTaskFactory(requireNonNull(remoteTaskFactory, "remoteTaskFactory is null"), stateMachine); } } diff --git a/presto-main/src/main/java/com/facebook/presto/execution/SqlQueryManagerStats.java b/presto-main/src/main/java/com/facebook/presto/execution/SqlQueryManagerStats.java index 2f621a8bcfafe..0b458c7541931 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/SqlQueryManagerStats.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/SqlQueryManagerStats.java @@ -47,6 +47,7 @@ public class SqlQueryManagerStats private final TimeStat queuedTime = new TimeStat(MILLISECONDS); private final DistributionStat wallInputBytesRate = new DistributionStat(); private final DistributionStat cpuInputByteRate = new DistributionStat(); + private final DistributionStat peakRunningTasksStat = new DistributionStat(); public void queryQueued() { @@ -88,6 +89,11 @@ public void queryFinished(QueryInfo info) cpuInputByteRate.add(rawInputBytes * 1000 / executionCpuMillis); } + long peakRunningTasks = info.getQueryStats().getPeakRunningTasks(); + if (peakRunningTasks > 0) { + peakRunningTasksStat.add(peakRunningTasks); + } + if (info.getErrorCode() != null) { switch (info.getErrorCode().getType()) { case USER_ERROR: @@ -245,4 +251,11 @@ public DistributionStat getCpuInputByteRate() { return cpuInputByteRate; } + + @Managed(description = "Distribution of query peak running tasks") + @Nested + public DistributionStat getPeakRunningTasksStat() + { + return peakRunningTasksStat; + } } diff --git a/presto-main/src/main/java/com/facebook/presto/execution/MemoryTrackingRemoteTaskFactory.java b/presto-main/src/main/java/com/facebook/presto/execution/TrackingRemoteTaskFactory.java similarity index 79% rename from presto-main/src/main/java/com/facebook/presto/execution/MemoryTrackingRemoteTaskFactory.java rename to presto-main/src/main/java/com/facebook/presto/execution/TrackingRemoteTaskFactory.java index 846f15f713fb5..278d8b32d1fed 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/MemoryTrackingRemoteTaskFactory.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/TrackingRemoteTaskFactory.java @@ -25,15 +25,17 @@ import java.util.OptionalInt; +import static com.facebook.presto.execution.TaskState.PLANNED; +import static com.facebook.presto.execution.TaskState.RUNNING; import static java.util.Objects.requireNonNull; -public class MemoryTrackingRemoteTaskFactory +public class TrackingRemoteTaskFactory implements RemoteTaskFactory { private final RemoteTaskFactory remoteTaskFactory; private final QueryStateMachine stateMachine; - public MemoryTrackingRemoteTaskFactory(RemoteTaskFactory remoteTaskFactory, QueryStateMachine stateMachine) + public TrackingRemoteTaskFactory(RemoteTaskFactory remoteTaskFactory, QueryStateMachine stateMachine) { this.remoteTaskFactory = requireNonNull(remoteTaskFactory, "remoteTaskFactory is null"); this.stateMachine = requireNonNull(stateMachine, "stateMachine is null"); @@ -60,20 +62,22 @@ public RemoteTask createRemoteTask(Session session, partitionedSplitCountTracker, summarizeTaskInfo); - task.addStateChangeListener(new UpdatePeakMemory(stateMachine)); + task.addStateChangeListener(new UpdateQueryStats(stateMachine)); return task; } - private static final class UpdatePeakMemory + private static final class UpdateQueryStats implements StateChangeListener { private final QueryStateMachine stateMachine; private long previousUserMemory; private long previousSystemMemory; + private TaskState state; - public UpdatePeakMemory(QueryStateMachine stateMachine) + public UpdateQueryStats(QueryStateMachine stateMachine) { this.stateMachine = stateMachine; + this.state = PLANNED; } @Override @@ -87,6 +91,14 @@ public synchronized void stateChanged(TaskStatus newStatus) previousUserMemory = currentUserMemory; previousSystemMemory = currentSystemMemory; stateMachine.updateMemoryUsage(deltaUserMemoryInBytes, deltaTotalMemoryInBytes, currentUserMemory, currentTotalMemory); + + if (state == PLANNED && newStatus.getState() == RUNNING) { + stateMachine.incrementCurrentRunningTaskCount(); + } + else if (state == RUNNING && newStatus.getState().isDone()) { + stateMachine.decrementCurrentRunningTaskCount(); + } + state = newStatus.getState(); } } } diff --git a/presto-main/src/test/java/com/facebook/presto/execution/MockQueryExecution.java b/presto-main/src/test/java/com/facebook/presto/execution/MockQueryExecution.java index 0138fa10d7c7c..2d9b31b3e9375 100644 --- a/presto-main/src/test/java/com/facebook/presto/execution/MockQueryExecution.java +++ b/presto-main/src/test/java/com/facebook/presto/execution/MockQueryExecution.java @@ -122,6 +122,7 @@ public QueryInfo getQueryInfo() 9, 10, 11, + 11, 12, 13, diff --git a/presto-main/src/test/java/com/facebook/presto/execution/TestQueryStats.java b/presto-main/src/test/java/com/facebook/presto/execution/TestQueryStats.java index e0a85018cfd32..fa3a4ccfa1061 100644 --- a/presto-main/src/test/java/com/facebook/presto/execution/TestQueryStats.java +++ b/presto-main/src/test/java/com/facebook/presto/execution/TestQueryStats.java @@ -155,6 +155,7 @@ public class TestQueryStats 9, 10, 11, + 11, 12, 13, @@ -231,6 +232,7 @@ public static void assertExpectedQueryStats(QueryStats actual) assertEquals(actual.getTotalTasks(), 9); assertEquals(actual.getRunningTasks(), 10); + assertEquals(actual.getPeakRunningTasks(), 11); assertEquals(actual.getCompletedTasks(), 11); assertEquals(actual.getTotalDrivers(), 12); diff --git a/presto-main/src/test/java/com/facebook/presto/server/TestBasicQueryInfo.java b/presto-main/src/test/java/com/facebook/presto/server/TestBasicQueryInfo.java index 5c4305b69f176..06ed427c748b4 100644 --- a/presto-main/src/test/java/com/facebook/presto/server/TestBasicQueryInfo.java +++ b/presto-main/src/test/java/com/facebook/presto/server/TestBasicQueryInfo.java @@ -66,6 +66,7 @@ public void testConstructor() Duration.valueOf("11m"), 13, 14, + 21, 15, 16, 17, diff --git a/presto-main/src/test/java/com/facebook/presto/server/TestQueryStateInfo.java b/presto-main/src/test/java/com/facebook/presto/server/TestQueryStateInfo.java index a2bd04c3eec75..8125edf950c7c 100644 --- a/presto-main/src/test/java/com/facebook/presto/server/TestQueryStateInfo.java +++ b/presto-main/src/test/java/com/facebook/presto/server/TestQueryStateInfo.java @@ -117,6 +117,7 @@ private QueryInfo createQueryInfo(String queryId, QueryState state, String query 13, 14, 15, + 16, 100, 17, 18, From ff49687430c56743b67975489283a4f214c741c9 Mon Sep 17 00:00:00 2001 From: Vic Zhang Date: Sat, 10 Aug 2019 20:33:34 -0400 Subject: [PATCH 2/2] Fail query that has highest number of tasks When cluster is overloaded with too many running tasks, kills the most expensive query whose task count exceeds a custom-configured threshold --- .../execution/DataDefinitionExecution.java | 6 + .../execution/FailedQueryExecution.java | 6 + .../presto/execution/QueryManagerConfig.java | 30 +++++ .../presto/execution/QueryTracker.java | 66 +++++++++++ .../presto/execution/SqlQueryExecution.java | 6 + .../presto/execution/SqlQueryManager.java | 12 ++ .../presto/sql/planner/PlanFragmenter.java | 2 +- .../presto/execution/MockQueryExecution.java | 6 + .../execution/TestQueryManagerConfig.java | 6 + .../presto/tests/TestQueryTaskLimit.java | 107 ++++++++++++++++++ 10 files changed, 246 insertions(+), 1 deletion(-) create mode 100644 presto-tests/src/test/java/com/facebook/presto/tests/TestQueryTaskLimit.java diff --git a/presto-main/src/main/java/com/facebook/presto/execution/DataDefinitionExecution.java b/presto-main/src/main/java/com/facebook/presto/execution/DataDefinitionExecution.java index 75b4e74cd871e..e10a7ed4ae32f 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/DataDefinitionExecution.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/DataDefinitionExecution.java @@ -156,6 +156,12 @@ public BasicQueryInfo getBasicQueryInfo() .orElseGet(() -> stateMachine.getBasicQueryInfo(Optional.empty())); } + @Override + public int getRunningTaskCount() + { + return stateMachine.getCurrentRunningTaskCount(); + } + @Override public void start() { diff --git a/presto-main/src/main/java/com/facebook/presto/execution/FailedQueryExecution.java b/presto-main/src/main/java/com/facebook/presto/execution/FailedQueryExecution.java index 417ca1578a0e9..b75e04a866a21 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/FailedQueryExecution.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/FailedQueryExecution.java @@ -151,6 +151,12 @@ public BasicQueryInfo getBasicQueryInfo() return new BasicQueryInfo(getQueryInfo()); } + @Override + public int getRunningTaskCount() + { + return 0; + } + @Override public void start() { diff --git a/presto-main/src/main/java/com/facebook/presto/execution/QueryManagerConfig.java b/presto-main/src/main/java/com/facebook/presto/execution/QueryManagerConfig.java index 29ff95dcde5a0..3f068e6192518 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/QueryManagerConfig.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/QueryManagerConfig.java @@ -49,6 +49,8 @@ public class QueryManagerConfig private int maxQueryLength = 1_000_000; private int maxStageCount = 100; private int stageCountWarningThreshold = 50; + private int maxTotalRunningTaskCount = Integer.MAX_VALUE; + private int maxQueryRunningTaskCount = Integer.MAX_VALUE; private Duration clientTimeout = new Duration(5, TimeUnit.MINUTES); @@ -234,6 +236,34 @@ public QueryManagerConfig setStageCountWarningThreshold(int stageCountWarningThr return this; } + @Min(1) + public int getMaxTotalRunningTaskCount() + { + return maxTotalRunningTaskCount; + } + + @Config("experimental.max-total-running-task-count") + @ConfigDescription("Maximal allowed running task from all queries") + public QueryManagerConfig setMaxTotalRunningTaskCount(int maxTotalRunningTaskCount) + { + this.maxTotalRunningTaskCount = maxTotalRunningTaskCount; + return this; + } + + @Min(1) + public int getMaxQueryRunningTaskCount() + { + return maxQueryRunningTaskCount; + } + + @Config("experimental.max-query-running-task-count") + @ConfigDescription("Maximal allowed running task for single query only if experimental.max-total-running-task-count is violated") + public QueryManagerConfig setMaxQueryRunningTaskCount(int maxQueryRunningTaskCount) + { + this.maxQueryRunningTaskCount = maxQueryRunningTaskCount; + return this; + } + @MinDuration("5s") @NotNull public Duration getClientTimeout() diff --git a/presto-main/src/main/java/com/facebook/presto/execution/QueryTracker.java b/presto-main/src/main/java/com/facebook/presto/execution/QueryTracker.java index 9ef467d8f78ad..d71440e0f8642 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/QueryTracker.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/QueryTracker.java @@ -35,12 +35,16 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import static com.facebook.presto.SystemSessionProperties.getQueryMaxExecutionTime; import static com.facebook.presto.SystemSessionProperties.getQueryMaxRunTime; import static com.facebook.presto.spi.StandardErrorCode.ABANDONED_QUERY; import static com.facebook.presto.spi.StandardErrorCode.EXCEEDED_TIME_LIMIT; +import static com.facebook.presto.spi.StandardErrorCode.QUERY_HAS_TOO_MANY_STAGES; import static com.facebook.presto.spi.StandardErrorCode.SERVER_SHUTTING_DOWN; +import static com.facebook.presto.sql.planner.PlanFragmenter.TOO_MANY_STAGES_MESSAGE; import static com.google.common.base.Preconditions.checkState; import static java.lang.String.format; import static java.util.Objects.requireNonNull; @@ -51,6 +55,12 @@ public class QueryTracker private static final Logger log = Logger.get(QueryTracker.class); private final int maxQueryHistory; + private final int maxTotalRunningTaskCount; + private final int maxQueryRunningTaskCount; + + private final AtomicInteger runningTaskCount = new AtomicInteger(); + private final AtomicLong queriesKilledDueToTooManyTask = new AtomicLong(); + private final Duration minQueryExpireAge; private final ConcurrentMap queries = new ConcurrentHashMap<>(); @@ -69,6 +79,8 @@ public QueryTracker(QueryManagerConfig queryManagerConfig, ScheduledExecutorServ this.minQueryExpireAge = queryManagerConfig.getMinQueryExpireAge(); this.maxQueryHistory = queryManagerConfig.getMaxQueryHistory(); this.clientTimeout = queryManagerConfig.getClientTimeout(); + this.maxTotalRunningTaskCount = queryManagerConfig.getMaxTotalRunningTaskCount(); + this.maxQueryRunningTaskCount = queryManagerConfig.getMaxQueryRunningTaskCount(); this.queryManagementExecutor = requireNonNull(queryManagementExecutor, "queryManagementExecutor is null"); } @@ -91,6 +103,15 @@ public synchronized void start() log.error(e, "Error enforcing query timeout limits"); } + try { + if (maxTotalRunningTaskCount != Integer.MAX_VALUE && maxQueryRunningTaskCount != Integer.MAX_VALUE) { + enforceTaskLimits(); + } + } + catch (Throwable e) { + log.error(e, "Error enforcing running task limits"); + } + try { removeExpiredQueries(); } @@ -167,6 +188,16 @@ public void expireQuery(QueryId queryId) .ifPresent(expirationQueue::add); } + public long getRunningTaskCount() + { + return runningTaskCount.get(); + } + + public long getQueriesKilledDueToTooManyTask() + { + return queriesKilledDueToTooManyTask.get(); + } + /** * Enforce query max runtime/execution time limits */ @@ -189,6 +220,39 @@ private void enforceTimeLimits() } } + /** + * When cluster reaches max tasks limit and also a single query + * exceeds a threshold, kill this query + */ + private void enforceTaskLimits() + { + int totalRunningTaskCount = 0; + int highestRunningTaskCount = 0; + Optional highestRunningTaskQuery = Optional.empty(); + for (T query : queries.values()) { + if (query.isDone()) { + continue; + } + int runningTaskCount = query.getRunningTaskCount(); + totalRunningTaskCount += runningTaskCount; + if (runningTaskCount > highestRunningTaskCount) { + highestRunningTaskCount = runningTaskCount; + highestRunningTaskQuery = Optional.of(query); + } + } + + runningTaskCount.set(totalRunningTaskCount); + + if (totalRunningTaskCount > maxTotalRunningTaskCount && + highestRunningTaskCount > maxQueryRunningTaskCount && + highestRunningTaskQuery.isPresent()) { + highestRunningTaskQuery.get().fail(new PrestoException(QUERY_HAS_TOO_MANY_STAGES, format( + "Query killed because the cluster is overloaded with too many tasks (%s) and this query was running with the highest number of tasks (%s). %s Otherwise, please try again later.", + totalRunningTaskCount, highestRunningTaskCount, TOO_MANY_STAGES_MESSAGE))); + queriesKilledDueToTooManyTask.incrementAndGet(); + } + } + /** * Prune extraneous info from old queries */ @@ -292,6 +356,8 @@ public interface TrackedQuery Optional getEndTime(); + int getRunningTaskCount(); + void fail(Throwable cause); // XXX: This should be removed when the client protocol is improved, so that we don't need to hold onto so much query history diff --git a/presto-main/src/main/java/com/facebook/presto/execution/SqlQueryExecution.java b/presto-main/src/main/java/com/facebook/presto/execution/SqlQueryExecution.java index de0b175287459..c49d0d133d6aa 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/SqlQueryExecution.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/SqlQueryExecution.java @@ -325,6 +325,12 @@ public BasicQueryInfo getBasicQueryInfo() .orElseGet(() -> stateMachine.getBasicQueryInfo(Optional.ofNullable(queryScheduler.get()).map(SqlQueryScheduler::getBasicStageStats))); } + @Override + public int getRunningTaskCount() + { + return stateMachine.getCurrentRunningTaskCount(); + } + @Override public void start() { diff --git a/presto-main/src/main/java/com/facebook/presto/execution/SqlQueryManager.java b/presto-main/src/main/java/com/facebook/presto/execution/SqlQueryManager.java index adf7664b307f8..eca448a60e130 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/SqlQueryManager.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/SqlQueryManager.java @@ -496,6 +496,18 @@ public ThreadPoolExecutorMBean getManagementExecutor() return queryManagementExecutorMBean; } + @Managed + public long getRunningTaskCount() + { + return queryTracker.getRunningTaskCount(); + } + + @Managed + public long getQueriesKilledDueToTooManyTask() + { + return queryTracker.getQueriesKilledDueToTooManyTask(); + } + /** * Enforce memory limits at the query level */ diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenter.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenter.java index 80a3a53e17b1c..1dab8bc34c1ee 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenter.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenter.java @@ -130,7 +130,7 @@ */ public class PlanFragmenter { - private static final String TOO_MANY_STAGES_MESSAGE = "If the query contains multiple DISTINCTs, please set the 'use_mark_distinct' session property to false. " + + public static final String TOO_MANY_STAGES_MESSAGE = "If the query contains multiple DISTINCTs, please set the 'use_mark_distinct' session property to false. " + "If the query contains multiple CTEs that are referenced more than once, please create temporary table(s) for one or more of the CTEs."; private final Metadata metadata; diff --git a/presto-main/src/test/java/com/facebook/presto/execution/MockQueryExecution.java b/presto-main/src/test/java/com/facebook/presto/execution/MockQueryExecution.java index 2d9b31b3e9375..9b4b929f20761 100644 --- a/presto-main/src/test/java/com/facebook/presto/execution/MockQueryExecution.java +++ b/presto-main/src/test/java/com/facebook/presto/execution/MockQueryExecution.java @@ -265,6 +265,12 @@ public BasicQueryInfo getBasicQueryInfo() return new BasicQueryInfo(getQueryInfo()); } + @Override + public int getRunningTaskCount() + { + return getQueryInfo().getQueryStats().getRunningTasks(); + } + @Override public DataSize getUserMemoryReservation() { diff --git a/presto-main/src/test/java/com/facebook/presto/execution/TestQueryManagerConfig.java b/presto-main/src/test/java/com/facebook/presto/execution/TestQueryManagerConfig.java index 04d67750d51bf..cff4b4017fa6e 100644 --- a/presto-main/src/test/java/com/facebook/presto/execution/TestQueryManagerConfig.java +++ b/presto-main/src/test/java/com/facebook/presto/execution/TestQueryManagerConfig.java @@ -33,6 +33,8 @@ public void testDefaults() .setMaxQueryLength(1_000_000) .setMaxStageCount(100) .setStageCountWarningThreshold(50) + .setMaxTotalRunningTaskCount(Integer.MAX_VALUE) + .setMaxQueryRunningTaskCount(Integer.MAX_VALUE) .setClientTimeout(new Duration(5, TimeUnit.MINUTES)) .setScheduleSplitBatchSize(1000) .setMinScheduleSplitBatchSize(100) @@ -65,6 +67,8 @@ public void testExplicitPropertyMappings() .put("query.max-length", "10000") .put("query.max-stage-count", "12345") .put("query.stage-count-warning-threshold", "12300") + .put("experimental.max-total-running-task-count", "60000") + .put("experimental.max-query-running-task-count", "10000") .put("query.schedule-split-batch-size", "99") .put("query.min-schedule-split-batch-size", "9") .put("query.max-concurrent-queries", "10") @@ -92,6 +96,8 @@ public void testExplicitPropertyMappings() .setMaxQueryLength(10000) .setMaxStageCount(12345) .setStageCountWarningThreshold(12300) + .setMaxTotalRunningTaskCount(60000) + .setMaxQueryRunningTaskCount(10000) .setClientTimeout(new Duration(10, TimeUnit.SECONDS)) .setScheduleSplitBatchSize(99) .setMinScheduleSplitBatchSize(9) diff --git a/presto-tests/src/test/java/com/facebook/presto/tests/TestQueryTaskLimit.java b/presto-tests/src/test/java/com/facebook/presto/tests/TestQueryTaskLimit.java new file mode 100644 index 0000000000000..9ab62af845df9 --- /dev/null +++ b/presto-tests/src/test/java/com/facebook/presto/tests/TestQueryTaskLimit.java @@ -0,0 +1,107 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.tests; + +import com.facebook.presto.Session; +import com.facebook.presto.server.BasicQueryInfo; +import com.facebook.presto.tpch.TpchPlugin; +import com.google.common.collect.ImmutableMap; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +import static com.facebook.presto.spi.StandardErrorCode.QUERY_HAS_TOO_MANY_STAGES; +import static com.facebook.presto.testing.TestingSession.testSessionBuilder; +import static java.util.concurrent.Executors.newCachedThreadPool; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; + +public class TestQueryTaskLimit +{ + private ExecutorService executor; + + @BeforeClass + public void setUp() + { + executor = newCachedThreadPool(); + } + + @AfterClass(alwaysRun = true) + public void shutdown() + { + executor.shutdownNow(); + executor = null; + } + + @Test(timeOut = 30_000, expectedExceptions = ExecutionException.class, expectedExceptionsMessageRegExp = ".*Query killed because the cluster is overloaded with too many tasks.*") + public void testExceedTaskLimit() + throws Exception + { + Session defaultSession = testSessionBuilder() + .setCatalog("tpch") + .setSchema("sf1000") + .build(); + + ImmutableMap extraProperties = ImmutableMap.builder() + .put("experimental.max-total-running-task-count", "4") + .put("experimental.max-query-running-task-count", "4") + .build(); + + try (DistributedQueryRunner queryRunner = createQueryRunner(defaultSession, extraProperties)) { + Future query = executor.submit(() -> queryRunner.execute("SELECT COUNT(*), clerk FROM orders GROUP BY clerk")); + + waitForQueryToBeKilled(queryRunner); + + query.get(); + } + } + + public static DistributedQueryRunner createQueryRunner(Session session, Map properties) + throws Exception + { + DistributedQueryRunner queryRunner = new DistributedQueryRunner(session, 2, properties); + + try { + queryRunner.installPlugin(new TpchPlugin()); + queryRunner.createCatalog("tpch", "tpch"); + return queryRunner; + } + catch (Exception e) { + queryRunner.close(); + throw e; + } + } + + private void waitForQueryToBeKilled(DistributedQueryRunner queryRunner) + throws InterruptedException + { + while (true) { + for (BasicQueryInfo info : queryRunner.getCoordinator().getQueryManager().getQueries()) { + if (info.getState().isDone()) { + assertNotNull(info.getErrorCode()); + assertEquals(info.getErrorCode().getCode(), QUERY_HAS_TOO_MANY_STAGES.toErrorCode().getCode()); + MILLISECONDS.sleep(100); + return; + } + } + MILLISECONDS.sleep(10); + } + } +}