Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved locking for HttpReceiver.ContentSource. #9007

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ public void abort(Throwable failure, Promise<Boolean> promise)
}

if (LOG.isDebugEnabled())
LOG.debug("Failed {}: req={}/rsp={} {}", this, abortRequest, abortResponse, failure);
LOG.debug("Failed {}: req={}/rsp={}", this, abortRequest, abortResponse, failure);

if (!abortRequest && !abortResponse)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.eclipse.jetty.io.content.ContentSourceTransformer;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.AutoLock;
import org.eclipse.jetty.util.thread.SerializedInvoker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -266,6 +267,9 @@ protected void responseHeaders(HttpExchange exchange)
List<Response.ResponseListener> responseListeners = exchange.getConversation().getResponseListeners();
notifier.notifyHeaders(responseListeners, response);

if (exchange.isResponseComplete())
return;

if (HttpStatus.isInterim(response.getStatus()))
{
if (LOG.isDebugEnabled())
Expand Down Expand Up @@ -338,24 +342,18 @@ protected void responseSuccess(HttpExchange exchange, Runnable afterSuccessTask)
if (LOG.isDebugEnabled())
LOG.debug("Invoking responseSuccess on {}", this);

NotifiableContentSource contentSource = this.contentSource;
if (contentSource != null)
{
this.contentSource = null;
contentSource.eof();
}
// Mark atomically the response as completed, with respect
// to concurrency between response success and response failure.
if (!exchange.responseComplete(null))
return;

invoker.run(() ->
{
if (LOG.isDebugEnabled())
LOG.debug("Executing responseSuccess on {}", this);

// Mark atomically the response as completed, with respect
// to concurrency between response success and response failure.
if (!exchange.responseComplete(null))
return;

responseState = ResponseState.IDLE;

reset();

HttpResponse response = exchange.getResponse();
Expand Down Expand Up @@ -385,34 +383,26 @@ protected void responseSuccess(HttpExchange exchange, Runnable afterSuccessTask)
protected void responseFailure(Throwable failure, Promise<Boolean> promise)
{
if (LOG.isDebugEnabled())
LOG.debug("Invoking responseFailure with {} on {}", failure, this);

invoker.run(() ->
LOG.debug("Failing with {} on {}", failure, this);

HttpExchange exchange = getHttpExchange();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the comment/test below, it feels like this should be a takeHttpExchange() semantic, or at least the test below could be:

if (exchange == null || !disassociate(exchange))

At the very least, can the comment below explain how the race of exchange becoming null just after the get is handled?

Copy link
Contributor Author

@sbordet sbordet Dec 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gregw I don't understand.
The semantic cannot be "take", as the "take" happens later when also the request is completed.
The call exchange.responseComplete() is atomic, so the decision of whether doing something or not is taken there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sbordet It is a general pattern for a race to have code in an async environment that does non atomic test and then act. It may well be that the exchange is atomically "taken" and nulled elsewhere, but a thread here may still have tested the exchange whilst non null and then act on it whilst it is null.

So either this is a race here, or there is some external circumstance that ensures that the responseComplete can never be called between the test and the action. So we need to either fix the race or document why it is not a race.

// In case of a response error, the failure has already been notified
// and it is possible that a further attempt to read in the receive
// loop throws an exception that reenters here but without exchange;
// or, the server could just have timed out the connection.
if (exchange == null)
{
if (LOG.isDebugEnabled())
LOG.debug("Executing responseFailure on {}", this);

HttpExchange exchange = getHttpExchange();
// In case of a response error, the failure has already been notified
// and it is possible that a further attempt to read in the receive
// loop throws an exception that reenters here but without exchange;
// or, the server could just have timed out the connection.
if (exchange == null)
{
promise.succeeded(false);
return;
}

if (LOG.isDebugEnabled())
LOG.debug("Response failure {}", exchange.getResponse(), failure);
promise.succeeded(false);
return;
}

// Mark atomically the response as completed, with respect
// to concurrency between response success and response failure.
if (exchange.responseComplete(failure))
abort(exchange, failure, promise);
else
promise.succeeded(false);
});
// Mark atomically the response as completed, with respect
// to concurrency between response success and response failure.
boolean completed = exchange.responseComplete(failure);
if (completed)
abort(exchange, failure, promise);
else
promise.succeeded(false);
}

private void terminateResponse(HttpExchange exchange)
Expand Down Expand Up @@ -482,10 +472,12 @@ public void abort(HttpExchange exchange, Throwable failure, Promise<Boolean> pro
if (LOG.isDebugEnabled())
LOG.debug("Invoking abort with {} on {}", failure, this);

assert exchange.isResponseComplete();
sbordet marked this conversation as resolved.
Show resolved Hide resolved

invoker.run(() ->
{
if (LOG.isDebugEnabled())
LOG.debug("Executing abort on {}", this);
LOG.debug("Executing abort with {} on {}", failure, this);

if (responseState == ResponseState.FAILURE)
{
Expand All @@ -496,7 +488,7 @@ public void abort(HttpExchange exchange, Throwable failure, Promise<Boolean> pro
responseState = ResponseState.FAILURE;
this.failure = failure;
if (contentSource != null)
contentSource.fail(failure);
contentSource.error(failure);
dispose();

HttpResponse response = exchange.getResponse();
Expand Down Expand Up @@ -557,7 +549,7 @@ private enum ResponseState

private interface NotifiableContentSource extends Content.Source
{
void eof();
boolean error(Throwable failure);

void onDataAvailable();
}
Expand All @@ -577,12 +569,6 @@ public DecodingContentSource(NotifiableContentSource rawSource, ContentDecoder d
_decoder = decoder;
}

@Override
public void eof()
{
_rawSource.eof();
}

@Override
public void onDataAvailable()
{
Expand Down Expand Up @@ -643,6 +629,15 @@ protected Content.Chunk transform(Content.Chunk inputChunk)
}
}
}

@Override
public boolean error(Throwable failure)
{
if (_chunk != null)
_chunk.release();
_chunk = null;
return _rawSource.error(failure);
}
}

/**
Expand All @@ -654,42 +649,36 @@ private class ContentSource implements NotifiableContentSource
private static final Logger LOG = LoggerFactory.getLogger(ContentSource.class);

private final AtomicReference<Runnable> demandCallbackRef = new AtomicReference<>();
private volatile Content.Chunk currentChunk;
private final AutoLock lock = new AutoLock();
private Content.Chunk currentChunk;

@Override
public Content.Chunk read()
{
if (LOG.isDebugEnabled())
LOG.debug("Reading from {}", this);
Content.Chunk chunk = consumeCurrentChunk();
if (chunk != null)
return chunk;
currentChunk = HttpReceiver.this.read(false);
return consumeCurrentChunk();
}

@Override
public void eof()
{
if (LOG.isDebugEnabled())
LOG.debug("Setting EOF on {}", this);
if (currentChunk != null)
throw new IllegalStateException();
currentChunk = Content.Chunk.EOF;
Content.Chunk current;
try (AutoLock ignored = lock.lock())
{
current = currentChunk;
currentChunk = Content.Chunk.next(current);
if (current != null)
return current;
}

Runnable demandCallback = demandCallbackRef.getAndSet(null);
if (LOG.isDebugEnabled())
LOG.debug("Calling demand callback on {}", this);
if (demandCallback != null)
current = HttpReceiver.this.read(false);

try (AutoLock ignored = lock.lock())
{
try
{
demandCallback.run();
}
catch (Throwable x)
if (currentChunk != null)
{
fail(x);
// There was a concurrent call to fail().
current.release();
sbordet marked this conversation as resolved.
Show resolved Hide resolved
return currentChunk;
}
currentChunk = Content.Chunk.next(current);
return current;
}
}

Expand All @@ -703,15 +692,6 @@ public void onDataAvailable()
invokeDemandCallback(true);
}

private Content.Chunk consumeCurrentChunk()
{
if (LOG.isDebugEnabled())
LOG.debug("Consuming current chunk from {}", this);
Content.Chunk chunk = currentChunk;
currentChunk = Content.Chunk.next(chunk);
return chunk;
}

@Override
public void demand(Runnable demandCallback)
{
Expand All @@ -731,12 +711,30 @@ private void processDemand()
if (LOG.isDebugEnabled())
LOG.debug("Processing demand on {}", this);

if (currentChunk == null)
Content.Chunk current;
try (AutoLock ignored = lock.lock())
{
current = currentChunk;
}

if (current == null)
{
currentChunk = HttpReceiver.this.read(true);
if (currentChunk == null)
current = HttpReceiver.this.read(true);
if (current == null)
lorban marked this conversation as resolved.
Show resolved Hide resolved
return;

try (AutoLock ignored = lock.lock())
{
if (currentChunk != null)
{
// There was a concurrent call to fail().
current.release();
return;
}
currentChunk = current;
}
}

// The processDemand method is only ever called by the
// invoker so there is no need to use the latter here.
invokeDemandCallback(false);
Expand Down Expand Up @@ -765,20 +763,43 @@ private void invokeDemandCallback(boolean invoke)

@Override
public void fail(Throwable failure)
{
boolean failed = error(failure);
if (failed)
HttpReceiver.this.failAndClose(failure);
invokeDemandCallback(true);
lorban marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public boolean error(Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug("Failing {}", this);
sbordet marked this conversation as resolved.
Show resolved Hide resolved
if (currentChunk != null)
currentChunk.release();
if (currentChunk == null || !(currentChunk instanceof Content.Chunk.Error))
HttpReceiver.this.failAndClose(failure);
currentChunk = Content.Chunk.from(failure);

try (AutoLock ignored = lock.lock())
{
if (currentChunk instanceof Content.Chunk.Error)
return false;
if (currentChunk != null)
currentChunk.release();
currentChunk = Content.Chunk.from(failure);
}

return true;
}

private Content.Chunk chunk()
{
try (AutoLock ignored = lock.lock())
{
return currentChunk;
}
}

@Override
public String toString()
{
return String.format("%s@%x{c=%s,d=%s}", getClass().getSimpleName(), hashCode(), currentChunk, demandCallbackRef);
return String.format("%s@%x{c=%s,d=%s}", getClass().getSimpleName(), hashCode(), chunk(), demandCallbackRef);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,9 @@ public void failAndClose(Throwable failure)
Stream stream = getHttpChannel().getStream();
responseFailure(failure, Promise.from(failed ->
{
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP);
getHttpChannel().getHttpConnection().close(failure);
}, x ->
{
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP);
getHttpChannel().getHttpConnection().close(failure);
}));
if (failed)
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP);
}, x -> stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP)));
}

@Override
Expand Down Expand Up @@ -127,15 +123,11 @@ private void onResponse(Stream stream, HeadersFrame frame)
httpResponse.version(response.getHttpVersion()).status(response.getStatus()).reason(response.getReason());

responseBegin(exchange);
if (exchange.isResponseComplete())
return;

HttpFields headers = response.getFields();
for (HttpField header : headers)
{
responseHeader(exchange, header);
if (exchange.isResponseComplete())
return;
}

HttpRequest httpRequest = exchange.getRequest();
Expand Down
Loading