Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ private static QueryInfo immediateFailureQueryInfo(
query,
preparedQuery,
immediateFailureQueryStats(),
immediateFailureQueryStats(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ public DispatchInfo getDispatchInfo()
{
// observe submitted before getting the state, to ensure a failed query stat is visible
boolean dispatched = submitted.isDone();
BasicQueryInfo queryInfo = stateMachine.getBasicQueryInfo(Optional.empty());
BasicQueryInfo queryInfo = stateMachine.getBasicQueryInfo(Optional.empty(), Optional.empty());

if (queryInfo.getState() == QueryState.FAILED) {
ExecutionFailureInfo failureInfo = stateMachine.getFailureInfo()
Expand Down Expand Up @@ -259,7 +259,7 @@ public BasicQueryInfo getBasicQueryInfo()
{
return tryGetQueryExecution()
.map(QueryExecution::getBasicQueryInfo)
.orElseGet(() -> stateMachine.getBasicQueryInfo(Optional.empty()));
.orElseGet(() -> stateMachine.getBasicQueryInfo(Optional.empty(), Optional.empty()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public DispatchQuery createDispatchQuery(
//
// Note that for immediate and in-order delivery of query events we depend on synchronous nature of
// QueryMonitor and EventListenerManager.
queryMonitor.queryCreatedEvent(stateMachine.getBasicQueryInfo(Optional.empty()));
queryMonitor.queryCreatedEvent(stateMachine.getBasicQueryInfo(Optional.empty(), Optional.empty()));

ListenableFuture<QueryExecution> queryExecutionFuture = executor.submit(() -> {
QueryExecutionFactory<?> queryExecutionFactory = executionFactories.get(preparedQuery.getStatement().getClass());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public BasicQueryInfo getBasicQueryInfo()
{
return stateMachine.getFinalQueryInfo()
.map(BasicQueryInfo::new)
.orElseGet(() -> stateMachine.getBasicQueryInfo(Optional.empty()));
.orElseGet(() -> stateMachine.getBasicQueryInfo(Optional.empty(), Optional.empty()));
}

@Override
Expand Down
10 changes: 10 additions & 0 deletions core/trino-main/src/main/java/io/trino/execution/QueryInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class QueryInfo
private final String query;
private final Optional<String> preparedQuery;
private final QueryStats queryStats;
private final QueryStats queryStatsNoFailedTasks;
private final Optional<String> setCatalog;
private final Optional<String> setSchema;
private final Optional<String> setPath;
Expand Down Expand Up @@ -91,6 +92,7 @@ public QueryInfo(
@JsonProperty("query") String query,
@JsonProperty("preparedQuery") Optional<String> preparedQuery,
@JsonProperty("queryStats") QueryStats queryStats,
@JsonProperty("queryStatsNoFailedTasks") QueryStats queryStatsNoFailedTasks,
@JsonProperty("setCatalog") Optional<String> setCatalog,
@JsonProperty("setSchema") Optional<String> setSchema,
@JsonProperty("setPath") Optional<String> setPath,
Expand Down Expand Up @@ -120,6 +122,7 @@ public QueryInfo(
requireNonNull(self, "self is null");
requireNonNull(fieldNames, "fieldNames is null");
requireNonNull(queryStats, "queryStats is null");
requireNonNull(queryStatsNoFailedTasks, "queryStatsNoFailedTasks is null");
requireNonNull(setCatalog, "setCatalog is null");
requireNonNull(setSchema, "setSchema is null");
requireNonNull(setPath, "setPath is null");
Expand Down Expand Up @@ -148,6 +151,7 @@ public QueryInfo(
this.query = query;
this.preparedQuery = preparedQuery;
this.queryStats = queryStats;
this.queryStatsNoFailedTasks = queryStatsNoFailedTasks;
this.setCatalog = setCatalog;
this.setSchema = setSchema;
this.setPath = setPath;
Expand Down Expand Up @@ -227,6 +231,12 @@ public QueryStats getQueryStats()
return queryStats;
}

@JsonProperty
public QueryStats getQueryStatsNoFailedTasks()
{
return queryStatsNoFailedTasks;
}

@JsonProperty
public Optional<String> getSetCatalog()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ public void updateMemoryUsage(
peakTaskTotalMemory.accumulateAndGet(taskTotalMemoryInBytes, Math::max);
}

public BasicQueryInfo getBasicQueryInfo(Optional<BasicStageStats> rootStage)
public BasicQueryInfo getBasicQueryInfo(Optional<BasicStageStats> rootStageStatsOptional, Optional<BasicStageStats> rootStageStatsNoFailedTasksOptional)
{
// Query state must be captured first in order to provide a
// correct view of the query. For example, building this
Expand All @@ -364,7 +364,30 @@ public BasicQueryInfo getBasicQueryInfo(Optional<BasicStageStats> rootStage)
}
}

BasicStageStats stageStats = rootStage.orElse(EMPTY_STAGE_STATS);
BasicStageStats rootStageStats = rootStageStatsOptional.orElse(EMPTY_STAGE_STATS);
BasicStageStats rootStageStatsNoFailedTasks = rootStageStatsNoFailedTasksOptional.orElse(EMPTY_STAGE_STATS);
BasicQueryStats queryStats = getBasicQueryStats(rootStageStats);
BasicQueryStats queryStatsNoFailedTasks = getBasicQueryStats(rootStageStatsNoFailedTasks);

return new BasicQueryInfo(
queryId,
session.toSessionRepresentation(),
Optional.of(resourceGroup),
state,
rootStageStats.isScheduled(),
self,
query,
Optional.ofNullable(updateType.get()),
preparedQuery,
queryStats,
queryStatsNoFailedTasks,
errorCode == null ? null : errorCode.getType(),
errorCode,
queryType);
}

private BasicQueryStats getBasicQueryStats(BasicStageStats stageStats)
{
BasicQueryStats queryStats = new BasicQueryStats(
queryStateTimer.getCreateTime(),
getEndTime().orElse(null),
Expand Down Expand Up @@ -393,21 +416,7 @@ public BasicQueryInfo getBasicQueryInfo(Optional<BasicStageStats> rootStage)
stageStats.isFullyBlocked(),
stageStats.getBlockedReasons(),
stageStats.getProgressPercentage());

return new BasicQueryInfo(
queryId,
session.toSessionRepresentation(),
Optional.of(resourceGroup),
state,
stageStats.isScheduled(),
self,
query,
Optional.ofNullable(updateType.get()),
preparedQuery,
queryStats,
errorCode == null ? null : errorCode.getType(),
errorCode,
queryType);
return queryStats;
}

@VisibleForTesting
Expand Down Expand Up @@ -441,6 +450,7 @@ QueryInfo getQueryInfo(Optional<StageInfo> rootStage)
query,
preparedQuery,
getQueryStats(rootStage),
getQueryStatsNoFailedTasks(rootStage),
Optional.ofNullable(setCatalog.get()),
Optional.ofNullable(setSchema.get()),
Optional.ofNullable(setPath.get()),
Expand All @@ -466,6 +476,16 @@ QueryInfo getQueryInfo(Optional<StageInfo> rootStage)
}

private QueryStats getQueryStats(Optional<StageInfo> rootStage)
{
return getQueryStats(rootStage, true);
}

private QueryStats getQueryStatsNoFailedTasks(Optional<StageInfo> rootStage)
{
return getQueryStats(rootStage, false);
}

private QueryStats getQueryStats(Optional<StageInfo> rootStage, boolean includeFailedTasks)
{
int totalTasks = 0;
int runningTasks = 0;
Expand Down Expand Up @@ -512,7 +532,7 @@ private QueryStats getQueryStats(Optional<StageInfo> rootStage)
ImmutableList.Builder<OperatorStats> operatorStatsSummary = ImmutableList.builder();
boolean completeInfo = true;
for (StageInfo stageInfo : getAllStages(rootStage)) {
StageStats stageStats = stageInfo.getStageStats();
StageStats stageStats = includeFailedTasks ? stageInfo.getStageStats() : stageInfo.getStageStatsNoFailedTasks();
totalTasks += stageStats.getTotalTasks();
runningTasks += stageStats.getRunningTasks();
completedTasks += stageStats.getCompletedTasks();
Expand Down Expand Up @@ -556,11 +576,11 @@ private QueryStats getQueryStats(Optional<StageInfo> rootStage)
stageGcStatistics.add(stageStats.getGcInfo());

completeInfo = completeInfo && stageInfo.isCompleteInfo();
operatorStatsSummary.addAll(stageInfo.getStageStats().getOperatorSummaries());
operatorStatsSummary.addAll(stageStats.getOperatorSummaries());
}

if (rootStage.isPresent()) {
StageStats outputStageStats = rootStage.get().getStageStats();
StageStats outputStageStats = includeFailedTasks ? rootStage.get().getStageStats() : rootStage.get().getStageStatsNoFailedTasks();
outputDataSize += outputStageStats.getOutputDataSize().toBytes();
outputPositions += outputStageStats.getOutputPositions();
}
Expand Down Expand Up @@ -1080,6 +1100,7 @@ public void pruneQueryInfo()
outputStage.isCoordinatorOnly(),
outputStage.getTypes(),
outputStage.getStageStats(),
outputStage.getStageStatsNoFailedTasks(),
ImmutableList.of(), // Remove the tasks
ImmutableList.of(), // Remove the substages
ImmutableMap.of(), // Remove tables
Expand All @@ -1095,6 +1116,7 @@ public void pruneQueryInfo()
queryInfo.getQuery(),
queryInfo.getPreparedQuery(),
pruneQueryStats(queryInfo.getQueryStats()),
pruneQueryStats(queryInfo.getQueryStatsNoFailedTasks()),
queryInfo.getSetCatalog(),
queryInfo.getSetSchema(),
queryInfo.getSetPath(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,12 @@ public BasicQueryInfo getBasicQueryInfo()
{
return stateMachine.getFinalQueryInfo()
.map(BasicQueryInfo::new)
.orElseGet(() -> stateMachine.getBasicQueryInfo(Optional.ofNullable(queryScheduler.get()).map(SqlQueryScheduler::getBasicStageStats)));
.orElseGet(() -> {
Optional<SqlQueryScheduler> sqlQueryScheduler = Optional.ofNullable(queryScheduler.get());
Optional<BasicStageStats> basicStageStats = sqlQueryScheduler.map(SqlQueryScheduler::getBasicStageStats);
Optional<BasicStageStats> basicStageStatsNoFailedTasks = sqlQueryScheduler.map(SqlQueryScheduler::getBasicStageStatsNoFailedTasks);
return stateMachine.getBasicQueryInfo(basicStageStats, basicStageStatsNoFailedTasks);
});
}

@Override
Expand Down
13 changes: 13 additions & 0 deletions core/trino-main/src/main/java/io/trino/execution/SqlStage.java
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,11 @@ public BasicStageStats getBasicStageStats()
return stateMachine.getBasicStageStats(this::getAllTaskInfo);
}

public BasicStageStats getBasicStageStatsNoFailedTasks()
{
return stateMachine.getBasicStageStats(this::getNoFailedTaskInfo);
}

public StageInfo getStageInfo()
{
return stateMachine.getStageInfo(this::getAllTaskInfo);
Expand All @@ -205,6 +210,14 @@ private Iterable<TaskInfo> getAllTaskInfo()
.collect(toImmutableList());
}

private Iterable<TaskInfo> getNoFailedTaskInfo()
{
return tasks.values().stream()
.map(RemoteTask::getTaskInfo)
.filter(task -> task.getTaskStatus().getState() != TaskState.FAILED) // todo: should we filter out tasks which were cancelled as a reason of other tasks failing?
.collect(toImmutableList());
}

public synchronized Optional<RemoteTask> createTask(
InternalNode node,
int partition,
Expand Down
10 changes: 10 additions & 0 deletions core/trino-main/src/main/java/io/trino/execution/StageInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class StageInfo
private final boolean coordinatorOnly;
private final List<Type> types;
private final StageStats stageStats;
private final StageStats stageStatsNoFailedTasks;
private final List<TaskInfo> tasks;
private final List<StageInfo> subStages;
private final ExecutionFailureInfo failureCause;
Expand All @@ -53,6 +54,7 @@ public StageInfo(
@JsonProperty("coordinatorOnly") boolean coordinatorOnly,
@JsonProperty("types") List<Type> types,
@JsonProperty("stageStats") StageStats stageStats,
@JsonProperty("stageStatsNoFailedTasks") StageStats stageStatsNoFailedTasks,
@JsonProperty("tasks") List<TaskInfo> tasks,
@JsonProperty("subStages") List<StageInfo> subStages,
@JsonProperty("tables") Map<PlanNodeId, TableInfo> tables,
Expand All @@ -61,6 +63,7 @@ public StageInfo(
requireNonNull(stageId, "stageId is null");
requireNonNull(state, "state is null");
requireNonNull(stageStats, "stageStats is null");
requireNonNull(stageStatsNoFailedTasks, "stageStatsNoFailedTasks is null");
requireNonNull(tasks, "tasks is null");
requireNonNull(subStages, "subStages is null");
requireNonNull(tables, "tables is null");
Expand All @@ -71,6 +74,7 @@ public StageInfo(
this.coordinatorOnly = coordinatorOnly;
this.types = types;
this.stageStats = stageStats;
this.stageStatsNoFailedTasks = stageStatsNoFailedTasks;
this.tasks = ImmutableList.copyOf(tasks);
this.subStages = subStages;
this.failureCause = failureCause;
Expand Down Expand Up @@ -114,6 +118,12 @@ public StageStats getStageStats()
return stageStats;
}

@JsonProperty
public StageStats getStageStatsNoFailedTasks()
{
return stageStatsNoFailedTasks;
}

@JsonProperty
public List<TaskInfo> getTasks()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,26 @@ public StageInfo getStageInfo(Supplier<Iterable<TaskInfo>> taskInfosSupplier)

List<TaskInfo> taskInfos = ImmutableList.copyOf(taskInfosSupplier.get());

ExecutionFailureInfo failureInfo = null;
if (state == FAILED) {
failureInfo = failureCause.get();
}
return new StageInfo(
stageId,
state,
fragment,
fragment.getPartitioning().isCoordinatorOnly(),
fragment.getTypes(),
getStageStats(taskInfos, true),
getStageStats(taskInfos, false),
taskInfos,
ImmutableList.of(),
tables,
failureInfo);
}

private StageStats getStageStats(List<TaskInfo> taskInfos, boolean includeFailedTasks)
{
int totalTasks = taskInfos.size();
int runningTasks = 0;
int completedTasks = 0;
Expand Down Expand Up @@ -400,6 +420,10 @@ public StageInfo getStageInfo(Supplier<Iterable<TaskInfo>> taskInfosSupplier)

Map<String, OperatorStats> operatorToStats = new HashMap<>();
for (TaskInfo taskInfo : taskInfos) {
if (!includeFailedTasks && taskInfo.getTaskStatus().getState() == TaskState.FAILED) {
continue;
}

TaskState taskState = taskInfo.getTaskStatus().getState();
if (taskState.isDone()) {
completedTasks++;
Expand Down Expand Up @@ -471,7 +495,7 @@ public StageInfo getStageInfo(Supplier<Iterable<TaskInfo>> taskInfosSupplier)
}
}

StageStats stageStats = new StageStats(
return new StageStats(
schedulingComplete.get(),
getSplitDistribution.snapshot(),

Expand Down Expand Up @@ -525,22 +549,6 @@ public StageInfo getStageInfo(Supplier<Iterable<TaskInfo>> taskInfosSupplier)
(int) (1.0 * totalFullGcSec / fullGcCount)),

ImmutableList.copyOf(operatorToStats.values()));

ExecutionFailureInfo failureInfo = null;
if (state == FAILED) {
failureInfo = failureCause.get();
}
return new StageInfo(
stageId,
state,
fragment,
fragment.getPartitioning().isCoordinatorOnly(),
fragment.getTypes(),
stageStats,
taskInfos,
ImmutableList.of(),
tables,
failureInfo);
}

public void recordGetSplitTime(long startNanos)
Expand Down
Loading