Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1924,6 +1924,8 @@ public void schedule()
return;
}

stateMachine.transitionToRunning();
Comment thread
findepi marked this conversation as resolved.
Outdated

try (SetThreadName ignored = new SetThreadName("Query-%s", queryStateMachine.getQueryId())) {
List<ListenableFuture<Void>> blockedStages = new ArrayList<>();
while (!isFinishingOrDone(queryStateMachine) && !stateMachine.getState().isDone()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,49 @@
*/
package io.trino.testing;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.trino.Session;
import io.trino.execution.QueryManager;
import io.trino.server.BasicQueryInfo;
import org.intellij.lang.annotations.Language;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import java.time.ZonedDateTime;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.regex.Pattern;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.FeaturesConfig.JoinDistributionType.BROADCAST;
import static io.trino.SystemSessionProperties.ENABLE_DYNAMIC_FILTERING;
import static io.trino.execution.QueryState.RUNNING;
import static io.trino.testing.assertions.Assert.assertEventually;
import static io.trino.testing.sql.TestTable.randomTableSuffix;
import static java.lang.String.format;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

public abstract class AbstractDistributedEngineOnlyQueries
extends AbstractTestEngineOnlyQueries
{
private ExecutorService executorService;

@BeforeClass
public void setUp()
{
executorService = newCachedThreadPool();
}

@AfterClass(alwaysRun = true)
public void shutdown()
{
executorService.shutdownNow();
}

/**
* Ensure the tests are run with {@link io.trino.testing.DistributedQueryRunner}. E.g. {@link io.trino.testing.LocalQueryRunner} takes some
* shortcuts, not exercising certain aspects.
Expand Down Expand Up @@ -294,4 +320,30 @@ public void testImplicitCastToRowWithFieldsRequiringDelimitation()
// run INSERT to verify that field names in generated CAST expressions are properly delimited
assertUpdate("INSERT INTO target_table SELECT * from source_table", 0);
}

@Test(timeOut = 10_000)
public void testQueryTransitionsToRunningState()
{
String query = format(
// use random marker in query for unique matching below
"SELECT count(*) c_%s FROM lineitem CROSS JOIN lineitem CROSS JOIN lineitem",
randomTableSuffix());
DistributedQueryRunner queryRunner = getDistributedQueryRunner();
ListenableFuture<?> queryFuture = Futures.submit(
() -> queryRunner.execute(getSession(), query), executorService);

QueryManager queryManager = queryRunner.getCoordinator().getQueryManager();
assertEventually(() -> {
List<BasicQueryInfo> queryInfos = queryManager.getQueries().stream()
.filter(q -> q.getQuery().equals(query))
.collect(toImmutableList());

assertThat(queryInfos).hasSize(1);
assertThat(queryInfos.get(0).getState()).isEqualTo(RUNNING);
// we are good. Let's kill the query
queryManager.cancelQuery(queryInfos.get(0).getQueryId());
});

assertThatThrownBy(queryFuture::get).hasMessageContaining("Query was canceled");
}
}