Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for some invalid server behaviors when a client is aborting a request #11637

Merged
merged 11 commits into from
Apr 23, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.eclipse.jetty.io.AbstractEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.FillInterest;
import org.eclipse.jetty.io.WriteFlusher;
import org.eclipse.jetty.util.BufferUtil;
Expand Down Expand Up @@ -249,7 +250,19 @@ public boolean onReadable()
if (LOG.isDebugEnabled())
LOG.debug("stream #{} is readable, processing: {}", streamId, interested);
if (interested)
{
getFillInterest().fillable();
}
else
{
QuicStreamEndPoint streamEndPoint = getQuicSession().getStreamEndPoint(streamId);
if (streamEndPoint.isStreamFinished())
{
EofException e = new EofException();
streamEndPoint.getFillInterest().onFail(e);
streamEndPoint.getQuicSession().onFailure(e);
}
}
return interested;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1452,12 +1452,11 @@ public void succeeded()
response = httpChannelState._response;
stream = httpChannelState._stream;

// We are being tough on handler implementations and expect them
// to not have pending operations when calling succeeded or failed.
// We convert a call to succeeded with pending demand/write into a call to failed.
if (httpChannelState._onContentAvailable != null)
throw new IllegalStateException("demand pending");
failure = ExceptionUtil.combine(failure, new IllegalStateException("demand pending"));
lorban marked this conversation as resolved.
Show resolved Hide resolved
if (response.lockedIsWriting())
throw new IllegalStateException("write pending");
failure = ExceptionUtil.combine(failure, new IllegalStateException("write pending"));

if (lockedCompleteCallback())
return;
Expand All @@ -1476,7 +1475,7 @@ public void succeeded()
long committedContentLength = httpChannelState._committedContentLength;

if (committedContentLength >= 0 && committedContentLength != totalWritten && !(totalWritten == 0 && HttpMethod.HEAD.is(_request.getMethod())))
failure = new IOException("content-length %d != %d written".formatted(committedContentLength, totalWritten));
failure = ExceptionUtil.combine(failure, new IOException("content-length %d != %d written".formatted(committedContentLength, totalWritten)));

// Is the request fully consumed?
Throwable unconsumed = stream.consumeAvailable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ public void handle()
catch (Throwable t)
{
ExceptionUtil.addSuppressedIfNotAssociated(t, cause);
throw t;
abort(t);
}
}
finally
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,5 +317,14 @@ public boolean isSecure()
case HTTPS, H2, H3 -> true;
};
}

