diff --git a/concurrency-limits-grpc/build.gradle b/concurrency-limits-grpc/build.gradle index 0938fe09..5d259967 100644 --- a/concurrency-limits-grpc/build.gradle +++ b/concurrency-limits-grpc/build.gradle @@ -10,7 +10,8 @@ dependencies { compile "io.grpc:grpc-core:1.9.0" testCompile project(":concurrency-limits-spectator") - + + testCompile "org.mockito:mockito-core:1.+" testCompile "io.grpc:grpc-netty:1.9.0" testCompile "io.grpc:grpc-stub:1.9.0" testCompile "junit:junit-dep:4.10" diff --git a/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/server/ConcurrencyLimitServerInterceptorTest.java b/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/server/ConcurrencyLimitServerInterceptorTest.java index 2adf79c7..61e8505a 100644 --- a/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/server/ConcurrencyLimitServerInterceptorTest.java +++ b/concurrency-limits-grpc/src/test/java/com/netflix/concurrency/limits/grpc/server/ConcurrencyLimitServerInterceptorTest.java @@ -1,16 +1,19 @@ package com.netflix.concurrency.limits.grpc.server; import com.netflix.concurrency.limits.Limit; +import com.netflix.concurrency.limits.Limiter; import com.netflix.concurrency.limits.grpc.StringMarshaller; import com.netflix.concurrency.limits.grpc.client.ConcurrencyLimitClientInterceptor; import com.netflix.concurrency.limits.grpc.client.GrpcClientLimiterBuilder; import com.netflix.concurrency.limits.limit.FixedLimit; +import com.netflix.concurrency.limits.limit.Gradient2Limit; import com.netflix.concurrency.limits.limit.VegasLimit; import com.netflix.concurrency.limits.spectator.SpectatorMetricRegistry; import com.netflix.spectator.api.DefaultRegistry; import com.netflix.spectator.api.Registry; import io.grpc.CallOptions; import io.grpc.Channel; +import io.grpc.ClientCall; import io.grpc.Metadata; import io.grpc.Metadata.Key; import io.grpc.MethodDescriptor; @@ -18,16 +21,23 @@ import io.grpc.Server; import io.grpc.ServerInterceptors; import io.grpc.ServerServiceDefinition; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; import io.grpc.netty.NettyChannelBuilder; import io.grpc.netty.NettyServerBuilder; import io.grpc.stub.ClientCalls; import io.grpc.stub.MetadataUtils; import io.grpc.stub.ServerCalls; import io.grpc.stub.StreamObserver; +import org.junit.Assert; +import org.junit.Before; import org.junit.Ignore; import org.junit.Test; +import org.mockito.Mock; +import org.mockito.Mockito; import java.io.IOException; +import java.util.Optional; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -42,105 +52,91 @@ public class ConcurrencyLimitServerInterceptorTest { .setResponseMarshaller(StringMarshaller.INSTANCE) .build(); - private static final Key ID_HEADER = Metadata.Key.of("id", Metadata.ASCII_STRING_MARSHALLER); + final Limiter limiter = Mockito.mock(Limiter.class); + final Limiter.Listener listener = Mockito.mock(Limiter.Listener.class); - Registry spectatorRegistry = new DefaultRegistry(); - SpectatorMetricRegistry serverRegistry = new SpectatorMetricRegistry(spectatorRegistry, spectatorRegistry.createId("grpc.server.call.limter")); - SpectatorMetricRegistry clientRegistry = new SpectatorMetricRegistry(spectatorRegistry, spectatorRegistry.createId("grpc.client.call.limter")); - - @Test - @Ignore - public void simulation() throws IOException, InterruptedException { - - Server server = NettyServerBuilder.forPort(0) - .addService(ServerInterceptors.intercept(ServerServiceDefinition.builder("service") - .addMethod(METHOD_DESCRIPTOR, ServerCalls.asyncUnaryCall((req, observer) -> { - try { - TimeUnit.MILLISECONDS.sleep(100); - } catch (InterruptedException e) { - } - - observer.onNext("response"); - observer.onCompleted(); - })) - .build(), ConcurrencyLimitServerInterceptor.newBuilder( - new GrpcServerLimiterBuilder() - .limit(FixedLimit.of(50)) - .partitionByHeader(ID_HEADER) - .partition("0", 0.1) - .partition("1", 0.2) - .partition("2", 0.7) - .build()) + @Before + public void beforeEachTest() { + Mockito.when(limiter.acquire(Mockito.isA(GrpcServerRequestContext.class))).thenReturn(Optional.of(listener)); + } + + private Server startServer(ServerCalls.UnaryMethod method) { + try { + return NettyServerBuilder.forPort(0) + .addService(ServerInterceptors.intercept( + ServerServiceDefinition.builder("service") + .addMethod(METHOD_DESCRIPTOR, ServerCalls.asyncUnaryCall(method)) + .build(), + ConcurrencyLimitServerInterceptor.newBuilder(limiter) + .build()) + ) .build() - )) - .build() - .start(); - - Limit clientLimit0 = VegasLimit.newBuilder() - .metricRegistry(clientRegistry) - .build(); - Limit clientLimit1 = VegasLimit.newBuilder() - .metricRegistry(clientRegistry) - .build(); - Limit clientLimit2 = VegasLimit.newBuilder() - .metricRegistry(clientRegistry) + .start(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Test + public void releaseOnSuccess() { + // Setup server + final Server server = startServer((req, observer) -> { + observer.onNext("response"); + observer.onCompleted(); + }); + + // Make Client call + final Channel channel = NettyChannelBuilder.forAddress("localhost", server.getPort()) + .usePlaintext(true) .build(); - - AtomicLongArray counters = new AtomicLongArray(3); - AtomicLong drops = new AtomicLong(0); - Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> { - System.out.println("" + counters.getAndSet(0, 0) + ", " + counters.getAndSet(1, 0)+ ", " + counters.getAndSet(2, 0)); - }, 1, 1, TimeUnit.SECONDS); - - Executor executor = Executors.newCachedThreadPool(); - executor.execute(() -> simulateClient(0, counters, drops, server.getPort(), clientLimit0)); - executor.execute(() -> simulateClient(1, counters, drops, server.getPort(), clientLimit1)); - executor.execute(() -> simulateClient(2, counters, drops, server.getPort(), clientLimit2)); - - TimeUnit.SECONDS.sleep(100); - - spectatorRegistry.distributionSummaries().forEach(sum -> sum.measure().forEach(System.out::println)); - spectatorRegistry.gauges().forEach(guage -> guage.measure().forEach(System.out::println)); + ClientCalls.blockingUnaryCall(channel, METHOD_DESCRIPTOR, CallOptions.DEFAULT, "foo"); + Mockito.verify(limiter, Mockito.times(1)).acquire(Mockito.isA(GrpcServerRequestContext.class)); + Mockito.verify(listener, Mockito.times(1)).onSuccess(); } - private void simulateClient(int id, AtomicLongArray counters, AtomicLong drops, int port, Limit limit) { - Metadata headers = new Metadata(); - headers.put(ID_HEADER, "" + id); + @Test + public void releaseOnError() { + // Setup server + final Server server = startServer((req, observer) -> { + observer.onError(Status.INVALID_ARGUMENT.asRuntimeException()); + }); - Channel channel = NettyChannelBuilder.forTarget("localhost:" + port) + // Make Client call + final Channel channel = NettyChannelBuilder.forAddress("localhost", server.getPort()) .usePlaintext(true) - .intercept(MetadataUtils.newAttachHeadersInterceptor(headers)) - .intercept(new ConcurrencyLimitClientInterceptor(new GrpcClientLimiterBuilder() - .metricRegistry(clientRegistry) - .limit(limit) - .blockOnLimit(true) - .build())) .build(); try { - while (true) { - TimeUnit.MICROSECONDS.sleep(100); - ClientCalls.asyncUnaryCall(channel.newCall(METHOD_DESCRIPTOR, CallOptions.DEFAULT.withWaitForReady()), "request", - new StreamObserver() { - @Override - public void onNext(String value) { - } + ClientCalls.blockingUnaryCall(channel, METHOD_DESCRIPTOR, CallOptions.DEFAULT, "foo"); + Assert.fail("Should have failed with UNKNOWN error"); + } catch (StatusRuntimeException e) { + // Verify + Assert.assertEquals(Status.Code.INVALID_ARGUMENT, e.getStatus().getCode()); + Mockito.verify(limiter, Mockito.times(1)).acquire(Mockito.isA(GrpcServerRequestContext.class)); + Mockito.verify(listener, Mockito.times(1)).onIgnore(); + } + } - @Override - public void onError(Throwable t) { - drops.incrementAndGet(); - } + @Test + public void releaseOnUncaughtException() throws IOException { + // Setup server + final Server server = startServer((req, observer) -> { + throw new RuntimeException("failure"); + }); - @Override - public void onCompleted() { - counters.incrementAndGet(id); - } - - }); - } - } catch (Exception e) { - e.printStackTrace(); + // Make Client call + final Channel channel = NettyChannelBuilder.forAddress("localhost", server.getPort()) + .usePlaintext(true) + .build(); + + try { + ClientCalls.blockingUnaryCall(channel, METHOD_DESCRIPTOR, CallOptions.DEFAULT, "foo"); + Assert.fail("Should have failed with UNKNOWN error"); + } catch (StatusRuntimeException e) { + // Verify + Mockito.verify(limiter, Mockito.times(1)).acquire(Mockito.isA(GrpcServerRequestContext.class)); + Mockito.verify(listener, Mockito.times(1)).onIgnore(); } } }