Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/bypass filter #194

Merged
merged 8 commits into from
Mar 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions concurrency-limits-core/dependencies.lock
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
},
"testRuntimeClasspath": {
"org.junit.jupiter:junit-jupiter-engine": {
"locked": "5.9.0"
"locked": "5.10.2"
},
"org.junit.vintage:junit-vintage-engine": {
"locked": "5.9.0"
"locked": "5.10.2"
},
"org.slf4j:slf4j-api": {
"locked": "1.7.36"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,41 @@

import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.function.Supplier;

public abstract class AbstractLimiter<ContextT> implements Limiter<ContextT> {
public static final String ID_TAG = "id";
public static final String STATUS_TAG = "status";

/**
* Constructs a new builder with a list of bypass resolvers.
* If the predicate condition in any of the resolver is satisfied,
* the call is bypassed without increasing the limiter inflight count
* and affecting the algorithm.
*/
public abstract static class BypassLimiterBuilder<BuilderT extends BypassLimiterBuilder<BuilderT, ContextT>, ContextT> extends Builder<BuilderT> {

private final Predicate<ContextT> ALWAYS_FALSE = (context) -> false;
private Predicate<ContextT> bypassResolver = ALWAYS_FALSE;

/**
* Add a chainable bypass resolver predicate from context. Multiple resolvers may be added and if any of the
* predicate condition returns true the call is bypassed without increasing the limiter inflight count and
* affecting the algorithm. Will not bypass any calls by default if no resolvers are added.
* @param shouldBypass Predicate condition to bypass limit
* @return Chainable builder
*/
public BuilderT bypassLimitResolver(Predicate<ContextT> shouldBypass) {
if (this.bypassResolver == ALWAYS_FALSE) {
this.bypassResolver = shouldBypass;
} else {
this.bypassResolver = bypassResolver.or(shouldBypass);
}
return self();
}
}

public abstract static class Builder<BuilderT extends Builder<BuilderT>> {
private static final AtomicInteger idCounter = new AtomicInteger();

Expand Down Expand Up @@ -69,6 +98,8 @@ public BuilderT metricRegistry(MetricRegistry registry) {
private final MetricRegistry.Counter droppedCounter;
private final MetricRegistry.Counter ignoredCounter;
private final MetricRegistry.Counter rejectedCounter;
private final MetricRegistry.Counter bypassCounter;
private Predicate<ContextT> bypassResolver = (context) -> false;

private volatile int limit;

Expand All @@ -77,19 +108,47 @@ protected AbstractLimiter(Builder<?> builder) {
this.limitAlgorithm = builder.limit;
this.limit = limitAlgorithm.getLimit();
this.limitAlgorithm.notifyOnChange(this::onNewLimit);

if (builder instanceof BypassLimiterBuilder) {
this.bypassResolver = ((BypassLimiterBuilder) builder).bypassResolver;
}
builder.registry.gauge(MetricIds.LIMIT_NAME, this::getLimit);
this.successCounter = builder.registry.counter(MetricIds.CALL_NAME, ID_TAG, builder.name, STATUS_TAG, "success");
this.droppedCounter = builder.registry.counter(MetricIds.CALL_NAME, ID_TAG, builder.name, STATUS_TAG, "dropped");
this.ignoredCounter = builder.registry.counter(MetricIds.CALL_NAME, ID_TAG, builder.name, STATUS_TAG, "ignored");
this.rejectedCounter = builder.registry.counter(MetricIds.CALL_NAME, ID_TAG, builder.name, STATUS_TAG, "rejected");
this.bypassCounter = builder.registry.counter(MetricIds.CALL_NAME, ID_TAG, builder.name, STATUS_TAG, "bypassed");
}

protected boolean shouldBypass(ContextT context){
return bypassResolver.test(context);
}

protected Optional<Listener> createRejectedListener() {
this.rejectedCounter.increment();
return Optional.empty();
}

protected Optional<Listener> createBypassListener() {
this.bypassCounter.increment();
return Optional.of(new Listener() {

@Override
public void onSuccess() {
// Do nothing
}

@Override
public void onIgnore() {
// Do nothing
}

@Override
public void onDropped() {
// Do nothing
}
});
}

protected Listener createListener() {
final long startTime = clock.get();
final int currentInflight = inFlight.incrementAndGet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public abstract class AbstractPartitionedLimiter<ContextT> extends AbstractLimit
private static final Logger LOG = LoggerFactory.getLogger(AbstractPartitionedLimiter.class);
private static final String PARTITION_TAG_NAME = "partition";

public abstract static class Builder<BuilderT extends AbstractLimiter.Builder<BuilderT>, ContextT> extends AbstractLimiter.Builder<BuilderT> {
public abstract static class Builder<BuilderT extends AbstractLimiter.BypassLimiterBuilder<BuilderT, ContextT>, ContextT> extends AbstractLimiter.BypassLimiterBuilder<BuilderT, ContextT> {
private List<Function<ContextT, String>> partitionResolvers = new ArrayList<>();
private final Map<String, Partition> partitions = new LinkedHashMap<>();
private int maxDelayedThreads = 100;
Expand Down Expand Up @@ -215,6 +215,9 @@ public Optional<Listener> acquire(ContextT context) {

try {
lock.lock();
if (shouldBypass(context)){
return createBypassListener();
}
if (getInflight() >= getLimit() && partition.isLimitExceeded()) {
lock.unlock();
if (partition.backoffMillis > 0 && delayedThreads.get() < maxDelayedThreads) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,18 @@
import java.util.concurrent.Semaphore;

public class SimpleLimiter<ContextT> extends AbstractLimiter<ContextT> {

public static class BypassLimiterBuilder<ContextT> extends AbstractLimiter.BypassLimiterBuilder<BypassLimiterBuilder<ContextT>, ContextT> {
public SimpleLimiter<ContextT> build() {
return new SimpleLimiter<>(this);
}

@Override
protected BypassLimiterBuilder<ContextT> self() {
return this;
}
}

public static class Builder extends AbstractLimiter.Builder<Builder> {
public <ContextT> SimpleLimiter<ContextT> build() {
return new SimpleLimiter<>(this);
Expand All @@ -34,6 +46,10 @@ protected Builder self() {
}
}

public static <ContextT> BypassLimiterBuilder<ContextT> newBypassLimiterBuilder() {
return new BypassLimiterBuilder<>();
}

public static Builder newBuilder() {
return new Builder();
}
Expand All @@ -42,21 +58,22 @@ public static Builder newBuilder() {

public SimpleLimiter(AbstractLimiter.Builder<?> builder) {
super(builder);

this.inflightDistribution = builder.registry.distribution(MetricIds.INFLIGHT_NAME);
this.semaphore = new AdjustableSemaphore(getLimit());
}

@Override
public Optional<Limiter.Listener> acquire(ContextT context) {
Optional<Limiter.Listener> listener;
if (!semaphore.tryAcquire()) {
if (shouldBypass(context)) {
listener = createBypassListener();
}
else if (!semaphore.tryAcquire()) {
listener = createRejectedListener();
}
else {
listener = Optional.of(new Listener(createListener()));
}

inflightDistribution.addSample(getInflight());
return listener;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import java.util.Optional;
import java.util.function.Function;
import java.util.function.Predicate;

public class AbstractPartitionedLimiterTest {
public static class TestPartitionedLimiter extends AbstractPartitionedLimiter<String> {
Expand All @@ -27,6 +28,13 @@ public TestPartitionedLimiter(Builder builder) {
}
}

public static class ShouldBypassPredicate implements Predicate<String> {
@Override
public boolean test(String s) {
return s.contains("admin");
}
}

@Test
public void limitAllocatedToBins() {
AbstractPartitionedLimiter<String> limiter = (AbstractPartitionedLimiter<String>) TestPartitionedLimiter.newBuilder()
Expand Down Expand Up @@ -156,4 +164,67 @@ public void setLimitReservesBusy() {
Assert.assertEquals(1, limiter.getPartition("batch").getInflight());
Assert.assertEquals(1, limiter.getInflight());
}

@Test
public void testBypassPartitionedLimiter() {

AbstractPartitionedLimiter<String> limiter = (AbstractPartitionedLimiter<String>) TestPartitionedLimiter.newBuilder()
.partitionResolver(Function.identity())
.partition("batch", 0.1)
.partition("live", 0.9)
.limit(FixedLimit.of(10))
.bypassLimitResolver(new ShouldBypassPredicate())
.build();

Assert.assertTrue(limiter.acquire("batch").isPresent());
Assert.assertEquals(1, limiter.getPartition("batch").getInflight());
Assert.assertTrue(limiter.acquire("admin").isPresent());

for (int i = 0; i < 9; i++) {
Assert.assertTrue(limiter.acquire("live").isPresent());
Assert.assertEquals(i+1, limiter.getPartition("live").getInflight());
Assert.assertTrue(limiter.acquire("admin").isPresent());
}

// Verify that bypassed requests are able to proceed even when the limiter is full
Assert.assertFalse(limiter.acquire("batch").isPresent());
Assert.assertEquals(1, limiter.getPartition("batch").getInflight());
Assert.assertFalse(limiter.acquire("live").isPresent());
Assert.assertEquals(9, limiter.getPartition("live").getInflight());
Assert.assertEquals(10, limiter.getInflight());
Assert.assertTrue(limiter.acquire("admin").isPresent());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be very clear about the behavior, let's add

Suggested change
}
// Verify that bypassed requests are able to proceed even when the limiter is full
Assert.assertFalse(limiter.acquire("live").isPresent());
Assert.assertEquals(10, limiter.getInflight());
Assert.assertTrue(limiter.acquire("admin").isPresent());
Assert.assertEquals(10, limiter.getInflight());
}

I understand that we've effectively covered that above, but this will be easier to understand for someone new to the codebase. It also helps defend against a setup issue (eg: if the limit is increased).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added extra check. This behavior is also validated for simple limiters in another test.


@Test
public void testBypassSimpleLimiter() {

SimpleLimiter<String> limiter = (SimpleLimiter<String>) TestPartitionedLimiter.newBuilder()
.limit(FixedLimit.of(10))
.bypassLimitResolver(new ShouldBypassPredicate())
.build();

int inflightCount = 0;
for (int i = 0; i < 5; i++) {
Assert.assertTrue(limiter.acquire("request").isPresent());
Assert.assertEquals(i+1, limiter.getInflight());
inflightCount++;
}

for (int i = 0; i < 15; i++) {
Assert.assertTrue(limiter.acquire("admin").isPresent());
Assert.assertEquals(inflightCount, limiter.getInflight());
}

for (int i = 0; i < 5; i++) {
Assert.assertTrue(limiter.acquire("request").isPresent());
Assert.assertEquals(inflightCount+i+1, limiter.getInflight());
}

// Calls with passing bypass condition will return a token
// whereas remaining calls will be throttled since inflight count is greater than the limit
for (int i = 0; i < 10; i++) {
Assert.assertFalse(limiter.acquire("request").isPresent());
Assert.assertTrue(limiter.acquire("admin").isPresent());
}
}
umairk79 marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package com.netflix.concurrency.limits.limiter;

import com.netflix.concurrency.limits.Limiter;
import com.netflix.concurrency.limits.limit.FixedLimit;
import org.junit.Assert;
import org.junit.Test;

import java.util.Optional;

public class SimpleLimiterTest {

@Test
public void useLimiterCapacityUntilTotalLimit() {
SimpleLimiter<String> limiter = SimpleLimiter.newBuilder()
.limit(FixedLimit.of(10))
.build();

for (int i = 0; i < 10; i++) {
Assert.assertTrue(limiter.acquire("live").isPresent());
}

// Rejected call after total limit is utilized
Assert.assertFalse(limiter.acquire("live").isPresent());
Assert.assertEquals(10, limiter.getInflight());
}

@Test
public void testReleaseLimit() {
SimpleLimiter<String> limiter = SimpleLimiter.newBuilder()
.limit(FixedLimit.of(10))
.build();

Optional<Limiter.Listener> completion = limiter.acquire("live");
for (int i = 1; i < 10; i++) {
Assert.assertTrue(limiter.acquire("live").isPresent());
}

Assert.assertEquals(10, limiter.getInflight());
Assert.assertFalse(limiter.acquire("live").isPresent());

// Release token
completion.get().onSuccess();
Assert.assertEquals(9, limiter.getInflight());

Assert.assertTrue(limiter.acquire("live").isPresent());
Assert.assertEquals(10, limiter.getInflight());
}

@Test
public void testSimpleBypassLimiter() {
SimpleLimiter<String> limiter = SimpleLimiter.<String>newBypassLimiterBuilder()
.limit(FixedLimit.of(10))
.bypassLimitResolver((context) -> context.equals("admin"))
.build();

for (int i = 0; i < 10; i++) {
Assert.assertTrue(limiter.acquire("live").isPresent());
Assert.assertEquals(i+1, limiter.getInflight());
}

// Verify calls with passing bypass condition will return a token
// whereas remaining calls will be throttled since inflight count is greater than the limit
for (int i = 0; i < 10; i++) {
Assert.assertFalse(limiter.acquire("live").isPresent());
Assert.assertTrue(limiter.acquire("admin").isPresent());
}
}

@Test
public void testSimpleBypassLimiterDefault() {
SimpleLimiter<String> limiter = SimpleLimiter.<String>newBypassLimiterBuilder()
.limit(FixedLimit.of(10))
.build();

for (int i = 0; i < 10; i++) {
Assert.assertTrue(limiter.acquire("live").isPresent());
Assert.assertEquals(i+1, limiter.getInflight());
}

// Verify that no calls are bypassed by default
Assert.assertFalse(limiter.acquire("live").isPresent());
Assert.assertFalse(limiter.acquire("admin").isPresent());
}

}
Loading
Loading