Skip to content

Commit

Permalink
Merge pull request #81 from elandau/bugfix/exception
Browse files Browse the repository at this point in the history
Fix releasing the limit on upstream interceptor failures
  • Loading branch information
elandau authored Sep 18, 2018
2 parents c5af767 + 6c2ba55 commit 35f89f1
Show file tree
Hide file tree
Showing 4 changed files with 238 additions and 206 deletions.
3 changes: 2 additions & 1 deletion concurrency-limits-grpc/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ dependencies {
compile "io.grpc:grpc-core:1.9.0"

testCompile project(":concurrency-limits-spectator")


testCompile "org.mockito:mockito-core:1.+"
testCompile "io.grpc:grpc-netty:1.9.0"
testCompile "io.grpc:grpc-stub:1.9.0"
testCompile "junit:junit-dep:4.10"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import com.google.common.base.Preconditions;
import com.netflix.concurrency.limits.Limiter;

import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
Expand All @@ -29,17 +28,15 @@
import io.grpc.Status;
import io.grpc.Status.Code;

import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.annotation.Nullable;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* ClientInterceptor that enforces per service and/or per method concurrent request limits and returns
* a Status.UNAVAILABLE when that limit has been reached.
*/
public class ConcurrencyLimitClientInterceptor implements ClientInterceptor {
private static final Status LIMIT_EXCEEDED_STATUS = Status.UNAVAILABLE.withDescription("Concurrency limit reached");
private static final Status LIMIT_EXCEEDED_STATUS = Status.UNAVAILABLE.withDescription("Client concurrency limit reached");

private final Limiter<GrpcClientRequestContext> grpcLimiter;

Expand All @@ -51,81 +48,78 @@ public ConcurrencyLimitClientInterceptor(final Limiter<GrpcClientRequestContext>
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(final MethodDescriptor<ReqT, RespT> method,
final CallOptions callOptions, final Channel next) {
final Optional<Limiter.Listener> listener = grpcLimiter.acquire(new GrpcClientRequestContext() {
@Override
public MethodDescriptor<?, ?> getMethod() {
return method;
}

@Override
public CallOptions getCallOptions() {
return callOptions;
}
});

if (!listener.isPresent()) {
return new ClientCall<ReqT, RespT>() {
@Override
public void start(io.grpc.ClientCall.Listener<RespT> responseListener, Metadata headers) {
responseListener.onClose(LIMIT_EXCEEDED_STATUS, new Metadata());
}

@Override
public void request(int numMessages) {
}
return grpcLimiter
.acquire(new GrpcClientRequestContext() {
@Override
public MethodDescriptor<?, ?> getMethod() {
return method;
}

@Override
public void cancel(String message, Throwable cause) {
}
@Override
public CallOptions getCallOptions() {
return callOptions;
}
})
// Perform the operation and release the limiter once done.
.map(listener -> (ClientCall<ReqT, RespT>) new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
final AtomicBoolean done = new AtomicBoolean(false);

@Override
public void halfClose() {
}
@Override
public void start(final Listener<RespT> responseListener, final Metadata headers) {
super.start(new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onClose(final Status status, final Metadata trailers) {
try {
super.onClose(status, trailers);
} finally {
if (done.compareAndSet(false, true)) {
if (status.isOk()) {
listener.onSuccess();
} else if (Code.UNAVAILABLE == status.getCode()) {
listener.onDropped();
} else {
listener.onIgnore();
}
}
}
}
}, headers);
}

@Override
public void sendMessage(ReqT message) {
}
};
}

// Perform the operation and release the limiter once done.
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
final AtomicBoolean done = new AtomicBoolean(false);

@Override
public void start(final Listener<RespT> responseListener, final Metadata headers) {
super.start(new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onClose(final Status status, final Metadata trailers) {
try {
super.onClose(status, trailers);
} finally {
listener.ifPresent(l -> {
if (done.compareAndSet(false, true)) {
if (status.isOk()) {
l.onSuccess();
} else if (Code.UNAVAILABLE == status.getCode()) {
l.onDropped();
} else {
l.onIgnore();
@Override
public void cancel(final @Nullable String message, final @Nullable Throwable cause) {
try {
super.cancel(message, cause);
} finally {
if (done.compareAndSet(false, true)) {
listener.onIgnore();
}
}
});
}
}
}
}, headers);
}

@Override
public void cancel(final @Nullable String message, final @Nullable Throwable cause) {
try {
super.cancel(message, cause);
} finally {
if (done.compareAndSet(false, true)) {
listener.ifPresent(Limiter.Listener::onIgnore);
}
}
}
};
)
.orElseGet(() -> new ClientCall<ReqT, RespT>() {
@Override
public void start(io.grpc.ClientCall.Listener<RespT> responseListener, Metadata headers) {
responseListener.onClose(LIMIT_EXCEEDED_STATUS, new Metadata());
}

@Override
public void request(int numMessages) {
}

@Override
public void cancel(String message, Throwable cause) {
}

@Override
public void halfClose() {
}

@Override
public void sendMessage(ReqT message) {
}
}
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

import com.netflix.concurrency.limits.Limiter;
Expand All @@ -29,13 +31,17 @@
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.Status;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* {@link ServerInterceptor} that enforces per service and/or per method concurrent request limits and returns
* a Status.UNAVAILABLE when that limit has been reached.
*/
public class ConcurrencyLimitServerInterceptor implements ServerInterceptor {
private static final Status LIMIT_EXCEEDED_STATUS = Status.UNAVAILABLE.withDescription("Concurrency limit reached");
private static final Logger LOG = LoggerFactory.getLogger(ConcurrencyLimitServerInterceptor.class);

private static final Status LIMIT_EXCEEDED_STATUS = Status.UNAVAILABLE.withDescription("Server concurrency limit reached");

private final Limiter<GrpcServerRequestContext> grpcLimiter;

Expand Down Expand Up @@ -109,54 +115,89 @@ public <ReqT, RespT> Listener<ReqT> interceptCall(final ServerCall<ReqT, RespT>
final Metadata headers,
final ServerCallHandler<ReqT, RespT> next) {

final Optional<Limiter.Listener> listener = grpcLimiter.acquire(new GrpcServerRequestContext() {
@Override
public ServerCall<?, ?> getCall() {
return call;
}
return grpcLimiter
.acquire(new GrpcServerRequestContext() {
@Override
public ServerCall<?, ?> getCall() {
return call;
}

@Override
public Metadata getHeaders() {
return headers;
}
});

if (!listener.isPresent()) {
call.close(statusSupplier.get(), trailerSupplier.get());
return new ServerCall.Listener<ReqT>() {};
}
@Override
public Metadata getHeaders() {
return headers;
}
})
.map(new Function<Limiter.Listener, Listener<ReqT>>() {
final AtomicBoolean done = new AtomicBoolean(false);

final AtomicBoolean done = new AtomicBoolean(false);
return new ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(
next.startCall(
new ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void close(Status status, Metadata trailers) {
void safeComplete(Runnable action) {
if (done.compareAndSet(false, true)) {
try {
super.close(status, trailers);
} finally {
if (done.compareAndSet(false, true)) {
if (status.isOk()) {
listener.get().onSuccess();
} else {
listener.get().onIgnore();
}
}
action.run();
} catch (Throwable t) {
LOG.error("Critical error releasing limit", t);
}
}
},
headers)) {
@Override
public void onCancel() {
try {
super.onCancel();
} finally {
if (done.compareAndSet(false, true)) {
listener.get().onIgnore();
}
};

@Override
public Listener<ReqT> apply(Limiter.Listener listener) {
return (ServerCall.Listener<ReqT>) new ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(
next.startCall(
new ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void close(Status status, Metadata trailers) {
try {
super.close(status, trailers);
} finally {
safeComplete(() -> {
if (status.isOk()) {
listener.onSuccess();
} else {
listener.onIgnore();
}
});
}
}
},
headers)) {

@Override
public void onMessage(ReqT message) {
try {
super.onMessage(message);
} catch (Throwable t) {
LOG.error("Uncaught exception. Force releasing limit. ", t);
safeComplete(listener::onIgnore);
throw t;
}
}

@Override
public void onHalfClose() {
try {
super.onHalfClose();
} catch (Throwable t) {
LOG.error("Uncaught exception. Force releasing limit. ", t);
safeComplete(listener::onIgnore);
throw t;
}
}

@Override
public void onCancel() {
try {
super.onCancel();
} finally {
safeComplete(listener::onIgnore);
}
}
};
}
}
};
})
.orElseGet(() -> {
call.close(statusSupplier.get(), trailerSupplier.get());
return new ServerCall.Listener<ReqT>() {};
});
}

}
Loading

0 comments on commit 35f89f1

Please sign in to comment.