Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes HttpClient Content.Source reads from arbitrary threads #12203

Merged
merged 8 commits into from
Aug 30, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ public void onSuccess(Response response)
{
// The request may still be sending content, stop it.
Request request = response.getRequest();
request.abort(new HttpRequestException("Aborting request after receiving a %d response".formatted(response.getStatus()), request));
if (request.getBody() != null)
request.abort(new HttpRequestException("Aborting request after receiving a %d response".formatted(response.getStatus()), request));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ public void onSuccess(Response response)
{
// The request may still be sending content, stop it.
Request request = response.getRequest();
request.abort(new HttpRequestException("Aborting request after receiving a %d response".formatted(response.getStatus()), request));
if (request.getBody() != null)
request.abort(new HttpRequestException("Aborting request after receiving a %d response".formatted(response.getStatus()), request));
}

@Override
Expand Down
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class should have the SerializedInvoker assertions of #12143

Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ protected void responseContentAvailable(HttpExchange exchange)
* This method directly invokes the demand callback, assuming the caller
* is already serialized with other events.
*/
// TODO: remove this after FCGI fix.
protected void responseContentAvailable()
{
contentSource.onDataAvailable();
Expand Down Expand Up @@ -720,6 +721,9 @@ public Content.Chunk read()

current = HttpReceiver.this.read(false);

if (LOG.isDebugEnabled())
LOG.debug("Read {} from {}", current, this);

try (AutoLock ignored = lock.lock())
{
if (currentChunk != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,18 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
{
private static final Logger LOG = LoggerFactory.getLogger(HttpReceiverOverHTTP.class);

private final Runnable receiveNext = this::receiveNext;
private final LongAdder inMessages = new LongAdder();
private final HttpParser parser;
private final ByteBufferPool byteBufferPool;
private RetainableByteBuffer networkBuffer;
private boolean shutdown;
private boolean complete;
private State state = State.STATUS;
private boolean unsolicited;
private String method;
private int status;
private String method;
private Content.Chunk chunk;
private Runnable action;
private boolean shutdown;
private boolean disposed;

public HttpReceiverOverHTTP(HttpChannelOverHTTP channel)
{
Expand All @@ -73,7 +74,7 @@ void receive()
{
if (!hasContent())
{
boolean setFillInterest = parseAndFill();
boolean setFillInterest = parseAndFill(true);
if (!hasContent() && setFillInterest)
fillInterested();
}
Expand All @@ -97,10 +98,8 @@ protected void reset()
super.reset();
parser.reset();
if (chunk != null)
{
chunk.release();
chunk = null;
}
chunk = null;
}

@Override
Expand All @@ -109,10 +108,9 @@ protected void dispose()
super.dispose();
parser.close();
if (chunk != null)
{
chunk.release();
chunk = null;
}
chunk = null;
disposed = true;
}

@Override
Expand All @@ -124,7 +122,7 @@ public Content.Chunk read(boolean fillInterestIfNeeded)
Content.Chunk chunk = consumeChunk();
if (chunk != null)
return chunk;
boolean needFillInterest = parseAndFill();
boolean needFillInterest = parseAndFill(false);
if (LOG.isDebugEnabled())
LOG.debug("ParseAndFill needFillInterest {} in {}", needFillInterest, this);
chunk = consumeChunk();
Expand Down Expand Up @@ -236,7 +234,7 @@ protected ByteBuffer onUpgradeFrom()
* If this method depletes the buffer, it will always try to re-fill until fill generates 0 byte.
* @return true if no bytes were filled.
*/
private boolean parseAndFill()
private boolean parseAndFill(boolean notifyContentAvailable)
{
HttpConnectionOverHTTP connection = getHttpConnection();
EndPoint endPoint = connection.getEndPoint();
Expand All @@ -246,23 +244,22 @@ private boolean parseAndFill()
acquireNetworkBuffer();
while (true)
{
if (LOG.isDebugEnabled())
LOG.debug("Parsing {} in {}", BufferUtil.toDetailString(networkBuffer.getByteBuffer()), this);
// Always parse even empty buffers to advance the parser.
if (parse())
boolean stopParsing = parse(notifyContentAvailable);
if (LOG.isDebugEnabled())
LOG.debug("Parsed stop={} in {}", stopParsing, this);
if (stopParsing)
{
// Return immediately, as this thread may be in a race
// with e.g. another thread demanding more content.
return false;
}
if (LOG.isDebugEnabled())
LOG.debug("Parser willing to advance in {}", this);

// Connection may be closed in a parser callback.
if (connection.isClosed())
if (connection.isClosed() || isShutdown())
{
if (LOG.isDebugEnabled())
LOG.debug("Closed {} in {}", connection, this);
LOG.debug("Closed/Shutdown {} in {}", connection, this);
releaseNetworkBuffer();
return false;
}
Expand All @@ -286,9 +283,9 @@ else if (read == 0)
}
else
{
releaseNetworkBuffer();
shutdown();
return false;
// Loop around to parse again to advance the parser,
// for example for HTTP/1.0 connection-delimited content.
}
}
}
Expand All @@ -307,62 +304,80 @@ else if (read == 0)
*
* @return true to indicate that parsing should be interrupted (and will be resumed by another thread).
*/
private boolean parse()
private boolean parse(boolean notifyContentAvailable)
{
// HttpParser is not reentrant, so we cannot invoke the
// application from the parser event callbacks.
// However, the mechanism in general (and this method)
// is reentrant: it notifies the application which may
// read response content, which reenters here.

ByteBuffer byteBuffer = networkBuffer.getByteBuffer();
while (true)
{
boolean handle = parser.parseNext(networkBuffer.getByteBuffer());
boolean handle = parser.parseNext(byteBuffer);
if (LOG.isDebugEnabled())
LOG.debug("Parse result={} on {}", handle, this);
Runnable action = getAndSetAction(null);
if (action != null)
{
if (LOG.isDebugEnabled())
LOG.debug("Executing action after parser returned: {} on {}", action, this);
action.run();
if (LOG.isDebugEnabled())
LOG.debug("Action executed after Parse result={} on {}", handle, this);
}
if (handle)
{
// When the receiver is aborted, the parser is closed in dispose() which changes
// its state to State.CLOSE; so checking parser.isClose() is just a way to check
// if the receiver was aborted or not.
return !parser.isClose();
}
LOG.debug("Parse state={} result={} {} {} on {}", state, handle, BufferUtil.toDetailString(byteBuffer), parser, this);
if (!handle)
return false;

boolean complete = this.complete;
this.complete = false;
if (LOG.isDebugEnabled())
LOG.debug("Parse complete={}, {} {} in {}", complete, networkBuffer, parser, this);
HttpExchange exchange = getHttpExchange();
if (exchange == null)
throw new IllegalStateException("No exchange");

if (complete)
switch (state)
{
int status = this.status;
this.status = 0;
// Connection upgrade due to 101, bail out.
if (status == HttpStatus.SWITCHING_PROTOCOLS_101)
return true;
// Connection upgrade due to CONNECT + 200, bail out.
String method = this.method;
this.method = null;
if (getHttpChannel().isTunnel(method, status))
return true;

if (!networkBuffer.hasRemaining())
return false;

if (!HttpStatus.isInformational(status))
case HEADERS -> responseHeaders(exchange);
case CONTENT ->
{
if (LOG.isDebugEnabled())
LOG.debug("Discarding unexpected content after response {}: {} in {}", status, networkBuffer, this);
networkBuffer.clear();
if (notifyContentAvailable)
responseContentAvailable(exchange);
}
return false;
case COMPLETE ->
{
boolean isUpgrade = status == HttpStatus.SWITCHING_PROTOCOLS_101;
boolean isTunnel = getHttpChannel().isTunnel(method, status);

Runnable task = isUpgrade || isTunnel ? null : this.receiveNext;
responseSuccess(exchange, task);

// Connection upgrade, bail out.
if (isUpgrade || isTunnel)
return true;

if (byteBuffer.hasRemaining())
{
if (HttpStatus.isInterim(status))
{
// There may be multiple interim responses in
// the same network buffer, continue parsing.
continue;
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("Discarding unexpected content after response {}: {} in {}", status, BufferUtil.toDetailString(byteBuffer), this);
BufferUtil.clear(byteBuffer);
return false;
}
}

// Continue to read from the network.
return false;
}
default -> throw new IllegalStateException("Invalid state " + state);
}

if (!networkBuffer.hasRemaining())
// The application may have aborted the request.
if (disposed)
{
BufferUtil.clear(byteBuffer);
return false;
}

// The application has been invoked,
// and it is now driving the parsing.
return true;
}
}

Expand All @@ -386,7 +401,6 @@ private void shutdown()
// header, the connection will be closed at exchange termination
// thanks to the flag we have set above.
parser.atEOF();
parser.parseNext(BufferUtil.EMPTY_BUFFER);
}

protected boolean isShutdown()
Expand All @@ -406,6 +420,7 @@ public void startResponse(HttpVersion version, int status, String reason)
this.status = status;
parser.setHeadResponse(HttpMethod.HEAD.is(method) || getHttpChannel().isTunnel(method, status));
exchange.getResponse().version(version).status(status).reason(reason);
state = State.STATUS;

responseBegin(exchange);
}
Expand All @@ -432,10 +447,7 @@ public boolean headerComplete()
// Store the EndPoint is case of upgrades, tunnels, etc.
exchange.getRequest().getConversation().setAttribute(EndPoint.class.getName(), getHttpConnection().getEndPoint());
getHttpConnection().onResponseHeaders(exchange);
if (LOG.isDebugEnabled())
LOG.debug("Setting action to responseHeaders(exchange, boolean) on {}", this);
if (getAndSetAction(() -> responseHeaders(exchange)) != null)
throw new IllegalStateException();
state = State.HEADERS;
return true;
}

Expand All @@ -451,17 +463,13 @@ public boolean content(ByteBuffer buffer)

if (chunk != null)
throw new IllegalStateException("Content generated with unconsumed content left");
if (getHttpConnection().isFillInterested())
throw new IllegalStateException("Fill interested while parsing for content");

// Retain the chunk because it is stored for later use.
networkBuffer.retain();
chunk = Content.Chunk.asChunk(buffer, false, networkBuffer);

if (LOG.isDebugEnabled())
LOG.debug("Setting action to responseContentAvailable on {}", this);
if (getAndSetAction(this::responseContentAvailable) != null)
throw new IllegalStateException();
if (getHttpConnection().isFillInterested())
throw new IllegalStateException();
state = State.CONTENT;
return true;
}

