diff --git a/api/src/main/java/io/grpc/ServerTimeoutManager.java b/api/src/main/java/io/grpc/ServerTimeoutManager.java new file mode 100644 index 00000000000..4629863a2b5 --- /dev/null +++ b/api/src/main/java/io/grpc/ServerTimeoutManager.java @@ -0,0 +1,113 @@ +/* + * Copyright 2014 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +/** A global instance that schedules the timeout tasks. */ +public class ServerTimeoutManager { + private final int timeout; + private final TimeUnit unit; + + private final Consumer logFunction; + + private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + + /** + * Creates a manager. Please make it a singleton and remember to shut it down. + * + * @param timeout Configurable timeout threshold. A value less than 0 (e.g. 0 or -1) means not to + * check timeout. + * @param unit The unit of the timeout. + * @param logFunction An optional function that can log (e.g. Logger::warn). Through this, + * we avoid depending on a specific logger library. + */ + public ServerTimeoutManager(int timeout, TimeUnit unit, Consumer logFunction) { + this.timeout = timeout; + this.unit = unit; + this.logFunction = logFunction; + } + + /** Please call shutdown() when the application exits. */ + public void shutdown() { + scheduler.shutdownNow(); + } + + /** + * Schedules a timeout and calls the RPC method invocation. + * Invalidates the timeout if the invocation completes in time. + * + * @param invocation The RPC method invocation that processes a request. + */ + public void intercept(Runnable invocation) { + if (timeout <= 0) { + invocation.run(); + return; + } + + TimeoutTask timeoutTask = schedule(Thread.currentThread()); + try { + invocation.run(); + } finally { + // If it completes in time, invalidate the timeout. + timeoutTask.invalidate(); + } + } + + private TimeoutTask schedule(Thread thread) { + TimeoutTask timeoutTask = new TimeoutTask(thread); + if (!scheduler.isShutdown()) { + scheduler.schedule(timeoutTask, timeout, unit); + } + return timeoutTask; + } + + private class TimeoutTask implements Runnable { + /** null thread means the task is invalid and will do nothing */ + private final AtomicReference threadReference = new AtomicReference<>(); + + private TimeoutTask(Thread thread) { + threadReference.set(thread); + } + + @Override + public void run() { + // Ensure the reference is consumed only once. + Thread thread = threadReference.getAndSet(null); + if (thread != null) { + thread.interrupt(); + if (logFunction != null) { + logFunction.accept( + "Interrupted RPC thread " + + thread.getName() + + " for timeout at " + + timeout + + " " + + unit); + } + } + } + + private void invalidate() { + threadReference.set(null); + } + } +} diff --git a/api/src/main/java/io/grpc/TimeoutServerInterceptor.java b/api/src/main/java/io/grpc/TimeoutServerInterceptor.java new file mode 100644 index 00000000000..6c9736c16ac --- /dev/null +++ b/api/src/main/java/io/grpc/TimeoutServerInterceptor.java @@ -0,0 +1,73 @@ +/* + * Copyright 2014 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc; + +/** + * An optional ServerInterceptor that can interrupt server calls that are running for too long time. + * + *

You can add it to your server using the ServerBuilder#intercept(ServerInterceptor) method. + * + *

Limitation: it only applies the timeout to unary calls (streaming calls will run without timeout). + */ +public class TimeoutServerInterceptor implements ServerInterceptor { + + private final ServerTimeoutManager serverTimeoutManager; + + public TimeoutServerInterceptor(ServerTimeoutManager serverTimeoutManager) { + this.serverTimeoutManager = serverTimeoutManager; + } + + @Override + public ServerCall.Listener interceptCall( + ServerCall serverCall, + Metadata metadata, + ServerCallHandler serverCallHandler) { + return new TimeoutServerCallListener<>( + serverCallHandler.startCall(serverCall, metadata), serverCall, serverTimeoutManager); + } + + /** A listener that intercepts the RPC method invocation for timeout control. */ + private static class TimeoutServerCallListener + extends ForwardingServerCallListener.SimpleForwardingServerCallListener { + + private final ServerCall serverCall; + private final ServerTimeoutManager serverTimeoutManager; + + private TimeoutServerCallListener( + ServerCall.Listener delegate, + ServerCall serverCall, + ServerTimeoutManager serverTimeoutManager) { + super(delegate); + this.serverCall = serverCall; + this.serverTimeoutManager = serverTimeoutManager; + } + + /** + * Only intercepts unary calls because the timeout is inapplicable to streaming calls. + * Intercepts onHalfClose() because the RPC method is called in it. See + * io.grpc.stub.ServerCalls.UnaryServerCallHandler.UnaryServerCallListener + */ + @Override + public void onHalfClose() { + if (serverCall.getMethodDescriptor().getType().clientSendsOneMessage()) { + serverTimeoutManager.intercept(super::onHalfClose); + } else { + super.onHalfClose(); + } + } + } +}