Skip to content

Commit

Permalink
Issue #6254 - Total timeout not enforced for queued requests.
Browse files Browse the repository at this point in the history
Various code cleanups.
Renamed HttpDestination.TimeoutTask to RequestTimeouts for clarity.
Improved javadocs, code comments and logging.

Signed-off-by: Simone Bordet <[email protected]>
  • Loading branch information
sbordet committed May 11, 2021
1 parent 003c313 commit 5f23689
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
private final ProxyConfiguration.Proxy proxy;
private final ClientConnectionFactory connectionFactory;
private final HttpField hostField;
private final TimeoutTask timeout;
private final RequestTimeouts requestTimeouts;
private ConnectionPool connectionPool;

public HttpDestination(HttpClient client, Origin origin)
Expand All @@ -78,7 +78,7 @@ public HttpDestination(HttpClient client, Origin origin)
this.requestNotifier = new RequestNotifier(client);
this.responseNotifier = new ResponseNotifier();

this.timeout = new TimeoutTask(client.getScheduler());
this.requestTimeouts = new RequestTimeouts(client.getScheduler());

ProxyConfiguration proxyConfig = client.getProxyConfiguration();
proxy = proxyConfig.match(origin);
Expand Down Expand Up @@ -272,7 +272,7 @@ public void send(HttpExchange exchange)
{
long expiresAt = request.getTimeoutAt();
if (expiresAt != -1)
timeout.schedule(expiresAt);
requestTimeouts.schedule(expiresAt);

if (!client.isRunning() && exchanges.remove(exchange))
{
Expand Down Expand Up @@ -425,7 +425,7 @@ public void close()
if (LOG.isDebugEnabled())
LOG.debug("Closed {}", this);
connectionPool.close();
timeout.destroy();
requestTimeouts.destroy();
}

public void release(Connection connection)
Expand Down Expand Up @@ -547,15 +547,15 @@ public String toString()
}

