Skip to content

Commit

Permalink
Fix jetty 12.0.x transient timeouts (#10844)
Browse files Browse the repository at this point in the history
Fixes #10234

* Introduced transient failures in reads where a failure chunk has last=false.
* Transient failure now do not fail the handler callback.
* Improve eeN ContentProducer to more carefully assert transient and terminal errors + enable HttpInputIntegrationTest
* Do not add connection: close to the response when the error is transient
* Rework ChunksContentSource to support null chunks
* Added tests to verify the new transient failure cases
* Review all code that handles failure, and handling correctly transient failure, either by making them fatal, and/or by failing Content.Source.

Signed-off-by: Ludovic Orban <[email protected]>
Signed-off-by: Olivier Lamy <[email protected]>
Signed-off-by: Simone Bordet <[email protected]>
Co-authored-by: Ludovic Orban <[email protected]>
Co-authored-by: Olivier Lamy <[email protected]>
Co-authored-by: Joakim Erdfelt <[email protected]>
Co-authored-by: Chad Wilson <[email protected]>
Co-authored-by: Simone Bordet <[email protected]>
  • Loading branch information
6 people authored Nov 23, 2023
1 parent b9bd3f2 commit 7dcab84
Show file tree
Hide file tree
Showing 67 changed files with 3,732 additions and 627 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -234,22 +234,39 @@ The high-level abstraction that Jetty offers to read bytes is `org.eclipse.jetty
A `Content.Chunk` groups the following information:

* A `ByteBuffer` with the bytes that have been read; it may be empty.
* Whether the read reached end-of-file.
* A failure that might have happened during the read.
* Whether the read reached end-of-file, via its `last` flag.
* A failure that might have happened during the read, via its `getFailure()` method.

The ``Content.Chunk``'s `ByteBuffer` is typically a slice of a different `ByteBuffer` that has been read by a lower layer.
There may be multiple layers between the bottom layer (where the initial read typically happens) and the application layer.
The `Content.Chunk` returned from `Content.Source.read()` can either be a _normal_ chunk (a chunk containing a `ByteBuffer` and a `null` failure), or a _failure_ chunk (a chunk containing an empty `ByteBuffer` and a non-`null` failure).

By slicing the `ByteBuffer` (rather than copying its bytes), there is no copy of the bytes between the layers.
A failure chunk also indicates (via the `last` flag) whether the failure is a fatal (when `last=true`) or transient (when `last=false`) failure.

A transient failure is a temporary failure that happened during the read, it may be ignored, and it is recoverable: it is possible to call `read()` again and obtain a normal chunk (or a `null` chunk).
Typical cases of transient failures are idle timeout failures, where the read timed out, but the application may decide to insist reading until some other event happens.
The application may convert a transient failure into a fatal failure by calling `Content.Source.fail(Throwable)`.

A `Content.Source` must be fully consumed by reading all its content, or failed by calling `Content.Source.fail(Throwable)` to signal that the reader is not interested in reading anymore, otherwise it may leak underlying resources.

Fully consuming a `Content.Source` means reading from it until it returns a `Content.Chunk` whose `last` flag is `true`.
Reading or demanding from an already fully consumed `Content.Source` is always immediately serviced with the last state of the `Content.Source`: a `Content.Chunk` with the `last` flag set to `true`, either an end-of-file chunk, or a failure chunk.

Once failed, a `Content.Source` is considered fully consumed.
Further attempts to read from a failed `Content.Source` return a failure chunk whose `getFailure()` method returns the exception passed to `Content.Source.fail(Throwable)`.

When reading a normal chunk, its `ByteBuffer` is typically a slice of a different `ByteBuffer` that has been read by a lower layer.
There may be multiple layers between the bottom layer (where the initial read typically happens) and the application layer that calls `Content.Source.read()`.

By slicing the `ByteBuffer` (rather than copying its bytes), there is no copy of the bytes between the layers, which yields greater performance.
However, this comes with the cost that the `ByteBuffer`, and the associated `Content.Chunk`, have an intrinsic lifecycle: the final consumer of a `Content.Chunk` at the application layer must indicate when it has consumed the chunk, so that the bottom layer may reuse/recycle the `ByteBuffer`.

Consuming the chunk means that the bytes in the `ByteBuffer` are read (or ignored), and that the application will not look at or reference that `ByteBuffer` ever again.

`Content.Chunk` offers a retain/release model to deal with the `ByteBuffer` lifecycle, with a simple rule:

IMPORTANT: A `Content.Chunk` returned by a call to `Content.Source.read()` **must** be released.
IMPORTANT: A `Content.Chunk` returned by a call to `Content.Source.read()` **must** be released, except for ``Content.Chunk``s that are failure chunks.
Failure chunks _may_ be released, but they do not _need_ to be.

The example below is the idiomatic way to read from a `Content.Source`:
The example below is the idiomatic way of reading from a `Content.Source`:

[source,java,indent=0]
----
Expand All @@ -258,7 +275,7 @@ include::{doc_code}/org/eclipse/jetty/docs/programming/ContentDocs.java[tags=idi
<1> The `read()` that must be paired with a `release()`.
<2> The `release()` that pairs the `read()`.

Note how the reads happens in a loop, consuming the `Content.Source` as soon as it has content available to be read, and therefore no backpressure is applied to the reads.
Note how the reads happen in a loop, consuming the `Content.Source` as soon as it has content available to be read, and therefore no backpressure is applied to the reads.

An alternative way to read from a `Content.Source`, to use when the chunk is consumed asynchronously, and you don't want to read again until the `Content.Chunk` is consumed, is the following:

Expand All @@ -273,7 +290,7 @@ Note how the reads do not happen in a loop, and therefore backpressure is applie

Since the `Chunk` is consumed asynchronously, you may need to retain it to extend its lifecycle, as explained in xref:pg-arch-io-content-source-chunk[this section].

You can use `Content.Source` static methods to conveniently read (in a blocking way or non-blocking way), for example via `static Content.Source.asStringAsync(Content.Source, Charset)`, or via an `InputStream` using `static Content.Source.asInputStream(Content.Source source)`.
You can use `Content.Source` static methods to conveniently read (in a blocking way or non-blocking way), for example via `static Content.Source.asStringAsync(Content.Source, Charset)`, or via an `InputStream` using `static Content.Source.asInputStream(Content.Source)`.

Refer to the `Content.Source` link:{javadoc-url}/org/eclipse/jetty/io/Content.Source.html[`javadocs`] for further details.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,17 @@ public void read(Content.Source source)
// If there is a failure reading, handle it.
if (Content.Chunk.isFailure(chunk))
{
handleFailure(chunk.getFailure());
return;
boolean fatal = chunk.isLast();
if (fatal)
{
handleFatalFailure(chunk.getFailure());
return;
}
else
{
handleTransientFailure(chunk.getFailure());
continue;
}
}

// A normal chunk of content, consume it.
Expand Down Expand Up @@ -93,10 +102,16 @@ public void read(Content.Source source)
return;
}

// If there is a failure reading, handle it.
// If there is a failure reading, always treat it as fatal.
if (Content.Chunk.isFailure(chunk))
{
handleFailure(chunk.getFailure());
// If the failure is transient, fail the source
// to indicate that there will be no more reads.
if (!chunk.isLast())
source.fail(chunk.getFailure());

// Handle the failure and stop reading by not demanding.
handleFatalFailure(chunk.getFailure());
return;
}

Expand All @@ -120,7 +135,7 @@ public void read(Content.Source source)
{
// If there is a failure reading, handle it,
// and stop reading by not demanding.
handleFailure(failure);
handleFatalFailure(failure);
}
});
}
Expand All @@ -132,7 +147,11 @@ private CompletableFuture<Void> consumeAsync(Content.Chunk chunk)
}
}

private static void handleFailure(Throwable failure)
private static void handleFatalFailure(Throwable failure)
{
}

private static void handleTransientFailure(Throwable failure)
{
}

Expand Down Expand Up @@ -189,7 +208,7 @@ public void run()

if (Content.Chunk.isFailure(chunk))
{
handleFailure(chunk.getFailure());
handleFatalFailure(chunk.getFailure());
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ public default void beforeDecoding(Response response)

/**
* <p>Decodes the bytes in the given {@code buffer} and returns the decoded bytes.</p>
* <p>The returned {@link RetainableByteBuffer} containing the decoded bytes may
* be empty and <b>must</b> be released via {@link RetainableByteBuffer#release()}.</p>
* <p>The returned {@link RetainableByteBuffer} <b>will</b> eventually be released via
* {@link RetainableByteBuffer#release()} by the code that called this method.</p>
*
* @param buffer the buffer containing encoded bytes
* @return a buffer containing decoded bytes that must be released
* @return a buffer containing decoded bytes
*/
public abstract RetainableByteBuffer decode(ByteBuffer buffer);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ default void onContentSource(Response response, Content.Source contentSource)
if (Content.Chunk.isFailure(chunk))
{
response.abort(chunk.getFailure());
if (!chunk.isLast())
contentSource.fail(chunk.getFailure());
return;
}
if (chunk.isLast() && !chunk.hasRemaining())
Expand All @@ -207,6 +209,7 @@ default void onContentSource(Response response, Content.Source contentSource)
{
chunk.release();
response.abort(x);
contentSource.fail(x);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.io.content.ContentSourceTransformer;
import org.eclipse.jetty.util.ExceptionUtil;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.component.Destroyable;
import org.eclipse.jetty.util.thread.AutoLock;
Expand Down Expand Up @@ -587,7 +588,11 @@ protected Content.Chunk transform(Content.Chunk inputChunk)
if (_chunk == null)
return null;
if (Content.Chunk.isFailure(_chunk))
return _chunk;
{
Content.Chunk failure = _chunk;
_chunk = Content.Chunk.next(failure);
return failure;
}

// Retain the input chunk because its ByteBuffer will be referenced by the Inflater.
if (retain)
Expand All @@ -602,14 +607,25 @@ protected Content.Chunk transform(Content.Chunk inputChunk)
{
// The decoded ByteBuffer is a transformed "copy" of the
// compressed one, so it has its own reference counter.
if (LOG.isDebugEnabled())
LOG.debug("returning decoded content");
return Content.Chunk.asChunk(decodedBuffer.getByteBuffer(), false, decodedBuffer);
if (decodedBuffer.canRetain())
{
if (LOG.isDebugEnabled())
LOG.debug("returning decoded content");
return Content.Chunk.asChunk(decodedBuffer.getByteBuffer(), false, decodedBuffer);
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("returning non-retainable decoded content");
return Content.Chunk.from(decodedBuffer.getByteBuffer(), false);
}
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("decoding produced no content");
if (decodedBuffer != null)
decodedBuffer.release();

if (!_chunk.hasRemaining())
{
Expand Down Expand Up @@ -788,7 +804,13 @@ public boolean error(Throwable failure)
try (AutoLock ignored = lock.lock())
{
if (Content.Chunk.isFailure(currentChunk))
{
Throwable cause = currentChunk.getFailure();
if (!currentChunk.isLast())
currentChunk = Content.Chunk.from(cause, true);
ExceptionUtil.addSuppressedIfNotAssociated(cause, failure);
return false;
}
if (currentChunk != null)
currentChunk.release();
currentChunk = Content.Chunk.from(failure);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,11 @@ protected Action process() throws Throwable
}

if (Content.Chunk.isFailure(chunk))
throw chunk.getFailure();
{
Content.Chunk failure = chunk;
chunk = Content.Chunk.next(failure);
throw failure.getFailure();
}

ByteBuffer buffer = chunk.getByteBuffer();
contentBuffer = buffer.asReadOnlyBuffer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.io.content.ByteBufferContentSource;
import org.eclipse.jetty.util.ExceptionUtil;
import org.eclipse.jetty.util.thread.AutoLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -679,12 +680,20 @@ public void fail(Throwable failure)
if (LOG.isDebugEnabled())
LOG.debug("Content source #{} fail while current chunk is {}", index, currentChunk);
if (Content.Chunk.isFailure(currentChunk))
return;
if (currentChunk != null && currentChunk != ALREADY_READ_CHUNK)
currentChunk.release();
this.chunk = Content.Chunk.from(failure);
onDemandCallback();
{
Throwable cause = currentChunk.getFailure();
if (!currentChunk.isLast())
chunk = Content.Chunk.from(cause, true);
ExceptionUtil.addSuppressedIfNotAssociated(cause, failure);
}
else
{
if (currentChunk != null && currentChunk != ALREADY_READ_CHUNK)
currentChunk.release();
this.chunk = Content.Chunk.from(failure);
}
registerFailure(this, failure);
onDemandCallback();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
//
// ========================================================================
// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//

package org.eclipse.jetty.client;

import java.io.Closeable;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.eclipse.jetty.client.transport.HttpConversation;
import org.eclipse.jetty.client.transport.HttpRequest;
import org.eclipse.jetty.client.transport.HttpResponse;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.io.content.ChunksContentSource;
import org.junit.jupiter.api.Test;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;

public class AsyncContentListenerTest
{
@Test
public void testTransientFailureBecomesTerminal()
{
TestSource originalSource = new TestSource(
Content.Chunk.from(ByteBuffer.wrap(new byte[] {1}), false),
Content.Chunk.from(ByteBuffer.wrap(new byte[] {2}), false),
Content.Chunk.from(new NumberFormatException(), false),
Content.Chunk.from(ByteBuffer.wrap(new byte[] {3}), true)
);

List<Content.Chunk> collectedChunks = new ArrayList<>();
Response.AsyncContentListener asyncContentListener = (response, chunk, demander) ->
{
chunk.retain();
collectedChunks.add(chunk);
demander.run();
};

HttpResponse response = new HttpResponse(new HttpRequest(new HttpClient(), new HttpConversation(), URI.create("http://localhost")));
asyncContentListener.onContentSource(response, originalSource);

assertThat(collectedChunks.size(), is(2));
assertThat(collectedChunks.get(0).isLast(), is(false));
assertThat(collectedChunks.get(0).getByteBuffer().get(), is((byte)1));
assertThat(collectedChunks.get(0).getByteBuffer().hasRemaining(), is(false));
assertThat(collectedChunks.get(1).isLast(), is(false));
assertThat(collectedChunks.get(1).getByteBuffer().get(), is((byte)2));
assertThat(collectedChunks.get(1).getByteBuffer().hasRemaining(), is(false));

Content.Chunk chunk = originalSource.read();
assertThat(Content.Chunk.isFailure(chunk, true), is(true));
assertThat(chunk.getFailure(), instanceOf(NumberFormatException.class));

collectedChunks.forEach(Content.Chunk::release);
originalSource.close();
}

private static class TestSource extends ChunksContentSource implements Closeable
{
private Content.Chunk[] chunks;

public TestSource(Content.Chunk... chunks)
{
super(Arrays.asList(chunks));
this.chunks = chunks;
}

@Override
public void close()
{
if (chunks != null)
{
for (Content.Chunk chunk : chunks)
{
if (chunk != null)
chunk.release();
}
chunks = null;
}
}
}
}
Loading

0 comments on commit 7dcab84

Please sign in to comment.