diff --git a/presto-docs/src/main/sphinx/admin/properties-session.rst b/presto-docs/src/main/sphinx/admin/properties-session.rst index 1f55bd6939b8e..ab0efbc74a363 100644 --- a/presto-docs/src/main/sphinx/admin/properties-session.rst +++ b/presto-docs/src/main/sphinx/admin/properties-session.rst @@ -406,3 +406,16 @@ Setting a duration controls how long to cache data. The value represents the max background fetch threads for refreshing metadata. +Query Manager Properties +------------------------ + +``query_client_timeout`` +^^^^^^^^^^^^^^^^^^^^^^^^ + +* **Type:** ``Duration`` +* **Default value:** ``5m`` + +This property can be used to configure how long a query runs without contact +from the client application, such as the CLI, before it's abandoned. + +The corresponding configuration property is :ref:`admin/properties:\`\`query.client.timeout\`\``. \ No newline at end of file diff --git a/presto-docs/src/main/sphinx/admin/properties.rst b/presto-docs/src/main/sphinx/admin/properties.rst index ba9d30e758b08..997515dae4620 100644 --- a/presto-docs/src/main/sphinx/admin/properties.rst +++ b/presto-docs/src/main/sphinx/admin/properties.rst @@ -1090,4 +1090,16 @@ system will keep logs for the past 15 days. * **Type:** ``data size`` * **Default value:** ``100MB`` -The maximum file size for the log file of the HTTP server. \ No newline at end of file +The maximum file size for the log file of the HTTP server. + +Query Manager Properties +------------------------ + +``query.client.timeout`` +^^^^^^^^^^^^^^^^^^^^^^^^ + +* **Type:** ``Duration`` +* **Default value:** ``5m`` + +This property can be used to configure how long a query runs without contact +from the client application, such as the CLI, before it's abandoned. \ No newline at end of file diff --git a/presto-main-base/src/main/java/com/facebook/presto/SystemSessionProperties.java b/presto-main-base/src/main/java/com/facebook/presto/SystemSessionProperties.java index bfbd888ad8c44..f1e94ae0debd3 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/SystemSessionProperties.java +++ b/presto-main-base/src/main/java/com/facebook/presto/SystemSessionProperties.java @@ -330,6 +330,7 @@ public final class SystemSessionProperties public static final String SINGLE_NODE_EXECUTION_ENABLED = "single_node_execution_enabled"; public static final String EXPRESSION_OPTIMIZER_NAME = "expression_optimizer_name"; public static final String ADD_EXCHANGE_BELOW_PARTIAL_AGGREGATION_OVER_GROUP_ID = "add_exchange_below_partial_aggregation_over_group_id"; + public static final String QUERY_CLIENT_TIMEOUT = "query_client_timeout"; // TODO: Native execution related session properties that are temporarily put here. They will be relocated in the future. public static final String NATIVE_AGGREGATION_SPILL_ALL = "native_aggregation_spill_all"; @@ -1890,7 +1891,16 @@ public SystemSessionProperties( booleanProperty(ADD_EXCHANGE_BELOW_PARTIAL_AGGREGATION_OVER_GROUP_ID, "Enable adding an exchange below partial aggregation over a GroupId node to improve partial aggregation performance", featuresConfig.getAddExchangeBelowPartialAggregationOverGroupId(), - false)); + false), + new PropertyMetadata<>( + QUERY_CLIENT_TIMEOUT, + "Configures how long the query runs without contact from the client application, such as the CLI, before it's abandoned", + VARCHAR, + Duration.class, + queryManagerConfig.getClientTimeout(), + false, + value -> Duration.valueOf((String) value), + Duration::toString)); } public static boolean isSpoolingOutputBufferEnabled(Session session) @@ -3221,4 +3231,9 @@ public static boolean isCanonicalizedJsonExtract(Session session) { return session.getSystemProperty(CANONICALIZED_JSON_EXTRACT, Boolean.class); } + + public static Duration getQueryClientTimeout(Session session) + { + return session.getSystemProperty(QUERY_CLIENT_TIMEOUT, Duration.class); + } } diff --git a/presto-main-base/src/main/java/com/facebook/presto/execution/QueryTracker.java b/presto-main-base/src/main/java/com/facebook/presto/execution/QueryTracker.java index b52c07c759eaf..d6290b664cf2a 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/execution/QueryTracker.java +++ b/presto-main-base/src/main/java/com/facebook/presto/execution/QueryTracker.java @@ -41,6 +41,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import static com.facebook.presto.SystemSessionProperties.getQueryClientTimeout; import static com.facebook.presto.SystemSessionProperties.getQueryMaxExecutionTime; import static com.facebook.presto.SystemSessionProperties.getQueryMaxRunTime; import static com.facebook.presto.execution.QueryLimit.Source.QUERY; @@ -74,8 +75,6 @@ public class QueryTracker private final ConcurrentMap queries = new ConcurrentHashMap<>(); private final Queue expirationQueue = new LinkedBlockingQueue<>(); - private final Duration clientTimeout; - private final ScheduledExecutorService queryManagementExecutor; @GuardedBy("this") @@ -88,7 +87,6 @@ public QueryTracker(QueryManagerConfig queryManagerConfig, ScheduledExecutorServ requireNonNull(queryManagerConfig, "queryManagerConfig is null"); this.minQueryExpireAge = queryManagerConfig.getMinQueryExpireAge(); this.maxQueryHistory = queryManagerConfig.getMaxQueryHistory(); - this.clientTimeout = queryManagerConfig.getClientTimeout(); this.maxTotalRunningTaskCountToKillQuery = queryManagerConfig.getMaxTotalRunningTaskCountToKillQuery(); this.maxQueryRunningTaskCount = queryManagerConfig.getMaxQueryRunningTaskCount(); @@ -384,7 +382,8 @@ private void failAbandonedQueries() private boolean isAbandoned(T query) { - long oldestAllowedHeartbeatInMillis = currentTimeMillis() - clientTimeout.toMillis(); + Duration queryClientTimeout = getQueryClientTimeout(query.getSession()); + long oldestAllowedHeartbeatInMillis = currentTimeMillis() - queryClientTimeout.toMillis(); long lastHeartbeat = query.getLastHeartbeatInMillis(); return lastHeartbeat > 0 && lastHeartbeat < oldestAllowedHeartbeatInMillis; diff --git a/presto-tests/src/test/java/com/facebook/presto/tests/TestQueryManager.java b/presto-tests/src/test/java/com/facebook/presto/tests/TestQueryManager.java index 815546f729f75..d64195dc2bae2 100644 --- a/presto-tests/src/test/java/com/facebook/presto/tests/TestQueryManager.java +++ b/presto-tests/src/test/java/com/facebook/presto/tests/TestQueryManager.java @@ -14,6 +14,7 @@ package com.facebook.presto.tests; import com.facebook.presto.Session; +import com.facebook.presto.SystemSessionProperties; import com.facebook.presto.common.RuntimeStats; import com.facebook.presto.cost.StatsAndCosts; import com.facebook.presto.dispatcher.DispatchManager; @@ -63,6 +64,7 @@ import static com.facebook.presto.execution.TestQueryRunnerUtil.waitForQueryState; import static com.facebook.presto.execution.resourceGroups.db.H2TestUtil.getSimpleQueryRunner; import static com.facebook.presto.operator.BlockedReason.WAITING_FOR_MEMORY; +import static com.facebook.presto.spi.StandardErrorCode.ABANDONED_QUERY; import static com.facebook.presto.spi.StandardErrorCode.EXCEEDED_CPU_LIMIT; import static com.facebook.presto.spi.StandardErrorCode.EXCEEDED_GLOBAL_MEMORY_LIMIT; import static com.facebook.presto.spi.StandardErrorCode.EXCEEDED_OUTPUT_POSITIONS_LIMIT; @@ -273,6 +275,26 @@ public void testQueryOutputSizeExceeded() } } + @Test(timeOut = 60_000L) + public void testQueryClientTimeoutExceeded() + throws Exception + { + Session session = Session.builder(TEST_SESSION) + .setSystemProperty(SystemSessionProperties.QUERY_CLIENT_TIMEOUT, "1s") + .build(); + + try (DistributedQueryRunner queryRunner = builder().setSingleExtraProperty("query.client.timeout", "3m").build()) { + QueryId queryId = createQuery(queryRunner, session, "SELECT COUNT(*) FROM lineitem"); + waitForQueryState(queryRunner, queryId, FAILED); + QueryManager queryManager = queryRunner.getCoordinator().getQueryManager(); + BasicQueryInfo queryInfo = queryManager.getQueryInfo(queryId); + assertEquals(queryInfo.getState(), FAILED); + assertEquals(queryInfo.getErrorCode(), ABANDONED_QUERY.toErrorCode()); + assertEquals(queryManager.getQuerySession(queryId).getAccessControlContext().getSchema(), session.getSchema()); + assertEquals(queryManager.getQuerySession(queryId).getAccessControlContext().getCatalog(), session.getCatalog()); + } + } + @Test public void testQueryCountMetrics() throws Exception