diff --git a/presto-docs/src/main/sphinx/admin/properties.rst b/presto-docs/src/main/sphinx/admin/properties.rst index 941a7ae53ceb2..56c0fa5a7439d 100644 --- a/presto-docs/src/main/sphinx/admin/properties.rst +++ b/presto-docs/src/main/sphinx/admin/properties.rst @@ -2,12 +2,12 @@ Presto Configuration Properties =============================== -This section describes configuration properties that may be used to tune +This section describes configuration properties that may be used to tune Presto or alter its behavior when required. -The following is not a complete list of all configuration properties +The following is not a complete list of all configuration properties available in Presto, and does not include any connector-specific -catalog configuration properties. +catalog configuration properties. For information on catalog configuration properties, see the :doc:`connector documentation `. @@ -40,9 +40,9 @@ only need to fit in distributed memory across all nodes. When set to ``AUTOMATIC Presto will make a cost based decision as to which distribution type is optimal. It will also consider switching the left and right inputs to the join. In ``AUTOMATIC`` mode, Presto will default to hash distributed joins if no cost could be computed, such as if -the tables do not have statistics. +the tables do not have statistics. -The corresponding session property is :ref:`admin/properties-session:\`\`join_distribution_type\`\``. +The corresponding session property is :ref:`admin/properties-session:\`\`join_distribution_type\`\``. ``redistribute-writes`` @@ -55,7 +55,7 @@ This property enables redistribution of data before writing. This can eliminate the performance impact of data skew when writing by hashing it across nodes in the cluster. It can be disabled when it is known that the output data set is not skewed in order to avoid the overhead of hashing and -redistributing all the data across the network. +redistributing all the data across the network. The corresponding session property is :ref:`admin/properties-session:\`\`redistribute_writes\`\``. @@ -124,8 +124,8 @@ session properties are included. * **Minimum value:** ``0`` * **Default value:** ``0`` -The number of times that a query is automatically retried in the case of a transient query or communications failure. -The default value ``0`` means that retries are disabled. +The number of times that a query is automatically retried in the case of a transient query or communications failure. +The default value ``0`` means that retries are disabled. ``http-server.max-request-header-size`` ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -133,10 +133,10 @@ The default value ``0`` means that retries are disabled. * **Type:** ``data size`` * **Default value:** ``8 kB`` -The maximum size of the request header from the HTTP server. +The maximum size of the request header from the HTTP server. -Note: The default value can cause errors when large session properties -or other large session information is involved. +Note: The default value can cause errors when large session properties +or other large session information is involved. See :ref:`troubleshoot/query:\`\`Request Header Fields Too Large\`\``. ``offset-clause-enabled`` @@ -147,7 +147,7 @@ See :ref:`troubleshoot/query:\`\`Request Header Fields Too Large\`\``. To enable the ``OFFSET`` clause in SQL query expressions, set this property to ``true``. -The corresponding session property is :ref:`admin/properties-session:\`\`offset_clause_enabled\`\``. +The corresponding session property is :ref:`admin/properties-session:\`\`offset_clause_enabled\`\``. ``max-serializable-object-size`` ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -178,7 +178,7 @@ The corresponding session property is :ref:`admin/properties-session:\`\`max_pre * **Type:** ``string`` * **Default value:** (none) -An optional identifier for the cluster. When set, this tag is included in the response from the +An optional identifier for the cluster. When set, this tag is included in the response from the ``/v1/cluster`` REST API endpoint, allowing clients to identify which cluster provided the response. Memory Management Properties @@ -279,7 +279,7 @@ for a list of operations that support spilling. Be aware that this is an experimental feature and should be used with care. -The corresponding session property is :ref:`admin/properties-session:\`\`spill_enabled\`\``. +The corresponding session property is :ref:`admin/properties-session:\`\`spill_enabled\`\``. ``experimental.join-spill-enabled`` ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -290,7 +290,7 @@ The corresponding session property is :ref:`admin/properties-session:\`\`spill_e When ``spill_enabled`` is ``true``, this determines whether Presto will try spilling memory to disk for joins to avoid exceeding memory limits for the query. -The corresponding session property is :ref:`admin/properties-session:\`\`join_spill_enabled\`\``. +The corresponding session property is :ref:`admin/properties-session:\`\`join_spill_enabled\`\``. ``experimental.aggregation-spill-enabled`` ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -301,7 +301,7 @@ The corresponding session property is :ref:`admin/properties-session:\`\`join_sp When ``spill_enabled`` is ``true``, this determines whether Presto will try spilling memory to disk for aggregations to avoid exceeding memory limits for the query. -The corresponding session property is :ref:`admin/properties-session:\`\`aggregation_spill_enabled\`\``. +The corresponding session property is :ref:`admin/properties-session:\`\`aggregation_spill_enabled\`\``. ``experimental.distinct-aggregation-spill-enabled`` ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -312,7 +312,7 @@ The corresponding session property is :ref:`admin/properties-session:\`\`aggrega When ``aggregation_spill_enabled`` is ``true``, this determines whether Presto will try spilling memory to disk for distinct aggregations to avoid exceeding memory limits for the query. -The corresponding session property is :ref:`admin/properties-session:\`\`distinct_aggregation_spill_enabled\`\``. +The corresponding session property is :ref:`admin/properties-session:\`\`distinct_aggregation_spill_enabled\`\``. ``experimental.order-by-aggregation-spill-enabled`` ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -323,7 +323,7 @@ The corresponding session property is :ref:`admin/properties-session:\`\`distinc When ``aggregation_spill_enabled`` is ``true``, this determines whether Presto will try spilling memory to disk for order by aggregations to avoid exceeding memory limits for the query. -The corresponding session property is :ref:`admin/properties-session:\`\`order_by_aggregation_spill_enabled\`\``. +The corresponding session property is :ref:`admin/properties-session:\`\`order_by_aggregation_spill_enabled\`\``. ``experimental.window-spill-enabled`` ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -334,7 +334,7 @@ The corresponding session property is :ref:`admin/properties-session:\`\`order_b When ``spill_enabled`` is ``true``, this determines whether Presto will try spilling memory to disk for window functions to avoid exceeding memory limits for the query. -The corresponding session property is :ref:`admin/properties-session:\`\`window_spill_enabled\`\``. +The corresponding session property is :ref:`admin/properties-session:\`\`window_spill_enabled\`\``. ``experimental.order-by-spill-enabled`` ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -345,7 +345,7 @@ The corresponding session property is :ref:`admin/properties-session:\`\`window_ When ``spill_enabled`` is ``true``, this determines whether Presto will try spilling memory to disk for order by to avoid exceeding memory limits for the query. -The corresponding session property is :ref:`admin/properties-session:\`\`order_by_spill_enabled\`\``. +The corresponding session property is :ref:`admin/properties-session:\`\`order_by_spill_enabled\`\``. ``experimental.spiller.task-spilling-strategy`` ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -471,7 +471,7 @@ Max spill space to be used by a single query on a single node. Limit for memory used for unspilling a single aggregation operator instance. -The corresponding session property is :ref:`admin/properties-session:\`\`aggregation_operator_unspill_memory_limit\`\``. +The corresponding session property is :ref:`admin/properties-session:\`\`aggregation_operator_unspill_memory_limit\`\``. ``experimental.spill-compression-codec`` ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -598,16 +598,16 @@ network has high latency or if there are many nodes in the cluster. * **Type:** ``boolean`` * **Default value:** ``false`` -Enables the use of custom connector-provided serialization codecs for handles. +Enables the use of custom connector-provided serialization codecs for handles. This feature allows connectors to use their own serialization format for handle objects (such as table handles, column handles, and splits) instead of standard JSON serialization. -When enabled, connectors that provide a ``ConnectorCodecProvider`` with -appropriate codecs will have their handles serialized using custom binary -formats, which are then Base64-encoded for transport. Connectors without -codec support automatically fall back to standard JSON serialization. -Internal Presto handles (prefixed with ``$``) always use JSON serialization +When enabled, connectors that provide a ``ConnectorCodecProvider`` with +appropriate codecs will have their handles serialized using custom binary +formats, which are then Base64-encoded for transport. Connectors without +codec support automatically fall back to standard JSON serialization. +Internal Presto handles (prefixed with ``$``) always use JSON serialization regardless of this setting. .. _task-properties: @@ -628,9 +628,9 @@ resource utilization. Lower values are better for clusters that run many queries concurrently because the cluster will already be utilized by all the running queries, so adding more concurrency will result in slow downs due to context switching and other overhead. Higher values are better for clusters that only run -one or a few queries at a time. +one or a few queries at a time. -The corresponding session property is :ref:`admin/properties-session:\`\`task_concurrency\`\``. +The corresponding session property is :ref:`admin/properties-session:\`\`task_concurrency\`\``. ``task.http-response-threads`` ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -721,9 +721,9 @@ The number of concurrent writer threads per worker per query. Increasing this va increase write speed, especially when a query is not I/O bound and can take advantage of additional CPU for parallel writes (some connectors can be bottlenecked on CPU when writing due to compression or other factors). Setting this too high may cause the cluster -to become overloaded due to excessive resource utilization. +to become overloaded due to excessive resource utilization. -The corresponding session property is :ref:`admin/properties-session:\`\`task_writer_count\`\``. +The corresponding session property is :ref:`admin/properties-session:\`\`task_writer_count\`\``. ``task.interrupt-runaway-splits-timeout`` ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -839,9 +839,9 @@ Optimizer Properties * **Type:** ``boolean`` * **Default value:** ``false`` -Enables optimization for aggregations on dictionaries. +Enables optimization for aggregations on dictionaries. -The corresponding session property is :ref:`admin/properties-session:\`\`dictionary_aggregation\`\``. +The corresponding session property is :ref:`admin/properties-session:\`\`dictionary_aggregation\`\``. ``optimizer.optimize-hash-generation`` ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -853,12 +853,12 @@ Compute hash codes for distribution, joins, and aggregations early during execut allowing result to be shared between operations later in the query. This can reduce CPU usage by avoiding computing the same hash multiple times, but at the cost of additional network transfer for the hashes. In most cases it will decrease overall -query processing time. +query processing time. It is often helpful to disable this property when using :doc:`/sql/explain` in order to make the query plan easier to read. -The corresponding session property is :ref:`admin/properties-session:\`\`optimize_hash_generation\`\``. +The corresponding session property is :ref:`admin/properties-session:\`\`optimize_hash_generation\`\``. ``optimizer.optimize-metadata-queries`` ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -904,9 +904,9 @@ over an outer join. For example:: Enabling this optimization can substantially speed up queries by reducing the amount of data that needs to be processed by the join. However, it may slow down some -queries that have very selective joins. +queries that have very selective joins. -The corresponding session property is :ref:`admin/properties-session:\`\`push_aggregation_through_join\`\``. +The corresponding session property is :ref:`admin/properties-session:\`\`push_aggregation_through_join\`\``. ``optimizer.push-table-write-through-union`` ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -918,9 +918,9 @@ Parallelize writes when using ``UNION ALL`` in queries that write data. This imp speed of writing output tables in ``UNION ALL`` queries because these writes do not require additional synchronization when collecting results. Enabling this optimization can improve ``UNION ALL`` speed when write speed is not yet saturated. However, it may slow down queries -in an already heavily loaded system. +in an already heavily loaded system. -The corresponding session property is :ref:`admin/properties-session:\`\`push_table_write_through_union\`\``. +The corresponding session property is :ref:`admin/properties-session:\`\`push_table_write_through_union\`\``. ``optimizer.join-reordering-strategy`` ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -934,9 +934,9 @@ query. ``ELIMINATE_CROSS_JOINS`` reorders joins to eliminate cross joins where otherwise maintains the original query order. When reordering joins it also strives to maintain the original table order as much as possible. ``AUTOMATIC`` enumerates possible orders and uses statistics-based cost estimation to determine the least cost order. If stats are not available or if -for any reason a cost could not be computed, the ``ELIMINATE_CROSS_JOINS`` strategy is used. +for any reason a cost could not be computed, the ``ELIMINATE_CROSS_JOINS`` strategy is used. -The corresponding session property is :ref:`admin/properties-session:\`\`join_reordering_strategy\`\``. +The corresponding session property is :ref:`admin/properties-session:\`\`join_reordering_strategy\`\``. ``optimizer.max-reordered-joins`` ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -1019,7 +1019,7 @@ Enable broadcasting based on the confidence of the statistics that are being use broadcasting the side of a joinNode which has the highest (``HIGH`` or ``FACT``) confidence statistics. If both sides have the same confidence statistics, then the original behavior will be followed. -The corresponding session property is :ref:`admin/properties-session:\`\`confidence_based_broadcast\`\``. +The corresponding session property is :ref:`admin/properties-session:\`\`confidence_based_broadcast\`\``. ``optimizer.treat-low-confidence-zero-estimation-as-unknown`` ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -1027,9 +1027,9 @@ The corresponding session property is :ref:`admin/properties-session:\`\`confide * **Type:** ``boolean`` * **Default value:** ``false`` -Enable treating ``LOW`` confidence, zero estimations as ``UNKNOWN`` during joins. +Enable treating ``LOW`` confidence, zero estimations as ``UNKNOWN`` during joins. -The corresponding session property is :ref:`admin/properties-session:\`\`treat-low-confidence-zero-estimation-as-unknown\`\``. +The corresponding session property is :ref:`admin/properties-session:\`\`treat-low-confidence-zero-estimation-as-unknown\`\``. ``optimizer.retry-query-with-history-based-optimization`` ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -1037,7 +1037,7 @@ The corresponding session property is :ref:`admin/properties-session:\`\`treat-l * **Type:** ``boolean`` * **Default value:** ``false`` -Enable retry for failed queries who can potentially be helped by HBO. +Enable retry for failed queries who can potentially be helped by HBO. The corresponding session property is :ref:`admin/properties-session:\`\`retry-query-with-history-based-optimization\`\``. @@ -1247,6 +1247,38 @@ Use to configure how long a query can be queued before it is terminated. The corresponding session property is :ref:`admin/properties-session:\`\`query_max_queued_time\`\``. +``query-manager.query-pacing.max-queries-per-second`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +* **Type:** ``integer`` +* **Minimum value:** ``1`` +* **Default value:** ``2147483647`` (unlimited) + +Maximum number of queries that can be admitted per second globally across +all resource groups. This property enables query admission pacing to prevent +worker overload when many queries start simultaneously. Pacing only activates +when the number of running queries exceeds the threshold configured by +``query-manager.query-pacing.min-running-queries``. + +Set to a lower value such as ``10`` to limit query admission rate during +periods of high cluster load. The default value effectively disables pacing. + +``query-manager.query-pacing.min-running-queries`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +* **Type:** ``integer`` +* **Minimum value:** ``0`` +* **Default value:** ``30`` + +Minimum number of running queries required before query admission pacing +is applied. When the total number of running queries is below this threshold, +queries are admitted immediately without rate limiting, regardless of the +``query-manager.query-pacing.max-queries-per-second`` setting. + +This allows the cluster to quickly ramp up when idle while still providing +protection against overload when the cluster is busy. Set to ``0`` to always +apply pacing when ``max-queries-per-second`` is configured. + Query Retry Properties ---------------------- diff --git a/presto-main-base/src/main/java/com/facebook/presto/execution/QueryManagerConfig.java b/presto-main-base/src/main/java/com/facebook/presto/execution/QueryManagerConfig.java index 8eb4ced151805..838f69f9fad87 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/execution/QueryManagerConfig.java +++ b/presto-main-base/src/main/java/com/facebook/presto/execution/QueryManagerConfig.java @@ -102,6 +102,10 @@ public class QueryManagerConfig private int minColumnarEncodingChannelsToPreferRowWiseEncoding = 1000; + private int maxQueryAdmissionsPerSecond = Integer.MAX_VALUE; + + private int minRunningQueriesForPacing = 30; + @Min(1) public int getScheduleSplitBatchSize() { @@ -766,6 +770,34 @@ public QueryManagerConfig setMinColumnarEncodingChannelsToPreferRowWiseEncoding( return this; } + @Min(1) + public int getMaxQueryAdmissionsPerSecond() + { + return maxQueryAdmissionsPerSecond; + } + + @Config("query-manager.query-pacing.max-queries-per-second") + @ConfigDescription("Maximum number of queries that can be admitted per second globally for admission pacing. Default is unlimited (Integer.MAX_VALUE). Set to a lower value (e.g., 1) to pace query admissions to one per second.") + public QueryManagerConfig setMaxQueryAdmissionsPerSecond(int maxQueryAdmissionsPerSecond) + { + this.maxQueryAdmissionsPerSecond = maxQueryAdmissionsPerSecond; + return this; + } + + @Min(0) + public int getMinRunningQueriesForPacing() + { + return minRunningQueriesForPacing; + } + + @Config("query-manager.query-pacing.min-running-queries") + @ConfigDescription("Minimum number of running queries before admission pacing is applied. Default is 30. Set to a higher value to only pace when cluster is busy.") + public QueryManagerConfig setMinRunningQueriesForPacing(int minRunningQueriesForPacing) + { + this.minRunningQueriesForPacing = minRunningQueriesForPacing; + return this; + } + public enum ExchangeMaterializationStrategy { NONE, diff --git a/presto-main-base/src/main/java/com/facebook/presto/execution/resourceGroups/InternalResourceGroup.java b/presto-main-base/src/main/java/com/facebook/presto/execution/resourceGroups/InternalResourceGroup.java index 82274c04b3550..9bd91021d34cc 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/execution/resourceGroups/InternalResourceGroup.java +++ b/presto-main-base/src/main/java/com/facebook/presto/execution/resourceGroups/InternalResourceGroup.java @@ -98,6 +98,7 @@ public class InternalResourceGroup private final Predicate shouldWaitForResourceManagerUpdate; private final InternalNodeManager nodeManager; private final ClusterResourceChecker clusterResourceChecker; + private final QueryPacingContext queryPacingContext; // Configuration // ============= @@ -169,13 +170,15 @@ protected InternalResourceGroup( Function> additionalRuntimeInfo, Predicate shouldWaitForResourceManagerUpdate, InternalNodeManager nodeManager, - ClusterResourceChecker clusterResourceChecker) + ClusterResourceChecker clusterResourceChecker, + QueryPacingContext queryPacingContext) { this.parent = requireNonNull(parent, "parent is null"); this.jmxExportListener = requireNonNull(jmxExportListener, "jmxExportListener is null"); this.executor = requireNonNull(executor, "executor is null"); this.nodeManager = requireNonNull(nodeManager, "node manager is null"); this.clusterResourceChecker = requireNonNull(clusterResourceChecker, "clusterResourceChecker is null"); + this.queryPacingContext = requireNonNull(queryPacingContext, "queryPacingContext is null"); requireNonNull(name, "name is null"); if (parent.isPresent()) { id = new ResourceGroupId(parent.get().id, name); @@ -676,7 +679,8 @@ public InternalResourceGroup getOrCreateSubGroup(String name, boolean staticSegm additionalRuntimeInfo, shouldWaitForResourceManagerUpdate, nodeManager, - clusterResourceChecker); + clusterResourceChecker, + queryPacingContext); // Sub group must use query priority to ensure ordering if (schedulingPolicy == QUERY_PRIORITY) { subGroup.setSchedulingPolicy(QUERY_PRIORITY); @@ -735,12 +739,14 @@ public void run(ManagedQueryExecution query) } else { query.setResourceGroupQueryLimits(perQueryLimits); - if (canRun && queuedQueries.isEmpty()) { + boolean immediateStartCandidate = canRun && queuedQueries.isEmpty(); + if (immediateStartCandidate && queryPacingContext.tryAcquireAdmissionSlot()) { startInBackground(query); } else { enqueueQuery(query); } + query.addStateChangeListener(state -> { if (state.isDone()) { queryFinished(query); @@ -807,6 +813,8 @@ private void startInBackground(ManagedQueryExecution query) group = group.parent.get(); } updateEligibility(); + // Increment global running query counter for pacing + queryPacingContext.onQueryStarted(); executor.execute(query::startWaitingForResources); group = this; long lastRunningQueryStartTimeMillis = currentTimeMillis(); @@ -840,6 +848,8 @@ private void queryFinished(ManagedQueryExecution query) group.parent.get().descendantRunningQueries--; group = group.parent.get(); } + // Decrement global running query counter for pacing + queryPacingContext.onQueryFinished(); } else { queuedQueries.remove(query); @@ -908,8 +918,13 @@ protected boolean internalStartNext() return false; } - ManagedQueryExecution query = queuedQueries.poll(); + ManagedQueryExecution query = queuedQueries.peek(); if (query != null) { + if (!queryPacingContext.tryAcquireAdmissionSlot()) { + return false; + } + + queuedQueries.poll(); // Remove from queue; use query from peek() above startInBackground(query); return true; } @@ -1146,7 +1161,8 @@ public RootInternalResourceGroup( Function> additionalRuntimeInfo, Predicate shouldWaitForResourceManagerUpdate, InternalNodeManager nodeManager, - ClusterResourceChecker clusterResourceChecker) + ClusterResourceChecker clusterResourceChecker, + QueryPacingContext queryPacingContext) { super(Optional.empty(), name, @@ -1156,7 +1172,8 @@ public RootInternalResourceGroup( additionalRuntimeInfo, shouldWaitForResourceManagerUpdate, nodeManager, - clusterResourceChecker); + clusterResourceChecker, + queryPacingContext); } public synchronized void updateEligibilityRecursively(InternalResourceGroup group) @@ -1172,7 +1189,7 @@ public synchronized void processQueuedQueries() internalRefreshStats(); while (internalStartNext()) { - // start all the queries we can + // start all the queries we can (subject to limits and pacing) } } diff --git a/presto-main-base/src/main/java/com/facebook/presto/execution/resourceGroups/InternalResourceGroupManager.java b/presto-main-base/src/main/java/com/facebook/presto/execution/resourceGroups/InternalResourceGroupManager.java index f760cfa19b031..806e82e1e64d9 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/execution/resourceGroups/InternalResourceGroupManager.java +++ b/presto-main-base/src/main/java/com/facebook/presto/execution/resourceGroups/InternalResourceGroupManager.java @@ -59,6 +59,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.LongSupplier; @@ -90,6 +91,18 @@ public final class InternalResourceGroupManager private static final String CONFIGURATION_MANAGER_PROPERTY_NAME = "resource-groups.configuration-manager"; private static final int REFRESH_EXECUTOR_POOL_SIZE = 2; + private final int maxQueryAdmissionsPerSecond; + private final int minRunningQueriesForPacing; + private final long queryAdmissionIntervalNanos; + private final AtomicLong lastAdmittedQueryNanos = new AtomicLong(0L); + + // Pacing metrics - use AtomicLong/AtomicInteger for lock-free updates to avoid deadlock + // with resource group locks (see tryAcquireAdmissionSlot for details) + private final AtomicLong totalAdmissionAttempts = new AtomicLong(0L); + private final AtomicLong totalAdmissionsGranted = new AtomicLong(0L); + private final AtomicLong totalAdmissionsDenied = new AtomicLong(0L); + private final AtomicInteger totalRunningQueriesCounter = new AtomicInteger(0); + private final ScheduledExecutorService refreshExecutor = newScheduledThreadPool(REFRESH_EXECUTOR_POOL_SIZE, daemonThreadsNamed("resource-group-manager-refresher-%d-" + REFRESH_EXECUTOR_POOL_SIZE)); private final PeriodicTaskExecutor resourceGroupRuntimeExecutor; private final List rootGroups = new CopyOnWriteArrayList<>(); @@ -115,6 +128,7 @@ public final class InternalResourceGroupManager private final InternalNodeManager nodeManager; private AtomicBoolean isConfigurationManagerLoaded; private final ClusterResourceChecker clusterResourceChecker; + private final QueryPacingContext queryPacingContext; @Inject public InternalResourceGroupManager( @@ -141,7 +155,101 @@ public InternalResourceGroupManager( this.resourceGroupRuntimeExecutor = new PeriodicTaskExecutor(resourceGroupRuntimeInfoRefreshInterval.toMillis(), refreshExecutor, this::refreshResourceGroupRuntimeInfo); configurationManagerFactories.putIfAbsent(LegacyResourceGroupConfigurationManager.NAME, new LegacyResourceGroupConfigurationManager.Factory()); this.isConfigurationManagerLoaded = new AtomicBoolean(false); - this.clusterResourceChecker = clusterResourceChecker; + this.clusterResourceChecker = requireNonNull(clusterResourceChecker, "clusterResourceChecker is null"); + this.maxQueryAdmissionsPerSecond = queryManagerConfig.getMaxQueryAdmissionsPerSecond(); + this.minRunningQueriesForPacing = queryManagerConfig.getMinRunningQueriesForPacing(); + this.queryAdmissionIntervalNanos = (maxQueryAdmissionsPerSecond == Integer.MAX_VALUE) + ? 0L + : 1_000_000_000L / maxQueryAdmissionsPerSecond; + this.queryPacingContext = new QueryPacingContext() + { + @Override + public boolean tryAcquireAdmissionSlot() + { + return InternalResourceGroupManager.this.tryAcquireAdmissionSlot(); + } + + @Override + public void onQueryStarted() + { + incrementRunningQueries(); + } + + @Override + public void onQueryFinished() + { + decrementRunningQueries(); + } + }; + } + + /** + * Global rate limiter for query admissions. Enforces maxQueryAdmissionsPerSecond + * when running queries exceed minRunningQueriesForPacing threshold. + * + * @return true if query can be admitted, false if rate limit exceeded + */ + boolean tryAcquireAdmissionSlot() + { + // Pacing disabled - return early without tracking metrics + if (queryAdmissionIntervalNanos == 0L) { + return true; + } + + // Running queries below threshold - bypass pacing + int currentRunningQueries = getTotalRunningQueries(); + if (currentRunningQueries < minRunningQueriesForPacing) { + return true; + } + + totalAdmissionAttempts.incrementAndGet(); + + // Atomic update for global rate limiting. With multiple root resource groups, + // concurrent threads may call this method simultaneously (each holding their + // own root group's lock). Compare-and-swap ensures correctness in that scenario. + // With a single root group, the root lock serializes access, making the atomic + // update redundant but harmless. + for (int attempt = 0; attempt < 10; attempt++) { + long now = System.nanoTime(); + long last = lastAdmittedQueryNanos.get(); + + // Check if enough time has elapsed since last admission + if (last != 0L && (now - last) < queryAdmissionIntervalNanos) { + totalAdmissionsDenied.incrementAndGet(); + return false; + } + + // Atomically update timestamp if unchanged; retry if another thread won + if (lastAdmittedQueryNanos.compareAndSet(last, now)) { + totalAdmissionsGranted.incrementAndGet(); + return true; + } + } + + // Exhausted retries - deny to prevent starvation under extreme contention + totalAdmissionsDenied.incrementAndGet(); + return false; + } + + /** + * Returns total running queries across all resource groups. + * Uses atomic counter updated via callbacks to avoid locking resource groups. + */ + private int getTotalRunningQueries() + { + return totalRunningQueriesCounter.get(); + } + + /** Called by InternalResourceGroup when a query starts execution. */ + public void incrementRunningQueries() + { + totalRunningQueriesCounter.incrementAndGet(); + } + + /** Called by InternalResourceGroup when a query finishes execution. */ + public void decrementRunningQueries() + { + totalRunningQueriesCounter.decrementAndGet(); } @Override @@ -406,7 +514,15 @@ private synchronized void createGroupIfNecessary(SelectionContext context, Ex else { RootInternalResourceGroup root; if (!isResourceManagerEnabled) { - root = new RootInternalResourceGroup(id.getSegments().get(0), this::exportGroup, executor, ignored -> Optional.empty(), rg -> false, nodeManager, clusterResourceChecker); + root = new RootInternalResourceGroup( + id.getSegments().get(0), + this::exportGroup, + executor, + ignored -> Optional.empty(), + rg -> false, + nodeManager, + clusterResourceChecker, + queryPacingContext); } else { root = new RootInternalResourceGroup( @@ -420,7 +536,8 @@ private synchronized void createGroupIfNecessary(SelectionContext context, Ex lastUpdatedResourceGroupRuntimeInfo::get, concurrencyThreshold), nodeManager, - clusterResourceChecker); + clusterResourceChecker, + queryPacingContext); } group = root; rootGroups.add(root); @@ -500,6 +617,57 @@ public long getLastSchedulingCycleRuntimeDelayMs() return lastSchedulingCycleRunTimeMs.get() == 0L ? lastSchedulingCycleRunTimeMs.get() : currentTimeMillis() - lastSchedulingCycleRunTimeMs.get(); } + @Managed + public int getMaxQueryAdmissionsPerSecond() + { + return maxQueryAdmissionsPerSecond; + } + + @Managed + public long getTotalAdmissionAttempts() + { + return totalAdmissionAttempts.get(); + } + + @Managed + public long getTotalAdmissionsGranted() + { + return totalAdmissionsGranted.get(); + } + + @Managed + public long getTotalAdmissionsDenied() + { + return totalAdmissionsDenied.get(); + } + + @Managed + public int getMinRunningQueriesForPacing() + { + return minRunningQueriesForPacing; + } + + @Managed + public double getAdmissionGrantRate() + { + long attempts = totalAdmissionAttempts.get(); + return attempts > 0 ? (double) totalAdmissionsGranted.get() / attempts : 0.0; + } + + @Managed + public double getAdmissionDenyRate() + { + long attempts = totalAdmissionAttempts.get(); + return attempts > 0 ? (double) totalAdmissionsDenied.get() / attempts : 0.0; + } + + @Managed + public long getMillisSinceLastAdmission() + { + long last = lastAdmittedQueryNanos.get(); + return last == 0L ? -1L : (System.nanoTime() - last) / 1_000_000; + } + private int getQueriesQueuedOnInternal(InternalResourceGroup resourceGroup) { if (resourceGroup.subGroups().isEmpty()) { diff --git a/presto-main-base/src/main/java/com/facebook/presto/execution/resourceGroups/QueryPacingContext.java b/presto-main-base/src/main/java/com/facebook/presto/execution/resourceGroups/QueryPacingContext.java new file mode 100644 index 0000000000000..02eca0b4ac607 --- /dev/null +++ b/presto-main-base/src/main/java/com/facebook/presto/execution/resourceGroups/QueryPacingContext.java @@ -0,0 +1,64 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.execution.resourceGroups; + +/** + * Context for query admission pacing. Provides a single interface for + * global rate limiting and running query tracking to prevent worker overload. + *

+ * This interface consolidates the pacing-related callbacks that are shared + * across all resource groups, keeping resource group objects smaller. + */ +public interface QueryPacingContext +{ + /** + * A no-op implementation that allows all queries and tracks nothing. + */ + QueryPacingContext NOOP = new QueryPacingContext() + { + @Override + public boolean tryAcquireAdmissionSlot() + { + return true; + } + + @Override + public void onQueryStarted() + { + } + + @Override + public void onQueryFinished() + { + } + }; + + /** + * Attempts to acquire an admission slot for starting a new query. + * Enforces global rate limiting when running queries exceed threshold. + * + * @return true if query can be admitted, false if rate limit exceeded + */ + boolean tryAcquireAdmissionSlot(); + + /** + * Called when a query starts running. Used to track global running query count. + */ + void onQueryStarted(); + + /** + * Called when a query finishes (success or failure). Used to track global running query count. + */ + void onQueryFinished(); +} diff --git a/presto-main-base/src/test/java/com/facebook/presto/execution/TestQueryManagerConfig.java b/presto-main-base/src/test/java/com/facebook/presto/execution/TestQueryManagerConfig.java index dd2527f0435fd..109a1be212e3c 100644 --- a/presto-main-base/src/test/java/com/facebook/presto/execution/TestQueryManagerConfig.java +++ b/presto-main-base/src/test/java/com/facebook/presto/execution/TestQueryManagerConfig.java @@ -85,7 +85,9 @@ public void testDefaults() .setRateLimiterCacheLimit(1000) .setRateLimiterCacheWindowMinutes(5) .setEnableWorkerIsolation(false) - .setMinColumnarEncodingChannelsToPreferRowWiseEncoding(1000)); + .setMinColumnarEncodingChannelsToPreferRowWiseEncoding(1000) + .setMaxQueryAdmissionsPerSecond(Integer.MAX_VALUE) + .setMinRunningQueriesForPacing(30)); } @Test @@ -141,6 +143,8 @@ public void testExplicitPropertyMappings() .put("query.cte-partitioning-provider-catalog", "hive") .put("query-manager.enable-worker-isolation", "true") .put("min-columnar-encoding-channels-to-prefer-row-wise-encoding", "123") + .put("query-manager.query-pacing.max-queries-per-second", "10") + .put("query-manager.query-pacing.min-running-queries", "5") .build(); QueryManagerConfig expected = new QueryManagerConfig() @@ -193,7 +197,9 @@ public void testExplicitPropertyMappings() .setRateLimiterCacheWindowMinutes(60) .setCtePartitioningProviderCatalog("hive") .setEnableWorkerIsolation(true) - .setMinColumnarEncodingChannelsToPreferRowWiseEncoding(123); + .setMinColumnarEncodingChannelsToPreferRowWiseEncoding(123) + .setMaxQueryAdmissionsPerSecond(10) + .setMinRunningQueriesForPacing(5); ConfigAssertions.assertFullMapping(properties, expected); } } diff --git a/presto-main-base/src/test/java/com/facebook/presto/execution/resourceGroups/BenchmarkResourceGroup.java b/presto-main-base/src/test/java/com/facebook/presto/execution/resourceGroups/BenchmarkResourceGroup.java index 1ab8ba0bc30e7..7fba90f5fdc63 100644 --- a/presto-main-base/src/test/java/com/facebook/presto/execution/resourceGroups/BenchmarkResourceGroup.java +++ b/presto-main-base/src/test/java/com/facebook/presto/execution/resourceGroups/BenchmarkResourceGroup.java @@ -77,7 +77,7 @@ public static class BenchmarkData @Setup public void setup() { - root = new RootInternalResourceGroup("root", (group, export) -> {}, executor, ignored -> Optional.empty(), rg -> false, new InMemoryNodeManager(), createClusterResourceChecker()); + root = new RootInternalResourceGroup("root", (group, export) -> {}, executor, ignored -> Optional.empty(), rg -> false, new InMemoryNodeManager(), createClusterResourceChecker(), QueryPacingContext.NOOP); root.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); root.setMaxQueuedQueries(queries); root.setHardConcurrencyLimit(queries); diff --git a/presto-main-base/src/test/java/com/facebook/presto/execution/resourceGroups/TestInternalResourceGroupManager.java b/presto-main-base/src/test/java/com/facebook/presto/execution/resourceGroups/TestInternalResourceGroupManager.java index 257d957f8e578..b54e1984fcec2 100644 --- a/presto-main-base/src/test/java/com/facebook/presto/execution/resourceGroups/TestInternalResourceGroupManager.java +++ b/presto-main-base/src/test/java/com/facebook/presto/execution/resourceGroups/TestInternalResourceGroupManager.java @@ -30,6 +30,11 @@ import org.weakref.jmx.MBeanExporter; import org.weakref.jmx.testing.TestingMBeanServer; +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + public class TestInternalResourceGroupManager { @Test(expectedExceptions = PrestoException.class, expectedExceptionsMessageRegExp = ".*Presto server is still initializing.*") @@ -48,4 +53,218 @@ public void testQuerySucceedsWhenConfigurationManagerLoaded() internalResourceGroupManager.loadConfigurationManager(); internalResourceGroupManager.submit(new MockManagedQueryExecution(0), new SelectionContext<>(new ResourceGroupId("global"), ImmutableMap.of()), command -> {}); } + + // Tests that admission always succeeds when pacing is disabled (default config) + @Test + public void testAdmissionPacingUnlimited() + { + // When maxQueryAdmissionsPerSecond is Integer.MAX_VALUE (default), admission should always succeed + QueryManagerConfig config = new QueryManagerConfig(); + InternalResourceGroupManager> manager = new InternalResourceGroupManager<>( + (poolId, listener) -> {}, + config, + new NodeInfo("test"), + new MBeanExporter(new TestingMBeanServer()), + () -> null, + new ServerConfig(), + new InMemoryNodeManager(), + new ClusterResourceChecker(new CpuMemoryOverloadPolicy(new ClusterOverloadConfig()), new ClusterOverloadConfig(), new InMemoryNodeManager())); + + // Multiple consecutive calls should all succeed + assertTrue(manager.tryAcquireAdmissionSlot()); + assertTrue(manager.tryAcquireAdmissionSlot()); + assertTrue(manager.tryAcquireAdmissionSlot()); + } + + // Tests that admission respects 1 query/second rate limit + @Test + public void testAdmissionPacingOnePerSecond() + throws InterruptedException + { + // When maxQueryAdmissionsPerSecond is 1, verify admission succeeds after waiting + QueryManagerConfig config = new QueryManagerConfig().setMaxQueryAdmissionsPerSecond(1); + InternalResourceGroupManager> manager = new InternalResourceGroupManager<>( + (poolId, listener) -> {}, + config, + new NodeInfo("test"), + new MBeanExporter(new TestingMBeanServer()), + () -> null, + new ServerConfig(), + new InMemoryNodeManager(), + new ClusterResourceChecker(new CpuMemoryOverloadPolicy(new ClusterOverloadConfig()), new ClusterOverloadConfig(), new InMemoryNodeManager())); + + // First admission should succeed + assertTrue(manager.tryAcquireAdmissionSlot()); + + // Wait for 1 second (required interval) and verify next admission succeeds + Thread.sleep(1100); + assertTrue(manager.tryAcquireAdmissionSlot()); + } + + // Tests that admission respects 10 queries/second rate limit + @Test + public void testAdmissionPacingMultiplePerSecond() + throws InterruptedException + { + // When maxQueryAdmissionsPerSecond is 10, verify admission succeeds after waiting appropriate interval + QueryManagerConfig config = new QueryManagerConfig().setMaxQueryAdmissionsPerSecond(10); + InternalResourceGroupManager> manager = new InternalResourceGroupManager<>( + (poolId, listener) -> {}, + config, + new NodeInfo("test"), + new MBeanExporter(new TestingMBeanServer()), + () -> null, + new ServerConfig(), + new InMemoryNodeManager(), + new ClusterResourceChecker(new CpuMemoryOverloadPolicy(new ClusterOverloadConfig()), new ClusterOverloadConfig(), new InMemoryNodeManager())); + + // First admission should succeed + assertTrue(manager.tryAcquireAdmissionSlot()); + + // Wait for 150ms (more than the 100ms interval required for 10 queries/sec) and verify next admission succeeds + Thread.sleep(150); + assertTrue(manager.tryAcquireAdmissionSlot()); + } + + // Tests that pacing is bypassed when running queries are below threshold + @Test + public void testAdmissionPacingBypassedBelowRunningQueryThreshold() + throws Exception + { + // Configure pacing with a threshold of 5 running queries + // When running queries are below threshold, pacing should be bypassed + QueryManagerConfig config = new QueryManagerConfig() + .setMaxQueryAdmissionsPerSecond(1) // Very slow pacing: 1 per second + .setMinRunningQueriesForPacing(5); // Threshold of 5 running queries + + InternalResourceGroupManager> manager = new InternalResourceGroupManager<>( + (poolId, listener) -> {}, + config, + new NodeInfo("test"), + new MBeanExporter(new TestingMBeanServer()), + () -> null, + new ServerConfig(), + new InMemoryNodeManager(), + new ClusterResourceChecker(new CpuMemoryOverloadPolicy(new ClusterOverloadConfig()), new ClusterOverloadConfig(), new InMemoryNodeManager())); + + manager.loadConfigurationManager(); + + // Create a resource group with some running queries (but below threshold) + MockManagedQueryExecution query1 = new MockManagedQueryExecution(0); + MockManagedQueryExecution query2 = new MockManagedQueryExecution(0); + manager.submit(query1, new SelectionContext<>(new ResourceGroupId("global"), ImmutableMap.of()), directExecutor()); + manager.submit(query2, new SelectionContext<>(new ResourceGroupId("global"), ImmutableMap.of()), directExecutor()); + + // With only 2 running queries (below threshold of 5), pacing should be bypassed + // Multiple rapid admissions should all succeed without waiting + assertTrue(manager.tryAcquireAdmissionSlot()); + assertTrue(manager.tryAcquireAdmissionSlot()); + assertTrue(manager.tryAcquireAdmissionSlot()); + + // Verify metrics are NOT tracked when pacing is bypassed + assertEquals(manager.getTotalAdmissionAttempts(), 0); + assertEquals(manager.getTotalAdmissionsGranted(), 0); + assertEquals(manager.getTotalAdmissionsDenied(), 0); + } + + // Tests that pacing is enforced when running queries exceed threshold + @Test + public void testAdmissionPacingAppliedAboveRunningQueryThreshold() + throws Exception + { + // Configure pacing with a threshold of 2 running queries + QueryManagerConfig config = new QueryManagerConfig() + .setMaxQueryAdmissionsPerSecond(1) // 1 per second + .setMinRunningQueriesForPacing(2); // Threshold of 2 running queries + + InternalResourceGroupManager> manager = new InternalResourceGroupManager<>( + (poolId, listener) -> {}, + config, + new NodeInfo("test"), + new MBeanExporter(new TestingMBeanServer()), + () -> null, + new ServerConfig(), + new InMemoryNodeManager(), + new ClusterResourceChecker(new CpuMemoryOverloadPolicy(new ClusterOverloadConfig()), new ClusterOverloadConfig(), new InMemoryNodeManager())); + + manager.loadConfigurationManager(); + + // Create resource groups with enough running queries to exceed threshold + MockManagedQueryExecution query1 = new MockManagedQueryExecution(0); + MockManagedQueryExecution query2 = new MockManagedQueryExecution(0); + MockManagedQueryExecution query3 = new MockManagedQueryExecution(0); + manager.submit(query1, new SelectionContext<>(new ResourceGroupId("global"), ImmutableMap.of()), directExecutor()); + manager.submit(query2, new SelectionContext<>(new ResourceGroupId("global"), ImmutableMap.of()), directExecutor()); + manager.submit(query3, new SelectionContext<>(new ResourceGroupId("global"), ImmutableMap.of()), directExecutor()); + + // Wait for rate limit window to expire after query submissions (which internally call tryAcquireAdmissionSlot) + Thread.sleep(1100); + + // With 3 running queries (above threshold of 2), pacing should be applied + // First admission should succeed + assertTrue(manager.tryAcquireAdmissionSlot()); + + // Immediate second attempt should be denied (need to wait 1 second) + assertFalse(manager.tryAcquireAdmissionSlot()); + + // Verify metrics ARE tracked when pacing is applied + // Note: Query 3's submission also triggered pacing (running queries = 2 at submission time), + // so we have 3 total attempts: 1 from query3 submission + 2 from explicit calls + assertEquals(manager.getTotalAdmissionAttempts(), 3); + assertEquals(manager.getTotalAdmissionsGranted(), 2); + assertEquals(manager.getTotalAdmissionsDenied(), 1); + } + + // Tests that pacing turns off when running queries drop below the threshold + @Test + public void testAdmissionPacingTurnsOffWhenRunningQueriesDropBelowThreshold() + throws Exception + { + // Configure pacing with a threshold of 2 running queries and a slow rate + QueryManagerConfig config = new QueryManagerConfig() + .setMaxQueryAdmissionsPerSecond(1) // 1 per second, so pacing should be visible + .setMinRunningQueriesForPacing(2); // Threshold of 2 running queries + + InternalResourceGroupManager> manager = new InternalResourceGroupManager<>( + (poolId, listener) -> {}, + config, + new NodeInfo("test"), + new MBeanExporter(new TestingMBeanServer()), + () -> null, + new ServerConfig(), + new InMemoryNodeManager(), + new ClusterResourceChecker(new CpuMemoryOverloadPolicy(new ClusterOverloadConfig()), new ClusterOverloadConfig(), new InMemoryNodeManager())); + + // Simulate being above the threshold by incrementing running queries counter + manager.incrementRunningQueries(); + manager.incrementRunningQueries(); + + // With 2 running queries (at threshold), pacing should be applied + // First admission should succeed and set the lastAdmittedQueryNanos timestamp + assertTrue(manager.tryAcquireAdmissionSlot()); + + // Immediate second attempt should be denied (need to wait 1 second) + assertFalse(manager.tryAcquireAdmissionSlot()); + + // Verify metrics are tracked when pacing is applied + assertEquals(manager.getTotalAdmissionAttempts(), 2); + assertEquals(manager.getTotalAdmissionsGranted(), 1); + assertEquals(manager.getTotalAdmissionsDenied(), 1); + + // Now simulate queries finishing so that we drop below the threshold + manager.decrementRunningQueries(); + manager.decrementRunningQueries(); + + // With 0 running queries (below threshold of 2), pacing should be bypassed + // Multiple rapid admissions should all succeed without waiting + assertTrue(manager.tryAcquireAdmissionSlot()); + assertTrue(manager.tryAcquireAdmissionSlot()); + assertTrue(manager.tryAcquireAdmissionSlot()); + + // Verify metrics did NOT increase when pacing was bypassed + // (should still be the same as before the decrement) + assertEquals(manager.getTotalAdmissionAttempts(), 2); + assertEquals(manager.getTotalAdmissionsGranted(), 1); + assertEquals(manager.getTotalAdmissionsDenied(), 1); + } } diff --git a/presto-main-base/src/test/java/com/facebook/presto/execution/resourceGroups/TestResourceGroups.java b/presto-main-base/src/test/java/com/facebook/presto/execution/resourceGroups/TestResourceGroups.java index 5890a210c44e1..43e9ba467e92b 100644 --- a/presto-main-base/src/test/java/com/facebook/presto/execution/resourceGroups/TestResourceGroups.java +++ b/presto-main-base/src/test/java/com/facebook/presto/execution/resourceGroups/TestResourceGroups.java @@ -72,7 +72,7 @@ public class TestResourceGroups @Test(timeOut = 10_000) public void testQueueFull() { - RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), ignored -> Optional.empty(), rg -> false, createNodeManager(), createClusterResourceChecker()); + RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), ignored -> Optional.empty(), rg -> false, createNodeManager(), createClusterResourceChecker(), QueryPacingContext.NOOP); root.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); root.setMaxQueuedQueries(1); root.setHardConcurrencyLimit(1); @@ -94,7 +94,7 @@ public void testQueueFull() @Test(timeOut = 10_000) public void testFairEligibility() { - RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), ignored -> Optional.empty(), rg -> false, createNodeManager(), createClusterResourceChecker()); + RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), ignored -> Optional.empty(), rg -> false, createNodeManager(), createClusterResourceChecker(), QueryPacingContext.NOOP); root.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); root.setMaxQueuedQueries(4); root.setHardConcurrencyLimit(1); @@ -154,7 +154,7 @@ public void testFairEligibility() @Test public void testSetSchedulingPolicy() { - RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), ignored -> Optional.empty(), rg -> false, createNodeManager(), createClusterResourceChecker()); + RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), ignored -> Optional.empty(), rg -> false, createNodeManager(), createClusterResourceChecker(), QueryPacingContext.NOOP); root.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); root.setMaxQueuedQueries(4); root.setHardConcurrencyLimit(1); @@ -200,7 +200,7 @@ public void testSetSchedulingPolicy() @Test(timeOut = 10_000) public void testFairQueuing() { - RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), ignored -> Optional.empty(), rg -> false, createNodeManager(), createClusterResourceChecker()); + RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), ignored -> Optional.empty(), rg -> false, createNodeManager(), createClusterResourceChecker(), QueryPacingContext.NOOP); root.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); root.setMaxQueuedQueries(4); root.setHardConcurrencyLimit(1); @@ -246,7 +246,7 @@ public void testFairQueuing() @Test(timeOut = 10_000) public void testMemoryLimit() { - RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), ignored -> Optional.empty(), rg -> false, createNodeManager(), createClusterResourceChecker()); + RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), ignored -> Optional.empty(), rg -> false, createNodeManager(), createClusterResourceChecker(), QueryPacingContext.NOOP); root.setSoftMemoryLimit(new DataSize(1, BYTE)); root.setMaxQueuedQueries(4); root.setHardConcurrencyLimit(3); @@ -274,7 +274,7 @@ public void testMemoryLimit() @Test public void testSubgroupMemoryLimit() { - RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), ignored -> Optional.empty(), rg -> false, createNodeManager(), createClusterResourceChecker()); + RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), ignored -> Optional.empty(), rg -> false, createNodeManager(), createClusterResourceChecker(), QueryPacingContext.NOOP); root.setSoftMemoryLimit(new DataSize(10, BYTE)); root.setMaxQueuedQueries(4); root.setHardConcurrencyLimit(3); @@ -307,7 +307,7 @@ public void testSubgroupMemoryLimit() @Test(timeOut = 10_000) public void testSoftCpuLimit() { - RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), ignored -> Optional.empty(), rg -> false, createNodeManager(), createClusterResourceChecker()); + RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), ignored -> Optional.empty(), rg -> false, createNodeManager(), createClusterResourceChecker(), QueryPacingContext.NOOP); root.setSoftMemoryLimit(new DataSize(1, BYTE)); root.setSoftCpuLimit(new Duration(1, SECONDS)); root.setHardCpuLimit(new Duration(2, SECONDS)); @@ -344,7 +344,7 @@ public void testSoftCpuLimit() @Test(timeOut = 10_000) public void testPerWorkerQueryLimit() { - RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), ignored -> Optional.empty(), rg -> false, createNodeManager(), createClusterResourceChecker()); + RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), ignored -> Optional.empty(), rg -> false, createNodeManager(), createClusterResourceChecker(), QueryPacingContext.NOOP); root.setWorkersPerQueryLimit(5); root.setMaxQueuedQueries(2); root.setHardConcurrencyLimit(2); @@ -377,7 +377,7 @@ public void testPerWorkerQueryLimit() @Test(timeOut = 10_000) public void testPerWorkerQueryLimitMultipleGroups() { - RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), ignored -> Optional.empty(), rg -> false, createNodeManager(), createClusterResourceChecker()); + RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), ignored -> Optional.empty(), rg -> false, createNodeManager(), createClusterResourceChecker(), QueryPacingContext.NOOP); root.setWorkersPerQueryLimit(5); root.setMaxQueuedQueries(5); root.setHardConcurrencyLimit(2); @@ -420,7 +420,7 @@ public void testPerWorkerQueryLimitMultipleGroups() @Test(timeOut = 10_000) public void testHardCpuLimit() { - RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), ignored -> Optional.empty(), rg -> false, createNodeManager(), createClusterResourceChecker()); + RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), ignored -> Optional.empty(), rg -> false, createNodeManager(), createClusterResourceChecker(), QueryPacingContext.NOOP); root.setSoftMemoryLimit(new DataSize(1, BYTE)); root.setHardCpuLimit(new Duration(1, SECONDS)); root.setCpuQuotaGenerationMillisPerSecond(2000); @@ -447,7 +447,7 @@ public void testHardCpuLimit() @Test(timeOut = 10_000) public void testPriorityScheduling() { - RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), ignored -> Optional.empty(), rg -> false, createNodeManager(), createClusterResourceChecker()); + RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), ignored -> Optional.empty(), rg -> false, createNodeManager(), createClusterResourceChecker(), QueryPacingContext.NOOP); root.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); root.setMaxQueuedQueries(100); // Start with zero capacity, so that nothing starts running until we've added all the queries @@ -497,7 +497,7 @@ public void testPriorityScheduling() @Test(timeOut = 20_000) public void testWeightedScheduling() { - RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), ignored -> Optional.empty(), rg -> false, createNodeManager(), createClusterResourceChecker()); + RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), ignored -> Optional.empty(), rg -> false, createNodeManager(), createClusterResourceChecker(), QueryPacingContext.NOOP); root.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); root.setMaxQueuedQueries(4); // Start with zero capacity, so that nothing starts running until we've added all the queries @@ -546,7 +546,7 @@ public void testWeightedScheduling() @Test(timeOut = 30_000) public void testWeightedFairScheduling() { - RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), ignored -> Optional.empty(), rg -> false, createNodeManager(), createClusterResourceChecker()); + RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), ignored -> Optional.empty(), rg -> false, createNodeManager(), createClusterResourceChecker(), QueryPacingContext.NOOP); root.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); root.setMaxQueuedQueries(50); // Start with zero capacity, so that nothing starts running until we've added all the queries @@ -589,7 +589,7 @@ public void testWeightedFairScheduling() @Test(timeOut = 10_000) public void testWeightedFairSchedulingEqualWeights() { - RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), ignored -> Optional.empty(), rg -> false, createNodeManager(), createClusterResourceChecker()); + RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), ignored -> Optional.empty(), rg -> false, createNodeManager(), createClusterResourceChecker(), QueryPacingContext.NOOP); root.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); root.setMaxQueuedQueries(50); // Start with zero capacity, so that nothing starts running until we've added all the queries @@ -648,7 +648,7 @@ public void testWeightedFairSchedulingEqualWeights() @Test(timeOut = 20_000) public void testWeightedFairSchedulingNoStarvation() { - RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), ignored -> Optional.empty(), rg -> false, createNodeManager(), createClusterResourceChecker()); + RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), ignored -> Optional.empty(), rg -> false, createNodeManager(), createClusterResourceChecker(), QueryPacingContext.NOOP); root.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); root.setMaxQueuedQueries(50); // Start with zero capacity, so that nothing starts running until we've added all the queries @@ -689,7 +689,7 @@ public void testWeightedFairSchedulingNoStarvation() @Test public void testGetInfo() { - RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), ignored -> Optional.empty(), rg -> false, createNodeManager(), createClusterResourceChecker()); + RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), ignored -> Optional.empty(), rg -> false, createNodeManager(), createClusterResourceChecker(), QueryPacingContext.NOOP); root.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); root.setMaxQueuedQueries(40); // Start with zero capacity, so that nothing starts running until we've added all the queries @@ -779,7 +779,7 @@ public void testGetInfo() @Test public void testGetResourceGroupStateInfo() { - RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), ignored -> Optional.empty(), rg -> false, createNodeManager(), createClusterResourceChecker()); + RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), ignored -> Optional.empty(), rg -> false, createNodeManager(), createClusterResourceChecker(), QueryPacingContext.NOOP); root.setSoftMemoryLimit(new DataSize(1, GIGABYTE)); root.setMaxQueuedQueries(40); root.setHardConcurrencyLimit(10); @@ -847,7 +847,7 @@ public void testGetResourceGroupStateInfo() @Test public void testGetStaticResourceGroupInfo() { - RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), ignored -> Optional.empty(), rg -> false, createNodeManager(), createClusterResourceChecker()); + RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), ignored -> Optional.empty(), rg -> false, createNodeManager(), createClusterResourceChecker(), QueryPacingContext.NOOP); root.setSoftMemoryLimit(new DataSize(1, GIGABYTE)); root.setMaxQueuedQueries(100); root.setHardConcurrencyLimit(10); @@ -924,7 +924,7 @@ private Optional getResourceGroupInfoForId(InternalResourceGr @Test public void testGetBlockedQueuedQueries() { - RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), ignored -> Optional.empty(), rg -> false, createNodeManager(), createClusterResourceChecker()); + RootInternalResourceGroup root = new RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), ignored -> Optional.empty(), rg -> false, createNodeManager(), createClusterResourceChecker(), QueryPacingContext.NOOP); root.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); root.setMaxQueuedQueries(40); // Start with zero capacity, so that nothing starts running until we've added all the queries diff --git a/presto-main-base/src/test/java/com/facebook/presto/server/TestQueryStateInfo.java b/presto-main-base/src/test/java/com/facebook/presto/server/TestQueryStateInfo.java index f38af02192ce5..f14a66b5a2629 100644 --- a/presto-main-base/src/test/java/com/facebook/presto/server/TestQueryStateInfo.java +++ b/presto-main-base/src/test/java/com/facebook/presto/server/TestQueryStateInfo.java @@ -22,6 +22,7 @@ import com.facebook.presto.execution.QueryState; import com.facebook.presto.execution.QueryStats; import com.facebook.presto.execution.resourceGroups.InternalResourceGroup; +import com.facebook.presto.execution.resourceGroups.QueryPacingContext; import com.facebook.presto.execution.scheduler.clusterOverload.ClusterOverloadPolicy; import com.facebook.presto.execution.scheduler.clusterOverload.ClusterResourceChecker; import com.facebook.presto.metadata.InMemoryNodeManager; @@ -62,7 +63,7 @@ public class TestQueryStateInfo @Test public void testQueryStateInfo() { - InternalResourceGroup.RootInternalResourceGroup root = new InternalResourceGroup.RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), ignored -> Optional.empty(), rg -> false, new InMemoryNodeManager(), createClusterResourceChecker()); + InternalResourceGroup.RootInternalResourceGroup root = new InternalResourceGroup.RootInternalResourceGroup("root", (group, export) -> {}, directExecutor(), ignored -> Optional.empty(), rg -> false, new InMemoryNodeManager(), createClusterResourceChecker(), QueryPacingContext.NOOP); root.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); root.setMaxQueuedQueries(40); root.setHardConcurrencyLimit(0); diff --git a/presto-resource-group-managers/src/test/java/com/facebook/presto/resourceGroups/reloading/TestReloadingResourceGroupConfigurationManager.java b/presto-resource-group-managers/src/test/java/com/facebook/presto/resourceGroups/reloading/TestReloadingResourceGroupConfigurationManager.java index 065fbfd740daf..760cd1a0b25ae 100644 --- a/presto-resource-group-managers/src/test/java/com/facebook/presto/resourceGroups/reloading/TestReloadingResourceGroupConfigurationManager.java +++ b/presto-resource-group-managers/src/test/java/com/facebook/presto/resourceGroups/reloading/TestReloadingResourceGroupConfigurationManager.java @@ -17,6 +17,7 @@ import com.facebook.airlift.units.Duration; import com.facebook.presto.execution.ClusterOverloadConfig; import com.facebook.presto.execution.resourceGroups.InternalResourceGroup; +import com.facebook.presto.execution.resourceGroups.QueryPacingContext; import com.facebook.presto.execution.scheduler.clusterOverload.ClusterOverloadPolicy; import com.facebook.presto.execution.scheduler.clusterOverload.ClusterResourceChecker; import com.facebook.presto.metadata.InMemoryNodeManager; @@ -78,7 +79,7 @@ public void testConfiguration() DbManagerSpecProvider dbManagerSpecProvider = new DbManagerSpecProvider(daoProvider.get(), ENVIRONMENT, new ReloadingResourceGroupConfig()); ReloadingResourceGroupConfigurationManager manager = new ReloadingResourceGroupConfigurationManager((poolId, listener) -> {}, new ReloadingResourceGroupConfig(), dbManagerSpecProvider); AtomicBoolean exported = new AtomicBoolean(); - InternalResourceGroup global = new InternalResourceGroup.RootInternalResourceGroup("global", (group, export) -> exported.set(export), directExecutor(), ignored -> Optional.empty(), rg -> false, new InMemoryNodeManager(), createClusterResourceChecker()); + InternalResourceGroup global = new InternalResourceGroup.RootInternalResourceGroup("global", (group, export) -> exported.set(export), directExecutor(), ignored -> Optional.empty(), rg -> false, new InMemoryNodeManager(), createClusterResourceChecker(), QueryPacingContext.NOOP); manager.configure(global, new SelectionContext<>(global.getId(), new VariableMap(ImmutableMap.of("USER", "user")))); assertEqualsResourceGroup(global, "1MB", 1000, 100, 100, WEIGHTED, DEFAULT_WEIGHT, true, new Duration(1, HOURS), new Duration(1, DAYS), new ResourceGroupQueryLimits(Optional.of(new Duration(1, HOURS)), Optional.of(new DataSize(1, MEGABYTE)), Optional.of(new Duration(1, HOURS)))); exported.set(false); @@ -101,7 +102,7 @@ public void testMissing() dao.insertSelector(2, 1, null, null, null, null, null, null); DbManagerSpecProvider dbManagerSpecProvider = new DbManagerSpecProvider(daoProvider.get(), ENVIRONMENT, new ReloadingResourceGroupConfig()); ReloadingResourceGroupConfigurationManager manager = new ReloadingResourceGroupConfigurationManager((poolId, listener) -> {}, new ReloadingResourceGroupConfig(), dbManagerSpecProvider); - InternalResourceGroup missing = new InternalResourceGroup.RootInternalResourceGroup("missing", (group, export) -> {}, directExecutor(), ignored -> Optional.empty(), rg -> false, new InMemoryNodeManager(), createClusterResourceChecker()); + InternalResourceGroup missing = new InternalResourceGroup.RootInternalResourceGroup("missing", (group, export) -> {}, directExecutor(), ignored -> Optional.empty(), rg -> false, new InMemoryNodeManager(), createClusterResourceChecker(), QueryPacingContext.NOOP); manager.configure(missing, new SelectionContext<>(missing.getId(), new VariableMap(ImmutableMap.of("USER", "user")))); } @@ -122,7 +123,7 @@ public void testReconfig() ReloadingResourceGroupConfigurationManager manager = new ReloadingResourceGroupConfigurationManager((poolId, listener) -> {}, new ReloadingResourceGroupConfig(), dbManagerSpecProvider); manager.start(); AtomicBoolean exported = new AtomicBoolean(); - InternalResourceGroup global = new InternalResourceGroup.RootInternalResourceGroup("global", (group, export) -> exported.set(export), directExecutor(), ignored -> Optional.empty(), rg -> false, new InMemoryNodeManager(), createClusterResourceChecker()); + InternalResourceGroup global = new InternalResourceGroup.RootInternalResourceGroup("global", (group, export) -> exported.set(export), directExecutor(), ignored -> Optional.empty(), rg -> false, new InMemoryNodeManager(), createClusterResourceChecker(), QueryPacingContext.NOOP); manager.configure(global, new SelectionContext<>(global.getId(), new VariableMap(ImmutableMap.of("USER", "user")))); InternalResourceGroup globalSub = global.getOrCreateSubGroup("sub", true); manager.configure(globalSub, new SelectionContext<>(globalSub.getId(), new VariableMap(ImmutableMap.of("USER", "user"))));