Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restore thread starvation tests #12395

Merged
merged 57 commits into from
Oct 31, 2024

Conversation

lorban
Copy link
Contributor

@lorban lorban commented Oct 16, 2024

testReadStarvation are broken in ee9, ee10 and core.

They uncovered a problem with the core API not always correctly using InvocationTypes and using CompletableFuture as a return type for async calls.

@lorban lorban requested review from gregw and sbordet October 16, 2024 21:56
@lorban lorban self-assigned this Oct 16, 2024
@gregw
Copy link
Contributor

gregw commented Oct 17, 2024

@lorban I've pushed a fixed for the read side of servlets in ee10. Thoughts?

@gregw
Copy link
Contributor

gregw commented Oct 17, 2024

@lorban @sbordet we could extend CompletableFuture to an InvocableCompletableFuture that either:

  • intercepts calls to methods like andThen which pass in functional references. If any such method is called, then the invocation type will be blocking, otherwise it is non-blocking
  • intercepts calls to methods like andThen and looks at the invocation type of the functional references passed, which it combines as the invocation type.
  • intercepts calls to methods like andThen and redirects them to andThenAsync. The invocation type can then always be non-blocking.

Note the returned completablefuture might also need to be wrapped (maybe not for the andThenAsync redirection).

@gregw
Copy link
Contributor

gregw commented Oct 17, 2024

@lorban @sbordet would this help:

    class NonBlockingCompletableFuture<V> extends CompletableFuture<V> implements Invocable
    {
        private final Executor _executor;

        public NonBlockingCompletableFuture(Executor executor)
        {
            _executor = executor;
        }

        @Override
        public InvocationType getInvocationType()
        {
            return InvocationType.NON_BLOCKING;
        }

        @Override
        public CompletableFuture<Void> acceptEither(CompletionStage<? extends V> other, Consumer<? super V> action)
        {
            return super.acceptEitherAsync(other, action, _executor);
        }

        @Override
        public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends V> other, Function<? super V, U> fn)
        {
            return super.applyToEitherAsync(other, fn, _executor);
        }

        @Override
        public CompletableFuture<V> exceptionallyCompose(Function<Throwable, ? extends CompletionStage<V>> fn)
        {
            return super.exceptionallyComposeAsync(fn, _executor);
        }

        @Override
        public <U> CompletableFuture<U> handle(BiFunction<? super V, Throwable, ? extends U> fn)
        {
            return super.handleAsync(fn, _executor);
        }

        @Override
        public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action)
        {
            return super.runAfterBothAsync(other, action, _executor);
        }

        @Override
        public CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action)
        {
            return super.runAfterEitherAsync(other, action, _executor);
        }

        @Override
        public CompletableFuture<Void> thenAccept(Consumer<? super V> action)
        {
            return super.thenAcceptAsync(action, _executor);
        }

        @Override
        public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super V, ? super U> action)
        {
            return super.thenAcceptBothAsync(other, action, _executor);
        }

        @Override
        public <U> CompletableFuture<U> thenApply(Function<? super V, ? extends U> fn)
        {
            return super.thenApplyAsync(fn, _executor);
        }

        @Override
        public <U, V1> CompletableFuture<V1> thenCombine(CompletionStage<? extends U> other, BiFunction<? super V, ? super U, ? extends V1> fn)
        {
            return super.thenCombineAsync(other, fn, _executor);
        }

        @Override
        public <U> CompletableFuture<U> thenCompose(Function<? super V, ? extends CompletionStage<U>> fn)
        {
            return super.thenComposeAsync(fn, _executor);
        }

        @Override
        public CompletableFuture<Void> thenRun(Runnable action)
        {
            return super.thenRunAsync(action, _executor);
        }

        @Override
        public CompletableFuture<V> whenComplete(BiConsumer<? super V, ? super Throwable> action)
        {
            return super.whenCompleteAsync(action, _executor);
        }
    }

@sbordet
Copy link
Contributor

sbordet commented Oct 17, 2024

I honestly would not go there.
Changing the semantic of thenRun() into a thenRunAsync() is breaking the least surprise.

I think we can just say in the javadocs to never block on the CF returned, or deprecate the API.

@gregw
Copy link
Contributor

gregw commented Oct 18, 2024

I honestly would not go there. Changing the semantic of thenRun() into a thenRunAsync() is breaking the least surprise.

I think we can just say in the javadocs to never block on the CF returned, or deprecate the API.

