Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #5105 - StatisticsHandler Graceful Shutdown of Async Requests #5175

Merged
merged 8 commits into from
Aug 28, 2020
6 changes: 5 additions & 1 deletion jetty-server/src/main/config/etc/jetty-stats.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@
<Configure id="Server" class="org.eclipse.jetty.server.Server">
<Call name="insertHandler">
<Arg>
<New id="StatsHandler" class="org.eclipse.jetty.server.handler.StatisticsHandler"></New>
<New id="StatsHandler" class="org.eclipse.jetty.server.handler.StatisticsHandler">
<Set name="waitForSuspendedRequestsOnShutdown">
<Property name="jetty.statistics.waitForSuspendedRequestsOnShutdown" default="true"/>
</Set>
</New>
</Arg>
</Call>
<Call class="org.eclipse.jetty.server.ServerConnectionStatistics" name="addToAllConnectors">
Expand Down
5 changes: 5 additions & 0 deletions jetty-server/src/main/config/modules/stats.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,8 @@ etc/jetty-stats.xml

[ini]
jetty.webapp.addServerClasses+=,-org.eclipse.jetty.servlet.StatisticsServlet

[ini-template]

## If the Graceful shutdown should wait for suspended requests as well as dispatched ones.
# jetty.statistics.waitForSuspendedRequestsOnShutdown=true
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import java.io.IOException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import javax.servlet.AsyncEvent;
Expand All @@ -29,7 +28,6 @@
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.server.AsyncContextEvent;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpChannelState;
Expand Down Expand Up @@ -59,6 +57,7 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful

private final LongAdder _asyncDispatches = new LongAdder();
private final LongAdder _expires = new LongAdder();
private final LongAdder _errors = new LongAdder();

private final LongAdder _responses1xx = new LongAdder();
private final LongAdder _responses2xx = new LongAdder();
Expand All @@ -67,6 +66,8 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful
private final LongAdder _responses5xx = new LongAdder();
private final LongAdder _responsesTotalBytes = new LongAdder();

private boolean waitForSuspendedRequestsOnShutdown = true;
lachlan-roberts marked this conversation as resolved.
Show resolved Hide resolved

private final Graceful.Shutdown _shutdown = new Graceful.Shutdown()
{
@Override
Expand All @@ -76,25 +77,24 @@ protected FutureCallback newShutdownCallback()
}
};

private final AtomicBoolean _wrapWarning = new AtomicBoolean();

