Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Server-side timeout mechanism #10360

Open
wants to merge 23 commits into
base: master
Choose a base branch
from
Open

Conversation

sorra
Copy link

@sorra sorra commented Jul 9, 2023

gRPC Java Server-side Timeout Design

Author: Dongqing Hu

Date: 2023-09-16

References:

Intention

Regarding #9684 , there have been multiple asks in the community for the server-side timeout. Servlets and database connections support timeout, so why gRPC does not provide it?
Our application is having such a problem. The grpc-java server usually runs workers in a ThreadPoolExecutor with a maximum size (infinite size is not better with the problem). If some server calls run infinitely (e.g. in an infinite loop or in a waiting state infinitely), they will occupy some threads. And if this situation persists, eventually all threads in the pool will be occupied and no work can be done anymore, which results in a service downtime.
The client-side timeout only helps the client stop infinite waiting, it does not help the server stop infinite processing.

So the server needs a forced timeout. Per the comment by @ejona86 #9684 (comment) , application developers can do it via a server interceptor. But such a thing is not straightforward enough for application developers to implement on their own. So why not provide a built-in API in the grpc-java framework?

Alternatives

Alternative 1:

Apply AOP (dynamic proxy or byte code weaving) to each application RPC service class, the AOP intercepts each application RPC method with timeout control.

Alternative 2:

Each RPC method explicitly delegates the request handling to another executor that supports timeout control. These methods must remember to pass along the gRPC context to the another executor.

These alternatives are too invasive to the application.

Design

Overall Flow

As we know. After startCall, a server call will go through listenable stages like onReady, onMessage, onHalfClose, and onComplete/onCancel.

A new interceptor is introduced in the util module. It can intercept startCall and create a CancellableContext timeoutContext using the configured timeout, and the timeout context has a cancellation listener to close the server call with Status.CANCELLED. The timeoutContext is attached to each stage, so each stage is able to know if timeout is reached by calling context.isCancelled(). Whether each stage checks context.isCancelled() or not, the server call is eventually closed (is this enough?).
The core code is like:

// In the interceptor
      ServerCall<ReqT, RespT> serializingServerCall = new SerializingServerCall<>(serverCall);
      Context.CancellableContext timeoutContext =
              serverTimeoutManager.startTimeoutContext(serializingServerCall);
      if (timeoutContext != null) {
        return new TimeoutServerCallListener<>(
                serverCallHandler.startCall(serializingServerCall, metadata),
                timeoutContext,
                serverTimeoutManager);
      }

// In the listener
    public void onSomething() {
      Context previous = context.attach();
      try {
        super.onSomething();
      } finally {
        context.detach(previous);
        // call context.close() if the stage is onComplete or onCancel.
      }
    }

Especially, if option shouldInterrupt == true, the unary server call's onHalfClose will have an additional cancellation listener to interrupt the current thread (the thread is in the execution of the application RPC method). Eventually, if timeout is not reached, onComplete/onCancel will normally cancel the timeout context to allow it to be garbage collected.

Notable Details

Serializing:

SerializingServerCall is used to close the server call thread-safely.

Status:

If the timeout is reached, it always results in a Status.CANCELLED with description "server call timeout".

Interruption:

  • Interruption is performed only when timeout is reached and shouldInterrupt == true and the stage is onHalfClose of a unary server call (where the application RPC method is invoked). If interruption has been performed, the interrupt state is always reset when leaving onHalfClose. This is to allow the worker thread to be safely reused for the next task in a ForkJoinPool. For more information, refer to https://bugs.openjdk.org/browse/JDK-8223430.
  • Interruption is useful for stopping excessive IO waits and lock waits (especially those mistakenly having no timeout) so the worker thread can be returned to the pool in time.
  • Interruption is not so useful for SQL queries because JDBC is not interruptible https://bugs.openjdk.org/browse/JDK-6393812. So applications have to set a SQL timeout besides the RPC timeout.

Pending Questions

  1. Should streaming server calls also have a timeout?
  2. Should interruption be also applied to other stages?
  3. If the server call is closed in a stage e.g. onHalfClose, what will be the next stage? onComplete, onCancel or nothing?

@linux-foundation-easycla
Copy link

