Skip to content

Commit 1245cde

Browse files
committed
Handle long running queries in DirectTrinoClient
When a query being executed by DirectTrinoClient takes longer than the query.client.timeout, the query is then canceled. This client is used in some cases when executing queries from within the coordinator
1 parent 2fb0bbe commit 1245cde

File tree

5 files changed

+120
-4
lines changed

5 files changed

+120
-4
lines changed

core/trino-main/src/main/java/io/trino/client/direct/DirectTrinoClient.java

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.trino.dispatcher.DispatchQuery;
2222
import io.trino.exchange.DirectExchangeInput;
2323
import io.trino.execution.QueryManager;
24+
import io.trino.execution.QueryManagerConfig;
2425
import io.trino.execution.QueryState;
2526
import io.trino.execution.buffer.PageDeserializer;
2627
import io.trino.memory.context.SimpleLocalMemoryContext;
@@ -40,6 +41,8 @@
4041
import java.util.List;
4142
import java.util.Optional;
4243
import java.util.concurrent.ExecutionException;
44+
import java.util.concurrent.TimeUnit;
45+
import java.util.concurrent.TimeoutException;
4346

4447
import static io.airlift.concurrent.MoreFutures.whenAnyComplete;
4548
import static io.trino.SystemSessionProperties.getRetryPolicy;
@@ -56,13 +59,20 @@ public class DirectTrinoClient
5659
private final QueryManager queryManager;
5760
private final DirectExchangeClientSupplier directExchangeClientSupplier;
5861
private final BlockEncodingSerde blockEncodingSerde;
62+
private final long heartBeatIntervalMillis;
5963

60-
public DirectTrinoClient(DispatchManager dispatchManager, QueryManager queryManager, DirectExchangeClientSupplier directExchangeClientSupplier, BlockEncodingSerde blockEncodingSerde)
64+
public DirectTrinoClient(
65+
DispatchManager dispatchManager,
66+
QueryManager queryManager,
67+
QueryManagerConfig queryManagerConfig,
68+
DirectExchangeClientSupplier directExchangeClientSupplier,
69+
BlockEncodingSerde blockEncodingSerde)
6170
{
6271
this.dispatchManager = requireNonNull(dispatchManager, "dispatchManager is null");
6372
this.queryManager = requireNonNull(queryManager, "queryManager is null");
6473
this.directExchangeClientSupplier = requireNonNull(directExchangeClientSupplier, "directExchangeClientSupplier is null");
6574
this.blockEncodingSerde = requireNonNull(blockEncodingSerde, "blockEncodingSerde is null");
75+
this.heartBeatIntervalMillis = requireNonNull(queryManagerConfig, "queryManagerConfig is null").getClientTimeout().toMillis() / 2;
6676
}
6777

6878
public DispatchQuery execute(SessionContext sessionContext, @Language("SQL") String sql, QueryResultsListener queryResultsListener)
@@ -103,7 +113,27 @@ public DispatchQuery execute(SessionContext sessionContext, @Language("SQL") Str
103113
Page page = pageDeserializer.deserialize(serializedPage);
104114
queryResultsListener.consumeOutputPage(page);
105115
}
106-
getQueryFuture(whenAnyComplete(ImmutableList.of(queryManager.getStateChange(queryId, state), exchangeClient.isBlocked())));
116+
117+
ListenableFuture<Object> anyCompleteFuture = whenAnyComplete(ImmutableList.of(
118+
queryManager.getStateChange(queryId, state),
119+
exchangeClient.isBlocked()));
120+
while (!anyCompleteFuture.isDone()) {
121+
try {
122+
anyCompleteFuture.get(heartBeatIntervalMillis, TimeUnit.MILLISECONDS);
123+
}
124+
catch (TimeoutException e) {
125+
// continue waiting until the query state changes or the exchange client is blocked.
126+
// we need to periodically record the heartbeat to prevent the query from being canceled
127+
dispatchQuery.recordHeartbeat();
128+
}
129+
catch (InterruptedException e) {
130+
Thread.currentThread().interrupt();
131+
throw new TrinoException(GENERIC_INTERNAL_ERROR, "Thread interrupted", e);
132+
}
133+
catch (ExecutionException e) {
134+
throw new TrinoException(GENERIC_INTERNAL_ERROR, "Error processing query", e.getCause());
135+
}
136+
}
107137
}
108138
}
109139

core/trino-main/src/main/java/io/trino/server/testing/TestingTrinoServer.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -758,6 +758,19 @@ public Builder addProperty(String name, String value)
758758
return this;
759759
}
760760

