Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ public class QueryManagerConfig
private int globalQueryRetryFailureLimit = 150;
private Duration globalQueryRetryFailureWindow = new Duration(5, MINUTES);

private long rateLimiterBucketMaxSize = 100;
private int rateLimiterCacheLimit = 1000;
private int rateLimiterCacheWindowMinutes = 5;

@Min(1)
public int getScheduleSplitBatchSize()
{
Expand Down Expand Up @@ -622,6 +626,45 @@ public QueryManagerConfig setGlobalQueryRetryFailureWindow(Duration globalQueryR
return this;
}

public long getRateLimiterBucketMaxSize()
{
return rateLimiterBucketMaxSize;
}

@Config("query-manager.rate-limiter-bucket-max-size")
@ConfigDescription("rate limiter token bucket max size, number of permits per second")
public QueryManagerConfig setRateLimiterBucketMaxSize(long rateLimiterBucketMaxSize)
{
this.rateLimiterBucketMaxSize = rateLimiterBucketMaxSize;
return this;
}

public int getRateLimiterCacheLimit()
{
return rateLimiterCacheLimit;
}

@Config("query-manager.rate-limiter-cache-limit")
@ConfigDescription("rate limiter cache size limit, used together with rateLimiterCacheWindowMinutes")
public QueryManagerConfig setRateLimiterCacheLimit(int rateLimiterCacheLimit)
{
this.rateLimiterCacheLimit = rateLimiterCacheLimit;
return this;
}

public int getRateLimiterCacheWindowMinutes()
{
return rateLimiterCacheWindowMinutes;
}

@Config("query-manager.rate-limiter-cache-window-minutes")
@ConfigDescription("rate limiter cache window size in minutes, used together with rateLimiterCacheLimit")
public QueryManagerConfig setRateLimiterCacheWindowMinutes(int rateLimiterCacheWindowMinutes)
{
this.rateLimiterCacheWindowMinutes = rateLimiterCacheWindowMinutes;
return this;
}

public enum ExchangeMaterializationStrategy
{
NONE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import com.facebook.presto.resourcemanager.ResourceManagerProxy;
import com.facebook.presto.server.protocol.ExecutingStatementResource;
import com.facebook.presto.server.protocol.LocalQueryProvider;
import com.facebook.presto.server.protocol.QueryBlockingRateLimiter;
import com.facebook.presto.server.protocol.QueuedStatementResource;
import com.facebook.presto.server.protocol.RetryCircuitBreaker;
import com.facebook.presto.server.remotetask.HttpRemoteTaskFactory;
Expand Down Expand Up @@ -188,6 +189,9 @@ protected void setup(Binder binder)
binder.bind(RetryCircuitBreaker.class).in(Scopes.SINGLETON);
newExporter(binder).export(RetryCircuitBreaker.class).withGeneratedName();

binder.bind(QueryBlockingRateLimiter.class).in(Scopes.SINGLETON);
newExporter(binder).export(QueryBlockingRateLimiter.class).withGeneratedName();

binder.bind(LocalQueryProvider.class).in(Scopes.SINGLETON);

jaxrsBinder(binder).bind(TaskInfoResource.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,17 @@
package com.facebook.presto.server.protocol;

import com.facebook.airlift.concurrent.BoundedExecutor;
import com.facebook.airlift.stats.TimeStat;
import com.facebook.presto.client.QueryResults;
import com.facebook.presto.server.ForStatementResource;
import com.facebook.presto.server.ServerConfig;
import com.facebook.presto.spi.QueryId;
import com.google.common.collect.Ordering;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import org.weakref.jmx.Managed;
import org.weakref.jmx.Nested;

import javax.annotation.security.RolesAllowed;
import javax.inject.Inject;
Expand All @@ -44,6 +48,7 @@
import static com.google.common.base.Strings.isNullOrEmpty;
import static com.google.common.net.HttpHeaders.X_FORWARDED_PROTO;
import static com.google.common.util.concurrent.Futures.transform;
import static com.google.common.util.concurrent.Futures.transformAsync;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static java.util.Objects.requireNonNull;
Expand All @@ -61,16 +66,26 @@ public class ExecutingStatementResource
private final BoundedExecutor responseExecutor;
private final LocalQueryProvider queryProvider;
private final boolean compressionEnabled;
private final QueryBlockingRateLimiter queryRateLimiter;

@Inject
public ExecutingStatementResource(
@ForStatementResource BoundedExecutor responseExecutor,
LocalQueryProvider queryProvider,
ServerConfig serverConfig)
ServerConfig serverConfig,
QueryBlockingRateLimiter queryRateLimiter)
{
this.responseExecutor = requireNonNull(responseExecutor, "responseExecutor is null");
this.queryProvider = requireNonNull(queryProvider, "queryProvider is null");
this.compressionEnabled = requireNonNull(serverConfig, "serverConfig is null").isQueryResultsCompressionEnabled();
this.queryRateLimiter = requireNonNull(queryRateLimiter, "queryRateLimiter is null");
}

@Managed
@Nested
public TimeStat getRateLimiterBlockTime()
{
return queryRateLimiter.getRateLimiterBlockTime();
}

@GET
Expand Down Expand Up @@ -98,8 +113,18 @@ public void getQueryResults(
}

Query query = queryProvider.getQuery(queryId, slug);
ListenableFuture<Double> acquirePermitAsync = queryRateLimiter.acquire(queryId);
String effectiveFinalProto = proto;
DataSize effectiveFinalTargetResultSize = targetResultSize;
ListenableFuture<QueryResults> waitForResultsAsync = transformAsync(
acquirePermitAsync,
acquirePermitTimeSeconds -> {
queryRateLimiter.addRateLimiterBlockTime(new Duration(acquirePermitTimeSeconds, SECONDS));
return query.waitForResults(token, uriInfo, effectiveFinalProto, wait, effectiveFinalTargetResultSize);
},
responseExecutor);
ListenableFuture<Response> queryResultsFuture = transform(
query.waitForResults(token, uriInfo, proto, wait, targetResultSize),
waitForResultsAsync,
results -> toResponse(query, results, compressionEnabled),
directExecutor());
bindAsyncResponse(asyncResponse, queryResultsFuture, responseExecutor);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.server.protocol;

import com.facebook.airlift.stats.CounterStat;
import com.facebook.airlift.stats.TimeStat;
import com.facebook.presto.execution.QueryManagerConfig;
import com.facebook.presto.spi.QueryId;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.RateLimiter;
import com.google.inject.Inject;
import io.airlift.units.Duration;
import org.weakref.jmx.Managed;
import org.weakref.jmx.Nested;

import javax.annotation.PreDestroy;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed;
import static com.google.common.util.concurrent.Futures.immediateFailedFuture;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator;
import static java.util.Objects.requireNonNull;

/*
* Rate Limiting per query with token bucket
* Rate = rateLimitBucketMaxSize/second
* When having sufficient tokens, Request will be responded immediately.
* When not having enough tokens available, it uses the delayed processing method.
*/
public class QueryBlockingRateLimiter
{
private final long rateLimiterBucketMaxSize;
private final ListeningExecutorService rateLimiterExecutorService;
private final LoadingCache<QueryId, RateLimiter> rateLimiterCache;
private final CounterStat rateLimiterTriggeredCounter = new CounterStat();
private final TimeStat rateLimiterBlockTime = new TimeStat();

@Inject
public QueryBlockingRateLimiter(QueryManagerConfig queryManagerConfig)
{
requireNonNull(queryManagerConfig, "queryManagerConfig is null");
this.rateLimiterBucketMaxSize = queryManagerConfig.getRateLimiterBucketMaxSize();
// Using a custom thread pool with size 1-10 to reduce initial thread resources
ExecutorService executorService = new ThreadPoolExecutor(1, 10, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), daemonThreadsNamed("rate-limiter-listener"));
rateLimiterExecutorService = listeningDecorator(executorService);
rateLimiterCache = CacheBuilder.newBuilder().maximumSize(queryManagerConfig.getRateLimiterCacheLimit()).expireAfterAccess(queryManagerConfig.getRateLimiterCacheWindowMinutes(), TimeUnit.MINUTES).build(CacheLoader.from(key -> RateLimiter.create(rateLimiterBucketMaxSize)));
}

/*
* For accidental bug-caused DoS, we will use delayed processing method to reduce the requests, even when user do not have back-off logic implemented
* Optimized to avoid blocking for normal usages with TryRequire first
* Fall back to delayed processing method to acquire a permit, in a separate thread pool
* Internal guava rate limiter returns time spent sleeping to enforce rate, in seconds; 0.0 if not rate-limited, we use a future to wrap around that.
*/
public ListenableFuture<Double> acquire(QueryId queryId)
{ // if rateLimitBucketMaxSize < 0, we disable rate limiting by returning immediately
if (rateLimiterBucketMaxSize < 0) {
return immediateFuture(0.0);
}
if (queryId == null) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: use checkArgument instead and move to the first line of this function

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

discussed with @tdcmeehan earlier, seems returning future is more consistent for this function

return immediateFailedFuture(new IllegalArgumentException("queryId should not be null"));
}
RateLimiter rateLimiter = rateLimiterCache.getUnchecked(queryId);
if (rateLimiter.tryAcquire()) {
return immediateFuture(0.0);
}
ListenableFuture<Double> asyncTask = rateLimiterExecutorService.submit(() -> rateLimiter.acquire());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you like to use a TimeStat so we can measure length of time spent blocking (if any?)

Copy link
Copy Markdown
Contributor Author

@ericyuliu ericyuliu May 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not very certain about TimeStat usage pattern, but just updated the PR based on GlueMetastoreStats example, will do more research in testing phase.

rateLimiterTriggeredCounter.update(1);
return asyncTask;
}

@Managed
@Nested
public CounterStat getRateLimiterTriggeredCounter()
{
return rateLimiterTriggeredCounter;
}

public TimeStat getRateLimiterBlockTime()
{
return rateLimiterBlockTime;
}

public void addRateLimiterBlockTime(Duration duration)
{
rateLimiterBlockTime.add(duration);
}

@PreDestroy
public void destroy()
{
rateLimiterExecutorService.shutdownNow();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.facebook.presto.server.protocol;

import com.facebook.airlift.log.Logger;
import com.facebook.airlift.stats.TimeStat;
import com.facebook.presto.client.QueryError;
import com.facebook.presto.client.QueryResults;
import com.facebook.presto.client.StatementStats;
Expand All @@ -37,6 +38,8 @@
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import org.weakref.jmx.Managed;
import org.weakref.jmx.Nested;

import javax.annotation.PreDestroy;
import javax.annotation.concurrent.GuardedBy;
Expand Down Expand Up @@ -123,6 +126,9 @@ public class QueuedStatementResource
private final TracerProvider tracerProvider;
private final SessionPropertyManager sessionPropertyManager; // We may need some system default session property values at early query stage even before session is created.

private final QueryBlockingRateLimiter queryRateLimiter;
private final TimeStat queuedRateLimiterBlockTime = new TimeStat();

@Inject
public QueuedStatementResource(
DispatchManager dispatchManager,
Expand All @@ -131,7 +137,8 @@ public QueuedStatementResource(
SqlParserOptions sqlParserOptions,
ServerConfig serverConfig,
TracerProvider tracerProvider,
SessionPropertyManager sessionPropertyManager)
SessionPropertyManager sessionPropertyManager,
QueryBlockingRateLimiter queryRateLimiter)
{
this.dispatchManager = requireNonNull(dispatchManager, "dispatchManager is null");
this.queryResultsProvider = queryResultsProvider;
Expand All @@ -143,6 +150,8 @@ public QueuedStatementResource(
this.tracerProvider = requireNonNull(tracerProvider, "tracerProvider is null");
this.sessionPropertyManager = sessionPropertyManager;

this.queryRateLimiter = requireNonNull(queryRateLimiter, "queryRateLimiter is null");

queryPurger.scheduleWithFixedDelay(
() -> {
try {
Expand All @@ -159,6 +168,13 @@ public QueuedStatementResource(
MILLISECONDS);
}

@Managed
@Nested
public TimeStat getRateLimiterBlockTime()
{
return queryRateLimiter.getRateLimiterBlockTime();
}

@PreDestroy
public void stop()
{
Expand Down Expand Up @@ -242,14 +258,20 @@ public void getStatus(
@Suspended AsyncResponse asyncResponse)
{
Query query = getQuery(queryId, slug);

ListenableFuture<Double> acquirePermitAsync = queryRateLimiter.acquire(queryId);
ListenableFuture<?> waitForDispatchedAsync = transformAsync(
acquirePermitAsync,
acquirePermitTimeSeconds -> {
queryRateLimiter.addRateLimiterBlockTime(new Duration(acquirePermitTimeSeconds, SECONDS));
return query.waitForDispatched();
},
responseExecutor);
// wait for query to be dispatched, up to the wait timeout
ListenableFuture<?> futureStateChange = addTimeout(
query.waitForDispatched(),
waitForDispatchedAsync,
() -> null,
WAIT_ORDERING.min(MAX_WAIT_TIME, maxWait),
timeoutExecutor);

// when state changes, fetch the next result
ListenableFuture<Response> queryResultsFuture = transformAsync(
futureStateChange,
Expand Down Expand Up @@ -437,7 +459,6 @@ private ListenableFuture<?> waitForDispatched()
return querySubmissionFuture;
}
}

// otherwise, wait for the query to finish
return dispatchManager.waitForDispatched(queryId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,10 @@ public void testDefaults()
.setPerQueryRetryLimit(0)
.setPerQueryRetryMaxExecutionTime(new Duration(5, MINUTES))
.setGlobalQueryRetryFailureLimit(150)
.setGlobalQueryRetryFailureWindow(new Duration(5, MINUTES)));
.setGlobalQueryRetryFailureWindow(new Duration(5, MINUTES))
.setRateLimiterBucketMaxSize(100)
.setRateLimiterCacheLimit(1000)
.setRateLimiterCacheWindowMinutes(5));
}

@Test
Expand Down Expand Up @@ -120,6 +123,9 @@ public void testExplicitPropertyMappings()
.put("per-query-retry-max-execution-time", "1h")
.put("global-query-retry-failure-limit", "200")
.put("global-query-retry-failure-window", "1h")
.put("query-manager.rate-limiter-bucket-max-size", "200")
.put("query-manager.rate-limiter-cache-limit", "10000")
.put("query-manager.rate-limiter-cache-window-minutes", "60")
.build();

QueryManagerConfig expected = new QueryManagerConfig()
Expand Down Expand Up @@ -161,8 +167,10 @@ public void testExplicitPropertyMappings()
.setPerQueryRetryLimit(10)
.setPerQueryRetryMaxExecutionTime(new Duration(1, HOURS))
.setGlobalQueryRetryFailureLimit(200)
.setGlobalQueryRetryFailureWindow(new Duration(1, HOURS));

.setGlobalQueryRetryFailureWindow(new Duration(1, HOURS))
.setRateLimiterBucketMaxSize(200)
.setRateLimiterCacheLimit(10000)
.setRateLimiterCacheWindowMinutes(60);
ConfigAssertions.assertFullMapping(properties, expected);
}
}
Loading