Skip to content

Commit

Permalink
Merge pull request #2965 from eclipse/jetty-9.4.x-2796-http2_max_conc…
Browse files Browse the repository at this point in the history
…urrent_streams

Issue #2796 - Max local stream count exceeded when request fails.
  • Loading branch information
sbordet authored Nov 1, 2018
2 parents 161f169 + 31cab3d commit 11abe53
Show file tree
Hide file tree
Showing 15 changed files with 187 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -550,14 +550,14 @@ public boolean abort(HttpExchange exchange, Throwable failure)
// respect to concurrency between request and response.
Result result = exchange.terminateResponse();
terminateResponse(exchange, result);
return true;
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("Concurrent failure: response termination skipped, performed by helpers");
return false;
}

return true;
}

private boolean updateResponseState(ResponseState from, ResponseState to)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public class HttpRequest implements Request
private String query;
private String method = HttpMethod.GET.asString();
private HttpVersion version = HttpVersion.HTTP_1_1;
private long idleTimeout;
private long idleTimeout = -1;
private long timeout;
private long timeoutAt;
private ContentProvider content;
Expand All @@ -99,7 +99,6 @@ protected HttpRequest(HttpClient client, HttpConversation conversation, URI uri)
extractParams(query);

followRedirects(client.isFollowRedirects());
idleTimeout = client.getIdleTimeout();
HttpField acceptEncodingField = client.getAcceptEncodingField();
if (acceptEncodingField != null)
headers.put(acceptEncodingField);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -579,14 +579,14 @@ public boolean abort(HttpExchange exchange, Throwable failure)
// respect to concurrency between request and response.
Result result = exchange.terminateRequest();
terminateRequest(exchange, failure, result);
return true;
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("Concurrent failure: request termination skipped, performed by helpers");
return false;
}

return true;
}

private boolean updateRequestState(RequestState from, RequestState to)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,9 @@ protected SendFailure send(HttpExchange exchange)
// Save the old idle timeout to restore it.
EndPoint endPoint = getEndPoint();
idleTimeout = endPoint.getIdleTimeout();
endPoint.setIdleTimeout(request.getIdleTimeout());
long requestIdleTimeout = request.getIdleTimeout();
if (requestIdleTimeout >= 0)
endPoint.setIdleTimeout(requestIdleTimeout);

// One channel per connection, just delegate the send.
return send(channel, exchange);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public FCGIIdleTimeout(HttpConnectionOverFCGI connection, long idleTimeout)
{
super(connection.getHttpDestination().getHttpClient().getScheduler());
this.connection = connection;
setIdleTimeout(idleTimeout);
setIdleTimeout(idleTimeout >= 0 ? idleTimeout : connection.getEndPoint().getIdleTimeout());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ public void newStream(HeadersFrame frame, Promise<Stream> promise, Stream.Listen
IStream stream = createLocalStream(streamId);
stream.setListener(listener);

ControlEntry entry = new ControlEntry(frame, stream, new PromiseCallback<>(promise, stream));
ControlEntry entry = new ControlEntry(frame, stream, new StreamPromiseCallback(promise, stream));
queued = flusher.append(entry);
}
// Iterate outside the synchronized block.
Expand Down Expand Up @@ -606,7 +606,7 @@ public void push(IStream stream, Promise<Stream> promise, PushPromiseFrame frame
IStream pushStream = createLocalStream(streamId);
pushStream.setListener(listener);

ControlEntry entry = new ControlEntry(frame, pushStream, new PromiseCallback<>(promise, pushStream));
ControlEntry entry = new ControlEntry(frame, pushStream, new StreamPromiseCallback(promise, pushStream));
queued = flusher.append(entry);
}
// Iterate outside the synchronized block.
Expand Down Expand Up @@ -764,7 +764,8 @@ protected IStream createLocalStream(int streamId)
int localCount = localStreamCount.get();
int maxCount = getMaxLocalStreams();
if (maxCount >= 0 && localCount >= maxCount)
throw new IllegalStateException("Max local stream count " + maxCount + " exceeded");
// TODO: remove the dump() in the exception message.
throw new IllegalStateException("Max local stream count " + maxCount + " exceeded" + System.lineSeparator() + dump());
if (localStreamCount.compareAndSet(localCount, localCount + 1))
break;
}
Expand All @@ -780,6 +781,7 @@ protected IStream createLocalStream(int streamId)
}
else
{
localStreamCount.decrementAndGet();
throw new IllegalStateException("Duplicate stream " + streamId);
}
}
Expand Down Expand Up @@ -816,6 +818,7 @@ protected IStream createRemoteStream(int streamId)
}
else
{
remoteStreamCount.addAndGetHi(-1);
onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "duplicate_stream");
return null;
}
Expand Down Expand Up @@ -1461,21 +1464,21 @@ public void succeeded()
}
}

