Skip to content

Commit

Permalink
Improved logging and toString() implementations,
Browse files Browse the repository at this point in the history
small refactorings in code and tests.

Signed-off-by: Simone Bordet <[email protected]>
  • Loading branch information
sbordet committed Aug 22, 2020
1 parent 70a679f commit 2d3f0e0
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public CompletableFuture<Void> preCreateConnections(int connectionCount)
CompletableFuture<?>[] futures = new CompletableFuture[connectionCount];
for (int i = 0; i < connectionCount; i++)
{
futures[i] = tryCreateReturningFuture(pool.getMaxEntries());
futures[i] = tryCreateAsync(getMaxConnectionCount());
}
return CompletableFuture.allOf(futures);
}
Expand Down Expand Up @@ -175,6 +175,8 @@ public Connection acquire()
*/
protected Connection acquire(boolean create)
{
if (LOG.isDebugEnabled())
LOG.debug("Acquiring create={} on {}", create, this);
Connection connection = activate();
if (connection == null && create)
{
Expand All @@ -196,20 +198,21 @@ protected Connection acquire(boolean create)
*/
protected void tryCreate(int maxPending)
{
tryCreateReturningFuture(maxPending);
tryCreateAsync(maxPending);
}

private CompletableFuture<Void> tryCreateReturningFuture(int maxPending)
private CompletableFuture<Void> tryCreateAsync(int maxPending)
{
int connectionCount = getConnectionCount();
if (LOG.isDebugEnabled())
LOG.debug("tryCreate {}/{} connections {}/{} pending", pool.size(), pool.getMaxEntries(), getPendingConnectionCount(), maxPending);
LOG.debug("Try creating connection {}/{} with {}/{} pending", connectionCount, getMaxConnectionCount(), getPendingConnectionCount(), maxPending);

Pool<Connection>.Entry entry = pool.reserve(maxPending);
if (entry == null)
return CompletableFuture.completedFuture(null);

if (LOG.isDebugEnabled())
LOG.debug("newConnection {}/{} connections {}/{} pending", pool.size(), pool.getMaxEntries(), getPendingConnectionCount(), maxPending);
LOG.debug("Creating connection {}/{}", connectionCount, getMaxConnectionCount());

CompletableFuture<Void> future = new CompletableFuture<>();
destination.newConnection(new Promise<Connection>()
Expand All @@ -218,7 +221,7 @@ private CompletableFuture<Void> tryCreateReturningFuture(int maxPending)
public void succeeded(Connection connection)
{
if (LOG.isDebugEnabled())
LOG.debug("Connection {}/{} creation succeeded {}", pool.size(), pool.getMaxEntries(), connection);
LOG.debug("Connection {}/{} creation succeeded {}", connectionCount, getMaxConnectionCount(), connection);
if (!(connection instanceof Attachable))
{
failed(new IllegalArgumentException("Invalid connection object: " + connection));
Expand All @@ -236,7 +239,7 @@ public void succeeded(Connection connection)
public void failed(Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("Connection " + pool.size() + "/" + pool.getMaxEntries() + " creation failed", x);
LOG.debug("Connection {}/{} creation failed", connectionCount, getMaxConnectionCount(), x);
entry.remove();
future.completeExceptionally(x);
requester.failed(x);
Expand All @@ -257,7 +260,7 @@ protected Connection activate()
if (entry != null)
{
if (LOG.isDebugEnabled())
LOG.debug("activated {}", entry);
LOG.debug("Activated {} {}", entry, pool);
Connection connection = entry.getPooled();
acquired(connection);
return connection;
Expand All @@ -275,8 +278,6 @@ public boolean isActive(Connection connection)
Pool<Connection>.Entry entry = (Pool<Connection>.Entry)attachable.getAttachment();
if (entry == null)
return false;
if (LOG.isDebugEnabled())
LOG.debug("isActive {}", entry);
return !entry.isIdle();
}

Expand All @@ -300,7 +301,7 @@ protected boolean deactivate(Connection connection)
return true;
boolean reusable = pool.release(entry);
if (LOG.isDebugEnabled())
LOG.debug("Released ({}) {}", reusable, entry);
LOG.debug("Released ({}) {} {}", reusable, entry, pool);
if (reusable)
return true;
remove(connection);
Expand All @@ -325,7 +326,7 @@ protected boolean remove(Connection connection, boolean force)
attachable.setAttachment(null);
boolean removed = pool.remove(entry);
if (LOG.isDebugEnabled())
LOG.debug("Removed ({}) {}", removed, entry);
LOG.debug("Removed ({}) {} {}", removed, entry, pool);
if (removed || force)
{
released(connection);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ private ProcessResult process(Connection connection)
{
// Aggressively send other queued requests
// in case connections are multiplexed.
return getHttpExchanges().size() > 0 ? ProcessResult.CONTINUE : ProcessResult.FINISH;
return getQueuedRequestCount() > 0 ? ProcessResult.CONTINUE : ProcessResult.FINISH;
}

if (LOG.isDebugEnabled())
Expand Down Expand Up @@ -438,7 +438,7 @@ public void release(Connection connection)
{
if (connectionPool.isActive(connection))
{
// trigger the next request after releasing the connection
// Trigger the next request after releasing the connection.
if (connectionPool.release(connection))
send(false);
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

Expand All @@ -63,10 +62,15 @@ public class ConnectionPoolTest
private HttpClient client;

public static Stream<ConnectionPoolFactory> pools()
{
return Stream.concat(poolsNoRoundRobin(),
Stream.of(new ConnectionPoolFactory("round-robin", destination -> new RoundRobinConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination))));
}

public static Stream<ConnectionPoolFactory> poolsNoRoundRobin()
{
return Stream.of(
new ConnectionPoolFactory("duplex", destination -> new DuplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination)),
new ConnectionPoolFactory("round-robin", destination -> new RoundRobinConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination)),
new ConnectionPoolFactory("multiplex", destination -> new MultiplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, 1))
);
}
Expand Down Expand Up @@ -295,11 +299,11 @@ public void resolve(String host, int port, Promise<List<InetSocketAddress>> prom
}

@ParameterizedTest
@MethodSource("pools")
@MethodSource("poolsNoRoundRobin")
public void testConcurrentRequestsDontOpenTooManyConnections(ConnectionPoolFactory factory) throws Exception
{
// Round robin connection pool does open a few more connections than expected.
Assumptions.assumeFalse(factory.name.equals("round-robin"));
// Round robin connection pool does open a few more
// connections than expected, exclude it from this test.

startServer(new EmptyServerHandler());

Expand Down
11 changes: 9 additions & 2 deletions jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,12 @@ public void dump(Appendable out, String indent) throws IOException
@Override
public String toString()
{
return getClass().getSimpleName() + " size=" + sharedList.size() + " closed=" + closed + " entries=" + sharedList;
return String.format("%s@%x[size=%d closed=%s entries=%s]",
getClass().getSimpleName(),
hashCode(),
sharedList.size(),
closed,
sharedList);
}

public class Entry
Expand Down Expand Up @@ -544,11 +549,13 @@ public int getUsageCount()
public String toString()
{
long encoded = state.get();
return String.format("%s@%x{usage=%d,multiplex=%d,pooled=%s}",
return String.format("%s@%x{usage=%d/%d,multiplex=%d/%d,pooled=%s}",
getClass().getSimpleName(),
hashCode(),
AtomicBiInteger.getHi(encoded),
getMaxUsageCount(),
AtomicBiInteger.getLo(encoded),
getMaxMultiplex(),
pooled);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@
package org.eclipse.jetty.http.client;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -56,7 +57,7 @@ public void testRoundRobin(Transport transport) throws Exception
{
init(transport);
AtomicBoolean record = new AtomicBoolean();
List<Integer> remotePorts = new ArrayList<>();
List<Integer> remotePorts = new CopyOnWriteArrayList<>();
scenario.start(new EmptyServerHandler()
{
@Override
Expand Down Expand Up @@ -115,8 +116,7 @@ public void testMultiplex(Transport transport) throws Exception
int maxConnections = 3;
int count = maxConnections * maxMultiplex;

AtomicBoolean record = new AtomicBoolean();
List<Integer> remotePorts = new ArrayList<>();
List<Integer> remotePorts = new CopyOnWriteArrayList<>();
AtomicReference<CountDownLatch> requestLatch = new AtomicReference<>();
CountDownLatch serverLatch = new CountDownLatch(count);
CyclicBarrier barrier = new CyclicBarrier(count + 1);
Expand All @@ -127,13 +127,10 @@ protected void service(String target, Request jettyRequest, HttpServletRequest r
{
try
{
if (record.get())
{
remotePorts.add(request.getRemotePort());
requestLatch.get().countDown();
serverLatch.countDown();
barrier.await();
}
remotePorts.add(request.getRemotePort());
requestLatch.get().countDown();
serverLatch.countDown();
barrier.await();
}
catch (Exception x)
{
Expand All @@ -144,17 +141,9 @@ protected void service(String target, Request jettyRequest, HttpServletRequest r

scenario.client.getTransport().setConnectionPoolFactory(destination -> new RoundRobinConnectionPool(destination, maxConnections, destination, maxMultiplex));

// Prime the connections, so that they are all opened
// before we actually test the round robin behavior.
for (int i = 0; i < maxConnections; ++i)
{
ContentResponse response = scenario.client.newRequest(scenario.newURI())
.timeout(5, TimeUnit.SECONDS)
.send();
assertEquals(HttpStatus.OK_200, response.getStatus());
}
// Do not prime the connections, to see if the behavior is
// correct even if the connections are not pre-created.

record.set(true);
CountDownLatch clientLatch = new CountDownLatch(count);
AtomicInteger requests = new AtomicInteger();
for (int i = 0; i < count; ++i)
Expand Down

0 comments on commit 2d3f0e0

Please sign in to comment.