Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ request adding CHANGELOG notes for breaking (!) changes and possibly other secti

### Upgrade notes

- The custom token-bucket based rate limiter has been replaced with Guava's rate limiter implementation.

### Breaking changes

- The (Before/After)CommitTableEvent has been removed.
Expand Down Expand Up @@ -60,6 +62,8 @@ request adding CHANGELOG notes for breaking (!) changes and possibly other secti

### Deprecations

- The configuration option `polaris.rate-limiter.token-bucket.window` is no longer supported and should be removed.

### Fixes

### Commits
Expand Down
1 change: 0 additions & 1 deletion helm/polaris/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,6 @@ ct install --namespace polaris --charts ./helm/polaris
| rateLimiter.tokenBucket | object | `{"requestsPerSecond":9999,"type":"default","window":"PT10S"}` | The configuration for the default rate limiter, which uses the token bucket algorithm with one bucket per realm. |
| rateLimiter.tokenBucket.requestsPerSecond | int | `9999` | The maximum number of requests per second allowed for each realm. |
| rateLimiter.tokenBucket.type | string | `"default"` | The type of the token bucket rate limiter. Only the default type is supported out of the box. |
| rateLimiter.tokenBucket.window | string | `"PT10S"` | The time window. |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As this is not a backward compatible change, should we update CHANGELOG.md as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added notes to the CHANGELOG

| rateLimiter.type | string | `"no-op"` | The type of rate limiter filter to use. Two built-in types are supported: default and no-op. |
| readinessProbe | object | `{"failureThreshold":3,"initialDelaySeconds":5,"periodSeconds":10,"successThreshold":1,"timeoutSeconds":10}` | Configures the readiness probe for polaris pods. |
| readinessProbe.failureThreshold | int | `3` | Minimum consecutive failures for the probe to be considered failed after having succeeded. Minimum value is 1. |
Expand Down
1 change: 0 additions & 1 deletion helm/polaris/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ data:
{{- if ne .Values.rateLimiter.type "no-op" -}}
{{- $_ = set $map "polaris.rate-limiter.token-bucket.type" .Values.rateLimiter.tokenBucket.type -}}
{{- $_ = set $map "polaris.rate-limiter.token-bucket.requests-per-second" .Values.rateLimiter.tokenBucket.requestsPerSecond -}}
{{- $_ = set $map "polaris.rate-limiter.token-bucket.window" .Values.rateLimiter.tokenBucket.window -}}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will also need to update https://github.com/apache/polaris/blob/main/helm/polaris/tests/configmap_test.yaml:

./helm/polaris/tests/configmap_test.yaml:      - matchRegex: { path: 'data["application.properties"]', pattern: "polaris.rate-limiter.token-bucket.window=PT10S" }
./helm/polaris/tests/configmap_test.yaml:      - matchRegex: { path: 'data["application.properties"]', pattern: "polaris.rate-limiter.token-bucket.window=PT5S" }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those are already removed (see below)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I missed to remove the window setting right above in configmap_test tho

{{- end -}}

{{- /* Tasks */ -}}
Expand Down
3 changes: 0 additions & 3 deletions helm/polaris/tests/configmap_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,6 @@ tests:
- matchRegex: { path: 'data["application.properties"]', pattern: "polaris.rate-limiter.filter.type=default" }
- matchRegex: { path: 'data["application.properties"]', pattern: "polaris.rate-limiter.token-bucket.type=default" }
- matchRegex: { path: 'data["application.properties"]', pattern: "polaris.rate-limiter.token-bucket.requests-per-second=9999" }
- matchRegex: { path: 'data["application.properties"]', pattern: "polaris.rate-limiter.token-bucket.window=PT10S" }

