Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class QueryManagerConfig
private Duration clientTimeout = new Duration(5, TimeUnit.MINUTES);

private int queryManagerExecutorPoolSize = 5;
private int queryExecutorPoolSize = 1000;

private Duration remoteTaskMaxErrorDuration = new Duration(5, TimeUnit.MINUTES);
private int remoteTaskMaxCallbackThreads = 1000;
Expand Down Expand Up @@ -261,6 +262,19 @@ public QueryManagerConfig setQueryManagerExecutorPoolSize(int queryManagerExecut
return this;
}

@Min(1)
public int getQueryExecutorPoolSize()
{
return queryExecutorPoolSize;
}

@Config("query.executor-pool-size")
public QueryManagerConfig setQueryExecutorPoolSize(int queryExecutorPoolSize)
{
this.queryExecutorPoolSize = queryExecutorPoolSize;
return this;
}

@Deprecated
public Duration getRemoteTaskMinErrorDuration()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import io.trino.execution.QueryExecutionMBean;
import io.trino.execution.QueryIdGenerator;
import io.trino.execution.QueryManager;
import io.trino.execution.QueryManagerConfig;
import io.trino.execution.QueryPerformanceFetcher;
import io.trino.execution.QueryPreparer;
import io.trino.execution.RemoteTaskFactory;
Expand Down Expand Up @@ -119,7 +120,9 @@

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;

import static com.google.inject.multibindings.MapBinder.newMapBinder;
import static com.google.inject.multibindings.Multibinder.newSetBinder;
Expand Down Expand Up @@ -305,8 +308,15 @@ protected void setup(Binder binder)
.toInstance(newSingleThreadScheduledExecutor(threadsNamed("stage-scheduler")));