/**
* This class enforces the total timeout for exchanges that are still in the queue.
* The total timeout for exchanges that are not in the destination queue is enforced
* by {@link HttpChannel}.
* <p>Enforces the total timeout for for exchanges that are still in the queue.</p>
* <p>The total timeout for exchanges that are not in the destination queue
* is enforced in {@link HttpChannel} by {@link TimeoutCompleteListener}.</p>
*/
private class TimeoutTask extends CyclicTimeout
private class RequestTimeouts extends CyclicTimeout
{
private final AtomicLong nextTimeout = new AtomicLong(Long.MAX_VALUE);
private final AtomicLong earliestTimeout = new AtomicLong(Long.MAX_VALUE);

private TimeoutTask(Scheduler scheduler)
private RequestTimeouts(Scheduler scheduler)
{
super(scheduler);
}
Expand All @@ -564,14 +564,17 @@ private TimeoutTask(Scheduler scheduler)
public void onTimeoutExpired()
{
if (LOG.isDebugEnabled())
LOG.debug("{} timeout expired", this);
LOG.debug("{} timeouts check", this);

nextTimeout.set(Long.MAX_VALUE);
long now = System.nanoTime();
long nextExpiresAt = Long.MAX_VALUE;

// Check all queued exchanges for those that have expired
// and to determine when the next check must be.
// Reset the earliest timeout so we can expire again.
// A concurrent call to schedule(long) may lose an earliest
// value, but the corresponding exchange is already enqueued
// and will be seen by scanning the exchange queue below.
earliestTimeout.set(Long.MAX_VALUE);

long earliest = Long.MAX_VALUE;
for (HttpExchange exchange : exchanges)
{
HttpRequest request = exchange.getRequest();
Expand All @@ -580,34 +583,27 @@ public void onTimeoutExpired()
continue;
if (expiresAt <= now)
request.abort(new TimeoutException("Total timeout " + request.getTimeout() + " ms elapsed"));
else if (expiresAt < nextExpiresAt)
nextExpiresAt = expiresAt;
else if (expiresAt < earliest)
earliest = expiresAt;
}

if (nextExpiresAt < Long.MAX_VALUE && client.isRunning())
schedule(nextExpiresAt);
if (earliest < Long.MAX_VALUE && client.isRunning())
schedule(earliest);
}

private void schedule(long expiresAt)
{
// Schedule a timeout for the soonest any known exchange can expire.
// If subsequently that exchange is removed from the queue, the
// timeout is not cancelled, instead the entire queue is swept
// for expired exchanges and a new timeout is set.
long timeoutAt = nextTimeout.getAndUpdate(e -> Math.min(e, expiresAt));
if (timeoutAt != expiresAt)
// Schedule a timeout for the earliest exchange that may expire.
// When the timeout expires, scan the exchange queue for the next
// earliest exchange that may expire, and reschedule a new timeout.
long earliest = earliestTimeout.getAndUpdate(t -> Math.min(t, expiresAt));
if (expiresAt != earliest)
{
long delay = expiresAt - System.nanoTime();
if (delay <= 0)
{
onTimeoutExpired();
}
else
{
schedule(delay, TimeUnit.NANOSECONDS);
if (LOG.isDebugEnabled())
LOG.debug("{} scheduled timeout in {} ms", this, TimeUnit.NANOSECONDS.toMillis(delay));
}
// A new request expires earlier than previous requests, schedule it.
long delay = Math.max(0, expiresAt - System.nanoTime());
if (LOG.isDebugEnabled())
LOG.debug("{} scheduling timeout in {} ms", this, TimeUnit.NANOSECONDS.toMillis(delay));
schedule(delay, TimeUnit.NANOSECONDS);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class TimeoutCompleteListener extends CyclicTimeout implements Response.C
{
private static final Logger LOG = Log.getLogger(TimeoutCompleteListener.class);

private final AtomicReference<Request> request = new AtomicReference<>();
private final AtomicReference<Request> requestTimeout = new AtomicReference<>();

public TimeoutCompleteListener(Scheduler scheduler)
{
Expand All @@ -44,7 +44,7 @@ public TimeoutCompleteListener(Scheduler scheduler)
@Override
public void onTimeoutExpired()
{
Request request = this.request.getAndSet(null);
Request request = requestTimeout.getAndSet(null);
if (LOG.isDebugEnabled())
LOG.debug("Total timeout {} ms elapsed for {} on {}", request.getTimeout(), request, this);
if (request != null)
Expand All @@ -54,7 +54,7 @@ public void onTimeoutExpired()
@Override
public void onComplete(Result result)
{
Request request = this.request.getAndSet(null);
Request request = requestTimeout.getAndSet(null);
if (request != null)
{
boolean cancelled = cancel();
Expand All @@ -65,19 +65,12 @@ public void onComplete(Result result)

void schedule(HttpRequest request, long timeoutAt)
{
if (this.request.compareAndSet(null, request))
if (requestTimeout.compareAndSet(null, request))
{
long delay = timeoutAt - System.nanoTime();
if (delay <= 0)
{
onTimeoutExpired();
}
else
{
schedule(delay, TimeUnit.NANOSECONDS);
if (LOG.isDebugEnabled())
LOG.debug("Scheduled timeout in {} ms for {} on {}", TimeUnit.NANOSECONDS.toMillis(delay), request, this);
}
long delay = Math.max(0, timeoutAt - System.nanoTime());
if (LOG.isDebugEnabled())
LOG.debug("Scheduling timeout in {} ms for {} on {}", TimeUnit.NANOSECONDS.toMillis(delay), request, this);
schedule(delay, TimeUnit.NANOSECONDS);
}
}
}
20 changes: 19 additions & 1 deletion jetty-io/src/main/java/org/eclipse/jetty/io/CyclicTimeout.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,23 @@
* <p>Subclasses should implement {@link #onTimeoutExpired()}.</p>
* <p>This implementation is optimised assuming that the timeout
* will mostly be cancelled and then reused with a similar value.</p>
* <p>The typical scenario to use this class is when you have events
* that postpone (by re-scheduling), or cancel then re-schedule, a
* timeout for a single entity.
* For example: connection idleness, where for each connection there
* is a CyclicTimeout and a read/write postpones the timeout; when
* the timeout expires, the implementation checks against a timestamp
* if the connection is really idle.
* Another example: HTTP session expiration, where for each HTTP
* session there is a CyclicTimeout and at the beginning of the
* request processing the timeout is canceled (via cancel()), but at
* the end of the request processing the timeout is re-scheduled.</p>
* <p>Another typical scenario is for a parent entity to manage
* the timeouts of many children entities; the timeout is scheduled
* for the child entity that expires the earlier; when the timeout
* expires, the implementation scans the children entities to find
* the expired child entities and to find the next child entity
* that expires the earlier. </p>
* <p>This implementation has a {@link Timeout} holding the time
* at which the scheduled task should fire, and a linked list of
* {@link Wakeup}, each holding the actual scheduled task.</p>
Expand Down Expand Up @@ -102,8 +119,9 @@ public boolean schedule(long delay, TimeUnit units)
if (_timeout.compareAndSet(timeout, new Timeout(newTimeoutAt, wakeup)))
{
if (LOG.isDebugEnabled())
LOG.debug("Installed timeout in {} ms, waking up in {} ms",
LOG.debug("Installed timeout in {} ms, {} wake up in {} ms",
units.toMillis(delay),
newWakeup != null ? "new" : "existing",
TimeUnit.NANOSECONDS.toMillis(wakeup._at - now));
break;
}
Expand Down

0 comments on commit 5f23689

Please sign in to comment.