Skip to content

Commit

Permalink
Fix connection pool's forcible stop (#9280)
Browse files Browse the repository at this point in the history
* #9275 fix forcible closure of connections upon stopping the connection pool


Signed-off-by: Ludovic Orban <[email protected]>
  • Loading branch information
lorban authored Feb 1, 2023
1 parent 1b1e18b commit e68c827
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -480,21 +480,26 @@ Collection<Connection> getActiveConnections()

private void close(Pool.Entry<Connection> entry)
{
assert pool.isTerminated();
// Forcibly release and remove entries to
// do our best effort calling the listeners.
// do our best effort calling the listeners;
// the pool is terminated so there is no
// need to release the entries, we can
// directly remove them.
Connection connection = entry.getPooled();
while (entry.isInUse())
while (true)
{
if (entry.release())
if (entry.remove())
{
released(connection);
break;
if (LOG.isDebugEnabled())
LOG.debug("Removed terminated entry {}", entry);
removed(connection);
IO.close(connection);
}
}
if (entry.remove())
{
removed(connection);
IO.close(connection);
if (!entry.isInUse())
break;
if (LOG.isDebugEnabled())
LOG.debug("Entry {} still in use, removing it again", entry);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.time.Duration;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
Expand All @@ -42,6 +43,7 @@
import org.eclipse.jetty.client.ContentResponse;
import org.eclipse.jetty.client.InputStreamRequestContent;
import org.eclipse.jetty.client.InputStreamResponseListener;
import org.eclipse.jetty.client.MultiplexConnectionPool;
import org.eclipse.jetty.client.OutputStreamRequestContent;
import org.eclipse.jetty.client.Response;
import org.eclipse.jetty.client.Result;
Expand All @@ -55,13 +57,16 @@
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.NanoTime;
import org.eclipse.jetty.util.component.LifeCycle;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand All @@ -78,7 +83,7 @@ public class HttpClientStreamTest extends AbstractTest
public void testFileUpload(Transport transport) throws Exception
{
// Prepare a big file to upload
Path targetTestsDir = MavenTestingUtils.getTargetTestingDir().toPath();
Path targetTestsDir = MavenTestingUtils.getTargetTestingPath();
Files.createDirectories(targetTestsDir);
Path upload = Paths.get(targetTestsDir.toString(), "http_client_upload.big");
try (OutputStream output = Files.newOutputStream(upload, StandardOpenOption.CREATE))
Expand Down Expand Up @@ -1099,6 +1104,63 @@ public boolean process(Request request, org.eclipse.jetty.server.Response respon
assertTrue(result.isSucceeded());
}

@ParameterizedTest
@MethodSource("transports")
public void testMultiplexedConnectionsForcibleStop(Transport transport) throws Exception
{
Assumptions.assumeTrue(transport.isMultiplexed());

long timeoutInSeconds = 5;
AtomicInteger processCount = new AtomicInteger();
CountDownLatch processLatch = new CountDownLatch(1);
startServer(transport, new Handler.Abstract()
{
@Override
public boolean process(Request request, org.eclipse.jetty.server.Response response, Callback callback) throws Exception
{
processCount.incrementAndGet();
processLatch.await(timeoutInSeconds * 2, TimeUnit.SECONDS);
callback.succeeded();
return true;
}
});
startClient(transport);
// Set up the client such as:
// - only one connection can be created;
// - the connection can be used by two concurrent requests;
// - the connection is pre-created.
client.setMaxConnectionsPerDestination(1);
client.getTransport().setConnectionPoolFactory(destination ->
{
MultiplexConnectionPool pool = new MultiplexConnectionPool(destination, 1, 2);
LifeCycle.start(pool);
try
{
pool.preCreateConnections(1).get();
}
catch (InterruptedException | ExecutionException e)
{
throw new RuntimeException(e);
}
return pool;
});

// Send two parallel requests.
CountDownLatch clientLatch = new CountDownLatch(2);
client.newRequest(newURI(transport)).send(result -> clientLatch.countDown());
client.newRequest(newURI(transport)).send(result -> clientLatch.countDown());

// Wait until both requests are in-flight.
await().atMost(timeoutInSeconds, TimeUnit.SECONDS).until(processCount::get, is(2));

// Stop the client while it has requests in-flight.
Assertions.assertTimeout(Duration.ofSeconds(timeoutInSeconds), () -> LifeCycle.stop(client));

// Let the server threads go & wait for the requests to be processed.
processLatch.countDown();
assertTrue(clientLatch.await(timeoutInSeconds, TimeUnit.SECONDS));
}

private record HandlerContext(Request request, org.eclipse.jetty.server.Response response, Callback callback)
{
}
Expand Down

0 comments on commit e68c827

Please sign in to comment.