Skip to content

Commit

Permalink
Transient timeouts. Fixes #10234 and #10277
Browse files Browse the repository at this point in the history
ongoing review
  • Loading branch information
gregw committed Nov 7, 2023
1 parent a27c26c commit d901e55
Show file tree
Hide file tree
Showing 18 changed files with 90 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ default void onContentSource(Response response, Content.Source contentSource)
if (Content.Chunk.isFailure(chunk))
{
response.abort(chunk.getFailure());
if (!chunk.isLast())
contentSource.fail(chunk.getFailure());
return;
}
if (chunk.isLast() && !chunk.hasRemaining())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.io.content.ContentSourceTransformer;
import org.eclipse.jetty.util.ExceptionUtil;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.component.Destroyable;
import org.eclipse.jetty.util.thread.AutoLock;
Expand Down Expand Up @@ -587,7 +588,11 @@ protected Content.Chunk transform(Content.Chunk inputChunk)
if (_chunk == null)
return null;
if (Content.Chunk.isFailure(_chunk))
return _chunk;
{
Content.Chunk failure = _chunk;
_chunk = Content.Chunk.next(failure);
return failure;
}

// Retain the input chunk because its ByteBuffer will be referenced by the Inflater.
if (retain)
Expand Down Expand Up @@ -788,7 +793,13 @@ public boolean error(Throwable failure)
try (AutoLock ignored = lock.lock())
{
if (Content.Chunk.isFailure(currentChunk))
{
Throwable cause = currentChunk.getFailure();
if (!currentChunk.isLast())
currentChunk = Content.Chunk.from(cause, true);
ExceptionUtil.addSuppressedIfNotAssociated(cause, failure);
return false;
}
if (currentChunk != null)
currentChunk.release();
currentChunk = Content.Chunk.from(failure);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,11 @@ protected Action process() throws Throwable
}

if (Content.Chunk.isFailure(chunk))
throw chunk.getFailure();
{
Content.Chunk failure = chunk;
chunk = Content.Chunk.next(failure);
throw failure.getFailure();
}