Expand Down Expand Up @@ -491,28 +499,20 @@ public boolean messageComplete()
if (exchange == null || unsolicited)
{
// We received an unsolicited response from the server.
networkBuffer.clear();
getHttpConnection().close();
return false;
}

int status = exchange.getResponse().getStatus();
if (!HttpStatus.isInterim(status))
{
inMessages.increment();
complete = true;
}

if (chunk != null)
throw new IllegalStateException();
chunk = Content.Chunk.EOF;

boolean isUpgrade = status == HttpStatus.SWITCHING_PROTOCOLS_101;
boolean isTunnel = getHttpChannel().isTunnel(method, status);
Runnable task = isUpgrade || isTunnel ? null : this::receiveNext;
if (LOG.isDebugEnabled())
LOG.debug("Message complete, calling response success with task {} in {}", task, this);
responseSuccess(exchange, task);
return false;
state = State.COMPLETE;
return true;
}

private void receiveNext()
Expand All @@ -524,7 +524,7 @@ private void receiveNext()

if (LOG.isDebugEnabled())
LOG.debug("Receiving next request in {}", this);
boolean setFillInterest = parseAndFill();
boolean setFillInterest = parseAndFill(true);
if (!hasContent() && setFillInterest)
fillInterested();
}
Expand Down Expand Up @@ -556,13 +556,6 @@ public void badMessage(HttpException failure)
}
}

private Runnable getAndSetAction(Runnable action)
{
Runnable r = this.action;
this.action = action;
return r;
}

long getMessagesIn()
{
return inMessages.longValue();
Expand All @@ -573,4 +566,9 @@ public String toString()
{
return String.format("%s[%s]", super.toString(), parser);
}

private enum State
{
STATUS, HEADERS, CONTENT, COMPLETE
}
}
Loading
Loading