Skip to content

Commit

Permalink
Improved locking for HttpReceiver.ContentSource. (#9007)
Browse files Browse the repository at this point in the history
* Improved locking for HttpReceiver.ContentSource.

Improved response failure code path.
Now either responseFailure() must be called, or exchange.responseComplete() followed by HttpReceiver.abort().

Fixed failAndClose() for HTTP/2 and HTTP/3: the connection must not be closed, stream.reset() is sufficient.

Fixed flaky test HttpClientDemandTest.testTwoListenersWithDifferentDemand().

Fixed DistributionTests.testVirtualThreadPool().

Signed-off-by: Simone Bordet <[email protected]>
  • Loading branch information
sbordet authored Dec 6, 2022
1 parent f8c4783 commit d505466
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ public void abort(Throwable failure, Promise<Boolean> promise)
}

if (LOG.isDebugEnabled())
LOG.debug("Failed {}: req={}/rsp={} {}", this, abortRequest, abortResponse, failure);
LOG.debug("Failed {}: req={}/rsp={}", this, abortRequest, abortResponse, failure);

if (!abortRequest && !abortResponse)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.eclipse.jetty.io.content.ContentSourceTransformer;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.AutoLock;
import org.eclipse.jetty.util.thread.SerializedInvoker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -266,6 +267,9 @@ protected void responseHeaders(HttpExchange exchange)
List<Response.ResponseListener> responseListeners = exchange.getConversation().getResponseListeners();
notifier.notifyHeaders(responseListeners, response);

if (exchange.isResponseComplete())
return;

if (HttpStatus.isInterim(response.getStatus()))
{
if (LOG.isDebugEnabled())
Expand Down Expand Up @@ -338,24 +342,18 @@ protected void responseSuccess(HttpExchange exchange, Runnable afterSuccessTask)
if (LOG.isDebugEnabled())
LOG.debug("Invoking responseSuccess on {}", this);

NotifiableContentSource contentSource = this.contentSource;
if (contentSource != null)
{
this.contentSource = null;
contentSource.eof();
}
// Mark atomically the response as completed, with respect
// to concurrency between response success and response failure.
if (!exchange.responseComplete(null))
return;

invoker.run(() ->
{
if (LOG.isDebugEnabled())
LOG.debug("Executing responseSuccess on {}", this);

// Mark atomically the response as completed, with respect
// to concurrency between response success and response failure.
if (!exchange.responseComplete(null))
return;

responseState = ResponseState.IDLE;

reset();

HttpResponse response = exchange.getResponse();
Expand Down Expand Up @@ -385,34 +383,26 @@ protected void responseSuccess(HttpExchange exchange, Runnable afterSuccessTask)
protected void responseFailure(Throwable failure, Promise<Boolean> promise)
{
if (LOG.isDebugEnabled())
LOG.debug("Invoking responseFailure with {} on {}", failure, this);

invoker.run(() ->
LOG.debug("Failing with {} on {}", failure, this);

HttpExchange exchange = getHttpExchange();
// In case of a response error, the failure has already been notified
// and it is possible that a further attempt to read in the receive
// loop throws an exception that reenters here but without exchange;
// or, the server could just have timed out the connection.
if (exchange == null)
{
if (LOG.isDebugEnabled())
LOG.debug("Executing responseFailure on {}", this);

HttpExchange exchange = getHttpExchange();
// In case of a response error, the failure has already been notified
// and it is possible that a further attempt to read in the receive
// loop throws an exception that reenters here but without exchange;
// or, the server could just have timed out the connection.
if (exchange == null)
{
promise.succeeded(false);
return;
}

if (LOG.isDebugEnabled())
LOG.debug("Response failure {}", exchange.getResponse(), failure);
promise.succeeded(false);
return;
}

// Mark atomically the response as completed, with respect
// to concurrency between response success and response failure.
if (exchange.responseComplete(failure))
abort(exchange, failure, promise);
else
promise.succeeded(false);
});
// Mark atomically the response as completed, with respect
// to concurrency between response success and response failure.
boolean completed = exchange.responseComplete(failure);
if (completed)
abort(exchange, failure, promise);
else
promise.succeeded(false);
}

private void terminateResponse(HttpExchange exchange)
Expand Down Expand Up @@ -482,10 +472,14 @@ public void abort(HttpExchange exchange, Throwable failure, Promise<Boolean> pro
if (LOG.isDebugEnabled())
LOG.debug("Invoking abort with {} on {}", failure, this);

// This method should be called only after calling HttpExchange.responseComplete().
if (!exchange.isResponseComplete())
throw new IllegalStateException();

invoker.run(() ->
{
if (LOG.isDebugEnabled())
LOG.debug("Executing abort on {}", this);
LOG.debug("Executing abort with {} on {}", failure, this);

if (responseState == ResponseState.FAILURE)
{
Expand All @@ -496,7 +490,7 @@ public void abort(HttpExchange exchange, Throwable failure, Promise<Boolean> pro
responseState = ResponseState.FAILURE;
this.failure = failure;
if (contentSource != null)
contentSource.fail(failure);
contentSource.error(failure);
dispose();

HttpResponse response = exchange.getResponse();
Expand Down Expand Up @@ -557,7 +551,7 @@ private enum ResponseState

private interface NotifiableContentSource extends Content.Source
{
void eof();
boolean error(Throwable failure);

void onDataAvailable();
}
Expand All @@ -577,12 +571,6 @@ public DecodingContentSource(NotifiableContentSource rawSource, ContentDecoder d
_decoder = decoder;
}

@Override
public void eof()
{
_rawSource.eof();
}

@Override
public void onDataAvailable()
{
Expand Down Expand Up @@ -643,6 +631,15 @@ protected Content.Chunk transform(Content.Chunk inputChunk)
}
}
}

@Override
public boolean error(Throwable failure)
{
if (_chunk != null)
_chunk.release();
_chunk = null;
return _rawSource.error(failure);
}
}

/**
Expand All @@ -654,42 +651,37 @@ private class ContentSource implements NotifiableContentSource
private static final Logger LOG = LoggerFactory.getLogger(ContentSource.class);

private final AtomicReference<Runnable> demandCallbackRef = new AtomicReference<>();
private volatile Content.Chunk currentChunk;
private final AutoLock lock = new AutoLock();
private Content.Chunk currentChunk;

@Override
public Content.Chunk read()
{
if (LOG.isDebugEnabled())
LOG.debug("Reading from {}", this);
Content.Chunk chunk = consumeCurrentChunk();
if (chunk != null)
return chunk;
currentChunk = HttpReceiver.this.read(false);
return consumeCurrentChunk();
}

@Override
public void eof()
{
if (LOG.isDebugEnabled())
LOG.debug("Setting EOF on {}", this);
if (currentChunk != null)
throw new IllegalStateException();
currentChunk = Content.Chunk.EOF;
Content.Chunk current;
try (AutoLock ignored = lock.lock())
{
current = currentChunk;
currentChunk = Content.Chunk.next(current);
if (current != null)
return current;
}

Runnable demandCallback = demandCallbackRef.getAndSet(null);
if (LOG.isDebugEnabled())
LOG.debug("Calling demand callback on {}", this);
if (demandCallback != null)
current = HttpReceiver.this.read(false);

try (AutoLock ignored = lock.lock())
{
try
if (currentChunk != null)
{
demandCallback.run();
}
catch (Throwable x)
{
fail(x);
// There was a concurrent call to fail().
if (current != null)
current.release();
return currentChunk;
}
currentChunk = Content.Chunk.next(current);
return current;
}
}

Expand All @@ -703,15 +695,6 @@ public void onDataAvailable()
invokeDemandCallback(true);
}

private Content.Chunk consumeCurrentChunk()
{
if (LOG.isDebugEnabled())
LOG.debug("Consuming current chunk from {}", this);
Content.Chunk chunk = currentChunk;
currentChunk = Content.Chunk.next(chunk);
return chunk;
}

@Override
public void demand(Runnable demandCallback)
{
Expand All @@ -731,12 +714,30 @@ private void processDemand()
if (LOG.isDebugEnabled())
LOG.debug("Processing demand on {}", this);

if (currentChunk == null)
Content.Chunk current;
try (AutoLock ignored = lock.lock())
{
currentChunk = HttpReceiver.this.read(true);
if (currentChunk == null)
current = currentChunk;
}

if (current == null)
{
current = HttpReceiver.this.read(true);
if (current == null)
return;

try (AutoLock ignored = lock.lock())
{
if (currentChunk != null)
{
// There was a concurrent call to fail().
current.release();
return;
}
currentChunk = current;
}
}

// The processDemand method is only ever called by the
// invoker so there is no need to use the latter here.
invokeDemandCallback(false);
Expand Down Expand Up @@ -768,17 +769,40 @@ public void fail(Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug("Failing {}", this);
if (currentChunk != null)
currentChunk.release();
if (currentChunk == null || !(currentChunk instanceof Content.Chunk.Error))
boolean failed = error(failure);
if (failed)
HttpReceiver.this.failAndClose(failure);
currentChunk = Content.Chunk.from(failure);
invokeDemandCallback(true);
}

@Override
public boolean error(Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug("Erroring {}", this);
try (AutoLock ignored = lock.lock())
{
if (currentChunk instanceof Content.Chunk.Error)
return false;
if (currentChunk != null)
currentChunk.release();
currentChunk = Content.Chunk.from(failure);
}
return true;
}

private Content.Chunk chunk()
{
try (AutoLock ignored = lock.lock())
{
return currentChunk;
}
}

@Override
public String toString()
{
return String.format("%s@%x{c=%s,d=%s}", getClass().getSimpleName(), hashCode(), currentChunk, demandCallbackRef);
return String.format("%s@%x{c=%s,d=%s}", getClass().getSimpleName(), hashCode(), chunk(), demandCallbackRef);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,9 @@ public void failAndClose(Throwable failure)
Stream stream = getHttpChannel().getStream();
responseFailure(failure, Promise.from(failed ->
{
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP);
getHttpChannel().getHttpConnection().close(failure);
}, x ->
{
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP);
getHttpChannel().getHttpConnection().close(failure);
}));
if (failed)
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP);
}, x -> stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP)));
}

@Override
Expand Down Expand Up @@ -127,15 +123,11 @@ private void onResponse(Stream stream, HeadersFrame frame)
httpResponse.version(response.getHttpVersion()).status(response.getStatus()).reason(response.getReason());

responseBegin(exchange);
if (exchange.isResponseComplete())
return;

HttpFields headers = response.getFields();
for (HttpField header : headers)
{
responseHeader(exchange, header);
if (exchange.isResponseComplete())
return;
}

HttpRequest httpRequest = exchange.getRequest();
Expand Down
Loading

0 comments on commit d505466

Please sign in to comment.