Skip to content

Commit

Permalink
Issue #2796 - Max local stream count exceeded when request fails.
Browse files Browse the repository at this point in the history
Reviewed other possible places where max local stream count may
overflow.

Fixed handling of HTTP/2 stream idle timeouts.

Signed-off-by: Simone Bordet <[email protected]>
  • Loading branch information
sbordet committed Oct 9, 2018
1 parent a056695 commit cec84cf
Show file tree
Hide file tree
Showing 12 changed files with 75 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -550,14 +550,14 @@ public boolean abort(HttpExchange exchange, Throwable failure)
// respect to concurrency between request and response.
Result result = exchange.terminateResponse();
terminateResponse(exchange, result);
return true;
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("Concurrent failure: response termination skipped, performed by helpers");
return false;
}

return true;
}

private boolean updateResponseState(ResponseState from, ResponseState to)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public class HttpRequest implements Request
private String query;
private String method = HttpMethod.GET.asString();
private HttpVersion version = HttpVersion.HTTP_1_1;
private long idleTimeout;
private long idleTimeout = -1;
private long timeout;
private long timeoutAt;
private ContentProvider content;
Expand All @@ -99,7 +99,6 @@ protected HttpRequest(HttpClient client, HttpConversation conversation, URI uri)
extractParams(query);

followRedirects(client.isFollowRedirects());
idleTimeout = client.getIdleTimeout();
HttpField acceptEncodingField = client.getAcceptEncodingField();
if (acceptEncodingField != null)
headers.put(acceptEncodingField);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -579,14 +579,14 @@ public boolean abort(HttpExchange exchange, Throwable failure)
// respect to concurrency between request and response.
Result result = exchange.terminateRequest();
terminateRequest(exchange, failure, result);
return true;
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("Concurrent failure: request termination skipped, performed by helpers");
return false;
}

return true;
}

private boolean updateRequestState(RequestState from, RequestState to)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,9 @@ protected SendFailure send(HttpExchange exchange)
// Save the old idle timeout to restore it.
EndPoint endPoint = getEndPoint();
idleTimeout = endPoint.getIdleTimeout();
endPoint.setIdleTimeout(request.getIdleTimeout());
long requestIdleTimeout = request.getIdleTimeout();
if (requestIdleTimeout >= 0)
endPoint.setIdleTimeout(requestIdleTimeout);

// One channel per connection, just delegate the send.
return send(channel, exchange);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public FCGIIdleTimeout(HttpConnectionOverFCGI connection, long idleTimeout)
{
super(connection.getHttpDestination().getHttpClient().getScheduler());
this.connection = connection;
setIdleTimeout(idleTimeout);
setIdleTimeout(idleTimeout >= 0 ? idleTimeout : connection.getEndPoint().getIdleTimeout());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ public void onSettings(SettingsFrame frame, boolean reply)
case SettingsFrame.MAX_CONCURRENT_STREAMS:
{
if (LOG.isDebugEnabled())
LOG.debug("Updating max local concurrent streams to {} for {}", maxLocalStreams, this);
LOG.debug("Updating max local concurrent streams to {} for {}", value, this);
maxLocalStreams = value;
break;
}
Expand Down Expand Up @@ -561,7 +561,7 @@ public void newStream(HeadersFrame frame, Promise<Stream> promise, Stream.Listen
IStream stream = createLocalStream(streamId);
stream.setListener(listener);

ControlEntry entry = new ControlEntry(frame, stream, new PromiseCallback<>(promise, stream));
ControlEntry entry = new ControlEntry(frame, stream, new StreamPromiseCallback(promise, stream));
queued = flusher.append(entry);
}
// Iterate outside the synchronized block.
Expand Down Expand Up @@ -605,7 +605,7 @@ public void push(IStream stream, Promise<Stream> promise, PushPromiseFrame frame
IStream pushStream = createLocalStream(streamId);
pushStream.setListener(listener);

ControlEntry entry = new ControlEntry(frame, pushStream, new PromiseCallback<>(promise, pushStream));
ControlEntry entry = new ControlEntry(frame, pushStream, new StreamPromiseCallback(promise, pushStream));
queued = flusher.append(entry);
}
// Iterate outside the synchronized block.
Expand Down Expand Up @@ -779,6 +779,7 @@ protected IStream createLocalStream(int streamId)
}
else
{
localStreamCount.decrementAndGet();
throw new IllegalStateException("Duplicate stream " + streamId);
}
}
Expand Down Expand Up @@ -815,6 +816,7 @@ protected IStream createRemoteStream(int streamId)
}
else
{
remoteStreamCount.addAndGetHi(-1);
onConnectionFailure(ErrorCode.PROTOCOL_ERROR.code, "duplicate_stream");
return null;
}
Expand Down Expand Up @@ -1461,21 +1463,21 @@ public void succeeded()
}
}