private static class PromiseCallback<C> implements Callback
private static class StreamPromiseCallback implements Callback
{
private final Promise<C> promise;
private final C value;
private final Promise<Stream> promise;
private final IStream stream;

private PromiseCallback(Promise<C> promise, C value)
private StreamPromiseCallback(Promise<Stream> promise, IStream stream)
{
this.promise = promise;
this.value = value;
this.stream = stream;
}

@Override
public void succeeded()
{
promise.succeeded(value);
promise.succeeded(stream);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ private boolean startWrite(Callback callback)
{
if (writing.compareAndSet(null, callback))
return true;
close();
callback.failed(new WritePendingException());
return false;
}
Expand Down Expand Up @@ -275,8 +276,6 @@ public void process(Frame frame, Callback callback)

private void onHeaders(HeadersFrame frame, Callback callback)
{
if (updateClose(frame.isEndStream(), CloseState.Event.RECEIVED))
session.removeStream(this);
MetaData metaData = frame.getMetaData();
if (metaData.isRequest() || metaData.isResponse())
{
Expand All @@ -286,6 +285,10 @@ private void onHeaders(HeadersFrame frame, Callback callback)
length = fields.getLongField(HttpHeader.CONTENT_LENGTH.asString());
dataLength = length >= 0 ? length : Long.MIN_VALUE;
}

if (updateClose(frame.isEndStream(), CloseState.Event.RECEIVED))
session.removeStream(this);

callback.succeeded();
}

Expand Down Expand Up @@ -507,6 +510,13 @@ public void close()
}
}

@Override
public void onClose()
{
super.onClose();
notifyClosed(this);
}

private void updateStreamCount(int deltaStream, int deltaClosing)
{
((HTTP2Session)session).updateStreamCount(isLocal(), deltaStream, deltaClosing);
Expand Down Expand Up @@ -612,6 +622,21 @@ private void notifyFailure(Stream stream, FailureFrame frame, Callback callback)
}
}

private void notifyClosed(Stream stream)
{
Listener listener = this.listener;
if (listener == null)
return;
try
{
listener.onClosed(stream);
}
catch (Throwable x)
{
LOG.info("Failure while notifying listener " + listener, x);
}
}

@Override
public String dump()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,13 @@ public interface Listener
*/
public void onData(Stream stream, DataFrame frame, Callback callback);

/**
* <p>Callback method invoked when a RST_STREAM frame has been received for this stream.</p>
*
* @param stream the stream
* @param frame the RST_FRAME received
* @param callback the callback to complete when the reset has been handled
*/
public default void onReset(Stream stream, ResetFrame frame, Callback callback)
{
try
Expand Down Expand Up @@ -214,11 +221,28 @@ public default boolean onIdleTimeout(Stream stream, Throwable x)
return true;
}

/**
* <p>Callback method invoked when the stream failed.</p>
*
* @param stream the stream
* @param error the error code
* @param reason the error reason, or null
* @param callback the callback to complete when the failure has been handled
*/
public default void onFailure(Stream stream, int error, String reason, Callback callback)
{
callback.succeeded();
}

/**
* <p>Callback method invoked after the stream has been closed.</p>
*
* @param stream the stream
*/
public default void onClosed(Stream stream)
{
}

/**
* <p>Empty implementation of {@link Listener}</p>
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.eclipse.jetty.client.HttpSender;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.IStream;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.ResetFrame;
Expand Down Expand Up @@ -101,6 +102,11 @@ public void release()
connection.release(this);
}

void onStreamClosed(IStream stream)
{
connection.onStreamClosed(stream, this);
}

@Override
public void exchangeTerminated(HttpExchange exchange, Result result)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,17 @@
import org.eclipse.jetty.client.SendFailure;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.IStream;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Sweeper;

public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.Sweepable
{
private static final Logger LOG = Log.getLogger(HttpConnection.class);

private final Set<HttpChannel> activeChannels = ConcurrentHashMap.newKeySet();
private final Queue<HttpChannelOverHTTP2> idleChannels = new ConcurrentLinkedQueue<>();
private final AtomicBoolean closed = new AtomicBoolean();
Expand Down Expand Up @@ -87,23 +92,32 @@ protected HttpChannelOverHTTP2 newHttpChannel()

protected void release(HttpChannelOverHTTP2 channel)
{
// Only non-push channels are released.
if (LOG.isDebugEnabled())
LOG.debug("Released {}", channel);
if (activeChannels.remove(channel))
{
channel.setStream(null);
// Recycle only non-failed channels.
if (channel.isFailed())
channel.destroy();
else
idleChannels.offer(channel);
getHttpDestination().release(this);
}
else
{
channel.destroy();
}
}

void onStreamClosed(IStream stream, HttpChannelOverHTTP2 channel)
{
if (LOG.isDebugEnabled())
LOG.debug("{} closed for {}", stream, channel);
channel.setStream(null);
// Only non-push channels are released.
if (stream.isLocal())
getHttpDestination().release(this);
}

@Override
public boolean onIdleTimeout(long idleTimeout)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.IStream;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
Expand Down Expand Up @@ -171,8 +172,10 @@ public void onReset(Stream stream, ResetFrame frame)
@Override
public boolean onIdleTimeout(Stream stream, Throwable x)
{
responseFailure(x);
return true;
HttpExchange exchange = getHttpExchange();
if (exchange == null)
return false;
return !exchange.abort(x);
}

@Override
Expand All @@ -182,6 +185,12 @@ public void onFailure(Stream stream, int error, String reason, Callback callback
callback.succeeded();
}

@Override
public void onClosed(Stream stream)
{
getHttpChannel().onStreamClosed((IStream)stream);
}

private void notifyContent(HttpExchange exchange, DataFrame frame, Callback callback)
{
contentNotifier.offer(new DataInfo(exchange, frame, callback));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ public void succeeded(Stream stream)
{
channel.setStream(stream);
((IStream)stream).setAttachment(channel);
stream.setIdleTimeout(request.getIdleTimeout());
long idleTimeout = request.getIdleTimeout();
if (idleTimeout >= 0)
stream.setIdleTimeout(idleTimeout);

if (content.hasContent() && !expects100Continue(request))
{
Expand Down
Loading

0 comments on commit 11abe53

Please sign in to comment.