We need more than javadocs, as we need the Runnable or Callback created from the CF to be Invocable. So how about:

    /**
     * An extension of {@link java.util.concurrent.CompletableFuture} that is an {@link Invocable}.
     * The {@link InvocationType} is initially the type used in construction (default NON_BLOCKING).
     * If a non async method is called, then the invocation type of any passed function is used.
     * @param <V>
     */
    class InvocableCompletableFuture<V> extends java.util.concurrent.CompletableFuture<V> implements Invocable
    {
        private final AtomicReference<InvocationType> _invocationType = new AtomicReference<>();

        public InvocableCompletableFuture()
        {
            this(null);
        }

        public InvocableCompletableFuture(InvocationType invocationType)
        {
            _invocationType.set(Objects.requireNonNullElse(invocationType, InvocationType.NON_BLOCKING));
        }

        @Override
        public InvocationType getInvocationType()
        {
            return _invocationType.get();
        }

        @Override
        public java.util.concurrent.CompletableFuture<Void> acceptEither(CompletionStage<? extends V> other, Consumer<? super V> action)
        {
            _invocationType.set(Invocable.combine(Invocable.getInvocationType(other), Invocable.getInvocationType(action)));
            return super.acceptEither(other, action);
        }

        @Override
        public <U> java.util.concurrent.CompletableFuture<U> applyToEither(CompletionStage<? extends V> other, Function<? super V, U> fn)
        {
            _invocationType.set(Invocable.combine(Invocable.getInvocationType(other), Invocable.getInvocationType(fn)));
            return super.applyToEither(other, fn);
        }

        @Override
        public <U> java.util.concurrent.CompletableFuture<U> handle(BiFunction<? super V, Throwable, ? extends U> fn)
        {
            _invocationType.set(Invocable.getInvocationType(fn));
            return super.handle(fn);
        }

        @Override
        public java.util.concurrent.CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action)
        {
            _invocationType.set(Invocable.combine(Invocable.getInvocationType(other), Invocable.getInvocationType(action)));
            return super.runAfterBoth(other, action);
        }

        @Override
        public java.util.concurrent.CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action)
        {
            _invocationType.set(Invocable.combine(Invocable.getInvocationType(other), Invocable.getInvocationType(action)));
            return super.runAfterEither(other, action);
        }

        @Override
        public java.util.concurrent.CompletableFuture<Void> thenAccept(Consumer<? super V> action)
        {
            _invocationType.set(Invocable.getInvocationType(action));
            return super.thenAccept(action);
        }

        @Override
        public <U> java.util.concurrent.CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super V, ? super U> action)
        {
            _invocationType.set(Invocable.combine(Invocable.getInvocationType(other), Invocable.getInvocationType(action)));
            return super.thenAcceptBoth(other, action);
        }

        @Override
        public <U> java.util.concurrent.CompletableFuture<U> thenApply(Function<? super V, ? extends U> fn)
        {
            _invocationType.set(Invocable.getInvocationType(fn));
            return super.thenApply(fn);
        }

        @Override
        public <U, V1> java.util.concurrent.CompletableFuture<V1> thenCombine(CompletionStage<? extends U> other, BiFunction<? super V, ? super U, ? extends V1> fn)
        {
            _invocationType.set(Invocable.combine(Invocable.getInvocationType(other), Invocable.getInvocationType(fn)));
            return super.thenCombine(other, fn);
        }

        @Override
        public <U> java.util.concurrent.CompletableFuture<U> thenCompose(Function<? super V, ? extends CompletionStage<U>> fn)
        {
            _invocationType.set(Invocable.getInvocationType(fn));
            return super.thenCompose(fn);
        }

        @Override
        public java.util.concurrent.CompletableFuture<Void> thenRun(Runnable action)
        {
            _invocationType.set(Invocable.getInvocationType(action));
            return super.thenRun(action);
        }

        @Override
        public java.util.concurrent.CompletableFuture<V> whenComplete(BiConsumer<? super V, ? super Throwable> action)
        {
            _invocationType.set(Invocable.getInvocationType(action));
            return super.whenComplete(action);
        }
    }

@gregw
Copy link
Contributor

gregw commented Oct 18, 2024

That would then be used with:

public abstract class ContentSourceCompletableFuture<X> extends Invocable.InvocableCompletableFuture<X> implements Runnable
{
    private final Content.Source _content;

    public ContentSourceCompletableFuture(Content.Source content)
    {
        _content = content;
    }

