Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,12 @@ public BasicQueryInfo getBasicQueryInfo()
.orElseGet(() -> stateMachine.getBasicQueryInfo(Optional.empty()));
}

@Override
public int getRunningTaskCount()
{
return stateMachine.getCurrentRunningTaskCount();
}

@Override
public void start()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,12 @@ public BasicQueryInfo getBasicQueryInfo()
return new BasicQueryInfo(getQueryInfo());
}

@Override
public int getRunningTaskCount()
{
return 0;
}

@Override
public void start()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public class QueryManagerConfig
private int maxQueryLength = 1_000_000;
private int maxStageCount = 100;
private int stageCountWarningThreshold = 50;
private int maxTotalRunningTaskCount = Integer.MAX_VALUE;
private int maxQueryRunningTaskCount = Integer.MAX_VALUE;

private Duration clientTimeout = new Duration(5, TimeUnit.MINUTES);

Expand Down Expand Up @@ -234,6 +236,34 @@ public QueryManagerConfig setStageCountWarningThreshold(int stageCountWarningThr
return this;
}

@Min(1)
public int getMaxTotalRunningTaskCount()
{
return maxTotalRunningTaskCount;
}

@Config("experimental.max-total-running-task-count")
@ConfigDescription("Maximal allowed running task from all queries")
public QueryManagerConfig setMaxTotalRunningTaskCount(int maxTotalRunningTaskCount)
{
this.maxTotalRunningTaskCount = maxTotalRunningTaskCount;
return this;
}

@Min(1)
public int getMaxQueryRunningTaskCount()
{
return maxQueryRunningTaskCount;
}

@Config("experimental.max-query-running-task-count")
@ConfigDescription("Maximal allowed running task for single query only if experimental.max-total-running-task-count is violated")
public QueryManagerConfig setMaxQueryRunningTaskCount(int maxQueryRunningTaskCount)
{
this.maxQueryRunningTaskCount = maxQueryRunningTaskCount;
return this;
}

@MinDuration("5s")
@NotNull
public Duration getClientTimeout()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
Expand Down Expand Up @@ -116,6 +117,9 @@ public class QueryStateMachine
private final AtomicLong peakTaskUserMemory = new AtomicLong();
private final AtomicLong peakTaskTotalMemory = new AtomicLong();

private final AtomicInteger currentRunningTaskCount = new AtomicInteger();
private final AtomicInteger peakRunningTaskCount = new AtomicInteger();

private final QueryStateTimer queryStateTimer;

private final StateMachine<QueryState> queryState;
Expand Down Expand Up @@ -277,6 +281,28 @@ public long getPeakTaskUserMemory()
return peakTaskUserMemory.get();
}

public int getCurrentRunningTaskCount()
{
return currentRunningTaskCount.get();
}

public int incrementCurrentRunningTaskCount()
{
int runningTaskCount = currentRunningTaskCount.incrementAndGet();
peakRunningTaskCount.accumulateAndGet(runningTaskCount, Math::max);
return runningTaskCount;
}

public int decrementCurrentRunningTaskCount()
{
return currentRunningTaskCount.decrementAndGet();
}

public int getPeakRunningTaskCount()
{
return peakRunningTaskCount.get();
}

