Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RoundRobinLoadBalancer: better identify excess connections #2173

Merged
merged 1 commit into from
Apr 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import static io.servicetalk.concurrent.api.SourceAdapters.toSource;
import static io.servicetalk.concurrent.internal.FlowControlUtils.addWithOverflowProtection;
import static java.lang.Integer.toHexString;
import static java.lang.Math.min;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -103,7 +104,7 @@ final class RoundRobinLoadBalancer<ResolvedAddress, C extends LoadBalancedConnec
* exhausting the full search space without sacrificing too much latency caused by the cost of a CAS operation per
* selection attempt.
*/
private static final int MIN_SEARCH_SPACE = 64;
private static final int MIN_RANDOM_SEARCH_SPACE = 64;

/**
* For larger search spaces, due to the cost of a CAS operation per selection attempt we see diminishing returns for
Expand All @@ -113,7 +114,7 @@ final class RoundRobinLoadBalancer<ResolvedAddress, C extends LoadBalancedConnec
* The current heuristics were chosen based on a set of benchmarks under various circumstances, low connection
* counts, larger connection counts, low connection churn, high connection churn.
*/
private static final float SEARCH_FACTOR = 0.75f;
private static final float RANDOM_SEARCH_FACTOR = 0.75f;

@SuppressWarnings("unused")
private volatile int index;
Expand All @@ -123,6 +124,7 @@ final class RoundRobinLoadBalancer<ResolvedAddress, C extends LoadBalancedConnec
private final Publisher<Object> eventStream;
private final SequentialCancellable discoveryCancellable = new SequentialCancellable();
private final ConnectionFactory<ResolvedAddress, ? extends C> connectionFactory;
private final int linearSearchSpace;
private final ListenableAsyncCloseable asyncCloseable;

