diff --git a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/BlockingLimiter.java b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/BlockingLimiter.java index a079cc42..1ec11055 100644 --- a/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/BlockingLimiter.java +++ b/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/limiter/BlockingLimiter.java @@ -16,6 +16,7 @@ package com.netflix.concurrency.limits.limiter; import com.netflix.concurrency.limits.Limiter; +import com.netflix.concurrency.limits.internal.Preconditions; import java.time.Duration; import java.time.Instant; @@ -29,38 +30,54 @@ * @param */ public final class BlockingLimiter implements Limiter { + public static final Duration MAX_TIMEOUT = Duration.ofHours(1); + + /** + * Wrap a limiter such that acquire will block up to {@link BlockingLimiter#MAX_TIMEOUT} if the limit was reached + * instead of return an empty listener immediately + * @param delegate Non-blocking limiter to wrap + * @return Wrapped limiter + */ public static BlockingLimiter wrap(Limiter delegate) { - return new BlockingLimiter<>(delegate, Optional.empty()); + return new BlockingLimiter<>(delegate, MAX_TIMEOUT); } + /** + * Wrap a limiter such that acquire will block up to a provided timeout if the limit was reached + * instead of return an empty listener immediately + * + * @param delegate Non-blocking limiter to wrap + * @param timeout Max amount of time to wait for the wait for the limit to be released. Cannot exceed {@link BlockingLimiter#MAX_TIMEOUT} + * @return Wrapped limiter + */ public static BlockingLimiter wrap(Limiter delegate, Duration timeout) { - return new BlockingLimiter<>(delegate, Optional.of(timeout)); + Preconditions.checkArgument(timeout.compareTo(MAX_TIMEOUT) < 0, "Timeout cannot be greater than " + MAX_TIMEOUT); + return new BlockingLimiter<>(delegate, timeout); } private final Limiter delegate; - private final Optional timeout; + private final Duration timeout; /** * Lock used to block and unblock callers as the limit is reached */ private final Object lock = new Object(); - private BlockingLimiter(Limiter limiter, Optional timeout) { + private BlockingLimiter(Limiter limiter, Duration timeout) { this.delegate = limiter; this.timeout = timeout; } private Optional tryAcquire(ContextT context) { - Instant deadline = timeout.map(t -> Instant.now().plus(t)).orElse(Instant.MAX); + final Instant deadline = Instant.now().plus(timeout); synchronized (lock) { while (true) { - Instant now = Instant.now(); + final Instant now = Instant.now(); if (!now.isBefore(deadline)) { return Optional.empty(); } // Try to acquire a token and return immediately if successful - Optional listener; - listener = delegate.acquire(context); + final Optional listener = delegate.acquire(context); if (listener.isPresent()) { return listener; } diff --git a/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limiter/BlockingLimiterTest.java b/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limiter/BlockingLimiterTest.java index 8de86157..254952d7 100644 --- a/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limiter/BlockingLimiterTest.java +++ b/concurrency-limits-core/src/test/java/com/netflix/concurrency/limits/limiter/BlockingLimiterTest.java @@ -2,12 +2,14 @@ import com.netflix.concurrency.limits.Limiter; import com.netflix.concurrency.limits.limit.SettableLimit; +import org.junit.Assert; import org.junit.Test; import java.time.Duration; import java.time.Instant; import java.util.LinkedList; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -17,7 +19,6 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; public class BlockingLimiterTest { @@ -62,11 +63,32 @@ public void testTimeout() { Duration timeout = Duration.ofMillis(50); SettableLimit limit = SettableLimit.startingAt(1); BlockingLimiter limiter = BlockingLimiter.wrap(SimpleLimiter.newBuilder().limit(limit).build(), timeout); + + // Acquire first, will succeeed an not block limiter.acquire(null); + + // Second acquire should time out after at least 50 millis Instant before = Instant.now(); - assertEquals(Optional.empty(), limiter.acquire(null)); + Assert.assertFalse(limiter.acquire(null).isPresent()); Instant after = Instant.now(); - Duration interval = Duration.between(before, after); - assertTrue(interval.compareTo(timeout) >= 0); + + Duration delay = Duration.between(before, after); + assertTrue("Delay was " + delay.toMillis() + " millis", delay.compareTo(timeout) >= 0); + } + + @Test(expected=TimeoutException.class) + public void testNoTimeout() throws InterruptedException, ExecutionException, TimeoutException { + SettableLimit limit = SettableLimit.startingAt(1); + BlockingLimiter limiter = BlockingLimiter.wrap(SimpleLimiter.newBuilder().limit(limit).build()); + limiter.acquire(null); + + CompletableFuture> future = CompletableFuture.supplyAsync(() -> limiter.acquire(null)); + future.get(1, TimeUnit.SECONDS); + } + + @Test(expected = IllegalArgumentException.class) + public void failOnHighTimeout() { + SettableLimit limit = SettableLimit.startingAt(1); + BlockingLimiter limiter = BlockingLimiter.wrap(SimpleLimiter.newBuilder().limit(limit).build(), Duration.ofDays(1)); } }