Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions presto-docs/src/main/sphinx/admin/properties-session.rst
Original file line number Diff line number Diff line change
Expand Up @@ -406,3 +406,16 @@ Setting a duration controls how long to cache data.

The value represents the max background fetch threads for refreshing metadata.

Query Manager Properties
------------------------

``query_client_timeout``
^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``Duration``
* **Default value:** ``5m``

This property can be used to configure how long a query runs without contact
from the client application, such as the CLI, before it's abandoned.

The corresponding configuration property is :ref:`admin/properties:\`\`query.client.timeout\`\``.
14 changes: 13 additions & 1 deletion presto-docs/src/main/sphinx/admin/properties.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1090,4 +1090,16 @@ system will keep logs for the past 15 days.
* **Type:** ``data size``
* **Default value:** ``100MB``

The maximum file size for the log file of the HTTP server.
The maximum file size for the log file of the HTTP server.

Query Manager Properties
------------------------

``query.client.timeout``
^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``Duration``
* **Default value:** ``5m``

This property can be used to configure how long a query runs without contact
from the client application, such as the CLI, before it's abandoned.
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ public final class SystemSessionProperties
public static final String SINGLE_NODE_EXECUTION_ENABLED = "single_node_execution_enabled";
public static final String EXPRESSION_OPTIMIZER_NAME = "expression_optimizer_name";
public static final String ADD_EXCHANGE_BELOW_PARTIAL_AGGREGATION_OVER_GROUP_ID = "add_exchange_below_partial_aggregation_over_group_id";
public static final String QUERY_CLIENT_TIMEOUT = "query_client_timeout";

// TODO: Native execution related session properties that are temporarily put here. They will be relocated in the future.
public static final String NATIVE_AGGREGATION_SPILL_ALL = "native_aggregation_spill_all";
Expand Down Expand Up @@ -1890,7 +1891,16 @@ public SystemSessionProperties(
booleanProperty(ADD_EXCHANGE_BELOW_PARTIAL_AGGREGATION_OVER_GROUP_ID,
"Enable adding an exchange below partial aggregation over a GroupId node to improve partial aggregation performance",
featuresConfig.getAddExchangeBelowPartialAggregationOverGroupId(),
false));
false),
new PropertyMetadata<>(
QUERY_CLIENT_TIMEOUT,
"Configures how long the query runs without contact from the client application, such as the CLI, before it's abandoned",
VARCHAR,
Duration.class,
queryManagerConfig.getClientTimeout(),
false,
value -> Duration.valueOf((String) value),
Duration::toString));
}

public static boolean isSpoolingOutputBufferEnabled(Session session)
Expand Down Expand Up @@ -3221,4 +3231,9 @@ public static boolean isCanonicalizedJsonExtract(Session session)
{
return session.getSystemProperty(CANONICALIZED_JSON_EXTRACT, Boolean.class);
}

public static Duration getQueryClientTimeout(Session session)
{
return session.getSystemProperty(QUERY_CLIENT_TIMEOUT, Duration.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import static com.facebook.presto.SystemSessionProperties.getQueryClientTimeout;
import static com.facebook.presto.SystemSessionProperties.getQueryMaxExecutionTime;
import static com.facebook.presto.SystemSessionProperties.getQueryMaxRunTime;
import static com.facebook.presto.execution.QueryLimit.Source.QUERY;
Expand Down Expand Up @@ -74,8 +75,6 @@ public class QueryTracker<T extends TrackedQuery>
private final ConcurrentMap<QueryId, T> queries = new ConcurrentHashMap<>();
private final Queue<T> expirationQueue = new LinkedBlockingQueue<>();

private final Duration clientTimeout;

private final ScheduledExecutorService queryManagementExecutor;

@GuardedBy("this")
Expand All @@ -88,7 +87,6 @@ public QueryTracker(QueryManagerConfig queryManagerConfig, ScheduledExecutorServ
requireNonNull(queryManagerConfig, "queryManagerConfig is null");
this.minQueryExpireAge = queryManagerConfig.getMinQueryExpireAge();
this.maxQueryHistory = queryManagerConfig.getMaxQueryHistory();
this.clientTimeout = queryManagerConfig.getClientTimeout();
this.maxTotalRunningTaskCountToKillQuery = queryManagerConfig.getMaxTotalRunningTaskCountToKillQuery();
this.maxQueryRunningTaskCount = queryManagerConfig.getMaxQueryRunningTaskCount();

Expand Down Expand Up @@ -384,7 +382,8 @@ private void failAbandonedQueries()

private boolean isAbandoned(T query)
{
long oldestAllowedHeartbeatInMillis = currentTimeMillis() - clientTimeout.toMillis();
Duration queryClientTimeout = getQueryClientTimeout(query.getSession());
long oldestAllowedHeartbeatInMillis = currentTimeMillis() - queryClientTimeout.toMillis();
long lastHeartbeat = query.getLastHeartbeatInMillis();

return lastHeartbeat > 0 && lastHeartbeat < oldestAllowedHeartbeatInMillis;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.facebook.presto.tests;

import com.facebook.presto.Session;
import com.facebook.presto.SystemSessionProperties;
import com.facebook.presto.common.RuntimeStats;
import com.facebook.presto.cost.StatsAndCosts;
import com.facebook.presto.dispatcher.DispatchManager;
Expand Down Expand Up @@ -63,6 +64,7 @@
import static com.facebook.presto.execution.TestQueryRunnerUtil.waitForQueryState;
import static com.facebook.presto.execution.resourceGroups.db.H2TestUtil.getSimpleQueryRunner;
import static com.facebook.presto.operator.BlockedReason.WAITING_FOR_MEMORY;
import static com.facebook.presto.spi.StandardErrorCode.ABANDONED_QUERY;
import static com.facebook.presto.spi.StandardErrorCode.EXCEEDED_CPU_LIMIT;
import static com.facebook.presto.spi.StandardErrorCode.EXCEEDED_GLOBAL_MEMORY_LIMIT;
import static com.facebook.presto.spi.StandardErrorCode.EXCEEDED_OUTPUT_POSITIONS_LIMIT;
Expand Down Expand Up @@ -273,6 +275,26 @@ public void testQueryOutputSizeExceeded()
}
}

@Test(timeOut = 60_000L)
public void testQueryClientTimeoutExceeded()
throws Exception
{
Session session = Session.builder(TEST_SESSION)
.setSystemProperty(SystemSessionProperties.QUERY_CLIENT_TIMEOUT, "1s")
.build();

try (DistributedQueryRunner queryRunner = builder().setSingleExtraProperty("query.client.timeout", "3m").build()) {
QueryId queryId = createQuery(queryRunner, session, "SELECT COUNT(*) FROM lineitem");
waitForQueryState(queryRunner, queryId, FAILED);
QueryManager queryManager = queryRunner.getCoordinator().getQueryManager();
BasicQueryInfo queryInfo = queryManager.getQueryInfo(queryId);
assertEquals(queryInfo.getState(), FAILED);
assertEquals(queryInfo.getErrorCode(), ABANDONED_QUERY.toErrorCode());
assertEquals(queryManager.getQuerySession(queryId).getAccessControlContext().getSchema(), session.getSchema());
assertEquals(queryManager.getQuerySession(queryId).getAccessControlContext().getCatalog(), session.getCatalog());
}
}

@Test
public void testQueryCountMetrics()
throws Exception
Expand Down
Loading