/**
Expand All @@ -141,11 +143,13 @@ final class RoundRobinLoadBalancer<ResolvedAddress, C extends LoadBalancedConnec
final String targetResourceName,
final Publisher<? extends Collection<? extends ServiceDiscovererEvent<ResolvedAddress>>> eventPublisher,
final ConnectionFactory<ResolvedAddress, ? extends C> connectionFactory,
final int linearSearchSpace,
@Nullable final HealthCheckConfig healthCheckConfig) {
this.targetResource = requireNonNull(targetResourceName) + " (instance @" + toHexString(hashCode()) + ')';
Processor<Object, Object> eventStreamProcessor = newPublisherProcessorDropHeadOnOverflow(32);
this.eventStream = fromSource(eventStreamProcessor);
this.connectionFactory = requireNonNull(connectionFactory);
this.linearSearchSpace = linearSearchSpace;

toSource(eventPublisher).subscribe(
new Subscriber<Collection<? extends ServiceDiscovererEvent<ResolvedAddress>>>() {
Expand Down Expand Up @@ -352,23 +356,36 @@ private Single<C> selectConnection0(Predicate<C> selector) {
Host<ResolvedAddress, C> pickedHost = null;
for (int i = 0; i < usedHosts.size(); ++i) {
// for a particular iteration we maintain a local cursor without contention with other requests
int localCursor = (cursor + i) % usedHosts.size();
final int localCursor = (cursor + i) % usedHosts.size();
final Host<ResolvedAddress, C> host = usedHosts.get(localCursor);
assert host != null : "Host can't be null.";

// Try first to see if an existing connection can be used
final Object[] connections = host.connState.connections;
// With small enough search space, attempt all connections.
// Back off after exploring most of the search space, it gives diminishing returns.
final int attempts = connections.length < MIN_SEARCH_SPACE ?
connections.length : (int) (connections.length * SEARCH_FACTOR);
for (int j = 0; j < attempts; ++j) {
// Exhaust the linear search space first:
final int linearAttempts = min(connections.length, linearSearchSpace);
for (int j = 0; j < linearAttempts; ++j) {
@SuppressWarnings("unchecked")
final C connection = (C) connections[rnd.nextInt(connections.length)];
final C connection = (C) connections[j];
if (selector.test(connection)) {
return succeeded(connection);
}
}
// Try other connections randomly:
if (connections.length > linearAttempts) {
final int diff = connections.length - linearAttempts;
// With small enough search space, attempt number of times equal to number of remaining connections.
// Back off after exploring most of the search space, it gives diminishing returns.
final int randomAttempts = diff < MIN_RANDOM_SEARCH_SPACE ? diff :
(int) (diff * RANDOM_SEARCH_FACTOR);
for (int j = 0; j < randomAttempts; ++j) {
@SuppressWarnings("unchecked")
final C connection = (C) connections[rnd.nextInt(linearAttempts, connections.length)];
if (selector.test(connection)) {
return succeeded(connection);
}
}
}

// Don't open new connections for expired or unhealthy hosts, try a different one.
// Unhealthy hosts have no open connections – that's why we don't fail earlier, the loop will not progress.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,16 @@
public final class RoundRobinLoadBalancerFactory<ResolvedAddress, C extends LoadBalancedConnection>
implements LoadBalancerFactory<ResolvedAddress, C> {

static final Duration DEFAULT_HEALTH_CHECK_INTERVAL = Duration.ofSeconds(1);
private static final Duration DEFAULT_HEALTH_CHECK_INTERVAL = Duration.ofSeconds(1);
static final int DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD = 5; // higher than default for AutoRetryStrategy

private final int linearSearchSpace;
@Nullable
private final HealthCheckConfig healthCheckConfig;

private RoundRobinLoadBalancerFactory(@Nullable HealthCheckConfig healthCheckConfig) {
private RoundRobinLoadBalancerFactory(final int linearSearchSpace,
@Nullable final HealthCheckConfig healthCheckConfig) {
this.linearSearchSpace = linearSearchSpace;
this.healthCheckConfig = healthCheckConfig;
}

Expand All @@ -89,7 +92,7 @@ public <T extends C> LoadBalancer<T> newLoadBalancer(
final Publisher<? extends Collection<? extends ServiceDiscovererEvent<ResolvedAddress>>> eventPublisher,
final ConnectionFactory<ResolvedAddress, T> connectionFactory) {
return new RoundRobinLoadBalancer<>(requireNonNull(targetResource), eventPublisher, connectionFactory,
healthCheckConfig);
linearSearchSpace, healthCheckConfig);
}

@Override
Expand All @@ -105,6 +108,7 @@ public ExecutionStrategy requiredOffloads() {
* @param <C> The type of connection.
*/
public static final class Builder<ResolvedAddress, C extends LoadBalancedConnection> {
private int linearSearchSpace = 16;
@Nullable
private Executor backgroundExecutor;
private Duration healthCheckInterval = DEFAULT_HEALTH_CHECK_INTERVAL;
Expand All @@ -116,6 +120,28 @@ public static final class Builder<ResolvedAddress, C extends LoadBalancedConnect
public Builder() {
}

/**
* Sets the linear search space to find an available connection for the next host.
* <p>
* When the next host has already opened connections, this {@link LoadBalancer} will perform a linear search for
* a connection that can serve the next request up to a specified number of attempts. If there are more open
* connections, selection of remaining connections will be attempted randomly.
* <p>
* Higher linear search space may help to better identify excess connections in highly concurrent environments,
* but may result in slightly increased selection time.
*
* @param linearSearchSpace the number of attempts for a linear search space, {@code 0} enforces random
* selection all the time.
* @return {@code this}.
*/
public RoundRobinLoadBalancerFactory.Builder<ResolvedAddress, C> linearSearchSpace(int linearSearchSpace) {
if (linearSearchSpace < 0) {
throw new IllegalArgumentException("linearSearchSpace: " + linearSearchSpace + " (expected >=0)");
}
this.linearSearchSpace = linearSearchSpace;
return this;
}

/**
* This {@link LoadBalancer} may monitor hosts to which connection establishment has failed
* using health checks that run in the background. The health check tries to establish a new connection
Expand Down Expand Up @@ -185,14 +211,14 @@ public RoundRobinLoadBalancerFactory.Builder<ResolvedAddress, C> healthCheckFail
*/
public RoundRobinLoadBalancerFactory<ResolvedAddress, C> build() {
if (this.healthCheckFailedConnectionsThreshold < 0) {
return new RoundRobinLoadBalancerFactory<>(null);
return new RoundRobinLoadBalancerFactory<>(linearSearchSpace, null);
}

HealthCheckConfig healthCheckConfig = new HealthCheckConfig(
this.backgroundExecutor == null ? SharedExecutor.getInstance() : this.backgroundExecutor,
healthCheckInterval, healthCheckFailedConnectionsThreshold);

return new RoundRobinLoadBalancerFactory<>(healthCheckConfig);
return new RoundRobinLoadBalancerFactory<>(linearSearchSpace, healthCheckConfig);
}
}

Expand Down