linux-foundation-easycla bot commented Jul 9, 2023

CLA Signed

The committers listed above are authorized under a signed CLA.

@linux-foundation-easycla
Copy link

linux-foundation-easycla bot commented Jul 9, 2023

CLA Signed

The committers listed above are authorized under a signed CLA.

@sorra
Copy link
Author

sorra commented Jul 17, 2023

Previously, a task was cancelled using TimeoutTask#invalidate()

    /** null thread means the task is invalid and will do nothing */
    private final AtomicReference<Thread> threadReference = new AtomicReference<>();
......
threadReference.set(null);

But error-prone asks me not to ignore the Future returned by ScheduledThreadPoolExecutor:

/home/runner/work/grpc-java/grpc-java/api/src/main/java/io/grpc/ServerTimeoutManager.java:78: warning: [FutureReturnValueIgnored] Return value of methods returning Future must be checked. Ignoring returned Futures suppresses exceptions thrown from the code that completes the Future.
      scheduler.schedule(timeoutTask, timeout, unit);
                        ^
    (see https://errorprone.info/bugpattern/FutureReturnValueIgnored)
  Did you mean 'var unused = scheduler.schedule(timeoutTask, timeout, unit);' or to remove this line?

So I now use Future for cancellation. The difference is:

  • invalidating a TimeoutTask only makes it no-op
  • canceling a Future not only makes it no-op but also removes it from the ScheduledThreadPoolExecutor's queue (as I tested, the queue removal does not affect performance).

@sanjaypujare
Copy link
Contributor

@sorra the approach in this PR is quite different from @ejona86 #9684 (comment) : specifically "creating a Context with your deadline and then adding a cancellationListener for that Context on when it closes and call ServerCall.close()" etc.

Also note the second part of the comment "However, ServerCall will need its methods to become synchronized and you'll want to ignore calls to sendMessage(), close(), and the like after the deadline expires." This either needs a much bigger change in gRPC or we can say this is really something for the application to do.

@sorra
Copy link
Author

sorra commented Jul 18, 2023

@sanjaypujare thank you.

  1. I think cancellationListener and ServerCall.close() can only close the stream and cannot stop the application's RPC method execution.
  2. My approach simply interrupts the RPC method execution which should result in an exception thrown from the execution if its runner thread is in a waiting state (io-wait, lock-wait, sleep) or its code path is actively checking interruption during a loop, so it can naturally jump to exception handling. I think there is no need to do any changes to ServerCall.

@sanjaypujare
Copy link
Contributor

  1. I think cancellationListener and ServerCall.close() can only close the stream and cannot stop the application's RPC method execution.

cancellationListener is for you (the user) to register your own listener and in that listener you can kill/abort the server handler thread.

  1. My approach simply interrupts the RPC method execution which should result in an exception thrown from the execution if its runner thread is in a waiting state (io-wait, lock-wait, sleep) or its code path is actively checking interruption during a loop, so it can naturally jump to exception handling. I think there is no need to do any changes to ServerCall.

I think the cancellationListener approach can achieve the same thing.

@sorra
Copy link
Author

sorra commented Jul 23, 2023

Sorry too busy these days.
@sanjaypujare I need a way to capture the application's execution thread so I can interrupt it.

  1. I should check if cancellationListener is run in the same thread; if so, that will work. I have taken a quick look and think "no, it is run in a random thread", according to
    notifyAndClearListeners();
    and
    void deliver() {
      try {
        executor.execute(this);
      } catch (Throwable t) {
        log.log(Level.INFO, "Exception notifying context listener", t);
      }
    }
  1. It is necessary to capture that thread because ServerCall.close() can only close the stream according to
    private void internalClose(Status internalError) {
  /**
   * Close the {@link ServerStream} because an internal error occurred. Allow the application to
   * run until completion, but silently ignore interactions with the {@link ServerStream} from now
   * on.
   */
  private void internalClose(Status internalError) {
    log.log(Level.WARNING, "Cancelling the stream with status {0}", new Object[] {internalError});
    stream.cancel(internalError);
    serverCallTracer.reportCallEnded(internalError.isOk()); // error so always false
  }

@sorra
Copy link
Author

sorra commented Jul 29, 2023

It is difficult to test threading behavior in unit tests.
So I have created a project https://github.com/sorra/grpc-timeout (you can run Server.java and Client.java) to test my approach and the alternative cancellationListener approach. Did I implement the alternative approach correctly? Could you help review it?

@ejona86
Copy link
Member

ejona86 commented Aug 8, 2023

Cancellation listener is exactly what you want for stopping the application. The RPC can be cancelled for many reasons, like Deadline, the client explicitly cancelled, or I/O failures. It'd look like:

    @Override
    public void onHalfClose() {
      Context context = Context.current();
      Thread currentThread = Thread.currentThread();
      Context.CancellationListener cancelled = c -> currentThread.interrupt();
      context.addListener(cancelled, Executors.directExecutor());
      try {
        super.onHalfClose();
      } finally {
         // You must remove; you don't want to interrupt the wrong thread
        context.removeListener(cancelled);
      }
    }

@sanjaypujare
Copy link
Contributor

Cancellation listener is exactly what you want for stopping the application. The RPC can be cancelled for many reasons, like Deadline, the client explicitly cancelled, or I/O failures. It'd look like:
...

Does this depend on receiving the HalfClose from the client? If the sever wants to time out without having to depend on a halfClose from the client (say the client has died or connectivity is lost) do we need to do something more?

@sorra
Copy link
Author

sorra commented Aug 9, 2023

Thank you. This is my updated approach using CancellableContext and CancellationListener:

    try (Context.CancellableContext context = Context.current()
        .withDeadline(Deadline.after(timeout, unit), scheduler)) {
      Thread thread = Thread.currentThread();
      Context.CancellationListener cancelled = c -> {
        if (c.cancellationCause() == null) {
          return;
        }
        thread.interrupt();
        // logging ......
      };
      context.addListener(cancelled, MoreExecutors.directExecutor());
      context.run(invocation);
      return true;
    }

Ordinary Context is not cancellable, so I add a CancellableContext with a deadline, and its API requires a user-provided ScheduledExecutorService scheduler.
The try-with-resources auto-closes the context (close() calls context.cancel(cause: null), so the listener should handle the null cause case).

@sorra
Copy link
Author

sorra commented Aug 10, 2023

Yesterday it worked in production 👏🏻

2023-08-09 05:19:16.467+0000 WARN 280 --- [pool-16-thread-1] ServerTimeoutManager.accept : Interrupted RPC thread pool-19-thread-13 for timeout at 60 SECONDS

@ejona86
Copy link
Member

ejona86 commented Aug 10, 2023

Does this depend on receiving the HalfClose from the client?

It is assuming that the RPC is unary or server-streaming. To extend it to more cases, you'd add the listener to more callbacks.

This is my updated approach using CancellableContext and CancellationListener

Note that your approach does not close the RPC. So the RPC is still consuming memory in gRPC when your application returns and the client is left hanging. If your application is handling the interruption by cancelling the RPC, then it'd work fine, although would be fragile.

This is looking in a state where it is useful to you, but needs serious changes to be accepted into gRPC. It it is too specialized and error-prone at the moment. For gRPC, we'd want the interruption handling to be a separate interceptor from the Context/Deadline handling, since they are separate features. The Deadline handling should also create the new Context within interceptCall and use Contexts.interceptCall() to set it on the thread for each callback, as interceptors expect a consistent Context. And when the deadline expires, the RPC would need to be closed. Closing the RPC is pretty annoying to make thread-safe, but we could share code with TransmitStatusRuntimeExceptionInterceptor.SerializingServerCall.

@sorra
Copy link
Author

sorra commented Aug 11, 2023

@ejona86 Thank you for the review comment.

Note that your approach does not close the RPC. So the RPC is still consuming memory in gRPC when your application returns and the client is left hanging. If your application is handling the interruption by cancelling the RPC, then it'd work fine, although would be fragile.

My approach only tries to stop the application RPC method invocation (not to stop other stages because there should be other mechanisms to handle other stages properly) and is based on the assumption that ThreadPoolExecutor will auto-clear the interrupted state on completing each execution, so following executions in the same thread will not be affected. But this does not apply to ForkJoinPool. To be safe, we can more actively ensure the auto-clear. I think this mechanism is strong enough to protect the system.

What about this: let ServerTimeoutManager#withTimeout() auto-clear the interrupted state when each application RPC method invocation is completed.

  1. If the application RPC method decides to handle the interruption (e.g. handles exceptions like InterruptedException or IOException, or actively checks Thread.interrupted()), it should be able to handle it correctly e.g. return an error response. This is like handling other types of Exception. Everything works fine.
  2. Else the application RPC method does not handle the interruption (e.g. not catch exceptions, or busy looping without checking Thread.interrupted()), we cannot really stop its execution, so we can restore the interrupted state when the execution is eventually completed as if we never interrupted it. The worst thing is only that the timeout does not take effect, but not worse because nothing is broken.

In both conditions, RPC can be completed at a determined state with memory freed (except that the application is not interrupted and runs infinitely, which is the original problem that I want to solve with this PR).

If the application does not know how to deal with interruption, it can simply do nothing about it.


For gRPC, we'd want the interruption handling to be a separate interceptor from the Context/Deadline handling, since they are separate features.

Sorry what does this mean?


The Deadline handling should also create the new Context within interceptCall and use Contexts.interceptCall() to set it on the thread for each callback, as interceptors expect a consistent Context.

Is this a statement about the status quo, or an ask for improvement?

@sorra
Copy link
Author

sorra commented Aug 20, 2023

@ejona86 I think I get most of your point after more learning.


The Deadline handling should also create the new Context within interceptCall and use Contexts.interceptCall() to set it on the thread for each callback, as interceptors expect a consistent Context.

Such a context is shared by each callback, so a holistic timeout is applied to the whole lifecycle, which is better than a halfClose-only timeout, right?


And when the deadline expires, the RPC would need to be closed.

It ensures the client will not hang even if the application forgets to send a response on timeout (though I think it should not happen because the application should either do it correctly or just not do it so a Status.UNKNOWN will be returned).
I agree it could be useful. And it is also useful for cascading cancellation.


For gRPC, we'd want the interruption handling to be a separate interceptor from the Context/Deadline handling, since they are separate features.

But I still do not know what this means. How to ensure the correct thread is interrupted if interruption handling is a separate interceptor? Could you please explain more about the design you expect?
Thank you.

@sorra
Copy link
Author

sorra commented Sep 3, 2023

My recent changes on September 3:

  1. Mark the API as experimental and introduce a builder for ServerTimeoutManager.
  2. Thread interruption behavior is now opt-in via the shouldInterrupt parameter (defaults to false). The derived context is always cancelled, while the thread is only interrupted when shouldInterrupt is true.
  3. If an interruption has been performed, clear the thread interruption flag after the application RPC method execution, so the worker thread can be safely reused by the next RPC callback when the executor is ForkJoinPool. More information:

* access by serializing everything on an executor.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/2189")
class SerializingServerCall<ReqT, RespT> extends
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is extracted from TransmitStatusRuntimeExceptionInterceptor.

@sorra sorra force-pushed the server-side-timeout branch 2 times, most recently from d7a917e to 0c0d2a9 Compare September 17, 2023 12:01
@sorra
Copy link
Author

sorra commented Sep 17, 2023

September 16 update:
Hi @ejona86,

  1. The context is now applied to every listenable stage not limited to halfClose. I use SerializingServerCall to close the server call and it always returns a Status.CANCELLED with description "server call timeout".
  2. The SerializingServerCall class is extracted from TransmitStatusRuntimeExceptionInterceptor for sharing code. But it is lack of code coverage. The current low code coverage is all caused by it and I propose to handle it a little later.
  3. I have written a quick design summary in the PR description.

@sanjaypujare sanjaypujare removed their request for review December 26, 2023 17:50
@sorra
Copy link
Author

sorra commented Jan 22, 2024

Hi @ejona86 @sanjaypujare , would you like to review this PR?

@sorra
Copy link
Author

sorra commented Apr 2, 2024

I know you are busy reviewing so many PRs.
This PR is becoming old, could you please let me know the top concern in your mind, or you think the code changes need further refinement? @ejona86 @sanjaypujare

@kuntsali
Copy link

Hello!
If there are any plans to add this feature?
We'd love to use this feature on our services

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants