Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Content.Source reads from within naked threads #12143

Closed
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
b51829a
add SerializedInvoker assertion
lorban Aug 6, 2024
9f3549b
add SerializedInvoker assertion
lorban Aug 6, 2024
dede122
make invoker assertion per-thread + name invokers to help debugging
lorban Aug 7, 2024
4e95323
Merge remote-tracking branch 'origin/jetty-12.0.x' into experiment/je…
lorban Aug 7, 2024
e7e6a4b
fix merge
lorban Aug 8, 2024
5f4ca3e
fix test
lorban Aug 8, 2024
93f58cc
responseContentAvailable can be called for a pending demand while inv…
lorban Aug 12, 2024
7766942
fix checkstyle
lorban Aug 12, 2024
3302b39
always invoke
lorban Aug 13, 2024
6c470d3
fix race condition
lorban Aug 14, 2024
48c9d14
improve serviceability by recording stack traces and adding the abili…
lorban Aug 14, 2024
31b2b9b
explicitly tell the receiver when to use the invoker
lorban Aug 14, 2024
27704f2
fix checkstyle
lorban Aug 14, 2024
3a88068
improve serviceability by recording stack traces and adding the abili…
lorban Aug 14, 2024
e3b4581
document why we need to either invoke or call
lorban Aug 15, 2024
b8c07a4
Merge remote-tracking branch 'origin/experiment/jetty-12.0.x/serializ…
lorban Aug 15, 2024
93b400c
Merge remote-tracking branch 'origin/jetty-12.0.x' into experiment/je…
lorban Aug 15, 2024
9e055df
fix bad merge
lorban Aug 15, 2024
63d3126
see what tests fail when not running
lorban Aug 16, 2024
7807703
revert change
lorban Aug 16, 2024
6e9d184
add comments
lorban Aug 16, 2024
7d98c47
Merge remote-tracking branch 'origin/jetty-12.0.x' into experiment/je…
lorban Aug 26, 2024
ffd8409
Merge remote-tracking branch 'origin/jetty-12.0.x' into experiment/je…
lorban Aug 26, 2024
1736903
Merge remote-tracking branch 'origin/jetty-12.0.x' into experiment/je…
lorban Aug 26, 2024
32000ac
add javadoc + rename ctor variable
lorban Aug 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public abstract class HttpReceiver
{
private static final Logger LOG = LoggerFactory.getLogger(HttpReceiver.class);

private final SerializedInvoker invoker = new SerializedInvoker();
private final SerializedInvoker invoker = new SerializedInvoker(HttpReceiver.class);
private final HttpChannel channel;
private ResponseState responseState = ResponseState.IDLE;
private NotifiableContentSource contentSource;
Expand Down Expand Up @@ -317,34 +317,47 @@ protected void responseHeaders(HttpExchange exchange)
* This method takes care of ensuring the {@link Content.Source} passed to
* {@link Response.ContentSourceListener#onContentSource(Response, Content.Source)}
* calls the demand callback.
* The call to the demand callback is serialized with other events.
* @param exchange the HTTP exchange
*/
protected void responseContentAvailable(HttpExchange exchange)
{
if (LOG.isDebugEnabled())
LOG.debug("Invoking responseContentAvailable on {}", this);

invoker.run(() ->
// Given:
// - This method must always call onDataAvailable() as the ContentSource is the only source of truth knowing if there is a
// pending demand or not, and that must always be done by the invoker.
// - This method is called sometimes from the context of the invoker (e.g.: read() from within a demand callback) and
// sometimes not (e.g.: read() from a naked thread).
// Then:
// If the consuming loop is a read-demand one (i.e.: demand is used to loop) then read() enqueues a onDataAvailable() invocation
// then demand() enqueues a processDemand() invocation before control is returned to the invoker.
// onDataAvailable() runs the demand callback that enqueues another onDataAvailable() invocation because of read() then another
// processDemand() invocation because of demand().
// There are then two processDemand() invocations enqueued, and since they call HttpReceiver.read(true) when no content is readily available,
// they can both try to register fill interest and eventually throw a ReadPendingException.
// So onDataAvailable() must always be executed from the invoker but also immediately.
// The root cause is that for H1/FCGI the parser calls this method not knowing if it is reacting to a read() or a demand().
// Make sure these 3 tests pass when modifying this method:
// - ForwardProxyWithDynamicTransportTest.testProxyConcurrentLoad()
// - ConnectionPoolTest.testConnectionPoolFactory()
// - HttpClientTest.testContentSourceListenerDemandInSpawnedThread()

Runnable runnable = () ->
{
if (LOG.isDebugEnabled())
LOG.debug("Executing responseContentAvailable on {}", this);

if (exchange.isResponseCompleteOrTerminated())
return;

responseContentAvailable();
});
}
contentSource.onDataAvailable();
};

/**
* Method to be invoked when response content is available to be read.
* <p>
* This method directly invokes the demand callback, assuming the caller
* is already serialized with other events.
*/
protected void responseContentAvailable()
{
contentSource.onDataAvailable();
if (LOG.isDebugEnabled())
LOG.debug("{} responseContentAvailable on {}", invoker.isCurrentThreadInvoking() ? "Invoking" : "Calling", this);

if (invoker.isCurrentThreadInvoking())
runnable.run(); // This is needed by H2, but it could be just return for h1/fcgi. ForwardProxyWithDynamicTransportTest.testProxyConcurrentLoad fails when we do not run the runnable here.
else
invoker.run(runnable);
}

/**
Expand Down Expand Up @@ -700,7 +713,6 @@ private class ContentSource implements NotifiableContentSource

private final AtomicReference<Runnable> demandCallbackRef = new AtomicReference<>();
private final AutoLock lock = new AutoLock();
private final Runnable processDemand = this::processDemand;
private Content.Chunk currentChunk;

@Override
Expand Down Expand Up @@ -739,6 +751,8 @@ public void onDataAvailable()
{
if (LOG.isDebugEnabled())
LOG.debug("onDataAvailable on {}", this);
if (!invoker.isCurrentThreadInvoking())
throw new IllegalStateException();
// The onDataAvailable() method is only ever called
// by the invoker so avoid using the invoker again.
invokeDemandCallback(false);
Expand All @@ -755,14 +769,17 @@ public void demand(Runnable demandCallback)
throw new IllegalStateException();
// The processDemand method may call HttpReceiver.read(boolean)
// so it must be called by the invoker.
invoker.run(processDemand);
invoker.run(this::processDemand);
}

private void processDemand()
{
if (LOG.isDebugEnabled())
LOG.debug("Processing demand on {}", this);

if (!invoker.isCurrentThreadInvoking())
throw new IllegalStateException();

Content.Chunk current;
try (AutoLock ignored = lock.lock())
{
Expand Down Expand Up @@ -802,9 +819,15 @@ private void invokeDemandCallback(boolean invoke)
try
{
if (invoke)
{
invoker.run(demandCallback);
}
else
{
if (!invoker.isCurrentThreadInvoking())
throw new IllegalStateException();
demandCallback.run();
}
}
catch (Throwable x)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ public boolean content(ByteBuffer buffer)

if (LOG.isDebugEnabled())
LOG.debug("Setting action to responseContentAvailable on {}", this);
if (getAndSetAction(this::responseContentAvailable) != null)
if (getAndSetAction(() -> responseContentAvailable(exchange)) != null)
lorban marked this conversation as resolved.
Show resolved Hide resolved
throw new IllegalStateException();
if (getHttpConnection().isFillInterested())
throw new IllegalStateException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ void content(Content.Chunk chunk)
// Retain the chunk because it is stored for later reads.
chunk.retain();
this.chunk = chunk;
responseContentAvailable();
responseContentAvailable(exchange);
}

void end(HttpExchange exchange)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ public String toString()
public abstract static class AbstractContentSource implements Content.Source, Closeable
{
private final AutoLock lock = new AutoLock();
private final SerializedInvoker invoker = new SerializedInvoker();
private final SerializedInvoker invoker = new SerializedInvoker(AbstractContentSource.class);
private final Queue<Part> parts = new ArrayDeque<>();
private final String boundary;
private final ByteBuffer firstBoundary;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public String toString()
};

private final AutoLock.WithCondition lock = new AutoLock.WithCondition();
private final SerializedInvoker invoker = new SerializedInvoker();
private final SerializedInvoker invoker = new SerializedInvoker(AsyncContent.class);
private final Queue<Content.Chunk> chunks = new ArrayDeque<>();
private Content.Chunk persistentFailure;
private boolean readClosed;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
public class ByteBufferContentSource implements Content.Source
{
private final AutoLock lock = new AutoLock();
private final SerializedInvoker invoker = new SerializedInvoker();
private final SerializedInvoker invoker = new SerializedInvoker(ByteBufferContentSource.class);
private final long length;
private final Collection<ByteBuffer> byteBuffers;
private Iterator<ByteBuffer> iterator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
public class ChunksContentSource implements Content.Source
{
private final AutoLock lock = new AutoLock();
private final SerializedInvoker invoker = new SerializedInvoker();
private final SerializedInvoker invoker = new SerializedInvoker(ChunksContentSource.class);
private final long length;
private final Collection<Content.Chunk> chunks;
private Iterator<Content.Chunk> iterator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public abstract class ContentSourceTransformer implements Content.Source

protected ContentSourceTransformer(Content.Source rawSource)
{
this(rawSource, new SerializedInvoker());
this(rawSource, new SerializedInvoker(ContentSourceTransformer.class));
}

protected ContentSourceTransformer(Content.Source rawSource, SerializedInvoker invoker)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
public class InputStreamContentSource implements Content.Source
{
private final AutoLock lock = new AutoLock();
private final SerializedInvoker invoker = new SerializedInvoker();
private final SerializedInvoker invoker = new SerializedInvoker(InputStreamContentSource.class);
private final InputStream inputStream;
private ByteBufferPool.Sized bufferPool;
private Runnable demandCallback;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class PathContentSource implements Content.Source
// TODO in 12.1.x reimplement this class based on ByteChannelContentSource

private final AutoLock lock = new AutoLock();
private final SerializedInvoker invoker = new SerializedInvoker();
private final SerializedInvoker invoker = new SerializedInvoker(PathContentSource.class);
private final Path path;
private final long length;
private final ByteBufferPool byteBufferPool;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
public class ByteChannelContentSource implements Content.Source
{
private final AutoLock lock = new AutoLock();
private final SerializedInvoker _invoker = new SerializedInvoker();
private final SerializedInvoker _invoker = new SerializedInvoker(ByteChannelContentSource.class);
private final ByteBufferPool.Sized _byteBufferPool;
private ByteChannel _byteChannel;
private final long _offset;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ public HttpChannelState(ConnectionMetaData connectionMetaData)
{
_connectionMetaData = connectionMetaData;
// The SerializedInvoker is used to prevent infinite recursion of callbacks calling methods calling callbacks etc.
_readInvoker = new HttpChannelSerializedInvoker();
_writeInvoker = new HttpChannelSerializedInvoker();
_readInvoker = new HttpChannelSerializedInvoker(HttpChannelState.class.getSimpleName() + "#readInvoker");
_writeInvoker = new HttpChannelSerializedInvoker(HttpChannelState.class.getSimpleName() + "#writeInvoker");
}

@Override
Expand Down Expand Up @@ -1813,6 +1813,11 @@ private void completing()

private class HttpChannelSerializedInvoker extends SerializedInvoker
{
public HttpChannelSerializedInvoker(String name)
{
super(name);
}

@Override
protected void onError(Runnable task, Throwable failure)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
*/
public class SerializedExecutor implements Executor
{
private final SerializedInvoker _invoker = new SerializedInvoker()
private final SerializedInvoker _invoker = new SerializedInvoker(SerializedExecutor.class)
{
@Override
protected void onError(Runnable task, Throwable t)
Expand Down
Loading
Loading