diff --git a/core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java b/core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java index 9a86363b5dbe..035114b191d8 100644 --- a/core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java +++ b/core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java @@ -62,6 +62,7 @@ public class QueryManagerConfig private Duration clientTimeout = new Duration(5, TimeUnit.MINUTES); private int queryManagerExecutorPoolSize = 5; + private int queryExecutorPoolSize = 1000; private Duration remoteTaskMaxErrorDuration = new Duration(5, TimeUnit.MINUTES); private int remoteTaskMaxCallbackThreads = 1000; @@ -261,6 +262,19 @@ public QueryManagerConfig setQueryManagerExecutorPoolSize(int queryManagerExecut return this; } + @Min(1) + public int getQueryExecutorPoolSize() + { + return queryExecutorPoolSize; + } + + @Config("query.executor-pool-size") + public QueryManagerConfig setQueryExecutorPoolSize(int queryExecutorPoolSize) + { + this.queryExecutorPoolSize = queryExecutorPoolSize; + return this; + } + @Deprecated public Duration getRemoteTaskMinErrorDuration() { diff --git a/core/trino-main/src/main/java/io/trino/server/CoordinatorModule.java b/core/trino-main/src/main/java/io/trino/server/CoordinatorModule.java index 0df472660f2c..e70b2028b5ec 100644 --- a/core/trino-main/src/main/java/io/trino/server/CoordinatorModule.java +++ b/core/trino-main/src/main/java/io/trino/server/CoordinatorModule.java @@ -49,6 +49,7 @@ import io.trino.execution.QueryExecutionMBean; import io.trino.execution.QueryIdGenerator; import io.trino.execution.QueryManager; +import io.trino.execution.QueryManagerConfig; import io.trino.execution.QueryPerformanceFetcher; import io.trino.execution.QueryPreparer; import io.trino.execution.RemoteTaskFactory; @@ -119,7 +120,9 @@ import java.util.List; import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; import static com.google.inject.multibindings.MapBinder.newMapBinder; import static com.google.inject.multibindings.Multibinder.newSetBinder; @@ -305,8 +308,15 @@ protected void setup(Binder binder) .toInstance(newSingleThreadScheduledExecutor(threadsNamed("stage-scheduler"))); // query execution - binder.bind(ExecutorService.class).annotatedWith(ForQueryExecution.class) - .toInstance(newCachedThreadPool(threadsNamed("query-execution-%s"))); + QueryManagerConfig queryManagerConfig = buildConfigObject(QueryManagerConfig.class); + ThreadPoolExecutor queryExecutor = new ThreadPoolExecutor( + queryManagerConfig.getQueryExecutorPoolSize(), + queryManagerConfig.getQueryExecutorPoolSize(), + 60, SECONDS, + new LinkedBlockingQueue<>(1000), + threadsNamed("query-execution-%s")); + queryExecutor.allowCoreThreadTimeOut(true); + binder.bind(ExecutorService.class).annotatedWith(ForQueryExecution.class).toInstance(queryExecutor); binder.bind(QueryExecutionMBean.class).in(Scopes.SINGLETON); newExporter(binder).export(QueryExecutionMBean.class) .as(generator -> generator.generatedNameOf(QueryExecution.class)); diff --git a/core/trino-main/src/test/java/io/trino/execution/TestQueryManagerConfig.java b/core/trino-main/src/test/java/io/trino/execution/TestQueryManagerConfig.java index f9983d981a64..54febc8d5bd7 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestQueryManagerConfig.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestQueryManagerConfig.java @@ -51,6 +51,7 @@ public void testDefaults() .setMaxQueuedQueries(5000) .setHashPartitionCount(100) .setQueryManagerExecutorPoolSize(5) + .setQueryExecutorPoolSize(1000) .setRemoteTaskMinErrorDuration(new Duration(5, MINUTES)) .setRemoteTaskMaxErrorDuration(new Duration(5, MINUTES)) .setRemoteTaskMaxCallbackThreads(1000) @@ -95,6 +96,7 @@ public void testExplicitPropertyMappings() .put("query.max-queued-queries", "15") .put("query.hash-partition-count", "16") .put("query.manager-executor-pool-size", "11") + .put("query.executor-pool-size", "111") .put("query.remote-task.min-error-duration", "30s") .put("query.remote-task.max-error-duration", "60s") .put("query.remote-task.max-callback-threads", "10") @@ -136,6 +138,7 @@ public void testExplicitPropertyMappings() .setMaxQueuedQueries(15) .setHashPartitionCount(16) .setQueryManagerExecutorPoolSize(11) + .setQueryExecutorPoolSize(111) .setRemoteTaskMinErrorDuration(new Duration(60, SECONDS)) .setRemoteTaskMaxErrorDuration(new Duration(60, SECONDS)) .setRemoteTaskMaxCallbackThreads(10) diff --git a/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestQueryFramework.java b/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestQueryFramework.java index 2245a32846bb..eab92f71d62e 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestQueryFramework.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestQueryFramework.java @@ -19,8 +19,13 @@ import com.google.errorprone.annotations.CanIgnoreReturnValue; import io.airlift.units.Duration; import io.trino.Session; +import io.trino.execution.QueryInfo; +import io.trino.execution.QueryManager; import io.trino.execution.QueryState; import io.trino.execution.QueryStats; +import io.trino.execution.SqlTaskManager; +import io.trino.execution.TaskId; +import io.trino.execution.TaskInfo; import io.trino.execution.warnings.WarningCollector; import io.trino.memory.LocalMemoryManager; import io.trino.memory.MemoryPool; @@ -56,6 +61,7 @@ import java.util.List; import java.util.Map; +import java.util.NoSuchElementException; import java.util.Optional; import java.util.OptionalLong; import java.util.function.Consumer; @@ -106,6 +112,8 @@ public final void close() { try (afterClassCloser) { checkQueryMemoryReleased(); + checkQueryInfosFinal(); + checkTasksDone(); } finally { queryRunner = null; @@ -116,27 +124,20 @@ public final void close() private void checkQueryMemoryReleased() { - if (queryRunner == null) { - return; - } - if (!(queryRunner instanceof DistributedQueryRunner)) { - return; - } - DistributedQueryRunner distributedQueryRunner = (DistributedQueryRunner) queryRunner; - assertEventually( + tryGetDistributedQueryRunner().ifPresent(runner -> assertEventually( new Duration(30, SECONDS), new Duration(1, SECONDS), () -> { - List servers = distributedQueryRunner.getServers(); + List servers = runner.getServers(); for (int serverId = 0; serverId < servers.size(); ++serverId) { TestingTrinoServer server = servers.get(serverId); - assertMemoryPoolReleased(distributedQueryRunner.getCoordinator(), server, serverId); + assertMemoryPoolReleased(runner.getCoordinator(), server, serverId); } - assertThat(distributedQueryRunner.getCoordinator().getClusterMemoryManager().getClusterTotalMemoryReservation()) + assertThat(runner.getCoordinator().getClusterMemoryManager().getClusterTotalMemoryReservation()) .describedAs("cluster memory reservation") .isZero(); - }); + })); } private void assertMemoryPoolReleased(TestingTrinoServer coordinator, TestingTrinoServer server, long serverId) @@ -178,6 +179,55 @@ private String describeMemoryPool(TestingTrinoServer coordinator, TestingTrinoSe return result.toString(); } + private void checkQueryInfosFinal() + { + tryGetDistributedQueryRunner().ifPresent(runner -> assertEventually( + new Duration(30, SECONDS), + new Duration(1, SECONDS), + () -> { + TestingTrinoServer coordinator = runner.getCoordinator(); + QueryManager queryManager = coordinator.getQueryManager(); + for (BasicQueryInfo basicQueryInfo : queryManager.getQueries()) { + QueryId queryId = basicQueryInfo.getQueryId(); + if (!basicQueryInfo.getState().isDone()) { + fail("query is expected to be in done state: " + basicQueryInfo.getQuery()); + } + QueryInfo queryInfo = queryManager.getFullQueryInfo(queryId); + if (!queryInfo.isFinalQueryInfo()) { + fail("QueryInfo is expected to be final: " + basicQueryInfo.getQuery()); + } + } + })); + } + + private void checkTasksDone() + { + tryGetDistributedQueryRunner().ifPresent(runner -> assertEventually( + new Duration(30, SECONDS), + new Duration(1, SECONDS), + () -> { + QueryManager queryManager = runner.getCoordinator().getQueryManager(); + List servers = runner.getServers(); + for (TestingTrinoServer server : servers) { + SqlTaskManager taskManager = server.getTaskManager(); + List taskInfos = taskManager.getAllTaskInfo(); + for (TaskInfo taskInfo : taskInfos) { + TaskId taskId = taskInfo.getTaskStatus().getTaskId(); + QueryId queryId = taskId.getQueryId(); + String query = "unknown"; + try { + query = queryManager.getQueryInfo(queryId).getQuery(); + } + catch (NoSuchElementException ignored) { + } + if (!taskInfo.getTaskStatus().getState().isDone()) { + fail("Task is expected to be in done state. TaskId: %s, QueryId: %s, Query: %s ".formatted(taskId, queryId, query)); + } + } + } + })); + } + @Test public void ensureTestNamingConvention() { @@ -536,6 +586,14 @@ protected final DistributedQueryRunner getDistributedQueryRunner() return (DistributedQueryRunner) queryRunner; } + private Optional tryGetDistributedQueryRunner() + { + if (queryRunner != null && queryRunner instanceof DistributedQueryRunner runner) { + return Optional.of(runner); + } + return Optional.empty(); + } + protected Session noJoinReordering() { return noJoinReordering(JoinDistributionType.PARTITIONED); diff --git a/testing/trino-testing/src/main/java/io/trino/testing/FaultTolerantExecutionConnectorTestHelper.java b/testing/trino-testing/src/main/java/io/trino/testing/FaultTolerantExecutionConnectorTestHelper.java index 63641c66f160..8755046c4f1b 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/FaultTolerantExecutionConnectorTestHelper.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/FaultTolerantExecutionConnectorTestHelper.java @@ -33,6 +33,8 @@ public static Map getExtraProperties() // to trigger spilling .put("exchange.deduplication-buffer-size", "1kB") .put("fault-tolerant-execution-task-memory", "1GB") + // limit number of threads to detect potential thread leaks + .put("query.executor-pool-size", "10") .buildOrThrow(); } } diff --git a/testing/trino-tests/src/test/java/io/trino/execution/TestUserImpersonationAccessControl.java b/testing/trino-tests/src/test/java/io/trino/execution/TestUserImpersonationAccessControl.java index dc0b3204e590..a4ae993975a2 100644 --- a/testing/trino-tests/src/test/java/io/trino/execution/TestUserImpersonationAccessControl.java +++ b/testing/trino-tests/src/test/java/io/trino/execution/TestUserImpersonationAccessControl.java @@ -95,14 +95,14 @@ private QueryError trySelectQuery(String assumedUser) .build(); // start query - StatementClient client = newStatementClient(httpClient, clientSession, "SELECT * FROM tpch.tiny.nation"); + try (StatementClient client = newStatementClient(httpClient, clientSession, "SELECT * FROM tpch.tiny.nation")) { + // wait for query to be fully scheduled + while (client.isRunning() && !client.currentStatusInfo().getStats().isScheduled()) { + client.advance(); + } - // wait for query to be fully scheduled - while (client.isRunning() && !client.currentStatusInfo().getStats().isScheduled()) { - client.advance(); + return client.currentStatusInfo().getError(); } - - return client.currentStatusInfo().getError(); } finally { // close the client since, query is not managed by the client protocol