public boolean isMultiplexed()
{
return switch (this)
{
case HTTP, HTTPS, FCGI -> false;
case H2C, H2, H3 -> true;
};
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,233 @@

import java.io.BufferedReader;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import jakarta.servlet.AsyncContext;
import jakarta.servlet.AsyncEvent;
import jakarta.servlet.AsyncListener;
import jakarta.servlet.ReadListener;
import jakarta.servlet.ServletException;
import jakarta.servlet.ServletInputStream;
import jakarta.servlet.http.HttpServlet;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.AsyncRequestContent;
import org.eclipse.jetty.client.BytesRequestContent;
import org.eclipse.jetty.client.ContentResponse;
import org.eclipse.jetty.client.Request;
import org.eclipse.jetty.client.Result;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.util.Callback;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.core.Is.is;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assumptions.assumeTrue;

public class RequestReaderTest extends AbstractTest
{
@ParameterizedTest
@MethodSource("transports")
public void testChannelStateSucceeded(Transport transport) throws Exception
{
CountDownLatch servletDoneLatch = new CountDownLatch(1);
start(transport, new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
{
request.startAsync();

ServletInputStream inputStream = request.getInputStream();
inputStream.setReadListener(new ReadListener()
{
@Override
public void onDataAvailable() throws IOException
{
while (inputStream.isReady() && !inputStream.isFinished())
{
int read = inputStream.read();
if (read < 0)
break;
}
}

@Override
public void onAllDataRead()
{
}

@Override
public void onError(Throwable t)
{
}
});

response.sendError(567);

new Thread(() ->
{
try
{
Thread.sleep(1000);
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
finally
{
servletDoneLatch.countDown();
}
}).start();
}
});
CountDownLatch callbackCompletedLatch = new CountDownLatch(1);
AtomicReference<Throwable> callbackFailure = new AtomicReference<>();
server.stop();
server.insertHandler(new Handler.Wrapper()
{
@Override
public boolean handle(org.eclipse.jetty.server.Request request, Response response, Callback callback) throws Exception
{
return super.handle(request, response, new Callback()
{
@Override
public void succeeded()
{
callback.succeeded();
callbackCompletedLatch.countDown();
}

@Override
public void failed(Throwable x)
{
callback.failed(x);
callbackFailure.set(x);
callbackCompletedLatch.countDown();
}
});
}
});
server.start();

AtomicReference<Result> resultRef = new AtomicReference<>();
try (AsyncRequestContent content = new AsyncRequestContent())
{
Request request = client.newRequest(newURI(transport))
.method("POST")
.timeout(5, TimeUnit.SECONDS)
.body(content);
request.send(resultRef::set);
assertTrue(servletDoneLatch.await(5, TimeUnit.SECONDS));
}

await().atMost(5, TimeUnit.SECONDS).until(resultRef::get, not(nullValue()));
Result result = resultRef.get();
assertThat(result.getResponse().getStatus(), is(567));
assertThat(callbackCompletedLatch.await(5, TimeUnit.SECONDS), is(true));
assertThat(callbackFailure.get(), is(nullValue()));
}

@ParameterizedTest
@MethodSource("transports")
public void testResetArrivingOnServer(Transport transport) throws Exception
{
assumeTrue(transport.isMultiplexed());

CountDownLatch servletOnDataAvailableLatch = new CountDownLatch(1);
AtomicReference<Throwable> serverError = new AtomicReference<>();
CountDownLatch errorDoneLatch = new CountDownLatch(1);
start(transport, new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
{
AsyncContext asyncContext = request.startAsync();
asyncContext.addListener(new AsyncListener()
{
@Override
public void onComplete(AsyncEvent event)
{
}

@Override
public void onTimeout(AsyncEvent event)
{
}

@Override
public void onError(AsyncEvent event) throws IOException
{
serverError.set(event.getThrowable());
response.sendError(567);
asyncContext.complete();
errorDoneLatch.countDown();
}

@Override
public void onStartAsync(AsyncEvent event)
{
}
});

ServletInputStream inputStream = request.getInputStream();
inputStream.setReadListener(new ReadListener()
{
@Override
public void onDataAvailable()
{
servletOnDataAvailableLatch.countDown();
}

@Override
public void onAllDataRead()
{
}

@Override
public void onError(Throwable t)
{
}
});
}
});

AtomicReference<Result> resultRef = new AtomicReference<>();
try (AsyncRequestContent content = new AsyncRequestContent(ByteBuffer.allocate(16)))
{
Request request = client.newRequest(newURI(transport))
.method("POST")
.timeout(5, TimeUnit.SECONDS)
.body(content);
request.send(resultRef::set);
assertTrue(servletOnDataAvailableLatch.await(5, TimeUnit.SECONDS));
request.abort(new ArithmeticException());
}

assertTrue(errorDoneLatch.await(5, TimeUnit.SECONDS));
assertThat(serverError.get(), instanceOf(EofException.class));

await().atMost(5, TimeUnit.SECONDS).until(resultRef::get, not(nullValue()));
Result result = resultRef.get();
assertThat(result.getRequestFailure(), instanceOf(ArithmeticException.class));
assertThat(result.getResponseFailure(), instanceOf(ArithmeticException.class));
assertThat(result.getResponse().getStatus(), is(0));
}

@ParameterizedTest
@MethodSource("transports")
public void testRecyclingWhenUsingReader(Transport transport) throws Exception
Expand Down
Loading