From e9d0e7cbf39b0f0f3e2685908e5daa3d01d6ecb0 Mon Sep 17 00:00:00 2001 From: Maksym Ostroverkhov Date: Sat, 9 Mar 2024 11:55:31 +0200 Subject: [PATCH 1/2] rpc grpc-stub: request timeouts support --- .../com/jauntsdn/rsocket/RpcMessageCodec.java | 258 +++++++++++++++++- 1 file changed, 257 insertions(+), 1 deletion(-) diff --git a/rsocket-rpc-grpc/src/main/java/com/jauntsdn/rsocket/RpcMessageCodec.java b/rsocket-rpc-grpc/src/main/java/com/jauntsdn/rsocket/RpcMessageCodec.java index 5e6239d..bc15389 100644 --- a/rsocket-rpc-grpc/src/main/java/com/jauntsdn/rsocket/RpcMessageCodec.java +++ b/rsocket-rpc-grpc/src/main/java/com/jauntsdn/rsocket/RpcMessageCodec.java @@ -20,6 +20,10 @@ import io.grpc.stub.ClientResponseObserver; import io.grpc.stub.ServerCallStreamObserver; import io.grpc.stub.StreamObserver; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.function.Function; import javax.annotation.Nullable; @@ -255,6 +259,25 @@ public static StreamObserver decode( return new RequestResponseObserver<>(observer, decoder, instrumentationListener); } + public static StreamObserver decode( + StreamObserver observer, + Function decoder, + @Nullable RpcInstrumentation.Listener instrumentationListener, + @Nullable ScheduledExecutorService scheduler, + long timeoutMillis) { + if (observer instanceof ClientResponseObserver) { + ClientResponseObserver clientResponseObserver = + (ClientResponseObserver) observer; + if (timeoutMillis <= 0 || scheduler == null) { + return new RequestResponseCancellableObserver<>( + clientResponseObserver, decoder, instrumentationListener); + } + return new RequestResponseTimeoutObserver<>( + clientResponseObserver, decoder, instrumentationListener, scheduler, timeoutMillis); + } + return new RequestResponseObserver<>(observer, decoder, instrumentationListener); + } + static class RequestResponseObserver implements StreamObserver { final StreamObserver observer; final Function decoder; @@ -301,6 +324,128 @@ public void onCompleted() { } } + static final class RequestResponseTimeoutObserver + extends RequestResponseObserver + implements ClientResponseObserver { + final long timeoutMillis; + final ScheduledExecutorService scheduler; + volatile ScheduledFuture timeoutHandle; + boolean timeoutCancelled; + + RequestResponseTimeoutObserver( + ClientResponseObserver observer, + Function decoder, + @Nullable RpcInstrumentation.Listener instrumentationListener, + ScheduledExecutorService scheduler, + long timeoutMillis) { + super(observer, decoder, instrumentationListener); + this.scheduler = scheduler; + this.timeoutMillis = timeoutMillis; + } + + @Override + public void beforeStart(ClientCallStreamObserver requestStream) { + timeoutHandle = + scheduler.schedule( + () -> { + timeoutCancelled = true; + TimeoutException e = + new TimeoutException("timed out after " + timeoutMillis + " millis"); + requestStream.cancel(null, e); + onError(e); + }, + timeoutMillis, + TimeUnit.MILLISECONDS); + + ((ClientResponseObserver) observer) + .beforeStart( + new ClientCallStreamObserver() { + @Override + public void cancel(@Nullable String message, @Nullable Throwable cause) { + RpcInstrumentation.Listener l = instrumentationListener; + if (l != null) { + l.onCancel(); + } + requestStream.cancel(message, cause); + cancelTimeout(); + } + + @Override + public boolean isReady() { + return requestStream.isReady(); + } + + @Override + public void setOnReadyHandler(Runnable onReadyHandler) { + requestStream.setOnReadyHandler(onReadyHandler); + } + + @Override + public void request(int count) { + requestStream.request(count); + } + + @Override + public void setMessageCompression(boolean enable) { + requestStream.setMessageCompression(enable); + } + + @Override + public void disableAutoInboundFlowControl() { + requestStream.disableAutoInboundFlowControl(); + } + + @Override + public void disableAutoRequestWithInitial(int request) { + requestStream.disableAutoRequestWithInitial(request); + } + + @Override + public void onNext(ReqT value) { + /*protobuf is not refcounted - ignore*/ + } + + @Override + public void onError(Throwable t) { + requestStream.onError(t); + } + + @Override + public void onCompleted() { + requestStream.onCompleted(); + } + }); + } + + @Override + public void onNext(Message message) { + cancelTimeout(); + super.onNext(message); + } + + @Override + public void onError(Throwable t) { + cancelTimeout(); + super.onError(t); + } + + @Override + public void onCompleted() { + cancelTimeout(); + super.onCompleted(); + } + + void cancelTimeout() { + if (!timeoutCancelled) { + timeoutCancelled = true; + ScheduledFuture h = timeoutHandle; + if (h != null) { + h.cancel(true); + } + } + } + } + static final class RequestResponseCancellableObserver extends RequestResponseObserver implements ClientResponseObserver { @@ -397,6 +542,32 @@ public static StreamObserver decode( return new RequestChannelObserver<>(observer, decoder, instrumentationListener); } + public static StreamObserver decode( + StreamObserver observer, + Encoder encoder, + Function decoder, + @Nullable RpcInstrumentation.Listener instrumentationListener, + @Nullable ScheduledExecutorService scheduler, + long timeoutMillis) { + if (observer instanceof ClientResponseObserver) { + if (timeoutMillis <= 0 || scheduler == null) { + return new RequestChannelEncoderObserver<>( + (ClientResponseObserver) observer, + encoder, + decoder, + instrumentationListener); + } + return new RequestChannelTimeoutObserver<>( + (ClientResponseObserver) observer, + encoder, + decoder, + instrumentationListener, + scheduler, + timeoutMillis); + } + return new RequestChannelObserver<>(observer, decoder, instrumentationListener); + } + static class RequestChannelObserver implements StreamObserver { final StreamObserver observer; final Function decoder; @@ -460,7 +631,76 @@ static final class RequestChannelEncoderObserver @Override public void beforeStart(ClientCallStreamObserver requestStream) { ((ClientResponseObserver) observer) - .beforeStart(encoder.encodeStream(requestStream)); + .beforeStart(encoder.encodeStream(requestStream, null)); + } + } + + static final class RequestChannelTimeoutObserver + extends RequestChannelObserver + implements ClientResponseObserver { + final Encoder encoder; + final ScheduledExecutorService scheduler; + final long timeoutMillis; + volatile ScheduledFuture timeoutHandle; + boolean timeoutCancelled; + + RequestChannelTimeoutObserver( + ClientResponseObserver observer, + Encoder encoder, + Function decoder, + @Nullable RpcInstrumentation.Listener instrumentationListener, + ScheduledExecutorService scheduler, + long timeoutMillis) { + super(observer, decoder, instrumentationListener); + this.encoder = encoder; + this.scheduler = scheduler; + this.timeoutMillis = timeoutMillis; + } + + @Override + public void beforeStart(ClientCallStreamObserver requestStream) { + ScheduledFuture h = + timeoutHandle = + scheduler.schedule( + () -> { + timeoutCancelled = true; + TimeoutException e = + new TimeoutException("timed out after " + timeoutMillis + " millis"); + requestStream.cancel(null, e); + onError(e); + }, + timeoutMillis, + TimeUnit.MILLISECONDS); + ((ClientResponseObserver) observer) + .beforeStart(encoder.encodeStream(requestStream, h)); + } + + @Override + public void onNext(Message message) { + cancelTimeout(); + super.onNext(message); + } + + @Override + public void onError(Throwable t) { + cancelTimeout(); + super.onError(t); + } + + @Override + public void onCompleted() { + cancelTimeout(); + super.onCompleted(); + } + + void cancelTimeout() { + if (!timeoutCancelled) { + timeoutCancelled = true; + ScheduledFuture h = timeoutHandle; + if (h != null) { + h.cancel(true); + } + } } } @@ -474,6 +714,11 @@ public Encoder(@Nullable RpcInstrumentation.Listener instrumentationListener) public abstract Message onNext(ReqT message); public ClientCallStreamObserver encodeStream(StreamObserver observer) { + return encodeStream(observer, null); + } + + public ClientCallStreamObserver encodeStream( + StreamObserver observer, @Nullable ScheduledFuture timeoutHandle) { if (!(observer instanceof ClientCallStreamObserver)) { throw new IllegalStateException( "messageStream.requestChannel() returned value is not ClientCallStreamObserver"); @@ -488,6 +733,9 @@ public void cancel(@Nullable String message, @Nullable Throwable cause) { if (listener != null) { listener.onCancel(); } + if (timeoutHandle != null) { + timeoutHandle.cancel(true); + } callObserver.cancel(message, cause); } @@ -541,6 +789,14 @@ public void onCompleted() { } } + public static ScheduledExecutorService timeoutScheduler( + MessageStreams messageStreams, long timeoutMillis) { + if (timeoutMillis <= 0) { + return null; + } + return messageStreams.scheduler().orElse(null); + } + abstract static class AbstractServerEncode extends ServerCallStreamObserver { final ServerCallStreamObserver upstream; final RpcInstrumentation.Listener instrumentationListener; From 58c7be50b991ac1c3a85961155be7c15beaab064 Mon Sep 17 00:00:00 2001 From: Maksym Ostroverkhov Date: Sun, 10 Mar 2024 15:28:31 +0200 Subject: [PATCH 2/2] rpc futures: request timeouts support --- .../java/com/jauntsdn/rsocket/RpcService.java | 70 +++++++++++++++++-- 1 file changed, 65 insertions(+), 5 deletions(-) diff --git a/rsocket-rpc-futures/src/main/java/com/jauntsdn/rsocket/RpcService.java b/rsocket-rpc-futures/src/main/java/com/jauntsdn/rsocket/RpcService.java index c6d4ab3..b6d4cb7 100644 --- a/rsocket-rpc-futures/src/main/java/com/jauntsdn/rsocket/RpcService.java +++ b/rsocket-rpc-futures/src/main/java/com/jauntsdn/rsocket/RpcService.java @@ -22,6 +22,9 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.CompletionStage; import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import javax.annotation.Nullable; @@ -36,11 +39,11 @@ interface Factory { T withLifecycle(Closeable requester); } - final class ResponseListener implements BiConsumer { - private final Future cancelHandle; - private final BiConsumer listener; + class ResponseListener implements BiConsumer { + final Future cancelHandle; + final BiConsumer listener; - private ResponseListener( + ResponseListener( CompletionStage response, @Nullable BiConsumer listener) { this.cancelHandle = response.toCompletableFuture(); this.listener = listener; @@ -48,13 +51,30 @@ private ResponseListener( public static ResponseListener create( CompletionStage response, BiConsumer listener) { - return new ResponseListener(response, listener); + return new ResponseListener<>(response, listener); + } + + public static ResponseListener create( + CompletionStage response, + BiConsumer listener, + @Nullable ScheduledExecutorService timeoutScheduler, + long timeoutMillis) { + return new ResponseTimeoutListener(response, listener) + .scheduleTimeout(timeoutScheduler, timeoutMillis); } public static ResponseListener create(CompletionStage response) { return new ResponseListener<>(response, null); } + public static ResponseListener create( + CompletionStage response, + @Nullable ScheduledExecutorService timeoutScheduler, + long timeoutMillis) { + return new ResponseTimeoutListener(response, null) + .scheduleTimeout(timeoutScheduler, timeoutMillis); + } + @Override public void accept(T t, Throwable throwable) { if (throwable instanceof CancellationException) { @@ -67,6 +87,46 @@ public void accept(T t, Throwable throwable) { } } + final class ResponseTimeoutListener extends ResponseListener { + volatile ScheduledFuture timeoutHandle; + + ResponseTimeoutListener( + CompletionStage response, @Nullable BiConsumer listener) { + super(response, listener); + } + + ResponseListener scheduleTimeout( + @Nullable ScheduledExecutorService timeoutScheduler, long timeoutMillis) { + if (timeoutMillis > 0 && timeoutScheduler != null) { + timeoutHandle = + timeoutScheduler.schedule( + () -> { + cancelHandle.cancel(true); + }, + timeoutMillis, + TimeUnit.MILLISECONDS); + } + return this; + } + + @Override + public void accept(T t, Throwable throwable) { + ScheduledFuture h = timeoutHandle; + if (h != null) { + h.cancel(true); + } + super.accept(t, throwable); + } + } + + static ScheduledExecutorService timeoutScheduler( + MessageStreams messageStreams, long timeoutMillis) { + if (timeoutMillis <= 0) { + return null; + } + return messageStreams.scheduler().orElse(null); + } + @SuppressWarnings("all") abstract class ServerFactory implements RpcService.Factory { private final Object service;