761+
public Builder overrideProperties(Map<String, String> properties)
762+
{
763+
ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
764+
this.properties.forEach((k, v) -> {
765+
if (!properties.containsKey(k)) {
766+
builder.put(k, v);
767+
}
768+
});
769+
builder.putAll(properties);
770+
this.properties = builder.buildOrThrow();
771+
return this;
772+
}
773+
761774
public Builder setProperties(Map<String, String> properties)
762775
{
763776
this.properties = ImmutableMap.copyOf(requireNonNull(properties, "properties is null"));

core/trino-main/src/main/java/io/trino/testing/StandaloneQueryRunner.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.trino.Session;
2323
import io.trino.cost.StatsCalculator;
2424
import io.trino.execution.FailureInjector.InjectedFailureType;
25+
import io.trino.execution.QueryManagerConfig;
2526
import io.trino.execution.warnings.WarningCollector;
2627
import io.trino.metadata.FunctionBundle;
2728
import io.trino.metadata.MetadataUtil;
@@ -91,6 +92,7 @@ public StandaloneQueryRunner(Session defaultSession, Consumer<TestingTrinoServer
9192
this.trinoClient = new TestingDirectTrinoClient(
9293
server.getDispatchManager(),
9394
server.getQueryManager(),
95+
server.getInstance(Key.get(QueryManagerConfig.class)),
9496
server.getInstance(Key.get(DirectExchangeClientSupplier.class)),
9597
server.getInstance(Key.get(BlockEncodingSerde.class)));
9698
}

core/trino-main/src/main/java/io/trino/testing/TestingDirectTrinoClient.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.trino.dispatcher.DispatchQuery;
2121
import io.trino.execution.QueryInfo;
2222
import io.trino.execution.QueryManager;
23+
import io.trino.execution.QueryManagerConfig;
2324
import io.trino.operator.DirectExchangeClientSupplier;
2425
import io.trino.server.ResultQueryInfo;
2526
import io.trino.server.SessionContext;
@@ -49,9 +50,9 @@ public class TestingDirectTrinoClient
4950
{
5051
private final DirectTrinoClient directTrinoClient;
5152

52-
public TestingDirectTrinoClient(DispatchManager dispatchManager, QueryManager queryManager, DirectExchangeClientSupplier directExchangeClientSupplier, BlockEncodingSerde blockEncodingSerde)
53+
public TestingDirectTrinoClient(DispatchManager dispatchManager, QueryManager queryManager, QueryManagerConfig queryManagerConfig, DirectExchangeClientSupplier directExchangeClientSupplier, BlockEncodingSerde blockEncodingSerde)
5354
{
54-
directTrinoClient = new DirectTrinoClient(dispatchManager, queryManager, directExchangeClientSupplier, blockEncodingSerde);
55+
directTrinoClient = new DirectTrinoClient(dispatchManager, queryManager, queryManagerConfig, directExchangeClientSupplier, blockEncodingSerde);
5556
}
5657

5758
public Result execute(Session session, @Language("SQL") String sql)
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Copyright Starburst Data, Inc. All rights reserved.
3+
*
4+
* THIS IS UNPUBLISHED PROPRIETARY SOURCE CODE OF STARBURST DATA.
5+
* The copyright notice above does not evidence any
6+
* actual or intended publication of such source code.
7+
*
8+
* Redistribution of this material is strictly prohibited.
9+
*/
10+
package io.trino.client.direct;
11+
12+
import com.google.common.collect.ImmutableMap;
13+
import io.trino.plugin.blackhole.BlackHolePlugin;
14+
import io.trino.testing.QueryFailedException;
15+
import io.trino.testing.QueryRunner;
16+
import io.trino.testing.StandaloneQueryRunner;
17+
import org.junit.jupiter.api.BeforeAll;
18+
import org.junit.jupiter.api.Test;
19+
import org.junit.jupiter.api.TestInstance;
20+
import org.junit.jupiter.api.parallel.Execution;
21+
22+
import java.util.Map;
23+
24+
import static io.trino.SessionTestUtils.TEST_SESSION;
25+
import static java.lang.String.format;
26+
import static org.assertj.core.api.Fail.fail;
27+
import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS;
28+
import static org.junit.jupiter.api.parallel.ExecutionMode.CONCURRENT;
29+
30+
@TestInstance(PER_CLASS)
31+
@Execution(CONCURRENT)
32+
public class TestDirectTrinoClient
33+
{
34+
private QueryRunner queryRunner;
35+
36+
@BeforeAll
37+
public void setup()
38+
throws Exception
39+
{
40+
queryRunner = new StandaloneQueryRunner(
41+
TEST_SESSION,
42+
builder -> builder.overrideProperties(ImmutableMap.of(
43+
"query.client.timeout", "1s")));
44+
queryRunner.installPlugin(new BlackHolePlugin());
45+
queryRunner.createCatalog("blackhole", "blackhole");
46+
queryRunner.execute("CREATE TABLE blackhole.test_table (col1 VARCHAR, col2 INTEGER)" +
47+
"WITH (" +
48+
" split_count = 1, " +
49+
" pages_per_split = 1, " +
50+
" rows_per_page = 1, " +
51+
" page_processing_delay = '10s'" +
52+
")");
53+
}
54+
55+
@Test
56+
public void testDirectTrinoClientLongQuery()
57+
{
58+
String sql = "SELECT * FROM blackhole.test_table";
59+
try {
60+
queryRunner.execute(TEST_SESSION, sql);
61+
}
62+
catch (QueryFailedException e) {
63+
fail(format("Expected query %s to succeed: %s", e.getQueryId(), sql), e);
64+
}
65+
catch (RuntimeException e) {
66+
fail("Expected query to succeed", e);
67+
}
68+
}
69+
}
70+

0 commit comments

Comments
 (0)