From 52d38b573c680c9ba144e440de8ebe64675bbc67 Mon Sep 17 00:00:00 2001 From: Idel Pivnitskiy Date: Fri, 1 Apr 2022 18:30:09 -0700 Subject: [PATCH] `RoundRobinLoadBalancer`: better identify excess connections Motivation: Cold start or traffic spikes can result in opening excess connections to handle unexpected load. Current selection algorithm in `RoundRobinLoadBalancer` attempts to pick a random connection. This approach does not let to decrease number of open connections when traffic stabilizes. Modifications: Introduce `linearSearchSpace` configuration option for `RoundRobinLoadBalancerFactory` to start a linear search before falling back to a random selection. Result: Allows configuring more predictable selection of the connection. When configured with `IDLE_TIMEOUT` socket option, helps to reduce number of connections that can serve multiplexed or pipelined requests. Default value of 16 has no performance impact on non-pipelined HTTP/1.1 connections, but significantly reduces latencies for HTTP/2 requests and number of open HTTP/2 connections. --- .../loadbalancer/RoundRobinLoadBalancer.java | 35 +++++++++++++----- .../RoundRobinLoadBalancerFactory.java | 36 ++++++++++++++++--- 2 files changed, 57 insertions(+), 14 deletions(-) diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java index 6c197db638..15864026b6 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java @@ -71,6 +71,7 @@ import static io.servicetalk.concurrent.api.SourceAdapters.toSource; import static io.servicetalk.concurrent.internal.FlowControlUtils.addWithOverflowProtection; import static java.lang.Integer.toHexString; +import static java.lang.Math.min; import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; import static java.util.Objects.requireNonNull; @@ -103,7 +104,7 @@ final class RoundRobinLoadBalancer eventStream; private final SequentialCancellable discoveryCancellable = new SequentialCancellable(); private final ConnectionFactory connectionFactory; + private final int linearSearchSpace; private final ListenableAsyncCloseable asyncCloseable; /** @@ -141,11 +143,13 @@ final class RoundRobinLoadBalancer>> eventPublisher, final ConnectionFactory connectionFactory, + final int linearSearchSpace, @Nullable final HealthCheckConfig healthCheckConfig) { this.targetResource = requireNonNull(targetResourceName) + " (instance @" + toHexString(hashCode()) + ')'; Processor eventStreamProcessor = newPublisherProcessorDropHeadOnOverflow(32); this.eventStream = fromSource(eventStreamProcessor); this.connectionFactory = requireNonNull(connectionFactory); + this.linearSearchSpace = linearSearchSpace; toSource(eventPublisher).subscribe( new Subscriber>>() { @@ -352,23 +356,36 @@ private Single selectConnection0(Predicate selector) { Host pickedHost = null; for (int i = 0; i < usedHosts.size(); ++i) { // for a particular iteration we maintain a local cursor without contention with other requests - int localCursor = (cursor + i) % usedHosts.size(); + final int localCursor = (cursor + i) % usedHosts.size(); final Host host = usedHosts.get(localCursor); assert host != null : "Host can't be null."; // Try first to see if an existing connection can be used final Object[] connections = host.connState.connections; - // With small enough search space, attempt all connections. - // Back off after exploring most of the search space, it gives diminishing returns. - final int attempts = connections.length < MIN_SEARCH_SPACE ? - connections.length : (int) (connections.length * SEARCH_FACTOR); - for (int j = 0; j < attempts; ++j) { + // Exhaust the linear search space first: + final int linearAttempts = min(connections.length, linearSearchSpace); + for (int j = 0; j < linearAttempts; ++j) { @SuppressWarnings("unchecked") - final C connection = (C) connections[rnd.nextInt(connections.length)]; + final C connection = (C) connections[j]; if (selector.test(connection)) { return succeeded(connection); } } + // Try other connections randomly: + if (connections.length > linearAttempts) { + final int diff = connections.length - linearAttempts; + // With small enough search space, attempt number of times equal to number of remaining connections. + // Back off after exploring most of the search space, it gives diminishing returns. + final int randomAttempts = diff < MIN_RANDOM_SEARCH_SPACE ? diff : + (int) (diff * RANDOM_SEARCH_FACTOR); + for (int j = 0; j < randomAttempts; ++j) { + @SuppressWarnings("unchecked") + final C connection = (C) connections[rnd.nextInt(linearAttempts, connections.length)]; + if (selector.test(connection)) { + return succeeded(connection); + } + } + } // Don't open new connections for expired or unhealthy hosts, try a different one. // Unhealthy hosts have no open connections – that's why we don't fail earlier, the loop will not progress. diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java index f631e5c980..9d83edbac1 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java @@ -73,13 +73,16 @@ public final class RoundRobinLoadBalancerFactory implements LoadBalancerFactory { - static final Duration DEFAULT_HEALTH_CHECK_INTERVAL = Duration.ofSeconds(1); + private static final Duration DEFAULT_HEALTH_CHECK_INTERVAL = Duration.ofSeconds(1); static final int DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD = 5; // higher than default for AutoRetryStrategy + private final int linearSearchSpace; @Nullable private final HealthCheckConfig healthCheckConfig; - private RoundRobinLoadBalancerFactory(@Nullable HealthCheckConfig healthCheckConfig) { + private RoundRobinLoadBalancerFactory(final int linearSearchSpace, + @Nullable final HealthCheckConfig healthCheckConfig) { + this.linearSearchSpace = linearSearchSpace; this.healthCheckConfig = healthCheckConfig; } @@ -89,7 +92,7 @@ public LoadBalancer newLoadBalancer( final Publisher>> eventPublisher, final ConnectionFactory connectionFactory) { return new RoundRobinLoadBalancer<>(requireNonNull(targetResource), eventPublisher, connectionFactory, - healthCheckConfig); + linearSearchSpace, healthCheckConfig); } @Override @@ -105,6 +108,7 @@ public ExecutionStrategy requiredOffloads() { * @param The type of connection. */ public static final class Builder { + private int linearSearchSpace = 16; @Nullable private Executor backgroundExecutor; private Duration healthCheckInterval = DEFAULT_HEALTH_CHECK_INTERVAL; @@ -116,6 +120,28 @@ public static final class Builder + * When the next host has already opened connections, this {@link LoadBalancer} will perform a linear search for + * a connection that can serve the next request up to a specified number of attempts. If there are more open + * connections, selection of remaining connections will be attempted randomly. + *

+ * Higher linear search space may help to better identify excess connections in highly concurrent environments, + * but may result in slightly increased selection time. + * + * @param linearSearchSpace the number of attempts for a linear search space, {@code 0} enforces random + * selection all the time. + * @return {@code this}. + */ + public RoundRobinLoadBalancerFactory.Builder linearSearchSpace(int linearSearchSpace) { + if (linearSearchSpace < 0) { + throw new IllegalArgumentException("linearSearchSpace: " + linearSearchSpace + " (expected >=0)"); + } + this.linearSearchSpace = linearSearchSpace; + return this; + } + /** * This {@link LoadBalancer} may monitor hosts to which connection establishment has failed * using health checks that run in the background. The health check tries to establish a new connection @@ -185,14 +211,14 @@ public RoundRobinLoadBalancerFactory.Builder healthCheckFail */ public RoundRobinLoadBalancerFactory build() { if (this.healthCheckFailedConnectionsThreshold < 0) { - return new RoundRobinLoadBalancerFactory<>(null); + return new RoundRobinLoadBalancerFactory<>(linearSearchSpace, null); } HealthCheckConfig healthCheckConfig = new HealthCheckConfig( this.backgroundExecutor == null ? SharedExecutor.getInstance() : this.backgroundExecutor, healthCheckInterval, healthCheckFailedConnectionsThreshold); - return new RoundRobinLoadBalancerFactory<>(healthCheckConfig); + return new RoundRobinLoadBalancerFactory<>(linearSearchSpace, healthCheckConfig); } }