ByteBuffer buffer = chunk.getByteBuffer();
contentBuffer = buffer.asReadOnlyBuffer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.io.content.ByteBufferContentSource;
import org.eclipse.jetty.util.ExceptionUtil;
import org.eclipse.jetty.util.thread.AutoLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -679,10 +680,18 @@ public void fail(Throwable failure)
if (LOG.isDebugEnabled())
LOG.debug("Content source #{} fail while current chunk is {}", index, currentChunk);
if (Content.Chunk.isFailure(currentChunk))
return;
if (currentChunk != null && currentChunk != ALREADY_READ_CHUNK)
currentChunk.release();
this.chunk = Content.Chunk.from(failure);
{
Throwable cause = currentChunk.getFailure();
if (!currentChunk.isLast())
chunk = Content.Chunk.from(cause, true);
ExceptionUtil.addSuppressedIfNotAssociated(cause, failure);
}
else
{
if (currentChunk != null && currentChunk != ALREADY_READ_CHUNK)
currentChunk.release();
this.chunk = Content.Chunk.from(failure);
}
onDemandCallback();
registerFailure(this, failure);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,9 @@ public void onComplete()
{
if (_chunk == null)
_chunk = Content.Chunk.EOF;
else if (!_chunk.isLast() && !(Content.Chunk.isFailure(_chunk)))
else if (Content.Chunk.isFailure(_chunk, false))
_chunk = Content.Chunk.from(_chunk.getFailure(), true);
else if (!_chunk.isLast())
throw new IllegalStateException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ public void run()
if (Chunk.isFailure(chunk))
{
completeExceptionally(chunk.getFailure());
if (!chunk.isLast())
_source.fail(chunk.getFailure());
break;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,17 @@ public void parse()
}
if (Content.Chunk.isFailure(chunk))
{
if (!chunk.isLast() && onTransientFailure(chunk.getFailure()))
continue;
completeExceptionally(chunk.getFailure());
if (chunk.isLast())
{
completeExceptionally(chunk.getFailure());
}
else
{
if (onTransientFailure(chunk.getFailure()))
continue;
completeExceptionally(chunk.getFailure());
_content.fail(chunk.getFailure());
}
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ public int read(byte[] b, int off, int len) throws IOException
{
if (Content.Chunk.isFailure(chunk))
{
Content.Chunk c = chunk;
chunk = Content.Chunk.next(c);
throw IO.rethrow(c.getFailure());
Content.Chunk failure = chunk;
chunk = Content.Chunk.next(failure);
throw IO.rethrow(failure.getFailure());
}

ByteBuffer byteBuffer = chunk.getByteBuffer();
Expand Down Expand Up @@ -125,9 +125,11 @@ public void close() throws IOException
// Handle a failure as read would
if (Content.Chunk.isFailure(chunk))
{
Content.Chunk c = chunk;
chunk = Content.Chunk.next(c);
throw IO.rethrow(c.getFailure());
Content.Chunk failure = chunk;
chunk = Content.Chunk.next(failure);
if (!failure.isLast())
content.fail(failure.getFailure());
throw IO.rethrow(failure.getFailure());
}

contentSkipped = chunk.hasRemaining();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ private void process()
{
terminate();
subscriber.onError(chunk.getFailure());
if (!chunk.isLast())
content.fail(chunk.getFailure());
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,12 @@ public Content.Chunk read()
}

if (Content.Chunk.isFailure(rawChunk))
return rawChunk;
{
Content.Chunk failure = rawChunk;
rawChunk = Content.Chunk.next(rawChunk);
needsRawRead = rawChunk == null;
return failure;
}

if (Content.Chunk.isFailure(transformedChunk))
return transformedChunk;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public void run()
if (Content.Chunk.isFailure(chunk))
{
promise.failed(chunk.getFailure());
if (!chunk.isLast())
source.fail(chunk.getFailure());
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ public void run()
if (Content.Chunk.isFailure(chunk))
{
callback.failed(chunk.getFailure());
if (!chunk.isLast())
source.fail(chunk.getFailure());
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public void convert()
if (Content.Chunk.isFailure(chunk))
{
promise.failed(chunk.getFailure());
if (!chunk.isLast())
content.fail(chunk.getFailure());
return;
}
text.append(chunk.getByteBuffer());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,11 @@ protected Content.Chunk transform(Content.Chunk inputChunk)
if (_chunk == null)
return null;
if (Content.Chunk.isFailure(_chunk))
return _chunk;
{
Content.Chunk failure = _chunk;
_chunk = Content.Chunk.next(failure);
return failure;
}
if (_chunk.isLast() && !_chunk.hasRemaining())
return Content.Chunk.EOF;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1100,7 +1100,17 @@ public void earlyEOF()
BadMessageException bad = new BadMessageException("Early EOF");

if (Content.Chunk.isFailure(stream._chunk))
stream._chunk.getFailure().addSuppressed(bad);
{
if (stream._chunk.isLast())
{
stream._chunk.getFailure().addSuppressed(bad);
}
else
{
bad.addSuppressed(stream._chunk.getFailure());
stream._chunk = Content.Chunk.from(bad);
}
}
else
{
if (stream._chunk != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,8 @@ public Content.Chunk read()
if (Content.Chunk.isFailure(chunk))
{
onClientRequestFailure(request, proxyRequest, response, chunk.getFailure());
if (!chunk.isLast())
super.fail(chunk.getFailure(), true);
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import jakarta.servlet.ReadListener;
import jakarta.servlet.ServletInputStream;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.server.Context;
import org.eclipse.jetty.util.thread.AutoLock;
Expand Down Expand Up @@ -348,9 +347,6 @@ public void run()
Throwable failure = chunk.getFailure();
if (LOG.isDebugEnabled())
LOG.debug("running failure={} {}", failure, this);
// TODO is this necessary to add here?
if (chunk.isLast())
_servletChannel.getServletContextResponse().getHeaders().add(HttpFields.CONNECTION_CLOSE);
_readListener.onError(failure);
}
else if (chunk.isLast() && !chunk.hasRemaining())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,8 @@ public Content.Chunk read()
if (Content.Chunk.isFailure(chunk))
{
onClientRequestFailure(request, proxyRequest, response, chunk.getFailure());
if (!chunk.isLast())
super.fail(chunk.getFailure());
}
else
{
Expand Down

0 comments on commit d901e55

Please sign in to comment.