diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/SqlQueryScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/SqlQueryScheduler.java index 4d702603d702..1f036318cf8b 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/SqlQueryScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/SqlQueryScheduler.java @@ -1924,6 +1924,8 @@ public void schedule() return; } + stateMachine.transitionToRunning(); + try (SetThreadName ignored = new SetThreadName("Query-%s", queryStateMachine.getQueryId())) { List> blockedStages = new ArrayList<>(); while (!isFinishingOrDone(queryStateMachine) && !stateMachine.getState().isDone()) { diff --git a/testing/trino-testing/src/main/java/io/trino/testing/AbstractDistributedEngineOnlyQueries.java b/testing/trino-testing/src/main/java/io/trino/testing/AbstractDistributedEngineOnlyQueries.java index 9756ff0e222c..cdeef3731c5e 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/AbstractDistributedEngineOnlyQueries.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/AbstractDistributedEngineOnlyQueries.java @@ -13,23 +13,49 @@ */ package io.trino.testing; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import io.trino.Session; +import io.trino.execution.QueryManager; +import io.trino.server.BasicQueryInfo; import org.intellij.lang.annotations.Language; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import java.time.ZonedDateTime; +import java.util.List; +import java.util.concurrent.ExecutorService; import java.util.regex.Pattern; +import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.FeaturesConfig.JoinDistributionType.BROADCAST; import static io.trino.SystemSessionProperties.ENABLE_DYNAMIC_FILTERING; +import static io.trino.execution.QueryState.RUNNING; import static io.trino.testing.assertions.Assert.assertEventually; import static io.trino.testing.sql.TestTable.randomTableSuffix; +import static java.lang.String.format; +import static java.util.concurrent.Executors.newCachedThreadPool; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; public abstract class AbstractDistributedEngineOnlyQueries extends AbstractTestEngineOnlyQueries { + private ExecutorService executorService; + + @BeforeClass + public void setUp() + { + executorService = newCachedThreadPool(); + } + + @AfterClass(alwaysRun = true) + public void shutdown() + { + executorService.shutdownNow(); + } + /** * Ensure the tests are run with {@link io.trino.testing.DistributedQueryRunner}. E.g. {@link io.trino.testing.LocalQueryRunner} takes some * shortcuts, not exercising certain aspects. @@ -294,4 +320,30 @@ public void testImplicitCastToRowWithFieldsRequiringDelimitation() // run INSERT to verify that field names in generated CAST expressions are properly delimited assertUpdate("INSERT INTO target_table SELECT * from source_table", 0); } + + @Test(timeOut = 10_000) + public void testQueryTransitionsToRunningState() + { + String query = format( + // use random marker in query for unique matching below + "SELECT count(*) c_%s FROM lineitem CROSS JOIN lineitem CROSS JOIN lineitem", + randomTableSuffix()); + DistributedQueryRunner queryRunner = getDistributedQueryRunner(); + ListenableFuture queryFuture = Futures.submit( + () -> queryRunner.execute(getSession(), query), executorService); + + QueryManager queryManager = queryRunner.getCoordinator().getQueryManager(); + assertEventually(() -> { + List queryInfos = queryManager.getQueries().stream() + .filter(q -> q.getQuery().equals(query)) + .collect(toImmutableList()); + + assertThat(queryInfos).hasSize(1); + assertThat(queryInfos.get(0).getState()).isEqualTo(RUNNING); + // we are good. Let's kill the query + queryManager.cancelQuery(queryInfos.get(0).getQueryId()); + }); + + assertThatThrownBy(queryFuture::get).hasMessageContaining("Query was canceled"); + } }