Skip to content

Commit

Permalink
Intercept all stages and close server call using serializing execution
Browse files Browse the repository at this point in the history
  • Loading branch information
sorra committed Sep 17, 2023
1 parent a7f5cc1 commit 1200f35
Show file tree
Hide file tree
Showing 5 changed files with 401 additions and 311 deletions.
167 changes: 167 additions & 0 deletions core/src/main/java/io/grpc/util/SerializingServerCall.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package io.grpc.util;

import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.Attributes;
import io.grpc.ExperimentalApi;
import io.grpc.ForwardingServerCall;
import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.Status;
import io.grpc.internal.SerializingExecutor;
import java.util.concurrent.ExecutionException;
import javax.annotation.Nullable;

/**
* A {@link ServerCall} that wraps around a non thread safe delegate and provides thread safe
* access by serializing everything on an executor.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/2189")
class SerializingServerCall<ReqT, RespT> extends
ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT> {
private static final String ERROR_MSG = "Encountered error during serialized access";
private final SerializingExecutor serializingExecutor =
new SerializingExecutor(MoreExecutors.directExecutor());
private boolean closeCalled = false;

SerializingServerCall(ServerCall<ReqT, RespT> delegate) {
super(delegate);
}

@Override
public void sendMessage(final RespT message) {
serializingExecutor.execute(new Runnable() {
@Override
public void run() {
SerializingServerCall.super.sendMessage(message);
}
});
}

@Override
public void request(final int numMessages) {
serializingExecutor.execute(new Runnable() {
@Override
public void run() {
SerializingServerCall.super.request(numMessages);
}
});
}

@Override
public void sendHeaders(final Metadata headers) {
serializingExecutor.execute(new Runnable() {
@Override
public void run() {
SerializingServerCall.super.sendHeaders(headers);
}
});
}

@Override
public void close(final Status status, final Metadata trailers) {
serializingExecutor.execute(new Runnable() {
@Override
public void run() {
if (!closeCalled) {
closeCalled = true;

SerializingServerCall.super.close(status, trailers);
}
}
});
}

@Override
public boolean isReady() {
final SettableFuture<Boolean> retVal = SettableFuture.create();
serializingExecutor.execute(new Runnable() {
@Override
public void run() {
retVal.set(SerializingServerCall.super.isReady());
}
});
try {
return retVal.get();
} catch (InterruptedException e) {
throw new RuntimeException(ERROR_MSG, e);
} catch (ExecutionException e) {
throw new RuntimeException(ERROR_MSG, e);
}
}

@Override
public boolean isCancelled() {
final SettableFuture<Boolean> retVal = SettableFuture.create();
serializingExecutor.execute(new Runnable() {
@Override
public void run() {
retVal.set(SerializingServerCall.super.isCancelled());
}
});
try {
return retVal.get();
} catch (InterruptedException e) {
throw new RuntimeException(ERROR_MSG, e);
} catch (ExecutionException e) {
throw new RuntimeException(ERROR_MSG, e);
}
}

@Override
public void setMessageCompression(final boolean enabled) {
serializingExecutor.execute(new Runnable() {
@Override
public void run() {
SerializingServerCall.super.setMessageCompression(enabled);
}
});
}

@Override
public void setCompression(final String compressor) {
serializingExecutor.execute(new Runnable() {
@Override
public void run() {
SerializingServerCall.super.setCompression(compressor);
}
});
}

@Override
public Attributes getAttributes() {
final SettableFuture<Attributes> retVal = SettableFuture.create();
serializingExecutor.execute(new Runnable() {
@Override
public void run() {
retVal.set(SerializingServerCall.super.getAttributes());
}
});
try {
return retVal.get();
} catch (InterruptedException e) {
throw new RuntimeException(ERROR_MSG, e);
} catch (ExecutionException e) {
throw new RuntimeException(ERROR_MSG, e);
}
}

@Nullable
@Override
public String getAuthority() {
final SettableFuture<String> retVal = SettableFuture.create();
serializingExecutor.execute(new Runnable() {
@Override
public void run() {
retVal.set(SerializingServerCall.super.getAuthority());
}
});
try {
return retVal.get();
} catch (InterruptedException e) {
throw new RuntimeException(ERROR_MSG, e);
} catch (ExecutionException e) {
throw new RuntimeException(ERROR_MSG, e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,15 @@
* limitations under the License.
*/

package io.grpc;
package io.grpc.util;

import io.grpc.Context;
import io.grpc.ExperimentalApi;
import io.grpc.ForwardingServerCallListener;
import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;

/**
* An optional ServerInterceptor that can interrupt server calls that are running for too long time.
Expand All @@ -41,33 +49,82 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCallHandler<ReqT, RespT> serverCallHandler) {
// Only intercepts unary calls because the timeout is inapplicable to streaming calls.
if (serverCall.getMethodDescriptor().getType().clientSendsOneMessage()) {
return new TimeoutServerCallListener<>(
serverCallHandler.startCall(serverCall, metadata), serverTimeoutManager);
} else {
return serverCallHandler.startCall(serverCall, metadata);
ServerCall<ReqT, RespT> serializingServerCall = new SerializingServerCall<>(serverCall);
Context.CancellableContext timeoutContext =
serverTimeoutManager.startTimeoutContext(serializingServerCall);
if (timeoutContext != null) {
return new TimeoutServerCallListener<>(
serverCallHandler.startCall(serializingServerCall, metadata),
timeoutContext,
serverTimeoutManager);
}
}
return serverCallHandler.startCall(serverCall, metadata);
}

/** A listener that intercepts the RPC method invocation for timeout control. */
private static class TimeoutServerCallListener<ReqT>
/** A listener that intercepts RPC callbacks for timeout control. */
static class TimeoutServerCallListener<ReqT>
extends ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT> {

private final Context.CancellableContext context;
private final ServerTimeoutManager serverTimeoutManager;

private TimeoutServerCallListener(
ServerCall.Listener<ReqT> delegate,
Context.CancellableContext context,
ServerTimeoutManager serverTimeoutManager) {
super(delegate);
this.context = context;
this.serverTimeoutManager = serverTimeoutManager;
}

@Override
public void onMessage(ReqT message) {
Context previous = context.attach();
try {
super.onMessage(message);
} finally {
context.detach(previous);
}
}

/**
* Intercepts onHalfClose() because the RPC method is called in it. See
* Intercepts onHalfClose() because the application RPC method is called in it. See
* io.grpc.stub.ServerCalls.UnaryServerCallHandler.UnaryServerCallListener
*/
@Override
public void onHalfClose() {
serverTimeoutManager.withTimeout(super::onHalfClose);
serverTimeoutManager.withInterruption(context, super::onHalfClose);
}

@Override
public void onCancel() {
Context previous = context.attach();
try {
super.onCancel();
} finally {
context.detach(previous);
}
}

@Override
public void onComplete() {
Context previous = context.attach();
try {
super.onComplete();
} finally {
context.detach(previous);
}
}

@Override
public void onReady() {
Context previous = context.attach();
try {
super.onReady();
} finally {
context.detach(previous);
}
}
}
}
Loading

0 comments on commit 1200f35

Please sign in to comment.