public WarningCollector getWarningCollector()
{
return warningCollector;
Expand Down Expand Up @@ -541,6 +567,7 @@ private QueryStats getQueryStats(Optional<StageInfo> rootStage)

totalTasks,
runningTasks,
getPeakRunningTaskCount(),
completedTasks,

totalDrivers,
Expand Down Expand Up @@ -1018,6 +1045,7 @@ private static QueryStats pruneQueryStats(QueryStats queryStats)
queryStats.getTotalTasks(),
queryStats.getRunningTasks(),
queryStats.getCompletedTasks(),
queryStats.getPeakRunningTasks(),
queryStats.getTotalDrivers(),
queryStats.getQueuedDrivers(),
queryStats.getRunningDrivers(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class QueryStats

private final int totalTasks;
private final int runningTasks;
private final int peakRunningTasks;
private final int completedTasks;

private final int totalDrivers;
Expand Down Expand Up @@ -114,6 +115,7 @@ public QueryStats(

@JsonProperty("totalTasks") int totalTasks,
@JsonProperty("runningTasks") int runningTasks,
@JsonProperty("peakRunningTasks") int peakRunningTasks,
@JsonProperty("completedTasks") int completedTasks,

@JsonProperty("totalDrivers") int totalDrivers,
Expand Down Expand Up @@ -173,6 +175,8 @@ public QueryStats(
this.totalTasks = totalTasks;
checkArgument(runningTasks >= 0, "runningTasks is negative");
this.runningTasks = runningTasks;
checkArgument(peakRunningTasks >= 0, "peakRunningTasks is negative");
this.peakRunningTasks = peakRunningTasks;
checkArgument(completedTasks >= 0, "completedTasks is negative");
this.completedTasks = completedTasks;

Expand Down Expand Up @@ -248,6 +252,7 @@ public static QueryStats immediateFailureQueryStats()
0,
0,
0,
0,
new DataSize(0, BYTE),
new DataSize(0, BYTE),
new DataSize(0, BYTE),
Expand Down Expand Up @@ -353,6 +358,12 @@ public int getRunningTasks()
return runningTasks;
}

@JsonProperty
public int getPeakRunningTasks()
{
return peakRunningTasks;
}

@JsonProperty
public int getCompletedTasks()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,16 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import static com.facebook.presto.SystemSessionProperties.getQueryMaxExecutionTime;
import static com.facebook.presto.SystemSessionProperties.getQueryMaxRunTime;
import static com.facebook.presto.spi.StandardErrorCode.ABANDONED_QUERY;
import static com.facebook.presto.spi.StandardErrorCode.EXCEEDED_TIME_LIMIT;
import static com.facebook.presto.spi.StandardErrorCode.QUERY_HAS_TOO_MANY_STAGES;
import static com.facebook.presto.spi.StandardErrorCode.SERVER_SHUTTING_DOWN;
import static com.facebook.presto.sql.planner.PlanFragmenter.TOO_MANY_STAGES_MESSAGE;
import static com.google.common.base.Preconditions.checkState;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
Expand All @@ -51,6 +55,12 @@ public class QueryTracker<T extends TrackedQuery>
private static final Logger log = Logger.get(QueryTracker.class);

private final int maxQueryHistory;
private final int maxTotalRunningTaskCount;
private final int maxQueryRunningTaskCount;

private final AtomicInteger runningTaskCount = new AtomicInteger();
private final AtomicLong queriesKilledDueToTooManyTask = new AtomicLong();

private final Duration minQueryExpireAge;

private final ConcurrentMap<QueryId, T> queries = new ConcurrentHashMap<>();
Expand All @@ -69,6 +79,8 @@ public QueryTracker(QueryManagerConfig queryManagerConfig, ScheduledExecutorServ
this.minQueryExpireAge = queryManagerConfig.getMinQueryExpireAge();
this.maxQueryHistory = queryManagerConfig.getMaxQueryHistory();
this.clientTimeout = queryManagerConfig.getClientTimeout();
this.maxTotalRunningTaskCount = queryManagerConfig.getMaxTotalRunningTaskCount();
this.maxQueryRunningTaskCount = queryManagerConfig.getMaxQueryRunningTaskCount();

this.queryManagementExecutor = requireNonNull(queryManagementExecutor, "queryManagementExecutor is null");
}
Expand All @@ -91,6 +103,15 @@ public synchronized void start()
log.error(e, "Error enforcing query timeout limits");
}

try {
if (maxTotalRunningTaskCount != Integer.MAX_VALUE && maxQueryRunningTaskCount != Integer.MAX_VALUE) {
enforceTaskLimits();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we do this in the SqlQueryManager where all the other limits (memory, cpu) are enforced?

alternatively, could we do it during scheduling before exceeding the task limit in the first place?

Copy link
Copy Markdown
Contributor Author

@viczhang861 viczhang861 Aug 20, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Currently both QueryTracker and SqlQueryManager enforce some limits (time, memory, cpu)
    QueryTracker.java is added later in October 2018,
    https://github.com/prestodb/presto/commits/fe743c0b1836343d428a8d07fba11691cbf29541/presto-main/src/main/java/com/facebook/presto/execution/QueryTracker.java
    They both use the same Executor, I assume I should use the new one

  2. @arhimondr mentioned some tasks are dynamically generated. A following task I am investigating is to not create new query execution when cluster is overloaded. This needs more caution thus I want to collect enough production statistics before making a decision about threshold.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it sounds like the abstraction between SqlQueryManager and QueryTracker is a bit fuzzy. You can leave it as is for now and we can look at refactoring to make the abstraction clearer.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rschlussel , @viczhang861

QueryTracker.java is added later in October 2018

Correct. Query timeout tracking is refactored into QueryTracker through #11518 . But memory and CPU enforcement is still in SqlQueryManager

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wenleix. yeah, I saw that, but it's not clear to me what unites things that go in query tracker vs. SqlQueryManager/what's the purpose of each.

QueryTracker right now does a few things:

  1. keeps track of query info and history and removes queries when they expire
  2. kills queries that exceed a time limit
  3. kills abandoned queries

SqlQueryManager does some other stuff:

  1. creates queries and other lifecycle operations on the query
  2. kills queries that exceed a memory limit
  3. kills queries that exceed a cpu limit
  4. is the thing that calls the query tracker to do its things

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rschlussel : Yeah, unfortunately resource management in Presto is a bit scattered right now. Might worthy a refactor to clean up code and to have a better understanding about how it works. cc @oerling , @mbasmanova , @viczhang861 , @bhhari

BTW: isn't memory limit enforcement done in ClusterMemoryManager ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, but that's called from the SqlQueryManager.enforceMemoryLimits()

}
}
catch (Throwable e) {
log.error(e, "Error enforcing running task limits");
}

try {
removeExpiredQueries();
}
Expand Down Expand Up @@ -167,6 +188,16 @@ public void expireQuery(QueryId queryId)
.ifPresent(expirationQueue::add);
}

public long getRunningTaskCount()
{
return runningTaskCount.get();
}

public long getQueriesKilledDueToTooManyTask()
{
return queriesKilledDueToTooManyTask.get();
}

/**
* Enforce query max runtime/execution time limits
*/
Expand All @@ -189,6 +220,39 @@ private void enforceTimeLimits()
}
}

