Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/trino-main/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@
<!-- for testing -->
<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-exchange</artifactId>
<artifactId>trino-exchange-filesystem</artifactId>
<scope>test</scope>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.trino.execution.scheduler.SplitSchedulerStats;
import io.trino.execution.scheduler.SqlQueryScheduler;
import io.trino.execution.scheduler.TaskDescriptorStorage;
import io.trino.execution.scheduler.TaskExecutionStats;
import io.trino.execution.scheduler.TaskSourceFactory;
import io.trino.execution.scheduler.policy.ExecutionPolicy;
import io.trino.execution.warnings.WarningCollector;
Expand Down Expand Up @@ -103,6 +104,7 @@ public class SqlQueryExecution
private final NodeScheduler nodeScheduler;
private final NodeAllocatorService nodeAllocatorService;
private final PartitionMemoryEstimatorFactory partitionMemoryEstimatorFactory;
private final TaskExecutionStats taskExecutionStats;
private final List<PlanOptimizer> planOptimizers;
private final PlanFragmenter planFragmenter;
private final RemoteTaskFactory remoteTaskFactory;
Expand Down Expand Up @@ -138,6 +140,7 @@ private SqlQueryExecution(
NodeScheduler nodeScheduler,
NodeAllocatorService nodeAllocatorService,
PartitionMemoryEstimatorFactory partitionMemoryEstimatorFactory,
TaskExecutionStats taskExecutionStats,
List<PlanOptimizer> planOptimizers,
PlanFragmenter planFragmenter,
RemoteTaskFactory remoteTaskFactory,
Expand Down Expand Up @@ -167,6 +170,7 @@ private SqlQueryExecution(
this.nodeScheduler = requireNonNull(nodeScheduler, "nodeScheduler is null");
this.nodeAllocatorService = requireNonNull(nodeAllocatorService, "nodeAllocatorService is null");
this.partitionMemoryEstimatorFactory = requireNonNull(partitionMemoryEstimatorFactory, "partitionMemoryEstimatorFactory is null");
this.taskExecutionStats = requireNonNull(taskExecutionStats, "taskExecutionStats is null");
this.planOptimizers = requireNonNull(planOptimizers, "planOptimizers is null");
this.planFragmenter = requireNonNull(planFragmenter, "planFragmenter is null");
this.queryExecutor = requireNonNull(queryExecutor, "queryExecutor is null");
Expand Down Expand Up @@ -507,6 +511,7 @@ private void planDistribution(PlanRoot plan)
nodeScheduler,
nodeAllocatorService,
partitionMemoryEstimatorFactory,
taskExecutionStats,
remoteTaskFactory,
plan.isSummarizeTaskInfos(),
scheduleSplitBatchSize,
Expand Down Expand Up @@ -710,6 +715,7 @@ public static class SqlQueryExecutionFactory
private final NodeScheduler nodeScheduler;
private final NodeAllocatorService nodeAllocatorService;
private final PartitionMemoryEstimatorFactory partitionMemoryEstimatorFactory;
private final TaskExecutionStats taskExecutionStats;
private final List<PlanOptimizer> planOptimizers;
private final PlanFragmenter planFragmenter;
private final RemoteTaskFactory remoteTaskFactory;
Expand Down Expand Up @@ -738,6 +744,7 @@ public static class SqlQueryExecutionFactory
NodeScheduler nodeScheduler,
NodeAllocatorService nodeAllocatorService,
PartitionMemoryEstimatorFactory partitionMemoryEstimatorFactory,
TaskExecutionStats taskExecutionStats,
PlanOptimizersFactory planOptimizersFactory,
PlanFragmenter planFragmenter,
RemoteTaskFactory remoteTaskFactory,
Expand Down Expand Up @@ -767,6 +774,7 @@ public static class SqlQueryExecutionFactory
this.nodeScheduler = requireNonNull(nodeScheduler, "nodeScheduler is null");
this.nodeAllocatorService = requireNonNull(nodeAllocatorService, "nodeAllocatorService is null");
this.partitionMemoryEstimatorFactory = requireNonNull(partitionMemoryEstimatorFactory, "partitionMemoryEstimatorFactory is null");
this.taskExecutionStats = requireNonNull(taskExecutionStats, "taskExecutionStats is null");
this.planFragmenter = requireNonNull(planFragmenter, "planFragmenter is null");
this.remoteTaskFactory = requireNonNull(remoteTaskFactory, "remoteTaskFactory is null");
this.queryExecutor = requireNonNull(queryExecutor, "queryExecutor is null");
Expand Down Expand Up @@ -808,6 +816,7 @@ public QueryExecution createQueryExecution(
nodeScheduler,
nodeAllocatorService,
partitionMemoryEstimatorFactory,
taskExecutionStats,
planOptimizers,
planFragmenter,
remoteTaskFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public class FaultTolerantStageScheduler
private final NodeAllocator nodeAllocator;
private final TaskDescriptorStorage taskDescriptorStorage;
private final PartitionMemoryEstimator partitionMemoryEstimator;
private final TaskExecutionStats taskExecutionStats;
private final int maxRetryAttemptsPerTask;
private final int maxTasksWaitingForNodePerStage;

Expand Down Expand Up @@ -181,6 +182,7 @@ public FaultTolerantStageScheduler(
NodeAllocator nodeAllocator,
TaskDescriptorStorage taskDescriptorStorage,
PartitionMemoryEstimator partitionMemoryEstimator,
TaskExecutionStats taskExecutionStats,
TaskLifecycleListener taskLifecycleListener,
DelayedFutureCompletor futureCompletor,
Ticker ticker,
Expand All @@ -202,6 +204,7 @@ public FaultTolerantStageScheduler(
this.nodeAllocator = requireNonNull(nodeAllocator, "nodeAllocator is null");
this.taskDescriptorStorage = requireNonNull(taskDescriptorStorage, "taskDescriptorStorage is null");
this.partitionMemoryEstimator = requireNonNull(partitionMemoryEstimator, "partitionMemoryEstimator is null");
this.taskExecutionStats = requireNonNull(taskExecutionStats, "taskExecutionStats is null");
this.taskLifecycleListener = requireNonNull(taskLifecycleListener, "taskLifecycleListener is null");
this.futureCompletor = requireNonNull(futureCompletor, "futureCompletor is null");
this.sinkExchange = requireNonNull(sinkExchange, "sinkExchange is null");
Expand Down Expand Up @@ -410,6 +413,7 @@ private void startTask(int partition, NodeAllocator.NodeLease nodeLease)
taskLifecycleListener.taskCreated(stage.getFragment().getId(), task);

task.addStateChangeListener(taskStatus -> updateTaskStatus(taskStatus, exchangeSinkInstanceHandle));
task.addFinalTaskInfoListener(taskExecutionStats::update);
task.start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ public class SqlQueryScheduler
private final NodeScheduler nodeScheduler;
private final NodeAllocatorService nodeAllocatorService;
private final PartitionMemoryEstimatorFactory partitionMemoryEstimatorFactory;
private final TaskExecutionStats taskExecutionStats;
private final int splitBatchSize;
private final ExecutorService executor;
private final ScheduledExecutorService schedulerExecutor;
Expand Down Expand Up @@ -222,6 +223,7 @@ public SqlQueryScheduler(
NodeScheduler nodeScheduler,
NodeAllocatorService nodeAllocatorService,
PartitionMemoryEstimatorFactory partitionMemoryEstimatorFactory,
TaskExecutionStats taskExecutionStats,
RemoteTaskFactory remoteTaskFactory,
boolean summarizeTaskInfo,
int splitBatchSize,
Expand All @@ -245,6 +247,7 @@ public SqlQueryScheduler(
this.nodeScheduler = requireNonNull(nodeScheduler, "nodeScheduler is null");
this.nodeAllocatorService = requireNonNull(nodeAllocatorService, "nodeAllocatorService is null");
this.partitionMemoryEstimatorFactory = requireNonNull(partitionMemoryEstimatorFactory, "partitionMemoryEstimatorFactory is null");
this.taskExecutionStats = requireNonNull(taskExecutionStats, "taskExecutionStats is null");
this.splitBatchSize = splitBatchSize;
this.executor = requireNonNull(queryExecutor, "queryExecutor is null");
this.schedulerExecutor = requireNonNull(schedulerExecutor, "schedulerExecutor is null");
Expand Down Expand Up @@ -363,7 +366,8 @@ private synchronized Optional<DistributedStagesScheduler> createDistributedStage
schedulerExecutor,
schedulerStats,
nodeAllocatorService,
partitionMemoryEstimatorFactory);
partitionMemoryEstimatorFactory,
taskExecutionStats);
break;
case QUERY:
case NONE:
Expand Down Expand Up @@ -1766,7 +1770,8 @@ public static FaultTolerantDistributedStagesScheduler create(
ScheduledExecutorService scheduledExecutorService,
SplitSchedulerStats schedulerStats,
NodeAllocatorService nodeAllocatorService,
PartitionMemoryEstimatorFactory partitionMemoryEstimatorFactory)
PartitionMemoryEstimatorFactory partitionMemoryEstimatorFactory,
TaskExecutionStats taskExecutionStats)
{
taskDescriptorStorage.initialize(queryStateMachine.getQueryId());
queryStateMachine.addStateChangeListener(state -> {
Expand Down Expand Up @@ -1831,6 +1836,7 @@ public static FaultTolerantDistributedStagesScheduler create(
nodeAllocator,
taskDescriptorStorage,
partitionMemoryEstimatorFactory.createPartitionMemoryEstimator(),
taskExecutionStats,
taskLifecycleListener,
(future, delay) -> scheduledExecutorService.schedule(() -> future.set(null), delay.toMillis(), MILLISECONDS),
systemTicker(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.execution.scheduler;

import io.airlift.log.Logger;
import io.airlift.stats.DistributionStat;
import io.airlift.stats.TimeStat;
import io.trino.execution.ExecutionFailureInfo;
import io.trino.execution.TaskInfo;
import io.trino.execution.TaskState;
import io.trino.operator.TaskStats;
import io.trino.spi.ErrorCode;
import io.trino.spi.ErrorType;
import io.trino.spi.TrinoException;
import org.weakref.jmx.Managed;
import org.weakref.jmx.Nested;

import java.util.Optional;

import static io.trino.spi.ErrorType.INTERNAL_ERROR;
import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static io.trino.util.Failures.toFailure;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

public class TaskExecutionStats
{
private static final Logger log = Logger.get(TaskExecutionStats.class);

private final ExecutionStats finishedTasks = new ExecutionStats();
private final ExecutionStats abortedTasks = new ExecutionStats();
private final FailedTasksStats failedTasks = new FailedTasksStats();

public void update(TaskInfo info)
{
TaskState state = info.getTaskStatus().getState();
switch (state) {
case FINISHED:
finishedTasks.update(info.getStats());
break;
case FAILED:
failedTasks.update(info);
break;
case CANCELED:
case ABORTED:
abortedTasks.update(info.getStats());
break;
case PLANNED:
case RUNNING:
case FLUSHING:
default:
log.error("Unexpected task state: %s", state);
}
}

@Managed
@Nested
public ExecutionStats getFinishedTasks()
{
return finishedTasks;
}

@Managed
@Nested
public ExecutionStats getAbortedTasks()
{
return abortedTasks;
}

@Managed
@Nested
public FailedTasksStats getFailedTasks()
{
return failedTasks;
}

public static class ExecutionStats
{
private final TimeStat elapsedTime = new TimeStat(MILLISECONDS);
private final TimeStat scheduledTime = new TimeStat(MILLISECONDS);
private final TimeStat cpuTime = new TimeStat(MILLISECONDS);
private final TimeStat inputBlockedTime = new TimeStat(MILLISECONDS);
private final TimeStat outputBlockedTime = new TimeStat(MILLISECONDS);
private final DistributionStat peakMemoryReservationInBytes = new DistributionStat();

public void update(TaskStats stats)
{
elapsedTime.add(stats.getElapsedTime());
scheduledTime.add(stats.getTotalScheduledTime());
cpuTime.add(stats.getTotalCpuTime());
inputBlockedTime.add(stats.getInputBlockedTime());
outputBlockedTime.add(stats.getOutputBlockedTime());
peakMemoryReservationInBytes.add(stats.getPeakUserMemoryReservation().toBytes());
}

@Managed
@Nested
public TimeStat getElapsedTime()
{
return elapsedTime;
}

@Managed
@Nested
public TimeStat getScheduledTime()
{
return scheduledTime;
}

@Managed
@Nested
public TimeStat getCpuTime()
{
return cpuTime;
}

@Managed
@Nested
public TimeStat getInputBlockedTime()
{
return inputBlockedTime;
}

@Managed
@Nested
public TimeStat getOutputBlockedTime()
{
return outputBlockedTime;
}

@Managed
@Nested
public DistributionStat getPeakMemoryReservationInBytes()
{
return peakMemoryReservationInBytes;
}
}

public static class FailedTasksStats
{
private final ExecutionStats userError = new ExecutionStats();
private final ExecutionStats internalError = new ExecutionStats();
private final ExecutionStats externalError = new ExecutionStats();
private final ExecutionStats insufficientResources = new ExecutionStats();

public void update(TaskInfo info)
{
ExecutionFailureInfo failureInfo = info.getTaskStatus().getFailures().stream()
.findFirst()
.orElse(toFailure(new TrinoException(GENERIC_INTERNAL_ERROR, "A task failed for an unknown reason")));
ErrorType errorType = Optional.ofNullable(failureInfo.getErrorCode()).map(ErrorCode::getType).orElse(INTERNAL_ERROR);
TaskStats stats = info.getStats();
switch (errorType) {
case USER_ERROR:
userError.update(stats);
break;
case INTERNAL_ERROR:
internalError.update(stats);
break;
case EXTERNAL:
externalError.update(stats);
break;
case INSUFFICIENT_RESOURCES:
insufficientResources.update(stats);
break;
default:
log.error("Unexpected error type: %s", errorType);
}
}

@Managed
@Nested
public ExecutionStats getUserError()
{
return userError;
}

@Managed
@Nested
public ExecutionStats getInternalError()
{
return internalError;
}

@Managed
@Nested
public ExecutionStats getExternalError()
{
return externalError;
}

@Managed
@Nested
public ExecutionStats getInsufficientResources()
{
return insufficientResources;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import io.trino.execution.scheduler.SplitSchedulerStats;
import io.trino.execution.scheduler.StageTaskSourceFactory;
import io.trino.execution.scheduler.TaskDescriptorStorage;
import io.trino.execution.scheduler.TaskExecutionStats;
import io.trino.execution.scheduler.TaskSourceFactory;
import io.trino.execution.scheduler.policy.AllAtOnceExecutionPolicy;
import io.trino.execution.scheduler.policy.ExecutionPolicy;
Expand Down Expand Up @@ -326,6 +327,9 @@ protected void setup(Binder binder)
binder.bind(TaskDescriptorStorage.class).in(Scopes.SINGLETON);
newExporter(binder).export(TaskDescriptorStorage.class).withGeneratedName();

binder.bind(TaskExecutionStats.class).in(Scopes.SINGLETON);
newExporter(binder).export(TaskExecutionStats.class).withGeneratedName();

MapBinder<String, ExecutionPolicy> executionPolicyBinder = newMapBinder(binder, String.class, ExecutionPolicy.class);
executionPolicyBinder.addBinding("all-at-once").to(AllAtOnceExecutionPolicy.class);
executionPolicyBinder.addBinding("legacy-phased").to(LegacyPhasedExecutionPolicy.class);
Expand Down
Loading