diff --git a/core/trino-main/src/main/java/io/trino/dispatcher/FailedDispatchQuery.java b/core/trino-main/src/main/java/io/trino/dispatcher/FailedDispatchQuery.java index c3d48857ba54..77d4f4a822a9 100644 --- a/core/trino-main/src/main/java/io/trino/dispatcher/FailedDispatchQuery.java +++ b/core/trino-main/src/main/java/io/trino/dispatcher/FailedDispatchQuery.java @@ -214,7 +214,6 @@ private static QueryInfo immediateFailureQueryInfo( session.getQueryId(), session.toSessionRepresentation(), QueryState.FAILED, - false, self, ImmutableList.of(), query, diff --git a/core/trino-main/src/main/java/io/trino/event/QueryMonitor.java b/core/trino-main/src/main/java/io/trino/event/QueryMonitor.java index 2b1717f51872..a00271dfa078 100644 --- a/core/trino-main/src/main/java/io/trino/event/QueryMonitor.java +++ b/core/trino-main/src/main/java/io/trino/event/QueryMonitor.java @@ -297,7 +297,7 @@ private QueryStatistics createQueryStatistics(QueryInfo queryInfo) queryStats.getFailedCumulativeUserMemory(), queryStats.getStageGcStatistics(), queryStats.getCompletedDrivers(), - queryInfo.isCompleteInfo(), + queryInfo.isFinalQueryInfo(), getCpuDistributions(queryInfo), operatorSummaries.build(), serializedPlanNodeStatsAndCosts); diff --git a/core/trino-main/src/main/java/io/trino/execution/QueryInfo.java b/core/trino-main/src/main/java/io/trino/execution/QueryInfo.java index 522757c4e2a0..4edd215ea386 100644 --- a/core/trino-main/src/main/java/io/trino/execution/QueryInfo.java +++ b/core/trino-main/src/main/java/io/trino/execution/QueryInfo.java @@ -42,7 +42,7 @@ import java.util.Set; import static com.google.common.base.MoreObjects.toStringHelper; -import static io.trino.execution.StageInfo.getAllStages; +import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; @Immutable @@ -51,7 +51,6 @@ public class QueryInfo private final QueryId queryId; private final SessionRepresentation session; private final QueryState state; - private final boolean scheduled; private final URI self; private final List fieldNames; private final String query; @@ -77,7 +76,7 @@ public class QueryInfo private final List warnings; private final Set inputs; private final Optional output; - private final boolean completeInfo; + private final boolean finalQueryInfo; private final Optional resourceGroupId; private final Optional queryType; private final RetryPolicy retryPolicy; @@ -87,7 +86,6 @@ public QueryInfo( @JsonProperty("queryId") QueryId queryId, @JsonProperty("session") SessionRepresentation session, @JsonProperty("state") QueryState state, - @JsonProperty("scheduled") boolean scheduled, @JsonProperty("self") URI self, @JsonProperty("fieldNames") List fieldNames, @JsonProperty("query") String query, @@ -112,7 +110,7 @@ public QueryInfo( @JsonProperty("output") Optional output, @JsonProperty("referencedTables") List referencedTables, @JsonProperty("routines") List routines, - @JsonProperty("completeInfo") boolean completeInfo, + @JsonProperty("finalQueryInfo") boolean finalQueryInfo, @JsonProperty("resourceGroupId") Optional resourceGroupId, @JsonProperty("queryType") Optional queryType, @JsonProperty("retryPolicy") RetryPolicy retryPolicy) @@ -146,7 +144,6 @@ public QueryInfo( this.queryId = queryId; this.session = session; this.state = state; - this.scheduled = scheduled; this.self = self; this.fieldNames = ImmutableList.copyOf(fieldNames); this.query = query; @@ -172,7 +169,8 @@ public QueryInfo( this.output = output; this.referencedTables = ImmutableList.copyOf(referencedTables); this.routines = ImmutableList.copyOf(routines); - this.completeInfo = completeInfo; + this.finalQueryInfo = finalQueryInfo; + checkArgument(!finalQueryInfo || state.isDone(), "finalQueryInfo without a terminal query state"); this.resourceGroupId = resourceGroupId; this.queryType = queryType; this.retryPolicy = retryPolicy; @@ -199,7 +197,7 @@ public QueryState getState() @JsonProperty public boolean isScheduled() { - return scheduled; + return queryStats.isScheduled(); } @JsonProperty @@ -335,7 +333,7 @@ public List getWarnings() @JsonProperty public boolean isFinalQueryInfo() { - return state.isDone() && getAllStages(outputStage).stream().allMatch(StageInfo::isFinalStageInfo); + return finalQueryInfo; } @JsonProperty @@ -389,9 +387,4 @@ public String toString() .add("fieldNames", fieldNames) .toString(); } - - public boolean isCompleteInfo() - { - return completeInfo; - } } diff --git a/core/trino-main/src/main/java/io/trino/execution/QueryStateMachine.java b/core/trino-main/src/main/java/io/trino/execution/QueryStateMachine.java index 518e8d5d375c..1fb5efcedfee 100644 --- a/core/trino-main/src/main/java/io/trino/execution/QueryStateMachine.java +++ b/core/trino-main/src/main/java/io/trino/execution/QueryStateMachine.java @@ -436,19 +436,19 @@ QueryInfo getQueryInfo(Optional rootStage) } } - boolean completeInfo = getAllStages(rootStage).stream().allMatch(StageInfo::isCompleteInfo); - boolean isScheduled = isScheduled(rootStage); + List allStages = getAllStages(rootStage); + QueryStats queryStats = getQueryStats(rootStage, allStages); + boolean finalInfo = state.isDone() && allStages.stream().allMatch(StageInfo::isFinalStageInfo); return new QueryInfo( queryId, session.toSessionRepresentation(), state, - isScheduled, self, outputManager.getQueryOutputInfo().map(QueryOutputInfo::getColumnNames).orElse(ImmutableList.of()), query, preparedQuery, - getQueryStats(rootStage), + queryStats, Optional.ofNullable(setCatalog.get()), Optional.ofNullable(setSchema.get()), Optional.ofNullable(setPath.get()), @@ -468,13 +468,13 @@ QueryInfo getQueryInfo(Optional rootStage) output.get(), referencedTables.get(), routines.get(), - completeInfo, + finalInfo, Optional.of(resourceGroup), queryType, getRetryPolicy(session)); } - private QueryStats getQueryStats(Optional rootStage) + private QueryStats getQueryStats(Optional rootStage, List allStages) { int totalTasks = 0; int runningTasks = 0; @@ -535,14 +535,13 @@ private QueryStats getQueryStats(Optional rootStage) long physicalWrittenDataSize = 0; long failedPhysicalWrittenDataSize = 0; - ImmutableList.Builder stageGcStatistics = ImmutableList.builder(); + ImmutableList.Builder stageGcStatistics = ImmutableList.builderWithExpectedSize(allStages.size()); boolean fullyBlocked = rootStage.isPresent(); Set blockedReasons = new HashSet<>(); ImmutableList.Builder operatorStatsSummary = ImmutableList.builder(); - boolean completeInfo = true; - for (StageInfo stageInfo : getAllStages(rootStage)) { + for (StageInfo stageInfo : allStages) { StageStats stageStats = stageInfo.getStageStats(); totalTasks += stageStats.getTotalTasks(); runningTasks += stageStats.getRunningTasks(); @@ -606,7 +605,6 @@ private QueryStats getQueryStats(Optional rootStage) stageGcStatistics.add(stageStats.getGcInfo()); - completeInfo = completeInfo && stageInfo.isCompleteInfo(); operatorStatsSummary.addAll(stageInfo.getStageStats().getOperatorSummaries()); } @@ -618,7 +616,9 @@ private QueryStats getQueryStats(Optional rootStage) failedOutputPositions += outputStageStats.getFailedOutputPositions(); } - boolean isScheduled = isScheduled(rootStage); + boolean isScheduled = rootStage.isPresent() && allStages.stream() + .map(StageInfo::getState) + .allMatch(state -> state == StageState.RUNNING || state == StageState.PENDING || state.isDone()); return new QueryStats( queryStateTimer.getCreateTime(), @@ -1108,16 +1108,6 @@ public Optional getEndTime() return queryStateTimer.getEndTime(); } - private static boolean isScheduled(Optional rootStage) - { - if (rootStage.isEmpty()) { - return false; - } - return getAllStages(rootStage).stream() - .map(StageInfo::getState) - .allMatch(state -> state == StageState.RUNNING || state == StageState.PENDING || state.isDone()); - } - public Optional getFailureInfo() { if (queryState.get() != FAILED) { @@ -1164,7 +1154,6 @@ public void pruneQueryInfo() queryInfo.getQueryId(), queryInfo.getSession(), queryInfo.getState(), - queryInfo.isScheduled(), queryInfo.getSelf(), queryInfo.getFieldNames(), queryInfo.getQuery(), @@ -1189,7 +1178,7 @@ public void pruneQueryInfo() queryInfo.getOutput(), queryInfo.getReferencedTables(), queryInfo.getRoutines(), - queryInfo.isCompleteInfo(), + queryInfo.isFinalQueryInfo(), queryInfo.getResourceGroupId(), queryInfo.getQueryType(), queryInfo.getRetryPolicy()); diff --git a/core/trino-main/src/main/java/io/trino/execution/StageInfo.java b/core/trino-main/src/main/java/io/trino/execution/StageInfo.java index ffd7172ab6bf..679a1482b997 100644 --- a/core/trino-main/src/main/java/io/trino/execution/StageInfo.java +++ b/core/trino-main/src/main/java/io/trino/execution/StageInfo.java @@ -154,22 +154,21 @@ public String toString() public static List getAllStages(Optional stageInfo) { + if (stageInfo.isEmpty()) { + return ImmutableList.of(); + } ImmutableList.Builder collector = ImmutableList.builder(); - addAllStages(stageInfo, collector); + addAllStages(stageInfo.get(), collector); return collector.build(); } - private static void addAllStages(Optional stageInfo, ImmutableList.Builder collector) + private static void addAllStages(@Nullable StageInfo stage, ImmutableList.Builder collector) { - stageInfo.ifPresent(stage -> { + if (stage != null) { collector.add(stage); - stage.getSubStages().stream() - .forEach(subStage -> addAllStages(Optional.ofNullable(subStage), collector)); - }); - } - - public boolean isCompleteInfo() - { - return state.isDone() && tasks.stream().allMatch(taskInfo -> taskInfo.getTaskStatus().getState().isDone()); + for (StageInfo subStage : stage.getSubStages()) { + addAllStages(subStage, collector); + } + } } } diff --git a/core/trino-main/src/main/java/io/trino/execution/StageStateMachine.java b/core/trino-main/src/main/java/io/trino/execution/StageStateMachine.java index 62a57ac66e4c..1ac8e0e564d5 100644 --- a/core/trino-main/src/main/java/io/trino/execution/StageStateMachine.java +++ b/core/trino-main/src/main/java/io/trino/execution/StageStateMachine.java @@ -195,7 +195,7 @@ public void setAllTasksFinal(Iterable finalTaskInfos) requireNonNull(finalTaskInfos, "finalTaskInfos is null"); checkState(stageState.get().isDone()); StageInfo stageInfo = getStageInfo(() -> finalTaskInfos); - checkArgument(stageInfo.isCompleteInfo(), "finalTaskInfos are not all done"); + checkArgument(stageInfo.isFinalStageInfo(), "finalTaskInfos are not all done"); finalStageInfo.compareAndSet(Optional.empty(), Optional.of(stageInfo)); } diff --git a/core/trino-main/src/main/java/io/trino/server/protocol/Query.java b/core/trino-main/src/main/java/io/trino/server/protocol/Query.java index 2f18d147419b..1aa2258494af 100644 --- a/core/trino-main/src/main/java/io/trino/server/protocol/Query.java +++ b/core/trino-main/src/main/java/io/trino/server/protocol/Query.java @@ -17,6 +17,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import io.airlift.log.Logger; @@ -733,11 +734,14 @@ private static StatementStats toStatementStats(QueryInfo queryInfo) QueryStats queryStats = queryInfo.getQueryStats(); StageInfo outputStage = queryInfo.getOutputStage().orElse(null); + Set globalUniqueNodes = new HashSet<>(); + StageStats rootStageStats = toStageStats(outputStage, globalUniqueNodes); + return StatementStats.builder() .setState(queryInfo.getState().toString()) .setQueued(queryInfo.getState() == QueryState.QUEUED) .setScheduled(queryInfo.isScheduled()) - .setNodes(globalUniqueNodes(outputStage).size()) + .setNodes(globalUniqueNodes.size()) .setTotalSplits(queryStats.getTotalDrivers()) .setQueuedSplits(queryStats.getQueuedDrivers()) .setRunningSplits(queryStats.getRunningDrivers() + queryStats.getBlockedDrivers()) @@ -751,11 +755,11 @@ private static StatementStats toStatementStats(QueryInfo queryInfo) .setPhysicalInputBytes(queryStats.getPhysicalInputDataSize().toBytes()) .setPeakMemoryBytes(queryStats.getPeakUserMemoryReservation().toBytes()) .setSpilledBytes(queryStats.getSpilledDataSize().toBytes()) - .setRootStage(toStageStats(outputStage)) + .setRootStage(rootStageStats) .build(); } - private static StageStats toStageStats(StageInfo stageInfo) + private static StageStats toStageStats(StageInfo stageInfo, Set globalUniqueNodes) { if (stageInfo == null) { return null; @@ -763,23 +767,11 @@ private static StageStats toStageStats(StageInfo stageInfo) io.trino.execution.StageStats stageStats = stageInfo.getStageStats(); - ImmutableList.Builder subStages = ImmutableList.builder(); - for (StageInfo subStage : stageInfo.getSubStages()) { - subStages.add(toStageStats(subStage)); - } - - Set uniqueNodes = new HashSet<>(); - for (TaskInfo task : stageInfo.getTasks()) { - // todo add nodeId to TaskInfo - URI uri = task.getTaskStatus().getSelf(); - uniqueNodes.add(uri.getHost() + ":" + uri.getPort()); - } - - return StageStats.builder() + // Store current stage details into a builder + StageStats.Builder builder = StageStats.builder() .setStageId(String.valueOf(stageInfo.getStageId().getId())) .setState(stageInfo.getState().toString()) .setDone(stageInfo.getState().isDone()) - .setNodes(uniqueNodes.size()) .setTotalSplits(stageStats.getTotalDrivers()) .setQueuedSplits(stageStats.getQueuedDrivers()) .setRunningSplits(stageStats.getRunningDrivers() + stageStats.getBlockedDrivers()) @@ -791,26 +783,34 @@ private static StageStats toStageStats(StageInfo stageInfo) .setPhysicalInputBytes(stageStats.getPhysicalInputDataSize().toBytes()) .setFailedTasks(stageStats.getFailedTasks()) .setCoordinatorOnly(stageInfo.isCoordinatorOnly()) - .setSubStages(subStages.build()) - .build(); - } + .setNodes(countStageAndAddGlobalUniqueNodes(stageInfo, globalUniqueNodes)); - private static Set globalUniqueNodes(StageInfo stageInfo) - { - if (stageInfo == null) { - return ImmutableSet.of(); + // Recurse into child stages to create their StageStats + List subStages = stageInfo.getSubStages(); + if (subStages.isEmpty()) { + builder.setSubStages(ImmutableList.of()); } - ImmutableSet.Builder nodes = ImmutableSet.builder(); - for (TaskInfo task : stageInfo.getTasks()) { - // todo add nodeId to TaskInfo - URI uri = task.getTaskStatus().getSelf(); - nodes.add(uri.getHost() + ":" + uri.getPort()); + else { + ImmutableList.Builder subStagesBuilder = ImmutableList.builderWithExpectedSize(subStages.size()); + for (StageInfo subStage : subStages) { + subStagesBuilder.add(toStageStats(subStage, globalUniqueNodes)); + } + builder.setSubStages(subStagesBuilder.build()); } - for (StageInfo subStage : stageInfo.getSubStages()) { - nodes.addAll(globalUniqueNodes(subStage)); + return builder.build(); + } + + private static int countStageAndAddGlobalUniqueNodes(StageInfo stageInfo, Set globalUniqueNodes) + { + List tasks = stageInfo.getTasks(); + Set stageUniqueNodes = Sets.newHashSetWithExpectedSize(tasks.size()); + for (TaskInfo task : tasks) { + String nodeId = task.getTaskStatus().getNodeId(); + stageUniqueNodes.add(nodeId); + globalUniqueNodes.add(nodeId); } - return nodes.build(); + return stageUniqueNodes.size(); } private static Optional findCancelableLeafStage(QueryInfo queryInfo) diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/PlanPrinter.java b/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/PlanPrinter.java index bd28dd577ba6..f3518d73214b 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/PlanPrinter.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/PlanPrinter.java @@ -269,14 +269,14 @@ public static String textDistributedPlan( ValuePrinter valuePrinter, boolean verbose) { - Map tableInfos = getAllStages(Optional.of(outputStageInfo)).stream() + List allStages = getAllStages(Optional.of(outputStageInfo)); + Map tableInfos = allStages.stream() .map(StageInfo::getTables) .map(Map::entrySet) .flatMap(Collection::stream) .collect(toImmutableMap(Entry::getKey, Entry::getValue)); StringBuilder builder = new StringBuilder(); - List allStages = getAllStages(Optional.of(outputStageInfo)); List allFragments = allStages.stream() .map(StageInfo::getPlan) .collect(toImmutableList()); diff --git a/core/trino-main/src/test/java/io/trino/execution/MockManagedQueryExecution.java b/core/trino-main/src/test/java/io/trino/execution/MockManagedQueryExecution.java index 808b8e1f7f57..c67facc54508 100644 --- a/core/trino-main/src/test/java/io/trino/execution/MockManagedQueryExecution.java +++ b/core/trino-main/src/test/java/io/trino/execution/MockManagedQueryExecution.java @@ -158,7 +158,6 @@ public QueryInfo getFullQueryInfo() new QueryId("test"), session.toSessionRepresentation(), state, - !state.isDone(), URI.create("http://test"), ImmutableList.of(), "SELECT 1", @@ -201,7 +200,7 @@ public QueryInfo getFullQueryInfo() DataSize.ofBytes(25), DataSize.ofBytes(26), - true, + !state.isDone(), new Duration(20, NANOSECONDS), new Duration(21, NANOSECONDS), new Duration(22, NANOSECONDS), diff --git a/core/trino-main/src/test/java/io/trino/execution/TestQueryInfo.java b/core/trino-main/src/test/java/io/trino/execution/TestQueryInfo.java new file mode 100644 index 000000000000..bdda26f6a07c --- /dev/null +++ b/core/trino-main/src/test/java/io/trino/execution/TestQueryInfo.java @@ -0,0 +1,127 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.execution; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import io.airlift.json.JsonCodec; +import io.trino.operator.RetryPolicy; +import io.trino.spi.QueryId; +import io.trino.spi.TrinoWarning; +import io.trino.spi.WarningCode; +import io.trino.spi.resourcegroups.QueryType; +import io.trino.spi.resourcegroups.ResourceGroupId; +import io.trino.spi.security.SelectedRole; +import io.trino.sql.planner.plan.PlanFragmentId; +import io.trino.sql.planner.plan.PlanNodeId; +import io.trino.transaction.TransactionId; +import org.testng.annotations.Test; + +import java.net.URI; +import java.util.Optional; + +import static io.trino.SessionTestUtils.TEST_SESSION; +import static io.trino.execution.QueryState.FINISHED; +import static org.testng.Assert.assertEquals; + +public class TestQueryInfo +{ + @Test + public void testQueryInfoRoundTrip() + { + JsonCodec codec = JsonCodec.jsonCodec(QueryInfo.class); + QueryInfo expected = createQueryInfo(); + QueryInfo actual = codec.fromJson(codec.toJsonBytes(expected)); + + assertEquals(actual.getQueryId(), expected.getQueryId()); + // Note: SessionRepresentation.equals? + assertEquals(actual.getState(), expected.getState()); + assertEquals(actual.isScheduled(), expected.isScheduled()); + + assertEquals(actual.getSelf(), expected.getSelf()); + assertEquals(actual.getFieldNames(), expected.getFieldNames()); + assertEquals(actual.getQuery(), expected.getQuery()); + assertEquals(actual.getPreparedQuery(), expected.getPreparedQuery()); + // Assert all of queryStats + TestQueryStats.assertExpectedQueryStats(actual.getQueryStats()); + + assertEquals(actual.getSetCatalog(), expected.getSetCatalog()); + assertEquals(actual.getSetSchema(), expected.getSetSchema()); + assertEquals(actual.getSetPath(), expected.getSetPath()); + assertEquals(actual.getSetSessionProperties(), expected.getSetSessionProperties()); + assertEquals(actual.getResetSessionProperties(), expected.getResetSessionProperties()); + assertEquals(actual.getSetRoles(), expected.getSetRoles()); + assertEquals(actual.getAddedPreparedStatements(), expected.getAddedPreparedStatements()); + assertEquals(actual.getDeallocatedPreparedStatements(), expected.getDeallocatedPreparedStatements()); + + assertEquals(actual.getStartedTransactionId(), expected.getStartedTransactionId()); + assertEquals(actual.isClearTransactionId(), expected.isClearTransactionId()); + + assertEquals(actual.getUpdateType(), expected.getUpdateType()); + assertEquals(actual.getOutputStage(), expected.getOutputStage()); + + assertEquals(actual.getFailureInfo(), expected.getFailureInfo()); + assertEquals(actual.getErrorCode(), expected.getErrorCode()); + assertEquals(actual.getWarnings(), expected.getWarnings()); + + assertEquals(actual.getInputs(), expected.getInputs()); + assertEquals(actual.getOutput(), expected.getOutput()); + + assertEquals(actual.getReferencedTables(), expected.getReferencedTables()); + assertEquals(actual.getRoutines(), expected.getRoutines()); + + assertEquals(actual.isFinalQueryInfo(), expected.isFinalQueryInfo()); + + assertEquals(actual.getResourceGroupId(), expected.getResourceGroupId()); + assertEquals(actual.getQueryType(), expected.getQueryType()); + assertEquals(actual.getRetryPolicy(), expected.getRetryPolicy()); + } + + private static QueryInfo createQueryInfo() + { + return new QueryInfo( + new QueryId("0"), + TEST_SESSION.toSessionRepresentation(), + FINISHED, + URI.create("1"), + ImmutableList.of("number"), + "SELECT 1 as number", + Optional.of("prepared_query"), + TestQueryStats.EXPECTED, + Optional.of("set_catalog"), + Optional.of("set_schema"), + Optional.of("set_path"), + ImmutableMap.of("set_property", "set_value"), + ImmutableSet.of("reset_property"), + ImmutableMap.of("set_roles", new SelectedRole(SelectedRole.Type.ROLE, Optional.of("role"))), + ImmutableMap.of("added_prepared_statement", "statement"), + ImmutableSet.of("deallocated_prepared_statement", "statement"), + Optional.of(TransactionId.create()), + true, + "42", + Optional.empty(), + null, + null, + ImmutableList.of(new TrinoWarning(new WarningCode(1, "name"), "message")), + ImmutableSet.of(new Input("catalog", "schema", "talble", Optional.empty(), ImmutableList.of(new Column("name", "type")), new PlanFragmentId("id"), new PlanNodeId("1"))), + Optional.empty(), + ImmutableList.of(), + ImmutableList.of(), + true, + Optional.of(new ResourceGroupId("groupId")), + Optional.of(QueryType.SELECT), + RetryPolicy.TASK); + } +} diff --git a/core/trino-main/src/test/java/io/trino/execution/TestSqlStage.java b/core/trino-main/src/test/java/io/trino/execution/TestSqlStage.java index 1d565218d1a8..9ad9d5a90d53 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestSqlStage.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestSqlStage.java @@ -156,7 +156,7 @@ private void testFinalStageInfoInternal() // once the final stage info is available, verify that it is complete StageInfo stageInfo = finalStageInfo.get(1, MINUTES); assertFalse(stageInfo.getTasks().isEmpty()); - assertTrue(stageInfo.isCompleteInfo()); + assertTrue(stageInfo.isFinalStageInfo()); assertSame(stage.getStageInfo(), stageInfo); // cancel the background thread adding tasks diff --git a/core/trino-main/src/test/java/io/trino/server/TestBasicQueryInfo.java b/core/trino-main/src/test/java/io/trino/server/TestBasicQueryInfo.java index 135bde5e4899..d77c9dd8cf77 100644 --- a/core/trino-main/src/test/java/io/trino/server/TestBasicQueryInfo.java +++ b/core/trino-main/src/test/java/io/trino/server/TestBasicQueryInfo.java @@ -39,6 +39,7 @@ import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.SECONDS; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; public class TestBasicQueryInfo { @@ -50,7 +51,6 @@ public void testConstructor() new QueryId("0"), TEST_SESSION.toSessionRepresentation(), RUNNING, - false, URI.create("1"), ImmutableList.of("2", "3"), "SELECT 4", @@ -160,7 +160,7 @@ public void testConstructor() assertEquals(basicInfo.getQueryId().getId(), "0"); assertEquals(basicInfo.getState(), RUNNING); - assertEquals(basicInfo.isScheduled(), false); + assertTrue(basicInfo.isScheduled()); // from query stats assertEquals(basicInfo.getQuery(), "SELECT 4"); assertEquals(basicInfo.getQueryType().get(), QueryType.SELECT); diff --git a/core/trino-main/src/test/java/io/trino/server/TestQueryStateInfo.java b/core/trino-main/src/test/java/io/trino/server/TestQueryStateInfo.java index 35b7d6ac4aa7..d489c6916b37 100644 --- a/core/trino-main/src/test/java/io/trino/server/TestQueryStateInfo.java +++ b/core/trino-main/src/test/java/io/trino/server/TestQueryStateInfo.java @@ -101,7 +101,6 @@ private QueryInfo createQueryInfo(String queryId, QueryState state, String query new QueryId(queryId), TEST_SESSION.toSessionRepresentation(), state, - true, URI.create("1"), ImmutableList.of("2", "3"), query, diff --git a/testing/trino-tests/src/test/java/io/trino/execution/TestStatementStats.java b/testing/trino-tests/src/test/java/io/trino/execution/TestStatementStats.java new file mode 100644 index 000000000000..289b3f7ba76a --- /dev/null +++ b/testing/trino-tests/src/test/java/io/trino/execution/TestStatementStats.java @@ -0,0 +1,60 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.execution; + +import com.google.common.collect.ImmutableMap; +import io.trino.client.StageStats; +import io.trino.client.StatementStats; +import io.trino.testing.DistributedQueryRunner; +import io.trino.testing.MaterializedResult; +import io.trino.tests.tpch.TpchQueryRunnerBuilder; +import org.testng.annotations.Test; + +import static io.trino.testing.TestingSession.testSessionBuilder; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; + +public class TestStatementStats +{ + @Test + public void testUniqueNodeCounts() + throws Exception + { + try (DistributedQueryRunner queryRunner = TpchQueryRunnerBuilder.builder() + .setCoordinatorProperties(ImmutableMap.builder() + .put("query-manager.required-workers", "2") // wait for both nodes before starting the query + .buildOrThrow()) + .setNodeCount(2) + .build()) { + MaterializedResult result = queryRunner.execute(testSessionBuilder().setCatalog("tpch").setSchema("tiny").build(), "SELECT COUNT(*) from lineitem LIMIT 10"); + + assertTrue(result.getStatementStats().isPresent()); + + StatementStats stats = result.getStatementStats().get(); + // two unique nodes across all stages + assertEquals(stats.getNodes(), 2); + + StageStats rootStage = stats.getRootStage(); + assertNotNull(rootStage); + // root stage should be a single node gather + assertEquals(rootStage.getNodes(), 1); + + // one child stage + assertEquals(rootStage.getSubStages().size(), 1); + // child stage has two unique nodes + assertEquals(rootStage.getSubStages().get(0).getNodes(), 2); + } + } +}