private static class PromiseCallback<C> implements Callback
private static class StreamPromiseCallback implements Callback
{
private final Promise<C> promise;
private final C value;
private final Promise<Stream> promise;
private final IStream stream;

private PromiseCallback(Promise<C> promise, C value)
private StreamPromiseCallback(Promise<Stream> promise, IStream stream)
{
this.promise = promise;
this.value = value;
this.stream = stream;
}

@Override
public void succeeded()
{
promise.succeeded(value);
promise.succeeded(stream);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ private boolean startWrite(Callback callback)
{
if (writing.compareAndSet(null, callback))
return true;
close();
callback.failed(new WritePendingException());
return false;
}
Expand Down Expand Up @@ -275,8 +276,6 @@ public void process(Frame frame, Callback callback)

private void onHeaders(HeadersFrame frame, Callback callback)
{
if (updateClose(frame.isEndStream(), CloseState.Event.RECEIVED))
session.removeStream(this);
MetaData metaData = frame.getMetaData();
if (metaData.isRequest() || metaData.isResponse())
{
Expand All @@ -286,6 +285,10 @@ private void onHeaders(HeadersFrame frame, Callback callback)
length = fields.getLongField(HttpHeader.CONTENT_LENGTH.asString());
dataLength = length >= 0 ? length : Long.MIN_VALUE;
}

if (updateClose(frame.isEndStream(), CloseState.Event.RECEIVED))
session.removeStream(this);

callback.succeeded();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,13 @@ public interface Listener
*/
public void onData(Stream stream, DataFrame frame, Callback callback);

/**
* <p>Callback method invoked when a RST_STREAM frame has been received for this stream.</p>
*
* @param stream the stream
* @param frame the RST_FRAME received
* @param callback the callback to complete when the reset has been handled
*/
public default void onReset(Stream stream, ResetFrame frame, Callback callback)
{
try
Expand Down Expand Up @@ -214,6 +221,14 @@ public default boolean onIdleTimeout(Stream stream, Throwable x)
return true;
}

/**
* <p>Callback method invoked when the stream failed.</p>
*
* @param stream the stream
* @param error the error code
* @param reason the error reason, or null
* @param callback the callback to complete when the failure has been handled
*/
public default void onFailure(Stream stream, int error, String reason, Callback callback)
{
callback.succeeded();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,10 @@ public void onReset(Stream stream, ResetFrame frame)
@Override
public boolean onIdleTimeout(Stream stream, Throwable x)
{
responseFailure(x);
return true;
HttpExchange exchange = getHttpExchange();
if (exchange == null)
return false;
return !exchange.abort(x);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ public void succeeded(Stream stream)
{
channel.setStream(stream);
((IStream)stream).setAttachment(channel);
stream.setIdleTimeout(request.getIdleTimeout());
long idleTimeout = request.getIdleTimeout();
if (idleTimeout >= 0)
stream.setIdleTimeout(idleTimeout);

if (content.hasContent() && !expects100Continue(request))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,22 @@

package org.eclipse.jetty.http2.client.http;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.eclipse.jetty.client.AbstractConnectionPool;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpDestination;
Expand All @@ -43,21 +59,6 @@
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.jupiter.api.Test;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand Down Expand Up @@ -370,7 +371,7 @@ protected void service(String target, Request jettyRequest, HttpServletRequest r
}

@Test
public void testTwoConcurrentStreamsFirstTimesOut() throws Exception
public void testTwoStreamsFirstTimesOut() throws Exception
{
long timeout = 1000;
start(1, new EmptyServerHandler()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,6 @@

package org.eclipse.jetty.http.client;

import static org.eclipse.jetty.http.client.Transport.FCGI;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assumptions.assumeTrue;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -65,6 +56,15 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;

import static org.eclipse.jetty.http.client.Transport.FCGI;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assumptions.assumeTrue;

public class HttpClientContinueTest extends AbstractTest<TransportScenario>
{
@Override
Expand Down Expand Up @@ -344,13 +344,14 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques
}
});

scenario.client.setIdleTimeout(idleTimeout);
scenario.client.setIdleTimeout(2 * idleTimeout);

byte[] content = new byte[1024];
final CountDownLatch latch = new CountDownLatch(1);
scenario.client.newRequest(scenario.newURI())
.header(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString())
.content(new BytesContentProvider(content))
.idleTimeout(idleTimeout, TimeUnit.MILLISECONDS)
.send(new BufferingResponseListener()
{
@Override
Expand Down

0 comments on commit cec84cf

Please sign in to comment.