private final AsyncListener _onCompletion = new AsyncListener()
{
@Override
public void onTimeout(AsyncEvent event) throws IOException
public void onStartAsync(AsyncEvent event) throws IOException
{
_expires.increment();
event.getAsyncContext().addListener(this);
}

@Override
public void onStartAsync(AsyncEvent event) throws IOException
public void onTimeout(AsyncEvent event) throws IOException
{
event.getAsyncContext().addListener(this);
_expires.increment();
}

@Override
public void onError(AsyncEvent event) throws IOException
{
_errors.increment();
}

@Override
Expand All @@ -105,15 +105,15 @@ public void onComplete(AsyncEvent event) throws IOException
Request request = state.getBaseRequest();
final long elapsed = System.currentTimeMillis() - request.getTimeStamp();

long d = _requestStats.decrement();
long numRequests = _requestStats.decrement();
_requestTimeStats.record(elapsed);

updateResponse(request);

_asyncWaitStats.decrement();

// If we have no more dispatches, should we signal shutdown?
if (d == 0)
if (numRequests == 0 && waitForSuspendedRequestsOnShutdown)
{
FutureCallback shutdown = _shutdown.get();
if (shutdown != null)
Expand Down Expand Up @@ -149,6 +149,10 @@ public void statsReset()
@Override
public void handle(String path, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
Handler handler = getHandler();
if (handler == null || !isStarted() || isShutdown())
return;

_dispatchedStats.increment();

final long start;
Expand All @@ -168,51 +172,41 @@ public void handle(String path, Request baseRequest, HttpServletRequest request,

try
{
Handler handler = getHandler();
if (handler != null && !_shutdown.isShutdown() && isStarted())
handler.handle(path, baseRequest, request, response);
else
{
if (!baseRequest.isHandled())
baseRequest.setHandled(true);
else if (_wrapWarning.compareAndSet(false, true))
LOG.warn("Bad statistics configuration. Latencies will be incorrect in {}", this);
if (!baseRequest.getResponse().isCommitted())
response.sendError(HttpStatus.SERVICE_UNAVAILABLE_503);
}
handler.handle(path, baseRequest, request, response);
}
finally
{
final long now = System.currentTimeMillis();
final long dispatched = now - start;

_dispatchedStats.decrement();
long numRequests = -1;
long numDispatches = _dispatchedStats.decrement();
_dispatchedTimeStats.record(dispatched);

if (state.isSuspended())
if (state.isInitial())
{
if (state.isInitial())
if (state.isAsyncStarted())
lachlan-roberts marked this conversation as resolved.
Show resolved Hide resolved
{
state.addListener(_onCompletion);
_asyncWaitStats.increment();
}
else
{
numRequests = _requestStats.decrement();
_requestTimeStats.record(dispatched);
updateResponse(baseRequest);
}
}
else if (state.isInitial())

FutureCallback shutdown = _shutdown.get();
if (shutdown != null)
{
long d = _requestStats.decrement();
_requestTimeStats.record(dispatched);
updateResponse(baseRequest);
response.flushBuffer();

// If we have no more dispatches, should we signal shutdown?
FutureCallback shutdown = _shutdown.get();
if (shutdown != null)
{
response.flushBuffer();
if (d == 0)
shutdown.succeeded();
}
// If we either have no more requests or dispatches, we can complete shutdown.
if (waitForSuspendedRequestsOnShutdown ? (numRequests == 0) : (numDispatches == 0))
shutdown.succeeded();
}
// else onCompletion will handle it.
}
}

Expand Down Expand Up @@ -251,6 +245,8 @@ protected void updateResponse(Request request)
@Override
protected void doStart() throws Exception
{
if (getHandler() == null)
throw new IllegalStateException("StatisticsHandler has no Wrapped Handler");
_shutdown.cancel();
super.doStart();
statsReset();
Expand All @@ -263,6 +259,16 @@ protected void doStop() throws Exception
super.doStop();
}

/**
* Set whether the graceful shutdown should wait for all requests to complete (including suspended requests)
* or whether it should only wait for all the actively dispatched requests to complete.
* @param waitForSuspendedRequests true to wait for suspended requests on graceful shutdown.
*/
public void waitForSuspendedRequestsOnShutdown(boolean waitForSuspendedRequests)
lachlan-roberts marked this conversation as resolved.
Show resolved Hide resolved
{
this.waitForSuspendedRequestsOnShutdown = waitForSuspendedRequests;
}

lachlan-roberts marked this conversation as resolved.
Show resolved Hide resolved
/**
* @return the number of requests handled by this handler
* since {@link #statsReset()} was last called, excluding
Expand Down Expand Up @@ -467,6 +473,16 @@ public int getExpires()
return _expires.intValue();
}

/**
* @return the number of async errors that occurred.
* @see #getAsyncDispatches()
*/
@ManagedAttribute("number of async errors that occurred")
public int getErrors()
{
return _errors.intValue();
}

/**
* @return the number of responses with a 1xx status returned by this context
* since {@link #statsReset()} was last called.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ public void run()
).getBytes());
client2.getOutputStream().flush();
String response2 = IO.toString(client2.getInputStream());
assertThat(response2, containsString(" 503 "));
assertThat(response2, containsString(" 404 "));

now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
Thread.sleep(Math.max(1, end - now));
Expand All @@ -330,8 +330,9 @@ public void run()
assertThat(response, containsString(" 200 OK"));
assertThat(response, containsString("read 10/10"));

assertThat(stats.getRequests(), is(2));
assertThat(stats.getResponses5xx(), is(1));
// The StatisticsHandler was shutdown when it received the second request so does not contribute to the stats.
assertThat(stats.getRequests(), is(1));
assertThat(stats.getResponses4xx(), is(0));
}
}

Expand Down Expand Up @@ -631,7 +632,7 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques

// Check new connections rejected!
String unavailable = connector.getResponse("GET / HTTP/1.1\r\nHost:localhost\r\n\r\n");
assertThat(unavailable, containsString(" 503 Service Unavailable"));
assertThat(unavailable, containsString(" 404 Not Found"));
lachlan-roberts marked this conversation as resolved.
Show resolved Hide resolved
assertThat(unavailable, Matchers.containsString("Connection: close"));

// Check completed 200 has close
Expand Down
Loading