Skip to content

Commit

Permalink
Add unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
elandau committed Sep 18, 2018
1 parent 536d1ed commit 6c2ba55
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 87 deletions.
3 changes: 2 additions & 1 deletion concurrency-limits-grpc/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ dependencies {
compile "io.grpc:grpc-core:1.9.0"

testCompile project(":concurrency-limits-spectator")


testCompile "org.mockito:mockito-core:1.+"
testCompile "io.grpc:grpc-netty:1.9.0"
testCompile "io.grpc:grpc-stub:1.9.0"
testCompile "junit:junit-dep:4.10"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,33 +1,43 @@
package com.netflix.concurrency.limits.grpc.server;

import com.netflix.concurrency.limits.Limit;
import com.netflix.concurrency.limits.Limiter;
import com.netflix.concurrency.limits.grpc.StringMarshaller;
import com.netflix.concurrency.limits.grpc.client.ConcurrencyLimitClientInterceptor;
import com.netflix.concurrency.limits.grpc.client.GrpcClientLimiterBuilder;
import com.netflix.concurrency.limits.limit.FixedLimit;
import com.netflix.concurrency.limits.limit.Gradient2Limit;
import com.netflix.concurrency.limits.limit.VegasLimit;
import com.netflix.concurrency.limits.spectator.SpectatorMetricRegistry;
import com.netflix.spectator.api.DefaultRegistry;
import com.netflix.spectator.api.Registry;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.Metadata;
import io.grpc.Metadata.Key;
import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.MethodType;
import io.grpc.Server;
import io.grpc.ServerInterceptors;
import io.grpc.ServerServiceDefinition;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.NettyServerBuilder;
import io.grpc.stub.ClientCalls;
import io.grpc.stub.MetadataUtils;
import io.grpc.stub.ServerCalls;
import io.grpc.stub.StreamObserver;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.Mockito;

import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
Expand All @@ -42,105 +52,91 @@ public class ConcurrencyLimitServerInterceptorTest {
.setResponseMarshaller(StringMarshaller.INSTANCE)
.build();

private static final Key<String> ID_HEADER = Metadata.Key.of("id", Metadata.ASCII_STRING_MARSHALLER);
final Limiter<GrpcServerRequestContext> limiter = Mockito.mock(Limiter.class);
final Limiter.Listener listener = Mockito.mock(Limiter.Listener.class);

Registry spectatorRegistry = new DefaultRegistry();
SpectatorMetricRegistry serverRegistry = new SpectatorMetricRegistry(spectatorRegistry, spectatorRegistry.createId("grpc.server.call.limter"));
SpectatorMetricRegistry clientRegistry = new SpectatorMetricRegistry(spectatorRegistry, spectatorRegistry.createId("grpc.client.call.limter"));

@Test
@Ignore
public void simulation() throws IOException, InterruptedException {

Server server = NettyServerBuilder.forPort(0)
.addService(ServerInterceptors.intercept(ServerServiceDefinition.builder("service")
.addMethod(METHOD_DESCRIPTOR, ServerCalls.asyncUnaryCall((req, observer) -> {
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
}

observer.onNext("response");
observer.onCompleted();
}))
.build(), ConcurrencyLimitServerInterceptor.newBuilder(
new GrpcServerLimiterBuilder()
.limit(FixedLimit.of(50))
.partitionByHeader(ID_HEADER)
.partition("0", 0.1)
.partition("1", 0.2)
.partition("2", 0.7)
.build())
@Before
public void beforeEachTest() {
Mockito.when(limiter.acquire(Mockito.isA(GrpcServerRequestContext.class))).thenReturn(Optional.of(listener));
}

private Server startServer(ServerCalls.UnaryMethod<String, String> method) {
try {
return NettyServerBuilder.forPort(0)
.addService(ServerInterceptors.intercept(
ServerServiceDefinition.builder("service")
.addMethod(METHOD_DESCRIPTOR, ServerCalls.asyncUnaryCall(method))
.build(),
ConcurrencyLimitServerInterceptor.newBuilder(limiter)
.build())
)
.build()
))
.build()
.start();

Limit clientLimit0 = VegasLimit.newBuilder()
.metricRegistry(clientRegistry)
.build();
Limit clientLimit1 = VegasLimit.newBuilder()
.metricRegistry(clientRegistry)
.build();
Limit clientLimit2 = VegasLimit.newBuilder()
.metricRegistry(clientRegistry)
.start();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@Test
public void releaseOnSuccess() {
// Setup server
final Server server = startServer((req, observer) -> {
observer.onNext("response");
observer.onCompleted();
});

// Make Client call
final Channel channel = NettyChannelBuilder.forAddress("localhost", server.getPort())
.usePlaintext(true)
.build();

AtomicLongArray counters = new AtomicLongArray(3);
AtomicLong drops = new AtomicLong(0);
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
System.out.println("" + counters.getAndSet(0, 0) + ", " + counters.getAndSet(1, 0)+ ", " + counters.getAndSet(2, 0));
}, 1, 1, TimeUnit.SECONDS);

Executor executor = Executors.newCachedThreadPool();

executor.execute(() -> simulateClient(0, counters, drops, server.getPort(), clientLimit0));
executor.execute(() -> simulateClient(1, counters, drops, server.getPort(), clientLimit1));
executor.execute(() -> simulateClient(2, counters, drops, server.getPort(), clientLimit2));

TimeUnit.SECONDS.sleep(100);

spectatorRegistry.distributionSummaries().forEach(sum -> sum.measure().forEach(System.out::println));
spectatorRegistry.gauges().forEach(guage -> guage.measure().forEach(System.out::println));
ClientCalls.blockingUnaryCall(channel, METHOD_DESCRIPTOR, CallOptions.DEFAULT, "foo");
Mockito.verify(limiter, Mockito.times(1)).acquire(Mockito.isA(GrpcServerRequestContext.class));
Mockito.verify(listener, Mockito.times(1)).onSuccess();
}

private void simulateClient(int id, AtomicLongArray counters, AtomicLong drops, int port, Limit limit) {
Metadata headers = new Metadata();
headers.put(ID_HEADER, "" + id);
@Test
public void releaseOnError() {
// Setup server
final Server server = startServer((req, observer) -> {
observer.onError(Status.INVALID_ARGUMENT.asRuntimeException());
});

Channel channel = NettyChannelBuilder.forTarget("localhost:" + port)
// Make Client call
final Channel channel = NettyChannelBuilder.forAddress("localhost", server.getPort())
.usePlaintext(true)
.intercept(MetadataUtils.newAttachHeadersInterceptor(headers))
.intercept(new ConcurrencyLimitClientInterceptor(new GrpcClientLimiterBuilder()
.metricRegistry(clientRegistry)
.limit(limit)
.blockOnLimit(true)
.build()))
.build();

try {
while (true) {
TimeUnit.MICROSECONDS.sleep(100);
ClientCalls.asyncUnaryCall(channel.newCall(METHOD_DESCRIPTOR, CallOptions.DEFAULT.withWaitForReady()), "request",
new StreamObserver<String>() {
@Override
public void onNext(String value) {
}
ClientCalls.blockingUnaryCall(channel, METHOD_DESCRIPTOR, CallOptions.DEFAULT, "foo");
Assert.fail("Should have failed with UNKNOWN error");
} catch (StatusRuntimeException e) {
// Verify
Assert.assertEquals(Status.Code.INVALID_ARGUMENT, e.getStatus().getCode());
Mockito.verify(limiter, Mockito.times(1)).acquire(Mockito.isA(GrpcServerRequestContext.class));
Mockito.verify(listener, Mockito.times(1)).onIgnore();
}
}

@Override
public void onError(Throwable t) {
drops.incrementAndGet();
}
@Test
public void releaseOnUncaughtException() throws IOException {
// Setup server
final Server server = startServer((req, observer) -> {
throw new RuntimeException("failure");
});

@Override
public void onCompleted() {
counters.incrementAndGet(id);
}

});
}
} catch (Exception e) {
e.printStackTrace();
// Make Client call
final Channel channel = NettyChannelBuilder.forAddress("localhost", server.getPort())
.usePlaintext(true)
.build();

try {
ClientCalls.blockingUnaryCall(channel, METHOD_DESCRIPTOR, CallOptions.DEFAULT, "foo");
Assert.fail("Should have failed with UNKNOWN error");
} catch (StatusRuntimeException e) {
// Verify
Mockito.verify(limiter, Mockito.times(1)).acquire(Mockito.isA(GrpcServerRequestContext.class));
Mockito.verify(listener, Mockito.times(1)).onIgnore();
}
}
}

0 comments on commit 6c2ba55

Please sign in to comment.