- it: should configure rate-limiter with custom token bucket values
set:
Expand All @@ -397,12 +396,10 @@ tests:
tokenBucket:
type: custom
requestsPerSecond: 1234
window: PT5S
asserts:
- matchRegex: { path: 'data["application.properties"]', pattern: "polaris.rate-limiter.filter.type=custom" }
- matchRegex: { path: 'data["application.properties"]', pattern: "polaris.rate-limiter.token-bucket.type=custom" }
- matchRegex: { path: 'data["application.properties"]', pattern: "polaris.rate-limiter.token-bucket.requests-per-second=1234" }
- matchRegex: { path: 'data["application.properties"]', pattern: "polaris.rate-limiter.token-bucket.window=PT5S" }

- it: should not include tasks configuration by default
asserts:
Expand Down
4 changes: 1 addition & 3 deletions helm/polaris/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -747,10 +747,8 @@ rateLimiter:
tokenBucket:
# -- The type of the token bucket rate limiter. Only the default type is supported out of the box.
type: default
# -- The maximum number of requests per second allowed for each realm.
# -- The maximum number of requests (permits) per second allowed for each realm.
requestsPerSecond: 9999
# -- The time window.
window: PT10S

# -- Polaris asynchronous task executor configuration.
tasks:
Expand Down
1 change: 0 additions & 1 deletion runtime/defaults/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,6 @@ polaris.metrics.tags.application=Polaris
polaris.rate-limiter.filter.type=no-op
polaris.rate-limiter.token-bucket.type=default
polaris.rate-limiter.token-bucket.requests-per-second=9999
polaris.rate-limiter.token-bucket.window=PT10S

