Skip to content

Fixes #9121 - Flaky BlockedWritesWithSmallThreadPoolTest.testServerThreadsBlockedInWrites(). #12178

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory;
import org.eclipse.jetty.http2.server.RawHTTP2ServerConnectionFactory;
import org.eclipse.jetty.io.AbstractEndPoint;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.Request;
Expand All @@ -45,7 +46,6 @@
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

import static java.util.concurrent.TimeUnit.SECONDS;
Expand Down Expand Up @@ -100,19 +100,19 @@ public void dispose()
}

@Test
@Tag("flaky")
public void testServerThreadsBlockedInWrites() throws Exception
{
int contentLength = 16 * 1024 * 1024;
AtomicReference<AbstractEndPoint> serverEndPointRef = new AtomicReference<>();
start(new Handler.Abstract()
{
@Override
public boolean handle(Request request, Response response, Callback callback)
public boolean handle(Request request, Response response, Callback callback) throws Exception
{
serverEndPointRef.compareAndSet(null, (AbstractEndPoint)request.getConnectionMetaData().getConnection().getEndPoint());
// Write a large content to cause TCP congestion.
response.write(true, ByteBuffer.wrap(new byte[contentLength]), callback);
// Blocking write a large content to cause TCP congestion.
Content.Sink.write(response, true, ByteBuffer.wrap(new byte[contentLength]));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we test both async and blocking writes here?

callback.succeeded();
return true;
}
});
Expand All @@ -138,21 +138,20 @@ public boolean handle(Request request, Response response, Callback callback)
@Override
public void onDataAvailable(Stream stream)
{
Stream.Data data = stream.readData();
try
{
// Block here to stop reading from the network
// to cause the server to TCP congest.
clientBlockLatch.await(5, SECONDS);
Stream.Data data = stream.readData();
data.release();
if (data.frame().isEndStream())
clientDataLatch.countDown();
else
stream.demand();
}
catch (InterruptedException x)
catch (InterruptedException ignored)
{
data.release();
}
}
});
Expand All @@ -172,16 +171,137 @@ public void onDataAvailable(Stream stream)
await().atMost(5, SECONDS).until(() -> serverThreads.getAvailableReservedThreads() == 1);
}
// Use the reserved thread for a blocking operation, simulating another blocking write.
long delaySeconds = 10;
CountDownLatch serverBlockLatch = new CountDownLatch(1);
assertTrue(serverThreads.tryExecute(() -> await().atMost(20, SECONDS).until(() -> serverBlockLatch.await(15, SECONDS), b -> true)));
assertTrue(serverThreads.tryExecute(() ->
{
try
{
serverBlockLatch.await(2 * delaySeconds, SECONDS);
}
catch (InterruptedException ignored)
{
}
}));

// No more threads are available on the server.
assertEquals(0, serverThreads.getReadyThreads());

// Unblock the client to read from the network, which should unblock the server write().
clientBlockLatch.countDown();

assertTrue(clientDataLatch.await(10, SECONDS), server.dump());
assertTrue(clientDataLatch.await(delaySeconds, SECONDS), server.dump());

// Unblock blocked threads.
serverBlockLatch.countDown();
}

@Test
public void testServerThreadsInPendingWrites() throws Exception
{
int contentLength = 16 * 1024 * 1024;
AtomicReference<AbstractEndPoint> serverEndPointRef = new AtomicReference<>();
start(new Handler.Abstract()
{
@Override
public boolean handle(Request request, Response response, Callback callback)
{
serverEndPointRef.set((AbstractEndPoint)request.getConnectionMetaData().getConnection().getEndPoint());
// Large write that will TCP congest, but it is non-blocking.
response.write(true, ByteBuffer.allocate(contentLength), callback);
return true;
}
});

client = new HTTP2Client();
// Set large flow control windows so the server hits TCP congestion.
int window = 2 * contentLength;
client.setInitialSessionRecvWindow(window);
client.setInitialStreamRecvWindow(window);
client.start();

CountDownLatch clientBlockLatch = new CountDownLatch(1);
CountDownLatch clientDataLatch = new CountDownLatch(1);
Session session = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Listener() {})
.get(5, SECONDS);
HttpURI uri = HttpURI.build("http://localhost:" + connector.getLocalPort() + "/congest");
MetaData.Request request = new MetaData.Request("GET", uri, HttpVersion.HTTP_2, HttpFields.EMPTY);
session.newStream(new HeadersFrame(request, null, true), new Stream.Listener()
{
@Override
public void onDataAvailable(Stream stream)
{
try
{
// Block here to stop reading from the network
// to cause the server to TCP congest.
clientBlockLatch.await(5, SECONDS);
Stream.Data data = stream.readData();
data.release();
if (data.frame().isEndStream())
clientDataLatch.countDown();
else
stream.demand();
}
catch (InterruptedException ignored)
{
}
}
});

