Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,6 @@ private static QueryInfo immediateFailureQueryInfo(
session.getQueryId(),
session.toSessionRepresentation(),
QueryState.FAILED,
false,
self,
ImmutableList.of(),
query,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ private QueryStatistics createQueryStatistics(QueryInfo queryInfo)
queryStats.getFailedCumulativeUserMemory(),
queryStats.getStageGcStatistics(),
queryStats.getCompletedDrivers(),
queryInfo.isCompleteInfo(),
queryInfo.isFinalQueryInfo(),
getCpuDistributions(queryInfo),
operatorSummaries.build(),
serializedPlanNodeStatsAndCosts);
Expand Down
21 changes: 7 additions & 14 deletions core/trino-main/src/main/java/io/trino/execution/QueryInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import java.util.Set;

import static com.google.common.base.MoreObjects.toStringHelper;
import static io.trino.execution.StageInfo.getAllStages;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

@Immutable
Expand All @@ -51,7 +51,6 @@ public class QueryInfo
private final QueryId queryId;
private final SessionRepresentation session;
private final QueryState state;
private final boolean scheduled;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if scheduled should be kept in QueryInfo and removed from QueryStats as it seems to be about a query's state rather than statistics.
Will let @sopel39 decide this

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I wasn’t totally sure and it’s easy enough to keep the field in both places (just redundant)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sopel39 - think you'll have a chance to weigh in on this soon?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if scheduled should be kept in QueryInfo and removed from QueryStats as it seems to be about a query's state rather than statistics.

makes sense.

BTW: do we need scheduled since we have state in query info?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@raunaqmorarka - I took a look at moving scheduled from QueryStats to QueryInfo but unfortunately it's used from within QueryStats#getProgressPercentage which makes extracting it a harder problem, so I propose we leave it in QueryStats since all QueryInfo instances have a QueryStats value they can consult.

BTW: do we need scheduled since we have state in query info?

@sopel39 : We don't "need" it in the sense that you could re-compute it, but the QueryInfo level notion of being scheduled is not a function of the QueryState but rather is set to true when of the query's stages are either done, pending, or running- so it does have semantic meaning on it's own.

private final URI self;
private final List<String> fieldNames;
private final String query;
Expand All @@ -77,7 +76,7 @@ public class QueryInfo
private final List<TrinoWarning> warnings;
private final Set<Input> inputs;
private final Optional<Output> output;
private final boolean completeInfo;
private final boolean finalQueryInfo;
private final Optional<ResourceGroupId> resourceGroupId;
private final Optional<QueryType> queryType;
private final RetryPolicy retryPolicy;
Expand All @@ -87,7 +86,6 @@ public QueryInfo(
@JsonProperty("queryId") QueryId queryId,
@JsonProperty("session") SessionRepresentation session,
@JsonProperty("state") QueryState state,
@JsonProperty("scheduled") boolean scheduled,
@JsonProperty("self") URI self,
@JsonProperty("fieldNames") List<String> fieldNames,
@JsonProperty("query") String query,
Expand All @@ -112,7 +110,7 @@ public QueryInfo(
@JsonProperty("output") Optional<Output> output,
@JsonProperty("referencedTables") List<TableInfo> referencedTables,
@JsonProperty("routines") List<RoutineInfo> routines,
@JsonProperty("completeInfo") boolean completeInfo,
@JsonProperty("finalQueryInfo") boolean finalQueryInfo,
@JsonProperty("resourceGroupId") Optional<ResourceGroupId> resourceGroupId,
@JsonProperty("queryType") Optional<QueryType> queryType,
@JsonProperty("retryPolicy") RetryPolicy retryPolicy)
Expand Down Expand Up @@ -146,7 +144,6 @@ public QueryInfo(
this.queryId = queryId;
this.session = session;
this.state = state;
this.scheduled = scheduled;
this.self = self;
this.fieldNames = ImmutableList.copyOf(fieldNames);
this.query = query;
Expand All @@ -172,7 +169,8 @@ public QueryInfo(
this.output = output;
this.referencedTables = ImmutableList.copyOf(referencedTables);
this.routines = ImmutableList.copyOf(routines);
this.completeInfo = completeInfo;
this.finalQueryInfo = finalQueryInfo;
checkArgument(!finalQueryInfo || state.isDone(), "finalQueryInfo without a terminal query state");
this.resourceGroupId = resourceGroupId;
this.queryType = queryType;
this.retryPolicy = retryPolicy;
Expand All @@ -199,7 +197,7 @@ public QueryState getState()
@JsonProperty
public boolean isScheduled()
{
return scheduled;
return queryStats.isScheduled();
}

@JsonProperty
Expand Down Expand Up @@ -335,7 +333,7 @@ public List<TrinoWarning> getWarnings()
@JsonProperty
public boolean isFinalQueryInfo()
{
return state.isDone() && getAllStages(outputStage).stream().allMatch(StageInfo::isFinalStageInfo);
return finalQueryInfo;
}

@JsonProperty
Expand Down Expand Up @@ -389,9 +387,4 @@ public String toString()
.add("fieldNames", fieldNames)
.toString();
}

public boolean isCompleteInfo()
{
return completeInfo;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -436,19 +436,19 @@ QueryInfo getQueryInfo(Optional<StageInfo> rootStage)
}
}

boolean completeInfo = getAllStages(rootStage).stream().allMatch(StageInfo::isCompleteInfo);
boolean isScheduled = isScheduled(rootStage);
List<StageInfo> allStages = getAllStages(rootStage);
QueryStats queryStats = getQueryStats(rootStage, allStages);
boolean finalInfo = state.isDone() && allStages.stream().allMatch(StageInfo::isFinalStageInfo);

return new QueryInfo(
queryId,
session.toSessionRepresentation(),
state,
isScheduled,
self,
outputManager.getQueryOutputInfo().map(QueryOutputInfo::getColumnNames).orElse(ImmutableList.of()),
query,
preparedQuery,
getQueryStats(rootStage),
queryStats,
Optional.ofNullable(setCatalog.get()),
Optional.ofNullable(setSchema.get()),
Optional.ofNullable(setPath.get()),
Expand All @@ -468,13 +468,13 @@ QueryInfo getQueryInfo(Optional<StageInfo> rootStage)
output.get(),
referencedTables.get(),
routines.get(),
completeInfo,
finalInfo,
Optional.of(resourceGroup),
queryType,
getRetryPolicy(session));
}

