Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 77 additions & 45 deletions presto-docs/src/main/sphinx/admin/properties.rst

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ public class QueryManagerConfig

private int minColumnarEncodingChannelsToPreferRowWiseEncoding = 1000;

private int maxQueryAdmissionsPerSecond = Integer.MAX_VALUE;

private int minRunningQueriesForPacing = 30;

@Min(1)
public int getScheduleSplitBatchSize()
{
Expand Down Expand Up @@ -766,6 +770,34 @@ public QueryManagerConfig setMinColumnarEncodingChannelsToPreferRowWiseEncoding(
return this;
}

@Min(1)
public int getMaxQueryAdmissionsPerSecond()
{
return maxQueryAdmissionsPerSecond;
}

@Config("query-manager.query-pacing.max-queries-per-second")
@ConfigDescription("Maximum number of queries that can be admitted per second globally for admission pacing. Default is unlimited (Integer.MAX_VALUE). Set to a lower value (e.g., 1) to pace query admissions to one per second.")
public QueryManagerConfig setMaxQueryAdmissionsPerSecond(int maxQueryAdmissionsPerSecond)
{
this.maxQueryAdmissionsPerSecond = maxQueryAdmissionsPerSecond;
return this;
}

@Min(0)
public int getMinRunningQueriesForPacing()
{
return minRunningQueriesForPacing;
}

@Config("query-manager.query-pacing.min-running-queries")
@ConfigDescription("Minimum number of running queries before admission pacing is applied. Default is 30. Set to a higher value to only pace when cluster is busy.")
public QueryManagerConfig setMinRunningQueriesForPacing(int minRunningQueriesForPacing)
{
this.minRunningQueriesForPacing = minRunningQueriesForPacing;
return this;
}

public enum ExchangeMaterializationStrategy
{
NONE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ public class InternalResourceGroup
private final Predicate<InternalResourceGroup> shouldWaitForResourceManagerUpdate;
private final InternalNodeManager nodeManager;
private final ClusterResourceChecker clusterResourceChecker;
private final QueryPacingContext queryPacingContext;

// Configuration
// =============
Expand Down Expand Up @@ -169,13 +170,15 @@ protected InternalResourceGroup(
Function<ResourceGroupId, Optional<ResourceGroupRuntimeInfo>> additionalRuntimeInfo,
Predicate<InternalResourceGroup> shouldWaitForResourceManagerUpdate,
InternalNodeManager nodeManager,
ClusterResourceChecker clusterResourceChecker)
ClusterResourceChecker clusterResourceChecker,
QueryPacingContext queryPacingContext)
{
this.parent = requireNonNull(parent, "parent is null");
this.jmxExportListener = requireNonNull(jmxExportListener, "jmxExportListener is null");
this.executor = requireNonNull(executor, "executor is null");
this.nodeManager = requireNonNull(nodeManager, "node manager is null");
this.clusterResourceChecker = requireNonNull(clusterResourceChecker, "clusterResourceChecker is null");
this.queryPacingContext = requireNonNull(queryPacingContext, "queryPacingContext is null");
requireNonNull(name, "name is null");
if (parent.isPresent()) {
id = new ResourceGroupId(parent.get().id, name);
Expand Down Expand Up @@ -676,7 +679,8 @@ public InternalResourceGroup getOrCreateSubGroup(String name, boolean staticSegm
additionalRuntimeInfo,
shouldWaitForResourceManagerUpdate,
nodeManager,
clusterResourceChecker);
clusterResourceChecker,
queryPacingContext);
// Sub group must use query priority to ensure ordering
if (schedulingPolicy == QUERY_PRIORITY) {
subGroup.setSchedulingPolicy(QUERY_PRIORITY);
Expand Down Expand Up @@ -735,12 +739,14 @@ public void run(ManagedQueryExecution query)
}
else {
query.setResourceGroupQueryLimits(perQueryLimits);
if (canRun && queuedQueries.isEmpty()) {
boolean immediateStartCandidate = canRun && queuedQueries.isEmpty();
if (immediateStartCandidate && queryPacingContext.tryAcquireAdmissionSlot()) {
startInBackground(query);
}
else {
enqueueQuery(query);
}

query.addStateChangeListener(state -> {
if (state.isDone()) {
queryFinished(query);
Expand Down Expand Up @@ -807,6 +813,8 @@ private void startInBackground(ManagedQueryExecution query)
group = group.parent.get();
}
updateEligibility();
// Increment global running query counter for pacing
queryPacingContext.onQueryStarted();
executor.execute(query::startWaitingForResources);
group = this;
long lastRunningQueryStartTimeMillis = currentTimeMillis();
Expand Down Expand Up @@ -840,6 +848,8 @@ private void queryFinished(ManagedQueryExecution query)
group.parent.get().descendantRunningQueries--;
group = group.parent.get();
}
// Decrement global running query counter for pacing
queryPacingContext.onQueryFinished();
}
else {
queuedQueries.remove(query);
Expand Down Expand Up @@ -908,8 +918,13 @@ protected boolean internalStartNext()
return false;
}

ManagedQueryExecution query = queuedQueries.poll();
ManagedQueryExecution query = queuedQueries.peek();
if (query != null) {
if (!queryPacingContext.tryAcquireAdmissionSlot()) {
return false;
}

queuedQueries.poll(); // Remove from queue; use query from peek() above
startInBackground(query);
return true;
}
Expand Down Expand Up @@ -1146,7 +1161,8 @@ public RootInternalResourceGroup(
Function<ResourceGroupId, Optional<ResourceGroupRuntimeInfo>> additionalRuntimeInfo,
Predicate<InternalResourceGroup> shouldWaitForResourceManagerUpdate,
InternalNodeManager nodeManager,
ClusterResourceChecker clusterResourceChecker)
ClusterResourceChecker clusterResourceChecker,
QueryPacingContext queryPacingContext)
{
super(Optional.empty(),
name,
Expand All @@ -1156,7 +1172,8 @@ public RootInternalResourceGroup(
additionalRuntimeInfo,
shouldWaitForResourceManagerUpdate,
nodeManager,
clusterResourceChecker);
clusterResourceChecker,
queryPacingContext);
}

public synchronized void updateEligibilityRecursively(InternalResourceGroup group)
Expand All @@ -1172,7 +1189,7 @@ public synchronized void processQueuedQueries()
internalRefreshStats();

while (internalStartNext()) {
// start all the queries we can
// start all the queries we can (subject to limits and pacing)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongSupplier;
Expand Down Expand Up @@ -90,6 +91,18 @@ public final class InternalResourceGroupManager<C>
private static final String CONFIGURATION_MANAGER_PROPERTY_NAME = "resource-groups.configuration-manager";
private static final int REFRESH_EXECUTOR_POOL_SIZE = 2;

private final int maxQueryAdmissionsPerSecond;
private final int minRunningQueriesForPacing;
private final long queryAdmissionIntervalNanos;
private final AtomicLong lastAdmittedQueryNanos = new AtomicLong(0L);

// Pacing metrics - use AtomicLong/AtomicInteger for lock-free updates to avoid deadlock
// with resource group locks (see tryAcquireAdmissionSlot for details)
private final AtomicLong totalAdmissionAttempts = new AtomicLong(0L);
private final AtomicLong totalAdmissionsGranted = new AtomicLong(0L);
private final AtomicLong totalAdmissionsDenied = new AtomicLong(0L);
private final AtomicInteger totalRunningQueriesCounter = new AtomicInteger(0);

private final ScheduledExecutorService refreshExecutor = newScheduledThreadPool(REFRESH_EXECUTOR_POOL_SIZE, daemonThreadsNamed("resource-group-manager-refresher-%d-" + REFRESH_EXECUTOR_POOL_SIZE));
private final PeriodicTaskExecutor resourceGroupRuntimeExecutor;
private final List<RootInternalResourceGroup> rootGroups = new CopyOnWriteArrayList<>();
Expand All @@ -115,6 +128,7 @@ public final class InternalResourceGroupManager<C>
private final InternalNodeManager nodeManager;
private AtomicBoolean isConfigurationManagerLoaded;
private final ClusterResourceChecker clusterResourceChecker;
private final QueryPacingContext queryPacingContext;

@Inject
public InternalResourceGroupManager(
Expand All @@ -141,7 +155,101 @@ public InternalResourceGroupManager(
this.resourceGroupRuntimeExecutor = new PeriodicTaskExecutor(resourceGroupRuntimeInfoRefreshInterval.toMillis(), refreshExecutor, this::refreshResourceGroupRuntimeInfo);
configurationManagerFactories.putIfAbsent(LegacyResourceGroupConfigurationManager.NAME, new LegacyResourceGroupConfigurationManager.Factory());
this.isConfigurationManagerLoaded = new AtomicBoolean(false);
this.clusterResourceChecker = clusterResourceChecker;
this.clusterResourceChecker = requireNonNull(clusterResourceChecker, "clusterResourceChecker is null");
this.maxQueryAdmissionsPerSecond = queryManagerConfig.getMaxQueryAdmissionsPerSecond();
this.minRunningQueriesForPacing = queryManagerConfig.getMinRunningQueriesForPacing();
this.queryAdmissionIntervalNanos = (maxQueryAdmissionsPerSecond == Integer.MAX_VALUE)
? 0L
: 1_000_000_000L / maxQueryAdmissionsPerSecond;
this.queryPacingContext = new QueryPacingContext()
{
@Override
public boolean tryAcquireAdmissionSlot()
{
return InternalResourceGroupManager.this.tryAcquireAdmissionSlot();
}

@Override
public void onQueryStarted()
{
incrementRunningQueries();
}

@Override
public void onQueryFinished()
{
decrementRunningQueries();
}
};
}

/**
* Global rate limiter for query admissions. Enforces maxQueryAdmissionsPerSecond
* when running queries exceed minRunningQueriesForPacing threshold.
*
* @return true if query can be admitted, false if rate limit exceeded
*/
boolean tryAcquireAdmissionSlot()
{
// Pacing disabled - return early without tracking metrics
if (queryAdmissionIntervalNanos == 0L) {
return true;
}

// Running queries below threshold - bypass pacing
int currentRunningQueries = getTotalRunningQueries();
if (currentRunningQueries < minRunningQueriesForPacing) {
return true;
}

totalAdmissionAttempts.incrementAndGet();

// Atomic update for global rate limiting. With multiple root resource groups,
// concurrent threads may call this method simultaneously (each holding their
// own root group's lock). Compare-and-swap ensures correctness in that scenario.
// With a single root group, the root lock serializes access, making the atomic
// update redundant but harmless.
for (int attempt = 0; attempt < 10; attempt++) {
long now = System.nanoTime();
long last = lastAdmittedQueryNanos.get();

// Check if enough time has elapsed since last admission
if (last != 0L && (now - last) < queryAdmissionIntervalNanos) {
totalAdmissionsDenied.incrementAndGet();
return false;
}

// Atomically update timestamp if unchanged; retry if another thread won
if (lastAdmittedQueryNanos.compareAndSet(last, now)) {
totalAdmissionsGranted.incrementAndGet();
return true;
}
}

// Exhausted retries - deny to prevent starvation under extreme contention
totalAdmissionsDenied.incrementAndGet();
return false;
}

/**
* Returns total running queries across all resource groups.
* Uses atomic counter updated via callbacks to avoid locking resource groups.
*/
private int getTotalRunningQueries()
{
return totalRunningQueriesCounter.get();
}

/** Called by InternalResourceGroup when a query starts execution. */
public void incrementRunningQueries()
{
totalRunningQueriesCounter.incrementAndGet();
}

/** Called by InternalResourceGroup when a query finishes execution. */
public void decrementRunningQueries()
{
totalRunningQueriesCounter.decrementAndGet();
}

@Override
Expand Down Expand Up @@ -406,7 +514,15 @@ private synchronized void createGroupIfNecessary(SelectionContext<C> context, Ex
else {
RootInternalResourceGroup root;
if (!isResourceManagerEnabled) {
root = new RootInternalResourceGroup(id.getSegments().get(0), this::exportGroup, executor, ignored -> Optional.empty(), rg -> false, nodeManager, clusterResourceChecker);
root = new RootInternalResourceGroup(
id.getSegments().get(0),
this::exportGroup,
executor,
ignored -> Optional.empty(),
rg -> false,
nodeManager,
clusterResourceChecker,
queryPacingContext);
}
else {
root = new RootInternalResourceGroup(
Expand All @@ -420,7 +536,8 @@ private synchronized void createGroupIfNecessary(SelectionContext<C> context, Ex
lastUpdatedResourceGroupRuntimeInfo::get,
concurrencyThreshold),
nodeManager,
clusterResourceChecker);
clusterResourceChecker,
queryPacingContext);
}
group = root;
rootGroups.add(root);
Expand Down Expand Up @@ -500,6 +617,57 @@ public long getLastSchedulingCycleRuntimeDelayMs()
return lastSchedulingCycleRunTimeMs.get() == 0L ? lastSchedulingCycleRunTimeMs.get() : currentTimeMillis() - lastSchedulingCycleRunTimeMs.get();
}

@Managed
public int getMaxQueryAdmissionsPerSecond()
{
return maxQueryAdmissionsPerSecond;
}

@Managed
public long getTotalAdmissionAttempts()
{
return totalAdmissionAttempts.get();
}

@Managed
public long getTotalAdmissionsGranted()
{
return totalAdmissionsGranted.get();
}

@Managed
public long getTotalAdmissionsDenied()
{
return totalAdmissionsDenied.get();
}

@Managed
public int getMinRunningQueriesForPacing()
{
return minRunningQueriesForPacing;
}

@Managed
public double getAdmissionGrantRate()
{
long attempts = totalAdmissionAttempts.get();
return attempts > 0 ? (double) totalAdmissionsGranted.get() / attempts : 0.0;
}

@Managed
public double getAdmissionDenyRate()
{
long attempts = totalAdmissionAttempts.get();
return attempts > 0 ? (double) totalAdmissionsDenied.get() / attempts : 0.0;
}

@Managed
public long getMillisSinceLastAdmission()
{
long last = lastAdmittedQueryNanos.get();
return last == 0L ? -1L : (System.nanoTime() - last) / 1_000_000;
}

private int getQueriesQueuedOnInternal(InternalResourceGroup resourceGroup)
{
if (resourceGroup.subGroups().isEmpty()) {
Expand Down
Loading
Loading