/**
* When cluster reaches max tasks limit and also a single query
* exceeds a threshold, kill this query
*/
private void enforceTaskLimits()
{
int totalRunningTaskCount = 0;
int highestRunningTaskCount = 0;
Optional<T> highestRunningTaskQuery = Optional.empty();
for (T query : queries.values()) {
if (query.isDone()) {
continue;
}
int runningTaskCount = query.getRunningTaskCount();
totalRunningTaskCount += runningTaskCount;
if (runningTaskCount > highestRunningTaskCount) {
highestRunningTaskCount = runningTaskCount;
highestRunningTaskQuery = Optional.of(query);
}
}

runningTaskCount.set(totalRunningTaskCount);

if (totalRunningTaskCount > maxTotalRunningTaskCount &&
highestRunningTaskCount > maxQueryRunningTaskCount &&
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This won't kill a query if we violated maxTotalRunningTaskCount if highestRunningTaskCount <= maxQueryRunningTaskCount which doesn't fit what I would expect the contract for those configuration values to be?

Is this what you mean by soft limit? They are allowed to exceed maxQueryRunningTaskCount as long as the cluster as a whole doesn't exceed maxTotalRunningTaskCount? This behavior could be clearer from the description.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, description is updated.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the purpose of the soft limit? If we're out of resources won't we always want to kill something?

highestRunningTaskQuery.isPresent()) {
highestRunningTaskQuery.get().fail(new PrestoException(QUERY_HAS_TOO_MANY_STAGES, format(
"Query killed because the cluster is overloaded with too many tasks (%s) and this query was running with the highest number of tasks (%s). %s Otherwise, please try again later.",
totalRunningTaskCount, highestRunningTaskCount, TOO_MANY_STAGES_MESSAGE)));
queriesKilledDueToTooManyTask.incrementAndGet();
}
}

/**
* Prune extraneous info from old queries
*/
Expand Down Expand Up @@ -292,6 +356,8 @@ public interface TrackedQuery

Optional<DateTime> getEndTime();

int getRunningTaskCount();

void fail(Throwable cause);

// XXX: This should be removed when the client protocol is improved, so that we don't need to hold onto so much query history
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ private SqlQueryExecution(
}
});

this.remoteTaskFactory = new MemoryTrackingRemoteTaskFactory(requireNonNull(remoteTaskFactory, "remoteTaskFactory is null"), stateMachine);
this.remoteTaskFactory = new TrackingRemoteTaskFactory(requireNonNull(remoteTaskFactory, "remoteTaskFactory is null"), stateMachine);
}
}

Expand Down Expand Up @@ -325,6 +325,12 @@ public BasicQueryInfo getBasicQueryInfo()
.orElseGet(() -> stateMachine.getBasicQueryInfo(Optional.ofNullable(queryScheduler.get()).map(SqlQueryScheduler::getBasicStageStats)));
}

@Override
public int getRunningTaskCount()
{
return stateMachine.getCurrentRunningTaskCount();
}

@Override
public void start()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,18 @@ public ThreadPoolExecutorMBean getManagementExecutor()
return queryManagementExecutorMBean;
}

@Managed
public long getRunningTaskCount()
{
return queryTracker.getRunningTaskCount();
}

@Managed
public long getQueriesKilledDueToTooManyTask()
{
return queryTracker.getQueriesKilledDueToTooManyTask();
}

/**
* Enforce memory limits at the query level
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class SqlQueryManagerStats
private final TimeStat queuedTime = new TimeStat(MILLISECONDS);
private final DistributionStat wallInputBytesRate = new DistributionStat();
private final DistributionStat cpuInputByteRate = new DistributionStat();
private final DistributionStat peakRunningTasksStat = new DistributionStat();

public void queryQueued()
{
Expand Down Expand Up @@ -88,6 +89,11 @@ public void queryFinished(QueryInfo info)
cpuInputByteRate.add(rawInputBytes * 1000 / executionCpuMillis);
}

long peakRunningTasks = info.getQueryStats().getPeakRunningTasks();
if (peakRunningTasks > 0) {
peakRunningTasksStat.add(peakRunningTasks);
}

if (info.getErrorCode() != null) {
switch (info.getErrorCode().getType()) {
case USER_ERROR:
Expand Down Expand Up @@ -245,4 +251,11 @@ public DistributionStat getCpuInputByteRate()
{
return cpuInputByteRate;
}

@Managed(description = "Distribution of query peak running tasks")
@Nested
public DistributionStat getPeakRunningTasksStat()
{
return peakRunningTasksStat;
}
}
Loading