private QueryStats getQueryStats(Optional<StageInfo> rootStage)
private QueryStats getQueryStats(Optional<StageInfo> rootStage, List<StageInfo> allStages)
{
int totalTasks = 0;
int runningTasks = 0;
Expand Down Expand Up @@ -535,14 +535,13 @@ private QueryStats getQueryStats(Optional<StageInfo> rootStage)
long physicalWrittenDataSize = 0;
long failedPhysicalWrittenDataSize = 0;

ImmutableList.Builder<StageGcStatistics> stageGcStatistics = ImmutableList.builder();
ImmutableList.Builder<StageGcStatistics> stageGcStatistics = ImmutableList.builderWithExpectedSize(allStages.size());

boolean fullyBlocked = rootStage.isPresent();
Set<BlockedReason> blockedReasons = new HashSet<>();

ImmutableList.Builder<OperatorStats> operatorStatsSummary = ImmutableList.builder();
boolean completeInfo = true;
for (StageInfo stageInfo : getAllStages(rootStage)) {
for (StageInfo stageInfo : allStages) {
StageStats stageStats = stageInfo.getStageStats();
totalTasks += stageStats.getTotalTasks();
runningTasks += stageStats.getRunningTasks();
Expand Down Expand Up @@ -606,7 +605,6 @@ private QueryStats getQueryStats(Optional<StageInfo> rootStage)

stageGcStatistics.add(stageStats.getGcInfo());

completeInfo = completeInfo && stageInfo.isCompleteInfo();
operatorStatsSummary.addAll(stageInfo.getStageStats().getOperatorSummaries());
}

Expand All @@ -618,7 +616,9 @@ private QueryStats getQueryStats(Optional<StageInfo> rootStage)
failedOutputPositions += outputStageStats.getFailedOutputPositions();
}

boolean isScheduled = isScheduled(rootStage);
boolean isScheduled = rootStage.isPresent() && allStages.stream()
.map(StageInfo::getState)
.allMatch(state -> state == StageState.RUNNING || state == StageState.PENDING || state.isDone());

return new QueryStats(
queryStateTimer.getCreateTime(),
Expand Down Expand Up @@ -1108,16 +1108,6 @@ public Optional<DateTime> getEndTime()
return queryStateTimer.getEndTime();
}

private static boolean isScheduled(Optional<StageInfo> rootStage)
{
if (rootStage.isEmpty()) {
return false;
}
return getAllStages(rootStage).stream()
.map(StageInfo::getState)
.allMatch(state -> state == StageState.RUNNING || state == StageState.PENDING || state.isDone());
}

public Optional<ExecutionFailureInfo> getFailureInfo()
{
if (queryState.get() != FAILED) {
Expand Down Expand Up @@ -1164,7 +1154,6 @@ public void pruneQueryInfo()
queryInfo.getQueryId(),
queryInfo.getSession(),
queryInfo.getState(),
queryInfo.isScheduled(),
queryInfo.getSelf(),
queryInfo.getFieldNames(),
queryInfo.getQuery(),
Expand All @@ -1189,7 +1178,7 @@ public void pruneQueryInfo()
queryInfo.getOutput(),
queryInfo.getReferencedTables(),
queryInfo.getRoutines(),
queryInfo.isCompleteInfo(),
queryInfo.isFinalQueryInfo(),
queryInfo.getResourceGroupId(),
queryInfo.getQueryType(),
queryInfo.getRetryPolicy());
Expand Down
21 changes: 10 additions & 11 deletions core/trino-main/src/main/java/io/trino/execution/StageInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -154,22 +154,21 @@ public String toString()

public static List<StageInfo> getAllStages(Optional<StageInfo> stageInfo)
{
if (stageInfo.isEmpty()) {
return ImmutableList.of();
}
ImmutableList.Builder<StageInfo> collector = ImmutableList.builder();
addAllStages(stageInfo, collector);
addAllStages(stageInfo.get(), collector);
return collector.build();
}

private static void addAllStages(Optional<StageInfo> stageInfo, ImmutableList.Builder<StageInfo> collector)
private static void addAllStages(@Nullable StageInfo stage, ImmutableList.Builder<StageInfo> collector)
{
stageInfo.ifPresent(stage -> {
if (stage != null) {
collector.add(stage);
stage.getSubStages().stream()
.forEach(subStage -> addAllStages(Optional.ofNullable(subStage), collector));
});
}

public boolean isCompleteInfo()
{
return state.isDone() && tasks.stream().allMatch(taskInfo -> taskInfo.getTaskStatus().getState().isDone());
for (StageInfo subStage : stage.getSubStages()) {
addAllStages(subStage, collector);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ public void setAllTasksFinal(Iterable<TaskInfo> finalTaskInfos)
requireNonNull(finalTaskInfos, "finalTaskInfos is null");
checkState(stageState.get().isDone());
StageInfo stageInfo = getStageInfo(() -> finalTaskInfos);
checkArgument(stageInfo.isCompleteInfo(), "finalTaskInfos are not all done");
checkArgument(stageInfo.isFinalStageInfo(), "finalTaskInfos are not all done");
finalStageInfo.compareAndSet(Optional.empty(), Optional.of(stageInfo));
}

Expand Down
64 changes: 32 additions & 32 deletions core/trino-main/src/main/java/io/trino/server/protocol/Query.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.log.Logger;
Expand Down Expand Up @@ -733,11 +734,14 @@ private static StatementStats toStatementStats(QueryInfo queryInfo)
QueryStats queryStats = queryInfo.getQueryStats();
StageInfo outputStage = queryInfo.getOutputStage().orElse(null);

Set<String> globalUniqueNodes = new HashSet<>();
StageStats rootStageStats = toStageStats(outputStage, globalUniqueNodes);

return StatementStats.builder()
.setState(queryInfo.getState().toString())
.setQueued(queryInfo.getState() == QueryState.QUEUED)
.setScheduled(queryInfo.isScheduled())
.setNodes(globalUniqueNodes(outputStage).size())
.setNodes(globalUniqueNodes.size())
.setTotalSplits(queryStats.getTotalDrivers())
.setQueuedSplits(queryStats.getQueuedDrivers())
.setRunningSplits(queryStats.getRunningDrivers() + queryStats.getBlockedDrivers())
Expand All @@ -751,35 +755,23 @@ private static StatementStats toStatementStats(QueryInfo queryInfo)
.setPhysicalInputBytes(queryStats.getPhysicalInputDataSize().toBytes())
.setPeakMemoryBytes(queryStats.getPeakUserMemoryReservation().toBytes())
.setSpilledBytes(queryStats.getSpilledDataSize().toBytes())
.setRootStage(toStageStats(outputStage))
.setRootStage(rootStageStats)
.build();
}

private static StageStats toStageStats(StageInfo stageInfo)
private static StageStats toStageStats(StageInfo stageInfo, Set<String> globalUniqueNodes)
Comment thread
pettyjamesm marked this conversation as resolved.
Outdated
{
if (stageInfo == null) {
return null;
}

io.trino.execution.StageStats stageStats = stageInfo.getStageStats();

ImmutableList.Builder<StageStats> subStages = ImmutableList.builder();
for (StageInfo subStage : stageInfo.getSubStages()) {
subStages.add(toStageStats(subStage));
}

Set<String> uniqueNodes = new HashSet<>();
for (TaskInfo task : stageInfo.getTasks()) {
// todo add nodeId to TaskInfo
URI uri = task.getTaskStatus().getSelf();
uniqueNodes.add(uri.getHost() + ":" + uri.getPort());
}

return StageStats.builder()
// Store current stage details into a builder
StageStats.Builder builder = StageStats.builder()
.setStageId(String.valueOf(stageInfo.getStageId().getId()))
.setState(stageInfo.getState().toString())
.setDone(stageInfo.getState().isDone())
.setNodes(uniqueNodes.size())
.setTotalSplits(stageStats.getTotalDrivers())
.setQueuedSplits(stageStats.getQueuedDrivers())
.setRunningSplits(stageStats.getRunningDrivers() + stageStats.getBlockedDrivers())
Expand All @@ -791,26 +783,34 @@ private static StageStats toStageStats(StageInfo stageInfo)
.setPhysicalInputBytes(stageStats.getPhysicalInputDataSize().toBytes())
.setFailedTasks(stageStats.getFailedTasks())
.setCoordinatorOnly(stageInfo.isCoordinatorOnly())
.setSubStages(subStages.build())
.build();
}
.setNodes(countStageAndAddGlobalUniqueNodes(stageInfo, globalUniqueNodes));

private static Set<String> globalUniqueNodes(StageInfo stageInfo)
{
if (stageInfo == null) {
return ImmutableSet.of();
// Recurse into child stages to create their StageStats
List<StageInfo> subStages = stageInfo.getSubStages();
if (subStages.isEmpty()) {
builder.setSubStages(ImmutableList.of());
}
ImmutableSet.Builder<String> nodes = ImmutableSet.builder();
for (TaskInfo task : stageInfo.getTasks()) {
// todo add nodeId to TaskInfo
URI uri = task.getTaskStatus().getSelf();
nodes.add(uri.getHost() + ":" + uri.getPort());
else {
ImmutableList.Builder<StageStats> subStagesBuilder = ImmutableList.builderWithExpectedSize(subStages.size());
for (StageInfo subStage : subStages) {
subStagesBuilder.add(toStageStats(subStage, globalUniqueNodes));
}
builder.setSubStages(subStagesBuilder.build());
}

for (StageInfo subStage : stageInfo.getSubStages()) {
nodes.addAll(globalUniqueNodes(subStage));
return builder.build();
}

private static int countStageAndAddGlobalUniqueNodes(StageInfo stageInfo, Set<String> globalUniqueNodes)
{
List<TaskInfo> tasks = stageInfo.getTasks();
Set<String> stageUniqueNodes = Sets.newHashSetWithExpectedSize(tasks.size());
for (TaskInfo task : tasks) {
String nodeId = task.getTaskStatus().getNodeId();
stageUniqueNodes.add(nodeId);
globalUniqueNodes.add(nodeId);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better if we did globalUniqueNodes.addAll(stageUniqueNodes) after this loop ?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There’s no particular advantage to bulk inserts on HashSet, the implementation just does another loop with another iterator which is probably slightly slower (extra iterator, iterator is over the Set instead of a List, temporal memory locality of access, etc)

}
return nodes.build();
return stageUniqueNodes.size();
}

private static Optional<Integer> findCancelableLeafStage(QueryInfo queryInfo)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,14 +269,14 @@ public static String textDistributedPlan(
ValuePrinter valuePrinter,
boolean verbose)
{
Map<PlanNodeId, TableInfo> tableInfos = getAllStages(Optional.of(outputStageInfo)).stream()
List<StageInfo> allStages = getAllStages(Optional.of(outputStageInfo));
Map<PlanNodeId, TableInfo> tableInfos = allStages.stream()
.map(StageInfo::getTables)
.map(Map::entrySet)
.flatMap(Collection::stream)
.collect(toImmutableMap(Entry::getKey, Entry::getValue));

StringBuilder builder = new StringBuilder();
List<StageInfo> allStages = getAllStages(Optional.of(outputStageInfo));
List<PlanFragment> allFragments = allStages.stream()
.map(StageInfo::getPlan)
.collect(toImmutableList());
Expand Down
Loading