    /**
     * <p>Initiates the parsing of the {@link Content.Source}.</p>
     * <p>For every valid chunk that is read, {@link #parse(Content.Chunk)}
     * is called, until a result is produced that is used to
     * complete this {@link CompletableFuture}.</p>
     * <p>Internally, this method is called multiple times to progress
     * the parsing in response to {@link Content.Source#demand(Runnable)}
     * calls.</p>
     * <p>Exceptions thrown during parsing result in this
     * {@link CompletableFuture} to be completed exceptionally.</p>
     */
    public void parse()
    {
        while (true)
        {
            Content.Chunk chunk = _content.read();
            if (chunk == null)
            {
                _content.demand(this);
                return;
            }
            if (Content.Chunk.isFailure(chunk))
            {
                if (chunk.isLast())
                {
                    completeExceptionally(chunk.getFailure());
                }
                else
                {
                    if (onTransientFailure(chunk.getFailure()))
                        continue;
                    _content.fail(chunk.getFailure());
                    completeExceptionally(chunk.getFailure());
                }
                return;
            }

            try
            {
                X x = parse(chunk);
                if (x != null)
                {
                    complete(x);
                    return;
                }
            }
            catch (Throwable failure)
            {
                completeExceptionally(failure);
                return;
            }
            finally
            {
                chunk.release();
            }

            if (chunk.isLast())
            {
                completeExceptionally(new EOFException());
                return;
            }
        }
    }

    @Override
    public void run()
    {
        parse();
    }

    /**
     * <p>Called by {@link #parse()} to parse a {@link org.eclipse.jetty.io.Content.Chunk}.</p>
     *
     * @param chunk The chunk containing content to parse. The chunk will never be {@code null} nor a
     *              {@link org.eclipse.jetty.io.Content.Chunk#isFailure(Content.Chunk) failure chunk}.
     *              If the chunk is stored away to be used later beyond the scope of this call,
     *              then implementations must call {@link Content.Chunk#retain()} and
     *              {@link Content.Chunk#release()} as appropriate.
     * @return The parsed {@code X} result instance or {@code null} if parsing is not yet complete
     * @throws Throwable If there is an error parsing
     */
    protected abstract X parse(Content.Chunk chunk) throws Throwable;

    /**
     * <p>Callback method that informs the parsing about how to handle transient failures.</p>
     *
     * @param cause A transient failure obtained by reading a {@link Content.Chunk#isLast() non-last}
     *             {@link org.eclipse.jetty.io.Content.Chunk#isFailure(Content.Chunk) failure chunk}
     * @return {@code true} if the transient failure can be ignored, {@code false} otherwise
     */
    protected boolean onTransientFailure(Throwable cause)
    {
        return false;
    }
}

@gregw
Copy link
Contributor

gregw commented Oct 18, 2024

@sbordet @lorban I've used the approach above to extend Invocable.InvocableCompletableFuture for ContentSourceCompletableFuture.
I've tested this with a new form based starvation test, that fails with the current future and passes with this fix.

@gregw
Copy link
Contributor

gregw commented Oct 18, 2024

I honestly would not go there. Changing the semantic of thenRun() into a thenRunAsync() is breaking the least surprise.

I think there is no problem changing thenRun() into thenRunAsync(), as the user of a CF has no control of what thread calls them in the first place, so the differences will be transparent to them. I don't think anybody would be surprised.

But, the current approach doesn't need this.

Signed-off-by: Ludovic Orban <[email protected]>
@lorban
Copy link
Contributor Author

lorban commented Oct 18, 2024

IMHO the CompletableFuture API is orthogonal to the main problem, so we should not distract ourselves too much with this side problem.

That being said, I think the main problem I have with that API is that it makes it so easy to write broken code that I'm tempted to say it encourages. This simple example brings back the original problem in a non-obvious way:

String s = Content.Source.asStringAsync(source, charset).thenRun(() -> { /* whatever */ }).get();

The above is my main motivation to say we should deprecate all our CompletableFuture API.

Back to the main problem, I remember a few months ago I said that we made a mistake when we designed the Content.Source.demand() call to take a Runnable: it should have taken a Invocable.Task instead to make it apparent that InvocationType is meaningful. I wonder if we could do that change in 12.1 without breaking backward compatibility, as it's source-compatible but not binary compatible?

