diff --git a/stub/src/main/java/io/grpc/stub/ServerCalls.java b/stub/src/main/java/io/grpc/stub/ServerCalls.java index bb9720984dd..ba08139b716 100644 --- a/stub/src/main/java/io/grpc/stub/ServerCalls.java +++ b/stub/src/main/java/io/grpc/stub/ServerCalls.java @@ -48,7 +48,7 @@ private ServerCalls() { */ public static ServerCallHandler asyncUnaryCall( UnaryMethod method) { - return new UnaryServerCallHandler<>(method); + return new UnaryServerCallHandler<>(method, false); } /** @@ -58,7 +58,7 @@ public static ServerCallHandler asyncUnaryCall( */ public static ServerCallHandler asyncServerStreamingCall( ServerStreamingMethod method) { - return new UnaryServerCallHandler<>(method); + return new UnaryServerCallHandler<>(method, true); } /** @@ -68,7 +68,7 @@ public static ServerCallHandler asyncServerStreamingC */ public static ServerCallHandler asyncClientStreamingCall( ClientStreamingMethod method) { - return new StreamingServerCallHandler<>(method); + return new StreamingServerCallHandler<>(method, false); } /** @@ -78,7 +78,7 @@ public static ServerCallHandler asyncClientStreamingC */ public static ServerCallHandler asyncBidiStreamingCall( BidiStreamingMethod method) { - return new StreamingServerCallHandler<>(method); + return new StreamingServerCallHandler<>(method, true); } /** @@ -113,10 +113,12 @@ private static final class UnaryServerCallHandler implements ServerCallHandler { private final UnaryRequestMethod method; + private final boolean serverStreaming; // Non private to avoid synthetic class - UnaryServerCallHandler(UnaryRequestMethod method) { + UnaryServerCallHandler(UnaryRequestMethod method, boolean serverStreaming) { this.method = method; + this.serverStreaming = serverStreaming; } @Override @@ -125,7 +127,7 @@ public ServerCall.Listener startCall(ServerCall call, Metadat call.getMethodDescriptor().getType().clientSendsOneMessage(), "asyncUnaryRequestCall is only for clientSendsOneMessage methods"); ServerCallStreamObserverImpl responseObserver = - new ServerCallStreamObserverImpl<>(call); + new ServerCallStreamObserverImpl<>(call, serverStreaming); // We expect only 1 request, but we ask for 2 requests here so that if a misbehaving client // sends more than 1 requests, ServerCall will catch it. Note that disabling auto // inbound flow control has no effect on unary calls. @@ -189,9 +191,11 @@ public void onHalfClose() { @Override public void onCancel() { - responseObserver.cancelled = true; if (responseObserver.onCancelHandler != null) { responseObserver.onCancelHandler.run(); + } else { + // Only trigger exceptions if unable to provide notification via a callback + responseObserver.cancelled = true; } } @@ -209,16 +213,18 @@ private static final class StreamingServerCallHandler implements ServerCallHandler { private final StreamingRequestMethod method; + private final boolean bidi; // Non private to avoid synthetic class - StreamingServerCallHandler(StreamingRequestMethod method) { + StreamingServerCallHandler(StreamingRequestMethod method, boolean bidi) { this.method = method; + this.bidi = bidi; } @Override public ServerCall.Listener startCall(ServerCall call, Metadata headers) { ServerCallStreamObserverImpl responseObserver = - new ServerCallStreamObserverImpl<>(call); + new ServerCallStreamObserverImpl<>(call, bidi); StreamObserver requestObserver = method.invoke(responseObserver); responseObserver.freeze(); if (responseObserver.autoRequestEnabled) { @@ -262,14 +268,19 @@ public void onHalfClose() { @Override public void onCancel() { - responseObserver.cancelled = true; if (responseObserver.onCancelHandler != null) { responseObserver.onCancelHandler.run(); + } else { + // Only trigger exceptions if unable to provide notification via a callback. Even though + // onError would provide notification to the server, we still throw an error since there + // isn't a guaranteed callback available. If the cancellation happened in a different + // order the service could be surprised to see the exception. + responseObserver.cancelled = true; } if (!halfClosed) { requestObserver.onError( Status.CANCELLED - .withDescription("cancelled before receiving half close") + .withDescription("client cancelled") .asRuntimeException()); } } @@ -300,6 +311,7 @@ private interface StreamingRequestMethod { private static final class ServerCallStreamObserverImpl extends ServerCallStreamObserver { final ServerCall call; + private final boolean serverStreamingOrBidi; volatile boolean cancelled; private boolean frozen; private boolean autoRequestEnabled = true; @@ -310,8 +322,9 @@ private static final class ServerCallStreamObserverImpl private boolean completed = false; // Non private to avoid synthetic class - ServerCallStreamObserverImpl(ServerCall call) { + ServerCallStreamObserverImpl(ServerCall call, boolean serverStreamingOrBidi) { this.call = call; + this.serverStreamingOrBidi = serverStreamingOrBidi; } private void freeze() { @@ -331,10 +344,17 @@ public void setCompression(String compression) { @Override public void onNext(RespT response) { if (cancelled) { - if (onCancelHandler == null) { - throw Status.CANCELLED.withDescription("call already cancelled").asRuntimeException(); + if (serverStreamingOrBidi) { + throw Status.CANCELLED + .withDescription("call already cancelled. " + + "Use ServerCallStreamObserver.setOnCancelHandler() to disable this exception") + .asRuntimeException(); + } else { + // We choose not to throw for unary responses. The exception is intended to stop servers + // from continuing processing, but for unary responses there is no further processing + // so throwing an exception would not provide a benefit and would increase application + // complexity. } - return; } checkState(!aborted, "Stream was terminated by error, no further calls are allowed"); checkState(!completed, "Stream is already completed, no further calls are allowed"); @@ -357,14 +377,8 @@ public void onError(Throwable t) { @Override public void onCompleted() { - if (cancelled) { - if (onCancelHandler == null) { - throw Status.CANCELLED.withDescription("call already cancelled").asRuntimeException(); - } - } else { - call.close(Status.OK, new Metadata()); - completed = true; - } + call.close(Status.OK, new Metadata()); + completed = true; } @Override diff --git a/stub/src/test/java/io/grpc/stub/ServerCallsTest.java b/stub/src/test/java/io/grpc/stub/ServerCallsTest.java index 29981932fb7..f9ceb82166d 100644 --- a/stub/src/test/java/io/grpc/stub/ServerCallsTest.java +++ b/stub/src/test/java/io/grpc/stub/ServerCallsTest.java @@ -195,12 +195,8 @@ public StreamObserver invoke(StreamObserver responseObserver) } catch (StatusRuntimeException expected) { // Expected } - try { - callObserver.get().onCompleted(); - fail("Expected cancellation exception when onCallHandler not set"); - } catch (StatusRuntimeException expected) { - // Expected - } + // No exception + callObserver.get().onCompleted(); } @Test