# Polaris authentication settings
polaris.authentication.type=internal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import io.smallrye.common.annotation.Identifier;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.time.Clock;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.polaris.core.context.RealmContext;
Expand All @@ -32,30 +30,20 @@
public class DefaultTokenBucketFactory implements TokenBucketFactory {

private final long requestsPerSecond;
private final Duration window;
private final Clock clock;
private final Map<String, TokenBucket> perRealmBuckets = new ConcurrentHashMap<>();

@Inject
public DefaultTokenBucketFactory(TokenBucketConfiguration configuration, Clock clock) {
this(configuration.requestsPerSecond(), configuration.window(), clock);
public DefaultTokenBucketFactory(TokenBucketConfiguration configuration) {
this(configuration.requestsPerSecond());
}

public DefaultTokenBucketFactory(long requestsPerSecond, Duration window, Clock clock) {
public DefaultTokenBucketFactory(long requestsPerSecond) {
this.requestsPerSecond = requestsPerSecond;
this.window = window;
this.clock = clock;
}

@Override
public TokenBucket getOrCreateTokenBucket(RealmContext realmContext) {
String realmId = realmContext.getRealmIdentifier();
return perRealmBuckets.computeIfAbsent(
realmId,
k ->
new TokenBucket(
requestsPerSecond,
Math.multiplyExact(requestsPerSecond, window.toSeconds()),
clock));
return perRealmBuckets.computeIfAbsent(realmId, k -> new TokenBucket(requestsPerSecond));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,18 @@
*/
package org.apache.polaris.service.ratelimiter;

import java.time.InstantSource;
import com.google.common.util.concurrent.RateLimiter;

/**
* General-purpose Token bucket implementation. Acquires tokens at a fixed rate and has a maximum
* amount of tokens. Each successful "tryAcquire" costs 1 token.
* General-purpose Token bucket implementation around Guava's {@link RateLimiter}. Acquires tokens
* at a fixed rate and has a maximum amount of tokens. Each successful "tryAcquire" costs 1 token.
*/
@SuppressWarnings("UnstableApiUsage")
public class TokenBucket {
private final double tokensPerMilli;
private final long maxTokens;
private final InstantSource instantSource;
private final RateLimiter rateLimiter;

private long tokens;
private long lastTokenGenerationMillis;

public TokenBucket(long tokensPerSecond, long maxTokens, InstantSource instantSource) {
this.tokensPerMilli = tokensPerSecond / 1000D;
this.maxTokens = maxTokens;
this.instantSource = instantSource;

tokens = maxTokens;
lastTokenGenerationMillis = instantSource.millis();
public TokenBucket(double permitsPerSecond) {
this.rateLimiter = RateLimiter.create(permitsPerSecond);
}

/**
Expand All @@ -47,17 +38,6 @@ public TokenBucket(long tokensPerSecond, long maxTokens, InstantSource instantSo
* @return whether a token was successfully acquired and spent
*/
public synchronized boolean tryAcquire() {
// Grant tokens for the time that has passed since our last tryAcquire()
long t = instantSource.millis();
long millisPassed = Math.subtractExact(t, lastTokenGenerationMillis);
lastTokenGenerationMillis = t;
tokens = Math.min(maxTokens, tokens + ((long) (millisPassed * tokensPerMilli)));

// Take a token if they have one available
if (tokens >= 1) {
tokens--;
return true;
}
return false;
return rateLimiter.tryAcquire();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,19 @@

import io.smallrye.config.ConfigMapping;
import java.time.Duration;
import java.util.Optional;

@ConfigMapping(prefix = "polaris.rate-limiter.token-bucket")
public interface TokenBucketConfiguration {

/**
* Number of allowed requests per second per realm. The value <em>must</em> be greater than zero.
*/
long requestsPerSecond();

Duration window();
Copy link
Contributor

@dimas-b dimas-b Jan 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be a breaking change if older deployments defined a config value for it in application.properties (which is introspected and validated by Quarkus, IIRC). A missing "receiver" for those configs could be a runtime error (and config is under strict guarantees in our standing evolution guidelines).

Could we keep a deprecated (in code and in CHANGELOG) receiver just to improve backward compatibility?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re-added the window option as "deprecated for removal" and added a note to the CHANGELOG.

/** This setting is no longer used and will be removed in a future release. */
@Deprecated(since = "1.3.0", forRemoval = true)
Optional<Duration> window();

/**
* The type of the token bucket factory. Must be a registered {@link
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.polaris.service.ratelimiter;

import io.smallrye.common.annotation.Identifier;
import jakarta.enterprise.context.RequestScoped;

@Identifier("mock")
@RequestScoped
public class MockRateLimiter implements RateLimiter {
public static volatile boolean allowProceed = false;

@Override
public boolean canProceed() {
return allowProceed;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,17 @@
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Alternative;
import jakarta.inject.Inject;
import java.time.Instant;
import java.time.ZoneOffset;
import org.threeten.extra.MutableClock;

/** TokenBucketFactory with a mock clock */
@Alternative
@ApplicationScoped
public class MockTokenBucketFactory extends DefaultTokenBucketFactory {
public static MutableClock CLOCK = MutableClock.of(Instant.now(), ZoneOffset.UTC);

public MockTokenBucketFactory() {
super(0, null, CLOCK);
super(5);
}

@Inject
public MockTokenBucketFactory(TokenBucketConfiguration configuration) {
super(configuration, CLOCK);
super(configuration);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import io.quarkus.test.junit.TestProfile;
import jakarta.inject.Inject;
import jakarta.ws.rs.core.Response.Status;
import java.time.Duration;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
Expand All @@ -46,7 +45,6 @@
import org.hawkular.agent.prometheus.types.MetricFamily;
import org.hawkular.agent.prometheus.types.Summary;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -72,12 +70,8 @@ public Set<Class<?>> getEnabledAlternatives() {
@Override
public Map<String, String> getConfigOverrides() {
return ImmutableMap.<String, String>builder()
.put("polaris.rate-limiter.filter.type", "default")
.put("polaris.rate-limiter.filter.type", "mock")
.put("polaris.rate-limiter.token-bucket.type", "default")
.put(
"polaris.rate-limiter.token-bucket.requests-per-second",
String.valueOf(REQUESTS_PER_SECOND))
.put("polaris.rate-limiter.token-bucket.window", WINDOW.toString())
.put("polaris.metrics.tags.environment", "prod")
.put("polaris.metrics.realm-id-tag.enable-in-api-metrics", "true")
.put("polaris.metrics.realm-id-tag.enable-in-http-metrics", "true")
Expand All @@ -89,9 +83,6 @@ public Map<String, String> getConfigOverrides() {
}
}

private static final long REQUESTS_PER_SECOND = 5;
private static final Duration WINDOW = Duration.ofSeconds(10);

@Inject PolarisIntegrationTestHelper helper;
@Inject MeterRegistry meterRegistry;
@Inject PolarisEventListener polarisEventListener;
Expand All @@ -101,6 +92,7 @@ public Map<String, String> getConfigOverrides() {

@BeforeAll
public void createFixture(TestEnvironment testEnv, TestInfo testInfo) {
MockRateLimiter.allowProceed = true;
this.testEnv = testEnv;
fixture = helper.createFixture(testEnv, testInfo);
}
Expand All @@ -112,15 +104,9 @@ public void destroyFixture() {
}
}

@BeforeEach
@AfterEach
public void resetRateLimiter() {
MockTokenBucketFactory.CLOCK.add(
WINDOW.multipliedBy(2)); // Clear any counters from before/after this test
}

@BeforeEach
public void resetMeterRegistry() {
MockRateLimiter.allowProceed = true;
meterRegistry.clear();
}

Expand All @@ -129,12 +115,15 @@ public void testRateLimiter() {
Consumer<Status> requestAsserter =
TestUtil.constructRequestAsserter(testEnv, fixture, fixture.realm);

for (int i = 0; i < REQUESTS_PER_SECOND * WINDOW.toSeconds(); i++) {
for (int i = 0; i < 3; i++) {
MockRateLimiter.allowProceed = true;
requestAsserter.accept(Status.OK);
MockRateLimiter.allowProceed = false;
requestAsserter.accept(Status.TOO_MANY_REQUESTS);
}
requestAsserter.accept(Status.TOO_MANY_REQUESTS);

// Ensure that a different realm identifier gets a separate limit
MockRateLimiter.allowProceed = true;
Consumer<Status> requestAsserter2 =
TestUtil.constructRequestAsserter(testEnv, fixture, fixture.realm + "2");
requestAsserter2.accept(Status.OK);
Expand All @@ -145,10 +134,12 @@ public void testMetricsAreEmittedWhenRateLimiting() {
Consumer<Status> requestAsserter =
TestUtil.constructRequestAsserter(testEnv, fixture, fixture.realm);

for (int i = 0; i < REQUESTS_PER_SECOND * WINDOW.toSeconds(); i++) {
for (int i = 0; i < 3; i++) {
MockRateLimiter.allowProceed = true;
requestAsserter.accept(Status.OK);
MockRateLimiter.allowProceed = false;
requestAsserter.accept(Status.TOO_MANY_REQUESTS);
}
requestAsserter.accept(Status.TOO_MANY_REQUESTS);

PolarisEvent event =
((TestPolarisEventListener) polarisEventListener)
Expand Down Expand Up @@ -182,7 +173,7 @@ public void testMetricsAreEmittedWhenRateLimiting() {
assertThat(metric)
.asInstanceOf(type(Summary.class))
.extracting(Summary::getSampleCount)
.isEqualTo(1L);
.isEqualTo(3L);
});

assertThat(metrics.get("polaris_principal_roles_listPrincipalRoles_seconds").getMetrics())
Expand All @@ -200,7 +191,7 @@ public void testMetricsAreEmittedWhenRateLimiting() {
assertThat(metric)
.asInstanceOf(type(Summary.class))
.extracting(Summary::getSampleCount)
.isEqualTo(REQUESTS_PER_SECOND * WINDOW.toSeconds());
.isEqualTo(3L);
});
}
}
Loading