await().atMost(5, SECONDS).until(() ->
{
AbstractEndPoint serverEndPoint = serverEndPointRef.get();
return serverEndPoint != null && serverEndPoint.getWriteFlusher().isPending();
});
// Wait for NIO on the server to be OP_WRITE interested.
Thread.sleep(1000);

// Handler.handle() should have returned, make sure we block that thread.
long delaySeconds = 10;
await().atMost(5, SECONDS).until(() -> serverThreads.getIdleThreads() == 1);
CountDownLatch serverBlockLatch = new CountDownLatch(1);
serverThreads.execute(() ->
{
try
{
serverBlockLatch.await(2 * delaySeconds, SECONDS);
}
catch (InterruptedException ignored)
{
}
});

// Make sure there is a reserved thread.
if (serverThreads.getAvailableReservedThreads() != 1)
{
assertFalse(serverThreads.tryExecute(() -> {}));
await().atMost(5, SECONDS).until(() -> serverThreads.getAvailableReservedThreads() == 1);
}
// Use the reserved thread for a blocking operation, simulating another blocking write.
CountDownLatch reservedBlockLatch = new CountDownLatch(1);
assertTrue(serverThreads.tryExecute(() ->
{
try
{
reservedBlockLatch.await(2 * delaySeconds, SECONDS);
}
catch (InterruptedException ignored)
{
}
}));

// No more threads are available on the server.
assertEquals(0, serverThreads.getReadyThreads());

// Unblock the client to read from the network, which must unblock the server write() and send a response.
clientBlockLatch.countDown();

assertTrue(clientDataLatch.await(delaySeconds, SECONDS), server.dump());

// Unblock blocked threads.
serverBlockLatch.countDown();
reservedBlockLatch.countDown();
}

@Test
Expand All @@ -200,12 +320,12 @@ public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
@Override
public void onDataAvailable(Stream stream)
{
Stream.Data data = stream.readData();
try
{
// Block here to stop reading from the network
// to cause the client to TCP congest.
serverBlockLatch.await(5, SECONDS);
Stream.Data data = stream.readData();
data.release();
if (data.frame().isEndStream())
{
Expand All @@ -217,9 +337,8 @@ public void onDataAvailable(Stream stream)
stream.demand();
}
}
catch (InterruptedException x)
catch (InterruptedException ignored)
{
data.release();
}
}
};
Expand Down Expand Up @@ -277,14 +396,27 @@ public void onHeaders(Stream stream, HeadersFrame frame)
await().atMost(5, SECONDS).until(() -> clientThreads.getAvailableReservedThreads() == 1);
}
// Use the reserved thread for a blocking operation, simulating another blocking write.
assertTrue(clientThreads.tryExecute(() -> await().until(() -> clientBlockLatch.await(15, SECONDS), b -> true)));
long delaySeconds = 10;
assertTrue(clientThreads.tryExecute(() ->
{
try
{
clientBlockLatch.await(2 * delaySeconds, SECONDS);
}
catch (InterruptedException ignored)
{
}
}));

// No more threads are available on the client.
await().atMost(5, SECONDS).until(() -> clientThreads.getReadyThreads() == 0);

// Unblock the server to read from the network, which should unblock the client.
serverBlockLatch.countDown();

assertTrue(latch.await(10, SECONDS), client.dump());
assertTrue(latch.await(delaySeconds, SECONDS), client.dump());

// Unblock blocked threads.
clientBlockLatch.countDown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,12 @@ static Throwable consumeAvailable(HttpStream stream, HttpConfiguration httpConfi
return CONTENT_NOT_CONSUMED;
}

@Override
default InvocationType getInvocationType()
{
return InvocationType.NON_BLOCKING;
}

class Wrapper implements HttpStream
{
private final HttpStream _wrapped;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1636,7 +1636,6 @@ private boolean lockedCompleteCallback()
@Override
public InvocationType getInvocationType()
{
// TODO review this as it is probably not correct
return _request.getHttpStream().getInvocationType();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1610,12 +1610,6 @@ private void abort(Throwable failure)
{
getEndPoint().close(failure);
}

@Override
public InvocationType getInvocationType()
{
return HttpStream.super.getInvocationType();
}
}

private class TunnelSupportOverHTTP1 implements TunnelSupport
Expand Down
Loading