// query execution
binder.bind(ExecutorService.class).annotatedWith(ForQueryExecution.class)
.toInstance(newCachedThreadPool(threadsNamed("query-execution-%s")));
QueryManagerConfig queryManagerConfig = buildConfigObject(QueryManagerConfig.class);
ThreadPoolExecutor queryExecutor = new ThreadPoolExecutor(
queryManagerConfig.getQueryExecutorPoolSize(),
queryManagerConfig.getQueryExecutorPoolSize(),
60, SECONDS,
new LinkedBlockingQueue<>(1000),
threadsNamed("query-execution-%s"));
queryExecutor.allowCoreThreadTimeOut(true);
binder.bind(ExecutorService.class).annotatedWith(ForQueryExecution.class).toInstance(queryExecutor);
binder.bind(QueryExecutionMBean.class).in(Scopes.SINGLETON);
newExporter(binder).export(QueryExecutionMBean.class)
.as(generator -> generator.generatedNameOf(QueryExecution.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public void testDefaults()
.setMaxQueuedQueries(5000)
.setHashPartitionCount(100)
.setQueryManagerExecutorPoolSize(5)
.setQueryExecutorPoolSize(1000)
.setRemoteTaskMinErrorDuration(new Duration(5, MINUTES))
.setRemoteTaskMaxErrorDuration(new Duration(5, MINUTES))
.setRemoteTaskMaxCallbackThreads(1000)
Expand Down Expand Up @@ -95,6 +96,7 @@ public void testExplicitPropertyMappings()
.put("query.max-queued-queries", "15")
.put("query.hash-partition-count", "16")
.put("query.manager-executor-pool-size", "11")
.put("query.executor-pool-size", "111")
.put("query.remote-task.min-error-duration", "30s")
.put("query.remote-task.max-error-duration", "60s")
.put("query.remote-task.max-callback-threads", "10")
Expand Down Expand Up @@ -136,6 +138,7 @@ public void testExplicitPropertyMappings()
.setMaxQueuedQueries(15)
.setHashPartitionCount(16)
.setQueryManagerExecutorPoolSize(11)
.setQueryExecutorPoolSize(111)
.setRemoteTaskMinErrorDuration(new Duration(60, SECONDS))
.setRemoteTaskMaxErrorDuration(new Duration(60, SECONDS))
.setRemoteTaskMaxCallbackThreads(10)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,13 @@
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.airlift.units.Duration;
import io.trino.Session;
import io.trino.execution.QueryInfo;
import io.trino.execution.QueryManager;
import io.trino.execution.QueryState;
import io.trino.execution.QueryStats;
import io.trino.execution.SqlTaskManager;
import io.trino.execution.TaskId;
import io.trino.execution.TaskInfo;
import io.trino.execution.warnings.WarningCollector;
import io.trino.memory.LocalMemoryManager;
import io.trino.memory.MemoryPool;
Expand Down Expand Up @@ -56,6 +61,7 @@

import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.function.Consumer;
Expand Down Expand Up @@ -106,6 +112,8 @@ public final void close()
{
try (afterClassCloser) {
checkQueryMemoryReleased();
checkQueryInfosFinal();
checkTasksDone();
}
finally {
queryRunner = null;
Expand All @@ -116,27 +124,20 @@ public final void close()

private void checkQueryMemoryReleased()
{
if (queryRunner == null) {
return;
}
if (!(queryRunner instanceof DistributedQueryRunner)) {
return;
}
DistributedQueryRunner distributedQueryRunner = (DistributedQueryRunner) queryRunner;
assertEventually(
tryGetDistributedQueryRunner().ifPresent(runner -> assertEventually(
new Duration(30, SECONDS),
new Duration(1, SECONDS),
() -> {
List<TestingTrinoServer> servers = distributedQueryRunner.getServers();
List<TestingTrinoServer> servers = runner.getServers();
for (int serverId = 0; serverId < servers.size(); ++serverId) {
TestingTrinoServer server = servers.get(serverId);
assertMemoryPoolReleased(distributedQueryRunner.getCoordinator(), server, serverId);
assertMemoryPoolReleased(runner.getCoordinator(), server, serverId);
}

assertThat(distributedQueryRunner.getCoordinator().getClusterMemoryManager().getClusterTotalMemoryReservation())
assertThat(runner.getCoordinator().getClusterMemoryManager().getClusterTotalMemoryReservation())
.describedAs("cluster memory reservation")
.isZero();
});
}));
}

private void assertMemoryPoolReleased(TestingTrinoServer coordinator, TestingTrinoServer server, long serverId)
Expand Down Expand Up @@ -178,6 +179,55 @@ private String describeMemoryPool(TestingTrinoServer coordinator, TestingTrinoSe
return result.toString();
}

private void checkQueryInfosFinal()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there may be some tests which schedule queries in the background and do not necessarily wait for their completion. But we can address that if we see this becoming flaky.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

{
tryGetDistributedQueryRunner().ifPresent(runner -> assertEventually(
new Duration(30, SECONDS),
new Duration(1, SECONDS),
() -> {
TestingTrinoServer coordinator = runner.getCoordinator();
QueryManager queryManager = coordinator.getQueryManager();
for (BasicQueryInfo basicQueryInfo : queryManager.getQueries()) {
QueryId queryId = basicQueryInfo.getQueryId();
if (!basicQueryInfo.getState().isDone()) {
fail("query is expected to be in done state: " + basicQueryInfo.getQuery());
}
QueryInfo queryInfo = queryManager.getFullQueryInfo(queryId);
if (!queryInfo.isFinalQueryInfo()) {
fail("QueryInfo is expected to be final: " + basicQueryInfo.getQuery());
}
}
}));
}

private void checkTasksDone()
{
tryGetDistributedQueryRunner().ifPresent(runner -> assertEventually(
new Duration(30, SECONDS),
new Duration(1, SECONDS),
() -> {
QueryManager queryManager = runner.getCoordinator().getQueryManager();
List<TestingTrinoServer> servers = runner.getServers();
for (TestingTrinoServer server : servers) {
SqlTaskManager taskManager = server.getTaskManager();
List<TaskInfo> taskInfos = taskManager.getAllTaskInfo();
for (TaskInfo taskInfo : taskInfos) {
TaskId taskId = taskInfo.getTaskStatus().getTaskId();
QueryId queryId = taskId.getQueryId();
String query = "unknown";
try {
query = queryManager.getQueryInfo(queryId).getQuery();
}
catch (NoSuchElementException ignored) {
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: move to previous line

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I move it to the previous line it doesn't pass the style check

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. I have checkstyle set up in IntelliJ and it did not complain. 🤷

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, my Intellij doesn't complain either (though it is trying to format it with a new line when I try to autoformat). But then the check during the build fails.

if (!taskInfo.getTaskStatus().getState().isDone()) {
fail("Task is expected to be in done state. TaskId: %s, QueryId: %s, Query: %s ".formatted(taskId, queryId, query));
}
}
}
}));
}

@Test
public void ensureTestNamingConvention()
{
Expand Down Expand Up @@ -536,6 +586,14 @@ protected final DistributedQueryRunner getDistributedQueryRunner()
return (DistributedQueryRunner) queryRunner;
}

private Optional<DistributedQueryRunner> tryGetDistributedQueryRunner()
{
if (queryRunner != null && queryRunner instanceof DistributedQueryRunner runner) {
return Optional.of(runner);
}
return Optional.empty();
}

protected Session noJoinReordering()
{
return noJoinReordering(JoinDistributionType.PARTITIONED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public static Map<String, String> getExtraProperties()
// to trigger spilling
.put("exchange.deduplication-buffer-size", "1kB")
.put("fault-tolerant-execution-task-memory", "1GB")
// limit number of threads to detect potential thread leaks
.put("query.executor-pool-size", "10")
.buildOrThrow();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,14 @@ private QueryError trySelectQuery(String assumedUser)
.build();

// start query
StatementClient client = newStatementClient(httpClient, clientSession, "SELECT * FROM tpch.tiny.nation");
try (StatementClient client = newStatementClient(httpClient, clientSession, "SELECT * FROM tpch.tiny.nation")) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: look unrelated - separate commit?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the statement is not closed the query never finishes and doesn't receive a final TaskInfo making the newly introduced check fail

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still should be a preparatory commit. So adding checks and making project actually pass the checks are separate commits. (fixing makes sense even if we do not have checks).

// wait for query to be fully scheduled
while (client.isRunning() && !client.currentStatusInfo().getStats().isScheduled()) {
client.advance();
}

// wait for query to be fully scheduled
while (client.isRunning() && !client.currentStatusInfo().getStats().isScheduled()) {
client.advance();
return client.currentStatusInfo().getError();
}

return client.currentStatusInfo().getError();
}
finally {
// close the client since, query is not managed by the client protocol
Expand Down