Signed-off-by: Ludovic Orban <[email protected]>
Signed-off-by: Ludovic Orban <[email protected]>
Signed-off-by: Ludovic Orban <[email protected]>
Signed-off-by: Ludovic Orban <[email protected]>
@sbordet
Copy link
Contributor

sbordet commented Oct 18, 2024

@lorban unfortunately it's not a compatible change: we have several occurrences of allocating a Runnable anonymous, and then calling demand(this) inside that Runnable.

@sbordet
Copy link
Contributor

sbordet commented Oct 18, 2024

I think there is no problem changing thenRun() into thenRunAsync(), as the user of a CF has no control of what thread calls them in the first place, so the differences will be transparent to them. I don't think anybody would be surprised.

They will be, as thenRunAsync() will occupy a common pool thread, and after N of them they won't be available, so tasks won't be run.
In the other case I would block some other thread, but not the precious ones in the common pool.

Furthermore, it would not solve the problem of a double composition, e.g. async().thenApply(...).thenRun(<JDBC>).

I really would not go into trying to put another smartness to fix this, but rather just document clearly the API or deprecate it.

…ehavior in case of close.

Signed-off-by: Simone Bordet <[email protected]>
@sbordet
Copy link
Contributor

sbordet commented Oct 29, 2024

TODO:

  • Make Promise implement Invocable.
  • Remove Promise.Invocable
  • Remove Invocable.InvocableCompletableFuture.
  • Use Invocable.ReadyTask in more places where Invocable.Task is used.

@gregw
Copy link
Contributor

gregw commented Oct 29, 2024

TODO:

  • Make Promise implement Invocable.

I don't think we should. Not all Promises are Invocable, and this makes it clear in the signature when the caller has to think about InvocationType or not.

  • Remove Promise.Invocable

Keep it.

  • Remove Invocable.InvocableCompletableFuture.

I'm 50:50 on this one. It has helped me a couple of times when I got the wrong invocation type during development.

  • Use Invocable.ReadyTask in more places where Invocable.Task is used.

I find the name ReadyTask meaningless and it does very little. Directly implementing Invocable.Task is more readable. Also, we end up creating Runnable lambdas just to pass into the constructor of ReadyTask, so might as well just create a nested class anyway.

@gregw
Copy link
Contributor

gregw commented Oct 29, 2024

  • Remove Invocable.InvocableCompletableFuture.

I'm 50:50 on this one. It has helped me a couple of times when I got the wrong invocation type during development.

I've got a compromise. I've removed InvocableCompletableFuture, but moved its methods to ContentSourceCompletableFuture, which was the only use anyway. We have not yet deprecated CSCF as we do not have an alternative, so the extra protection in using it is worthwhile I think.

@gregw
Copy link
Contributor

gregw commented Oct 29, 2024

@sbordet Another compromise.... How about we introduce Invocable.AbstractTask that implements the getInvocationType, but keeps run abstract so we don't have to create a runnable lambda just to pass into the ReadyTask constructor?

sbordet and others added 2 commits October 30, 2024 00:25
Improved ThreadStarvationTest.

Signed-off-by: Simone Bordet <[email protected]>
 + refined more DemandTask implementations
 + Added InvocableType.runWithoutBlocking (name is WIP)
@gregw gregw self-requested a review October 30, 2024 03:52
Copy link
Contributor

@sbordet sbordet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to tackle the 2 renamings will be good, otherwise LGTM.

@gregw gregw requested a review from sbordet October 30, 2024 22:28
@gregw gregw merged commit 94c3f9d into jetty-12.0.x Oct 31, 2024
10 checks passed
@gregw gregw deleted the fix/12.0.x/12214-restore-thread-starvation-tests branch October 31, 2024 01:01
@lorban lorban linked an issue Nov 4, 2024 that may be closed by this pull request
sbordet added a commit that referenced this pull request Nov 5, 2024
Restored synchronous behavior for `HttpConnection.close()`.
This allows to send a response for suspended requests while the server is being stopped.

See also #12435.

Signed-off-by: Simone Bordet <[email protected]>
sbordet added a commit that referenced this pull request Nov 5, 2024
Restored synchronous behavior for `HttpConnection.close()`.
This allows to send a response for suspended requests while the server is being stopped.

See also #12435.

Signed-off-by: Simone Bordet <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Bug For general bugs on Jetty side High Priority
Projects
No open projects
Status: ✅ Done
Development

Successfully merging this pull request may close these issues.

ThreadStarvation Testing
4 participants